From 4cb76071a26185cfee20dc4e42ffdf4636f84552 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 23 Apr 2017 19:38:34 -0700 Subject: [PATCH] server+peer: re-write persistent connection handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The prior methods we employed to handle persistent connections could result in the following situation: both peers come up, and _concurrently_ establish connection to each other. With the prior logic, at this point, both connections would be terminated as each peer would go to kill the connection of the other peer. In order to resolve this issue in this commit, we’ve re-written the way we handle persistent connections. The eliminate the issue described above, in the case of concurrent peer connection, we now use a deterministic method to decide _which_ connection should be closed. The following rule governs which connection should be closed: the connection of the peer with the “smaller” public key should be closed. With this rule we now avoid the issue described above. Additionally, each peer now gains a peerTerminationWatcher which waits until a peer has been disconnected, and then cleans up all resources allocated to the peer, notifies relevant sub-systems of its demise, and finally handles re-connecting to the peer if it's persistent. This replaces the goroutine that was spawned in the old version of peer.Disconnect(). --- peer.go | 15 ---- server.go | 220 +++++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 185 insertions(+), 50 deletions(-) diff --git a/peer.go b/peer.go index fd7c0f89..c57c01fa 100644 --- a/peer.go +++ b/peer.go @@ -342,21 +342,6 @@ func (p *peer) Disconnect() { close(p.quit) - // If this connection was established persistently, then notify the - // connection manager that the peer has been disconnected. - if p.connReq != nil { - p.server.connMgr.Disconnect(p.connReq.ID()) - } - - // Launch a goroutine to clean up the remaining resources. - go func() { - // Tell the switch to unregister all links associated with this - // peer. Passing nil as the target link indicates that all - // links associated with this interface should be closed. - p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) - - p.server.donePeers <- p - }() } // String returns the string representation of this peer. diff --git a/server.go b/server.go index 0bce7ece..b60a258a 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "crypto/rand" "crypto/sha256" "encoding/hex" @@ -52,6 +53,10 @@ type server struct { peersByID map[int32]*peer peersByPub map[string]*peer + persistentPeers map[string]struct{} + inboundPeers map[string]*peer + outboundPeers map[string]*peer + rpcServer *rpcServer chainNotifier chainntnfs.ChainNotifier @@ -137,10 +142,13 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, sphinx: sphinx.NewRouter(privKey, activeNetParams.Params), lightningID: sha256.Sum256(serializedPubKey), + persistentPeers: make(map[string]struct{}), persistentConnReqs: make(map[string][]*connmgr.ConnReq), - peersByID: make(map[int32]*peer), - peersByPub: make(map[string]*peer), + peersByID: make(map[int32]*peer), + peersByPub: make(map[string]*peer), + inboundPeers: make(map[string]*peer), + outboundPeers: make(map[string]*peer), newPeers: make(chan *peer, 10), donePeers: make(chan *peer, 10), @@ -625,6 +633,55 @@ func (s *server) findPeer(peerKey *btcec.PublicKey) (*peer, error) { return peer, nil } +// peerTerminationWatcher waits until a peer has been disconnected, and then +// cleans up all resources allocated to the peer, notifies relevant sub-systems +// of its demise, and finally handles re-connecting to the peer if it's +// persistent. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerTerminationWatcher(p *peer) { + p.WaitForDisconnect() + + srvrLog.Debugf("Peer %v has been disconnected", p) + + // Tell the switch to unregister all links associated with this peer. + // Passing nil as the target link indicates that all + // links associated with this interface should be closed. + p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) + + // Send the peer to be garbage collected by the server. + p.server.donePeers <- p + + // If this peer had an active persistent connection request, then we + // can remove this as we manually decide below if we should attempt to + // re-connect. + if p.connReq != nil { + s.connMgr.Remove(p.connReq.ID()) + } + + // Next, check to see if this is a persistent peer or not. + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + if _, ok := s.persistentPeers[pubStr]; ok { + srvrLog.Debugf("Attempting to re-establish persistent "+ + "connection to peer %v", p) + + // If so, then we'll attempt to re-establish a persistent + // connection to the peer. + // TODO(roasbeef): get latest port info? + connReq := &connmgr.ConnReq{ + Addr: p.addr, + Permanent: true, + } + + s.pendingConnMtx.Lock() + s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], + connReq) + s.pendingConnMtx.Unlock() + + go s.connMgr.Connect(connReq) + } +} + // peerConnected is a function that handles initialization a newly connected // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. @@ -641,15 +698,14 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound p, err := newPeer(conn, connReq, s, peerAddr, inbound) if err != nil { srvrLog.Errorf("unable to create peer %v", err) - if connReq != nil { - s.connMgr.Remove(connReq.ID()) - } return } // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? + // Attempt to start the peer, if we're unable to do so, then disconnect + // this peer. if err := p.Start(); err != nil { srvrLog.Errorf("unable to start peer: %v", err) p.Disconnect() @@ -659,6 +715,22 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound s.newPeers <- p } +// shouldDropConnection determines if our local connection to a remote peer +// should be dropped in the case of concurrent connection establishment. In +// order to deterministically decide which connection should be dropped, we'll +// utilize the ordering of the local and remote public key. If we didn't use +// such a tie breaker, then we risk _both_ connections erroneously being +// dropped. +func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool { + localPubBytes := local.SerializeCompressed() + remotePubPbytes := remote.SerializeCompressed() + + // The connection that comes from the node with a "smaller" pubkey should + // be kept. Therefore, if our pubkey is "greater" than theirs, we should + // drop our established connection. + return bytes.Compare(localPubBytes, remotePubPbytes) > 0 +} + // inboundPeerConnected initializes a new peer in response to a new inbound // connection. func (s *server) inboundPeerConnected(conn net.Conn) { @@ -667,30 +739,45 @@ func (s *server) inboundPeerConnected(conn net.Conn) { srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr()) + localPub := s.identityPriv.PubKey() nodePub := conn.(*brontide.Conn).RemotePub() - // If we already have an outbound connection to this peer, simply drop - // the connection. + // Check to see if we should drop our connection, if not, then we'll + // close out this connection with the remote peer. This + // prevents us from having duplicate connections, or none. pubStr := string(nodePub.SerializeCompressed()) - if _, ok := s.peersByPub[pubStr]; ok { - srvrLog.Errorf("Received inbound connection from peer %x, but "+ - "already connected, dropping conn", - nodePub.SerializeCompressed()) - conn.Close() - return + if connectedPeer, ok := s.peersByPub[pubStr]; ok { + // If the connection we've already established should be kept, + // then we'll close out this connection s.t there's only a + // single connection between us. + if !shouldDropLocalConnection(localPub, nodePub) { + srvrLog.Warnf("Received inbound connection from "+ + "peer %x, but already connected, dropping conn", + nodePub.SerializeCompressed()) + conn.Close() + return + } + + // Otherwise, if we should drop the connection, then we'll + // disconnect our already connected peer, and also send the + // peer to the peer garbage collection goroutine. + srvrLog.Debugf("Disconnecting stale connection to %v", + connectedPeer) + connectedPeer.Disconnect() + s.donePeers <- connectedPeer } - // However, if we receive an incoming connection from a peer we're - // attempting to maintain a persistent connection with then we need to - // cancel the ongoing connection attempts to ensure that we don't end - // up with a duplicate connecting to the same peer. - s.pendingConnMtx.RLock() + // Next, check to see if we have any outstanding persistent connection + // requests to this peer. If so, then we'll remove all of these + // connection requests, and also delete the entry from the map. + s.pendingConnMtx.Lock() if connReqs, ok := s.persistentConnReqs[pubStr]; ok { for _, connReq := range connReqs { s.connMgr.Remove(connReq.ID()) } + delete(s.persistentConnReqs, pubStr) } - s.pendingConnMtx.RUnlock() + s.pendingConnMtx.Unlock() go s.peerConnected(conn, nil, false) } @@ -701,22 +788,64 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.peersMtx.Lock() defer s.peersMtx.Unlock() - srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) - + localPub := s.identityPriv.PubKey() nodePub := conn.(*brontide.Conn).RemotePub() - - // If we already have an inbound connection from this peer, simply drop - // the connection. pubStr := string(nodePub.SerializeCompressed()) - if _, ok := s.peersByPub[pubStr]; ok { - srvrLog.Errorf("Established outbound connection to peer %x, but "+ - "already connected, dropping conn", - nodePub.SerializeCompressed()) - s.connMgr.Remove(connReq.ID()) + + // If we already have an outbound connection to this peer, then ignore + // this new connection. + if _, ok := s.outboundPeers[pubStr]; ok { + srvrLog.Debugf("Ignoring duplicate outbound connection") conn.Close() return } + if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil { + srvrLog.Debugf("Ignoring cancelled outbound connection") + conn.Close() + return + } + + srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) + + // As we've just established an outbound connection to this peer, we'll + // cancel all other persistent connection requests and eliminate the + // entry for this peer from the map. + s.pendingConnMtx.Lock() + if connReqs, ok := s.persistentConnReqs[pubStr]; ok { + for _, pConnReq := range connReqs { + if pConnReq.ID() != connReq.ID() { + s.connMgr.Remove(pConnReq.ID()) + } + } + delete(s.persistentConnReqs, pubStr) + } + s.pendingConnMtx.Unlock() + + // If we already have an inbound connection from this peer, then we'll + // check to see _which_ of our connections should be dropped. + if connectedPeer, ok := s.peersByPub[pubStr]; ok { + // If our (this) connection should be dropped, then we'll do + // so, in order to ensure we don't have any duplicate + // connections. + if shouldDropLocalConnection(localPub, nodePub) { + srvrLog.Warnf("Established outbound connection to "+ + "peer %x, but already connected, dropping conn", + nodePub.SerializeCompressed()) + s.connMgr.Remove(connReq.ID()) + conn.Close() + return + } + + // Otherwise, _their_ connection should be dropped. So we'll + // disconnect the peer and send the now obsolete peer to the + // server for garbage collection. + srvrLog.Debugf("Disconnecting stale connection to %v", + connectedPeer) + connectedPeer.Disconnect() + s.donePeers <- connectedPeer + } + go s.peerConnected(conn, connReq, true) } @@ -729,7 +858,7 @@ func (s *server) addPeer(p *peer) { // Ignore new peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { - p.Stop() + p.Disconnect() return } @@ -738,10 +867,25 @@ func (s *server) addPeer(p *peer) { // TODO(roasbeef): pipe all requests through to the // queryHandler/peerManager s.peersMtx.Lock() + + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + s.peersByID[p.id] = p - s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p + s.peersByPub[pubStr] = p + + if p.inbound { + s.inboundPeers[pubStr] = p + } else { + s.outboundPeers[pubStr] = p + } + s.peersMtx.Unlock() + // Launch a goroutine to watch for the termination of this peer so we + // can ensure all resources are properly cleaned up and if need be + // connections are re-established. + go s.peerTerminationWatcher(p) + // Once the peer has been added to our indexes, send a message to the // channel router so we can synchronize our view of the channel graph // with this new peer. @@ -762,17 +906,23 @@ func (s *server) removePeer(p *peer) { // As the peer is now finished, ensure that the TCP connection is // closed and all of its related goroutines have exited. - if err := p.Stop(); err != nil { - peerLog.Errorf("unable to stop peer: %v", err) - } + p.Disconnect() // Ignore deleting peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { return } + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + delete(s.peersByID, p.id) - delete(s.peersByPub, string(p.addr.IdentityKey.SerializeCompressed())) + delete(s.peersByPub, pubStr) + + if p.inbound { + delete(s.inboundPeers, pubStr) + } else { + delete(s.outboundPeers, pubStr) + } } // connectPeerMsg is a message requesting the server to open a connection to a