server+peer: re-write persistent connection handling

The prior methods we employed to handle persistent connections could
result in the following situation: both peers come up, and
_concurrently_ establish connection to each other. With the prior
logic, at this point, both connections would be terminated as each peer
would go to kill the connection of the other peer. In order to resolve
this issue in this commit, we’ve re-written the way we handle
persistent connections.

The eliminate the issue described above, in the case of concurrent peer
connection, we now use a deterministic method to decide _which_
connection should be closed. The following rule governs which
connection should be closed: the connection of the peer with the
“smaller” public key should be closed. With this rule we now avoid the
issue described above.

Additionally, each peer now gains a peerTerminationWatcher which waits
until a peer has been disconnected, and then cleans up all resources
allocated to the peer, notifies relevant sub-systems of its demise, and
finally handles re-connecting to the peer if it's persistent. This
replaces the goroutine that was spawned in the old version of
peer.Disconnect().
This commit is contained in:
Olaoluwa Osuntokun 2017-04-23 19:38:34 -07:00
parent 00a7f140ff
commit 4cb76071a2
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 185 additions and 50 deletions

15
peer.go
View File

@ -342,21 +342,6 @@ func (p *peer) Disconnect() {
close(p.quit)
// If this connection was established persistently, then notify the
// connection manager that the peer has been disconnected.
if p.connReq != nil {
p.server.connMgr.Disconnect(p.connReq.ID())
}
// Launch a goroutine to clean up the remaining resources.
go func() {
// Tell the switch to unregister all links associated with this
// peer. Passing nil as the target link indicates that all
// links associated with this interface should be closed.
p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil)
p.server.donePeers <- p
}()
}
// String returns the string representation of this peer.

220
server.go
View File

