diff --git a/server.go b/server.go index 2729fa76..4503dbce 100644 --- a/server.go +++ b/server.go @@ -1922,13 +1922,16 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer, error) { // sub-systems of its demise, and finally handles re-connecting to the peer if // it's persistent. If the server intentionally disconnects a peer, it should // have a corresponding entry in the ignorePeerTermination map which will cause -// the cleanup routine to exit early. +// the cleanup routine to exit early. The passed `ready` chan is used to +// synchronize when WaitForDisconnect should begin watching on the peer's +// waitgroup. The ready chan should only be signaled if the peer starts +// successfully, otherwise the peer should be disconnected instead. // // NOTE: This MUST be launched as a goroutine. -func (s *server) peerTerminationWatcher(p *peer) { +func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { defer s.wg.Done() - p.WaitForDisconnect() + p.WaitForDisconnect(ready) srvrLog.Debugf("Peer %v has been disconnected", p) @@ -2121,7 +2124,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, addr := conn.RemoteAddr() pubKey := brontideConn.RemotePub() - srvrLog.Infof("finalizing connection to %x, inbound=%v", + srvrLog.Infof("Finalizing connection to %x, inbound=%v", pubKey.SerializeCompressed(), inbound) peerAddr := &lnwire.NetAddress{ @@ -2158,14 +2161,13 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? - // Attempt to start the peer, if we're unable to do so, then disconnect - // this peer. - if err := p.Start(); err != nil { - p.Disconnect(errors.Errorf("unable to start peer: %v", err)) - return - } - s.addPeer(p) + + // Dispatch a goroutine to asynchronously start the peer. This process + // includes sending and receiving Init messages, which would be a DOS + // vector if we held the server's mutex throughout the procedure. + s.wg.Add(1) + go s.peerInitializer(p) } // shouldDropConnection determines if our local connection to a remote peer @@ -2453,15 +2455,49 @@ func (s *server) addPeer(p *peer) { } else { s.outboundPeers[pubStr] = p } +} - // Launch a goroutine to watch for the unexpected termination of this - // peer, which will ensure all resources are properly cleaned up, and - // re-establish persistent connections when necessary. The peer - // termination watcher will be short circuited if the peer is ever - // added to the ignorePeerTermination map, indicating that the server - // has already handled the removal of this peer. +// peerInitializer asynchronously starts a newly connected peer after it has +// been added to the server's peer map. This method sets up a +// peerTerminationWatcher for the given peer, and ensures that it executes even +// if the peer failed to start. In the event of a successful connection, this +// method reads the negotiated, local feature-bits and spawns the appropriate +// graph synchronization method. Any registered clients of NotifyWhenOnline will +// be signaled of the new peer once the method returns. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerInitializer(p *peer) { + defer s.wg.Done() + + // Avoid initializing peers while the server is exiting. + if s.Stopped() { + return + } + + // Create a channel that will be used to signal a successful start of + // the link. This prevents the peer termination watcher from beginning + // its duty too early. + ready := make(chan struct{}) + + // Before starting the peer, launch a goroutine to watch for the + // unexpected termination of this peer, which will ensure all resources + // are properly cleaned up, and re-establish persistent connections when + // necessary. The peer termination watcher will be short circuited if + // the peer is ever added to the ignorePeerTermination map, indicating + // that the server has already handled the removal of this peer. s.wg.Add(1) - go s.peerTerminationWatcher(p) + go s.peerTerminationWatcher(p, ready) + + // Start teh peer! If an error occurs, we Disconnect the peer, which + // will unblock the peerTerminationWatcher. + if err := p.Start(); err != nil { + p.Disconnect(errors.New("unable to start peer: %v")) + return + } + + // Otherwise, signal to the peerTerminationWatcher that the peer startup + // was successful, and to begin watching the peer's wait group. + close(ready) switch { // If the remote peer knows of the new gossip queries feature, then @@ -2490,6 +2526,11 @@ func (s *server) addPeer(p *peer) { go s.authGossiper.SynchronizeNode(p) } + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + + s.mu.Lock() + defer s.mu.Unlock() + // Check if there are listeners waiting for this peer to come online. for _, peerChan := range s.peerConnectedListeners[pubStr] { select {