From a1abb11dc5441e327bf6dd2884a11843574cbc9b Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 30 Jul 2018 20:51:08 -0700 Subject: [PATCH 1/4] peer: add ready chan arg to WaitForDisconnect This commit adds additional synchronization logic to WaitForDisconnect, such that it can be spawned before Start has been executed by the server. Without modification, the current version will return immediately since no goroutines will have been spawned. To solve this, we modify WaitForDisconnect to block until: 1) the peer is disconnected, 2) the peer is successfully started, before watching the waitgroup. In the first case, the waitgroup will block until all (if any) spawned goroutines have exited. Otherwise, if the Start is successful, we can switch to watching the waitgroup, knowing that waitgroup counter is positive. --- peer.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/peer.go b/peer.go index 74ff4c74..1040674b 100644 --- a/peer.go +++ b/peer.go @@ -567,8 +567,18 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, // WaitForDisconnect waits until the peer has disconnected. A peer may be // disconnected if the local or remote side terminating the connection, or an -// irrecoverable protocol error has been encountered. -func (p *peer) WaitForDisconnect() { +// irrecoverable protocol error has been encountered. This method will only +// begin watching the peer's waitgroup after the ready channel or the peer's +// quit channel are signaled. The ready channel should only be signaled if a +// call to Start returns no error. Otherwise, if the peer fails to start, +// calling Disconnect will signal the quit channel and the method will not +// block, since no goroutines were spawned. +func (p *peer) WaitForDisconnect(ready chan struct{}) { + select { + case <-ready: + case <-p.quit: + } + p.wg.Wait() } From 0ee0abc166c0558e2ff65df86636fe15d3378b6c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 30 Jul 2018 20:58:16 -0700 Subject: [PATCH 2/4] server: add async peer Start() + safer cleanup This commit adds asynchronous starting of peers, in order to avoid potential DOS vectors. Currently, we block with the server's mutex while peers exchange Init messages and perform other setup. Thus, a remote peer that does not reply with an init message will cause server to block for 15s per attempt. We also modify the startup behavior to spawn peerTerminationWatchers before starting the peer itself, ensuring that a peer is properly cleaned up if the initialization fails. Currently, failing to start a peer does not execute the bulk of the teardown logic, since it is not spawned until after a successful Start occurs. --- server.go | 77 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 18 deletions(-) diff --git a/server.go b/server.go index 2729fa76..4503dbce 100644 --- a/server.go +++ b/server.go @@ -1922,13 +1922,16 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer, error) { // sub-systems of its demise, and finally handles re-connecting to the peer if // it's persistent. If the server intentionally disconnects a peer, it should // have a corresponding entry in the ignorePeerTermination map which will cause -// the cleanup routine to exit early. +// the cleanup routine to exit early. The passed `ready` chan is used to +// synchronize when WaitForDisconnect should begin watching on the peer's +// waitgroup. The ready chan should only be signaled if the peer starts +// successfully, otherwise the peer should be disconnected instead. // // NOTE: This MUST be launched as a goroutine. -func (s *server) peerTerminationWatcher(p *peer) { +func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { defer s.wg.Done() - p.WaitForDisconnect() + p.WaitForDisconnect(ready) srvrLog.Debugf("Peer %v has been disconnected", p) @@ -2121,7 +2124,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, addr := conn.RemoteAddr() pubKey := brontideConn.RemotePub() - srvrLog.Infof("finalizing connection to %x, inbound=%v", + srvrLog.Infof("Finalizing connection to %x, inbound=%v", pubKey.SerializeCompressed(), inbound) peerAddr := &lnwire.NetAddress{ @@ -2158,14 +2161,13 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, // TODO(roasbeef): update IP address for link-node // * also mark last-seen, do it one single transaction? - // Attempt to start the peer, if we're unable to do so, then disconnect - // this peer. - if err := p.Start(); err != nil { - p.Disconnect(errors.Errorf("unable to start peer: %v", err)) - return - } - s.addPeer(p) + + // Dispatch a goroutine to asynchronously start the peer. This process + // includes sending and receiving Init messages, which would be a DOS + // vector if we held the server's mutex throughout the procedure. + s.wg.Add(1) + go s.peerInitializer(p) } // shouldDropConnection determines if our local connection to a remote peer @@ -2453,15 +2455,49 @@ func (s *server) addPeer(p *peer) { } else { s.outboundPeers[pubStr] = p } +} - // Launch a goroutine to watch for the unexpected termination of this - // peer, which will ensure all resources are properly cleaned up, and - // re-establish persistent connections when necessary. The peer - // termination watcher will be short circuited if the peer is ever - // added to the ignorePeerTermination map, indicating that the server - // has already handled the removal of this peer. +// peerInitializer asynchronously starts a newly connected peer after it has +// been added to the server's peer map. This method sets up a +// peerTerminationWatcher for the given peer, and ensures that it executes even +// if the peer failed to start. In the event of a successful connection, this +// method reads the negotiated, local feature-bits and spawns the appropriate +// graph synchronization method. Any registered clients of NotifyWhenOnline will +// be signaled of the new peer once the method returns. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerInitializer(p *peer) { + defer s.wg.Done() + + // Avoid initializing peers while the server is exiting. + if s.Stopped() { + return + } + + // Create a channel that will be used to signal a successful start of + // the link. This prevents the peer termination watcher from beginning + // its duty too early. + ready := make(chan struct{}) + + // Before starting the peer, launch a goroutine to watch for the + // unexpected termination of this peer, which will ensure all resources + // are properly cleaned up, and re-establish persistent connections when + // necessary. The peer termination watcher will be short circuited if + // the peer is ever added to the ignorePeerTermination map, indicating + // that the server has already handled the removal of this peer. s.wg.Add(1) - go s.peerTerminationWatcher(p) + go s.peerTerminationWatcher(p, ready) + + // Start teh peer! If an error occurs, we Disconnect the peer, which + // will unblock the peerTerminationWatcher. + if err := p.Start(); err != nil { + p.Disconnect(errors.New("unable to start peer: %v")) + return + } + + // Otherwise, signal to the peerTerminationWatcher that the peer startup + // was successful, and to begin watching the peer's wait group. + close(ready) switch { // If the remote peer knows of the new gossip queries feature, then @@ -2490,6 +2526,11 @@ func (s *server) addPeer(p *peer) { go s.authGossiper.SynchronizeNode(p) } + pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + + s.mu.Lock() + defer s.mu.Unlock() + // Check if there are listeners waiting for this peer to come online. for _, peerChan := range s.peerConnectedListeners[pubStr] { select { From 121252934b862d89cf71495a524c64918c4c9874 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Aug 2018 17:31:26 -0700 Subject: [PATCH 3/4] server: rearrange peer lifecyle helpers for readability --- server.go | 412 +++++++++++++++++++++++++++--------------------------- 1 file changed, 206 insertions(+), 206 deletions(-) diff --git a/server.go b/server.go index 4503dbce..bf770092 100644 --- a/server.go +++ b/server.go @@ -1917,155 +1917,6 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer, error) { return peer, nil } -// peerTerminationWatcher waits until a peer has been disconnected unexpectedly, -// and then cleans up all resources allocated to the peer, notifies relevant -// sub-systems of its demise, and finally handles re-connecting to the peer if -// it's persistent. If the server intentionally disconnects a peer, it should -// have a corresponding entry in the ignorePeerTermination map which will cause -// the cleanup routine to exit early. The passed `ready` chan is used to -// synchronize when WaitForDisconnect should begin watching on the peer's -// waitgroup. The ready chan should only be signaled if the peer starts -// successfully, otherwise the peer should be disconnected instead. -// -// NOTE: This MUST be launched as a goroutine. -func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { - defer s.wg.Done() - - p.WaitForDisconnect(ready) - - srvrLog.Debugf("Peer %v has been disconnected", p) - - // If the server is exiting then we can bail out early ourselves as all - // the other sub-systems will already be shutting down. - if s.Stopped() { - return - } - - // Next, we'll cancel all pending funding reservations with this node. - // If we tried to initiate any funding flows that haven't yet finished, - // then we need to unlock those committed outputs so they're still - // available for use. - s.fundingMgr.CancelPeerReservations(p.PubKey()) - - pubKey := p.addr.IdentityKey - - // We'll also inform the gossiper that this peer is no longer active, - // so we don't need to maintain sync state for it any longer. - s.authGossiper.PruneSyncState(pubKey) - - // Tell the switch to remove all links associated with this peer. - // Passing nil as the target link indicates that all links associated - // with this interface should be closed. - // - // TODO(roasbeef): instead add a PurgeInterfaceLinks function? - links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) - if err != nil { - srvrLog.Errorf("unable to get channel links: %v", err) - } - - for _, link := range links { - p.server.htlcSwitch.RemoveLink(link.ChanID()) - } - - s.mu.Lock() - defer s.mu.Unlock() - - // If the server has already removed this peer, we can short circuit the - // peer termination watcher and skip cleanup. - if _, ok := s.ignorePeerTermination[p]; ok { - delete(s.ignorePeerTermination, p) - - pubKey := p.PubKey() - pubStr := string(pubKey[:]) - - // If a connection callback is present, we'll go ahead and - // execute it now that previous peer has fully disconnected. If - // the callback is not present, this likely implies the peer was - // purposefully disconnected via RPC, and that no reconnect - // should be attempted. - connCallback, ok := s.scheduledPeerConnection[pubStr] - if ok { - delete(s.scheduledPeerConnection, pubStr) - connCallback() - } - return - } - - // First, cleanup any remaining state the server has regarding the peer - // in question. - s.removePeer(p) - - // Next, check to see if this is a persistent peer or not. - pubStr := string(pubKey.SerializeCompressed()) - _, ok := s.persistentPeers[pubStr] - if ok { - // We'll only need to re-launch a connection request if one - // isn't already currently pending. - if _, ok := s.persistentConnReqs[pubStr]; ok { - return - } - - // We'll ensure that we locate an advertised address to use - // within the peer's address for reconnection purposes. - // - // TODO(roasbeef): use them all? - if p.inbound { - advertisedAddr, err := s.fetchNodeAdvertisedAddr( - pubKey, - ) - if err != nil { - srvrLog.Errorf("Unable to retrieve advertised "+ - "address for node %x: %v", - pubKey.SerializeCompressed(), err) - } else { - p.addr.Address = advertisedAddr - } - } - - // Otherwise, we'll launch a new connection request in order to - // attempt to maintain a persistent connection with this peer. - connReq := &connmgr.ConnReq{ - Addr: p.addr, - Permanent: true, - } - s.persistentConnReqs[pubStr] = append( - s.persistentConnReqs[pubStr], connReq) - - // Record the computed backoff in the backoff map. - backoff := s.nextPeerBackoff(pubStr, p.StartTime()) - s.persistentPeersBackoff[pubStr] = backoff - - // Initialize a retry canceller for this peer if one does not - // exist. - cancelChan, ok := s.persistentRetryCancels[pubStr] - if !ok { - cancelChan = make(chan struct{}) - s.persistentRetryCancels[pubStr] = cancelChan - } - - // We choose not to wait group this go routine since the Connect - // call can stall for arbitrarily long if we shutdown while an - // outbound connection attempt is being made. - go func() { - srvrLog.Debugf("Scheduling connection re-establishment to "+ - "persistent peer %v in %s", p, backoff) - - select { - case <-time.After(backoff): - case <-cancelChan: - return - case <-s.quit: - return - } - - srvrLog.Debugf("Attempting to re-establish persistent "+ - "connection to peer %v", p) - - s.connMgr.Connect(connReq) - }() - } -} - // nextPeerBackoff computes the next backoff duration for a peer's pubkey using // exponential backoff. If no previous backoff was known, the default is // returned. @@ -2113,63 +1964,6 @@ func (s *server) shouldRequestGraphSync() bool { return len(s.peersByPub) <= 2 } -// peerConnected is a function that handles initialization a newly connected -// peer by adding it to the server's global list of all active peers, and -// starting all the goroutines the peer needs to function properly. The inbound -// boolean should be true if the peer initiated the connection to us. -func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, - inbound bool) { - - brontideConn := conn.(*brontide.Conn) - addr := conn.RemoteAddr() - pubKey := brontideConn.RemotePub() - - srvrLog.Infof("Finalizing connection to %x, inbound=%v", - pubKey.SerializeCompressed(), inbound) - - peerAddr := &lnwire.NetAddress{ - IdentityKey: pubKey, - Address: addr, - ChainNet: activeNetParams.Net, - } - - // With the brontide connection established, we'll now craft the local - // feature vector to advertise to the remote node. - localFeatures := lnwire.NewRawFeatureVector() - - // We'll signal that we understand the data loss protection feature, - // and also that we support the new gossip query features. - localFeatures.Set(lnwire.DataLossProtectOptional) - localFeatures.Set(lnwire.GossipQueriesOptional) - - // We'll only request a full channel graph sync if we detect that that - // we aren't fully synced yet. - if s.shouldRequestGraphSync() { - // TODO(roasbeef): only do so if gossiper doesn't have active - // peers? - localFeatures.Set(lnwire.InitialRoutingSync) - } - - // Now that we've established a connection, create a peer, and it to - // the set of currently active peers. - p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures) - if err != nil { - srvrLog.Errorf("unable to create peer %v", err) - return - } - - // TODO(roasbeef): update IP address for link-node - // * also mark last-seen, do it one single transaction? - - s.addPeer(p) - - // Dispatch a goroutine to asynchronously start the peer. This process - // includes sending and receiving Init messages, which would be a DOS - // vector if we held the server's mutex throughout the procedure. - s.wg.Add(1) - go s.peerInitializer(p) -} - // shouldDropConnection determines if our local connection to a remote peer // should be dropped in the case of concurrent connection establishment. In // order to deterministically decide which connection should be dropped, we'll @@ -2428,6 +2222,63 @@ func (s *server) cancelConnReqs(pubStr string, skip *uint64) { delete(s.persistentConnReqs, pubStr) } +// peerConnected is a function that handles initialization a newly connected +// peer by adding it to the server's global list of all active peers, and +// starting all the goroutines the peer needs to function properly. The inbound +// boolean should be true if the peer initiated the connection to us. +func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, + inbound bool) { + + brontideConn := conn.(*brontide.Conn) + addr := conn.RemoteAddr() + pubKey := brontideConn.RemotePub() + + srvrLog.Infof("Finalizing connection to %x, inbound=%v", + pubKey.SerializeCompressed(), inbound) + + peerAddr := &lnwire.NetAddress{ + IdentityKey: pubKey, + Address: addr, + ChainNet: activeNetParams.Net, + } + + // With the brontide connection established, we'll now craft the local + // feature vector to advertise to the remote node. + localFeatures := lnwire.NewRawFeatureVector() + + // We'll signal that we understand the data loss protection feature, + // and also that we support the new gossip query features. + localFeatures.Set(lnwire.DataLossProtectOptional) + localFeatures.Set(lnwire.GossipQueriesOptional) + + // We'll only request a full channel graph sync if we detect that that + // we aren't fully synced yet. + if s.shouldRequestGraphSync() { + // TODO(roasbeef): only do so if gossiper doesn't have active + // peers? + localFeatures.Set(lnwire.InitialRoutingSync) + } + + // Now that we've established a connection, create a peer, and it to + // the set of currently active peers. + p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures) + if err != nil { + srvrLog.Errorf("unable to create peer %v", err) + return + } + + // TODO(roasbeef): update IP address for link-node + // * also mark last-seen, do it one single transaction? + + s.addPeer(p) + + // Dispatch a goroutine to asynchronously start the peer. This process + // includes sending and receiving Init messages, which would be a DOS + // vector if we held the server's mutex throughout the procedure. + s.wg.Add(1) + go s.peerInitializer(p) +} + // addPeer adds the passed peer to the server's global state of all active // peers. func (s *server) addPeer(p *peer) { @@ -2542,6 +2393,155 @@ func (s *server) peerInitializer(p *peer) { delete(s.peerConnectedListeners, pubStr) } +// peerTerminationWatcher waits until a peer has been disconnected unexpectedly, +// and then cleans up all resources allocated to the peer, notifies relevant +// sub-systems of its demise, and finally handles re-connecting to the peer if +// it's persistent. If the server intentionally disconnects a peer, it should +// have a corresponding entry in the ignorePeerTermination map which will cause +// the cleanup routine to exit early. The passed `ready` chan is used to +// synchronize when WaitForDisconnect should begin watching on the peer's +// waitgroup. The ready chan should only be signaled if the peer starts +// successfully, otherwise the peer should be disconnected instead. +// +// NOTE: This MUST be launched as a goroutine. +func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { + defer s.wg.Done() + + p.WaitForDisconnect(ready) + + srvrLog.Debugf("Peer %v has been disconnected", p) + + // If the server is exiting then we can bail out early ourselves as all + // the other sub-systems will already be shutting down. + if s.Stopped() { + return + } + + // Next, we'll cancel all pending funding reservations with this node. + // If we tried to initiate any funding flows that haven't yet finished, + // then we need to unlock those committed outputs so they're still + // available for use. + s.fundingMgr.CancelPeerReservations(p.PubKey()) + + pubKey := p.addr.IdentityKey + + // We'll also inform the gossiper that this peer is no longer active, + // so we don't need to maintain sync state for it any longer. + s.authGossiper.PruneSyncState(pubKey) + + // Tell the switch to remove all links associated with this peer. + // Passing nil as the target link indicates that all links associated + // with this interface should be closed. + // + // TODO(roasbeef): instead add a PurgeInterfaceLinks function? + links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes) + if err != nil { + srvrLog.Errorf("unable to get channel links: %v", err) + } + + for _, link := range links { + p.server.htlcSwitch.RemoveLink(link.ChanID()) + } + + s.mu.Lock() + defer s.mu.Unlock() + + // If the server has already removed this peer, we can short circuit the + // peer termination watcher and skip cleanup. + if _, ok := s.ignorePeerTermination[p]; ok { + delete(s.ignorePeerTermination, p) + + pubKey := p.PubKey() + pubStr := string(pubKey[:]) + + // If a connection callback is present, we'll go ahead and + // execute it now that previous peer has fully disconnected. If + // the callback is not present, this likely implies the peer was + // purposefully disconnected via RPC, and that no reconnect + // should be attempted. + connCallback, ok := s.scheduledPeerConnection[pubStr] + if ok { + delete(s.scheduledPeerConnection, pubStr) + connCallback() + } + return + } + + // First, cleanup any remaining state the server has regarding the peer + // in question. + s.removePeer(p) + + // Next, check to see if this is a persistent peer or not. + pubStr := string(pubKey.SerializeCompressed()) + _, ok := s.persistentPeers[pubStr] + if ok { + // We'll only need to re-launch a connection request if one + // isn't already currently pending. + if _, ok := s.persistentConnReqs[pubStr]; ok { + return + } + + // We'll ensure that we locate an advertised address to use + // within the peer's address for reconnection purposes. + // + // TODO(roasbeef): use them all? + if p.inbound { + advertisedAddr, err := s.fetchNodeAdvertisedAddr( + pubKey, + ) + if err != nil { + srvrLog.Errorf("Unable to retrieve advertised "+ + "address for node %x: %v", + pubKey.SerializeCompressed(), err) + } else { + p.addr.Address = advertisedAddr + } + } + + // Otherwise, we'll launch a new connection request in order to + // attempt to maintain a persistent connection with this peer. + connReq := &connmgr.ConnReq{ + Addr: p.addr, + Permanent: true, + } + s.persistentConnReqs[pubStr] = append( + s.persistentConnReqs[pubStr], connReq) + + // Record the computed backoff in the backoff map. + backoff := s.nextPeerBackoff(pubStr, p.StartTime()) + s.persistentPeersBackoff[pubStr] = backoff + + // Initialize a retry canceller for this peer if one does not + // exist. + cancelChan, ok := s.persistentRetryCancels[pubStr] + if !ok { + cancelChan = make(chan struct{}) + s.persistentRetryCancels[pubStr] = cancelChan + } + + // We choose not to wait group this go routine since the Connect + // call can stall for arbitrarily long if we shutdown while an + // outbound connection attempt is being made. + go func() { + srvrLog.Debugf("Scheduling connection re-establishment to "+ + "persistent peer %v in %s", p, backoff) + + select { + case <-time.After(backoff): + case <-cancelChan: + return + case <-s.quit: + return + } + + srvrLog.Debugf("Attempting to re-establish persistent "+ + "connection to peer %v", p) + + s.connMgr.Connect(connReq) + }() + } +} + // removePeer removes the passed peer from the server's state of all active // peers. func (s *server) removePeer(p *peer) { From d4d90979efd2d45335c06c093f83bb82e27c645a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 31 Jul 2018 14:38:29 -0700 Subject: [PATCH 4/4] peer: increase peer write timeout to 50 seconds Sometimes when performing an initial sync, the remote node isn't able to pull messages off the wire because of long running tasks and queues are saturated. With a shorter write timeout, we will give up trying to send messages and teardown the connection, even though the peer is still active. --- peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peer.go b/peer.go index 1040674b..6ce9302d 100644 --- a/peer.go +++ b/peer.go @@ -43,7 +43,7 @@ const ( idleTimeout = 5 * time.Minute // writeMessageTimeout is the timeout used when writing a message to peer. - writeMessageTimeout = 10 * time.Second + writeMessageTimeout = 50 * time.Second // outgoingQueueLen is the buffer size of the channel which houses // messages to be sent across the wire, requested by objects outside