@ -1,6 +1,7 @@
package main
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
@ -52,6 +53,10 @@ type server struct {
peersByID map[int32]*peer
peersByPub map[string]*peer
persistentPeers map[string]struct{}
inboundPeers map[string]*peer
outboundPeers map[string]*peer
rpcServer *rpcServer
chainNotifier chainntnfs.ChainNotifier
@ -137,10 +142,13 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
sphinx: sphinx.NewRouter(privKey, activeNetParams.Params),
lightningID: sha256.Sum256(serializedPubKey),
persistentPeers: make(map[string]struct{}),
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
peersByID: make(map[int32]*peer),
peersByPub: make(map[string]*peer),
peersByID: make(map[int32]*peer),
peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer),
newPeers: make(chan *peer, 10),
donePeers: make(chan *peer, 10),
@ -625,6 +633,55 @@ func (s *server) findPeer(peerKey *btcec.PublicKey) (*peer, error) {
return peer, nil
}
// peerTerminationWatcher waits until a peer has been disconnected, and then
// cleans up all resources allocated to the peer, notifies relevant sub-systems
// of its demise, and finally handles re-connecting to the peer if it's
// persistent.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerTerminationWatcher(p *peer) {
p.WaitForDisconnect()
srvrLog.Debugf("Peer %v has been disconnected", p)
// Tell the switch to unregister all links associated with this peer.
// Passing nil as the target link indicates that all
// links associated with this interface should be closed.
p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil)
// Send the peer to be garbage collected by the server.
p.server.donePeers <- p
// If this peer had an active persistent connection request, then we
// can remove this as we manually decide below if we should attempt to
// re-connect.
if p.connReq != nil {
s.connMgr.Remove(p.connReq.ID())
}
// Next, check to see if this is a persistent peer or not.
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
if _, ok := s.persistentPeers[pubStr]; ok {
srvrLog.Debugf("Attempting to re-establish persistent "+
"connection to peer %v", p)
// If so, then we'll attempt to re-establish a persistent
// connection to the peer.
// TODO(roasbeef): get latest port info?
connReq := &connmgr.ConnReq{
Addr: p.addr,
Permanent: true,
}
s.pendingConnMtx.Lock()
s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr],
connReq)
s.pendingConnMtx.Unlock()
go s.connMgr.Connect(connReq)
}
}
// peerConnected is a function that handles initialization a newly connected
// peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly.
@ -641,15 +698,14 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound
p, err := newPeer(conn, connReq, s, peerAddr, inbound)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
return
}
// TODO(roasbeef): update IP address for link-node
// * also mark last-seen, do it one single transaction?
// Attempt to start the peer, if we're unable to do so, then disconnect
// this peer.
if err := p.Start(); err != nil {
srvrLog.Errorf("unable to start peer: %v", err)
p.Disconnect()
@ -659,6 +715,22 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound
s.newPeers <- p
}
// shouldDropConnection determines if our local connection to a remote peer
// should be dropped in the case of concurrent connection establishment. In
// order to deterministically decide which connection should be dropped, we'll
// utilize the ordering of the local and remote public key. If we didn't use
// such a tie breaker, then we risk _both_ connections erroneously being
// dropped.
func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool {
localPubBytes := local.SerializeCompressed()
remotePubPbytes := remote.SerializeCompressed()
// The connection that comes from the node with a "smaller" pubkey should
// be kept. Therefore, if our pubkey is "greater" than theirs, we should
// drop our established connection.
return bytes.Compare(localPubBytes, remotePubPbytes) > 0
}
// inboundPeerConnected initializes a new peer in response to a new inbound
// connection.
func (s *server) inboundPeerConnected(conn net.Conn) {
@ -667,30 +739,45 @@ func (s *server) inboundPeerConnected(conn net.Conn) {
srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
localPub := s.identityPriv.PubKey()
nodePub := conn.(*brontide.Conn).RemotePub()
// If we already have an outbound connection to this peer, simply drop
// the connection.
// Check to see if we should drop our connection, if not, then we'll
// close out this connection with the remote peer. This
// prevents us from having duplicate connections, or none.
pubStr := string(nodePub.SerializeCompressed())
if _, ok := s.peersByPub[pubStr]; ok {
srvrLog.Errorf("Received inbound connection from peer %x, but "+
"already connected, dropping conn",
nodePub.SerializeCompressed())
conn.Close()
return
if connectedPeer, ok := s.peersByPub[pubStr]; ok {
// If the connection we've already established should be kept,
// then we'll close out this connection s.t there's only a
// single connection between us.
if !shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Received inbound connection from "+
"peer %x, but already connected, dropping conn",
nodePub.SerializeCompressed())
conn.Close()
return
}
// Otherwise, if we should drop the connection, then we'll
// disconnect our already connected peer, and also send the
// peer to the peer garbage collection goroutine.
srvrLog.Debugf("Disconnecting stale connection to %v",
connectedPeer)
connectedPeer.Disconnect()
s.donePeers <- connectedPeer
}
// However, if we receive an incoming connection from a peer we're
// attempting to maintain a persistent connection with then we need to
// cancel the ongoing connection attempts to ensure that we don't end
// up with a duplicate connecting to the same peer.
s.pendingConnMtx.RLock()
// Next, check to see if we have any outstanding persistent connection
// requests to this peer. If so, then we'll remove all of these
// connection requests, and also delete the entry from the map.
s.pendingConnMtx.Lock()
if connReqs, ok := s.persistentConnReqs[pubStr]; ok {
for _, connReq := range connReqs {
s.connMgr.Remove(connReq.ID())
}
delete(s.persistentConnReqs, pubStr)
}
s.pendingConnMtx.RUnlock()
s.pendingConnMtx.Unlock()
go s.peerConnected(conn, nil, false)
}
@ -701,22 +788,64 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
s.peersMtx.Lock()
defer s.peersMtx.Unlock()
srvrLog.Infof("Established connection to: %v", conn.RemoteAddr())
localPub := s.identityPriv.PubKey()
nodePub := conn.(*brontide.Conn).RemotePub()
// If we already have an inbound connection from this peer, simply drop
// the connection.
pubStr := string(nodePub.SerializeCompressed())
if _, ok := s.peersByPub[pubStr]; ok {
srvrLog.Errorf("Established outbound connection to peer %x, but "+
"already connected, dropping conn",
nodePub.SerializeCompressed())
s.connMgr.Remove(connReq.ID())
// If we already have an outbound connection to this peer, then ignore
// this new connection.
if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Ignoring duplicate outbound connection")
conn.Close()
return
}
if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
srvrLog.Debugf("Ignoring cancelled outbound connection")
conn.Close()
return
}
srvrLog.Infof("Established connection to: %v", conn.RemoteAddr())
// As we've just established an outbound connection to this peer, we'll
// cancel all other persistent connection requests and eliminate the
// entry for this peer from the map.
s.pendingConnMtx.Lock()
if connReqs, ok := s.persistentConnReqs[pubStr]; ok {
for _, pConnReq := range connReqs {
if pConnReq.ID() != connReq.ID() {
s.connMgr.Remove(pConnReq.ID())
}
}
delete(s.persistentConnReqs, pubStr)
}
s.pendingConnMtx.Unlock()
// If we already have an inbound connection from this peer, then we'll
// check to see _which_ of our connections should be dropped.
if connectedPeer, ok := s.peersByPub[pubStr]; ok {
// If our (this) connection should be dropped, then we'll do
// so, in order to ensure we don't have any duplicate
// connections.
if shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Established outbound connection to "+
"peer %x, but already connected, dropping conn",
nodePub.SerializeCompressed())
s.connMgr.Remove(connReq.ID())
conn.Close()
return
}
// Otherwise, _their_ connection should be dropped. So we'll
// disconnect the peer and send the now obsolete peer to the
// server for garbage collection.
srvrLog.Debugf("Disconnecting stale connection to %v",
connectedPeer)
connectedPeer.Disconnect()
s.donePeers <- connectedPeer
}
go s.peerConnected(conn, connReq, true)
}
@ -729,7 +858,7 @@ func (s *server) addPeer(p *peer) {
// Ignore new peers if we're shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
p.Stop()
p.Disconnect()
return
}
@ -738,10 +867,25 @@ func (s *server) addPeer(p *peer) {
// TODO(roasbeef): pipe all requests through to the
// queryHandler/peerManager
s.peersMtx.Lock()
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
s.peersByID[p.id] = p
s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p
s.peersByPub[pubStr] = p
if p.inbound {
s.inboundPeers[pubStr] = p
} else {
s.outboundPeers[pubStr] = p
}
s.peersMtx.Unlock()
// Launch a goroutine to watch for the termination of this peer so we
// can ensure all resources are properly cleaned up and if need be
// connections are re-established.
go s.peerTerminationWatcher(p)
// Once the peer has been added to our indexes, send a message to the
// channel router so we can synchronize our view of the channel graph
// with this new peer.
@ -762,17 +906,23 @@ func (s *server) removePeer(p *peer) {
// As the peer is now finished, ensure that the TCP connection is
// closed and all of its related goroutines have exited.
if err := p.Stop(); err != nil {
peerLog.Errorf("unable to stop peer: %v", err)
}
p.Disconnect()
// Ignore deleting peers if we're shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
return
}
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
delete(s.peersByID, p.id)
delete(s.peersByPub, string(p.addr.IdentityKey.SerializeCompressed()))
delete(s.peersByPub, pubStr)
if p.inbound {
delete(s.inboundPeers, pubStr)
} else {
delete(s.outboundPeers, pubStr)
}
}
// connectPeerMsg is a message requesting the server to open a connection to a