server: add async peer Start() + safer cleanup

This commit adds asynchronous starting of peers,
in order to avoid potential DOS vectors. Currently,
we block with the server's mutex while peers exchange
Init messages and perform other setup. Thus, a remote
peer that does not reply with an init message will
cause server to block for 15s per attempt.

We also modify the startup behavior to spawn
peerTerminationWatchers before starting the
peer itself, ensuring that a peer is properly
cleaned up if the initialization fails. Currently,
failing to start a peer does not execute the bulk
of the teardown logic, since it is not spawned
until after a successful Start occurs.
This commit is contained in:
Conner Fromknecht 2018-07-30 20:58:16 -07:00
parent a1abb11dc5
commit 0ee0abc166
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -1922,13 +1922,16 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer, error) {
// sub-systems of its demise, and finally handles re-connecting to the peer if // sub-systems of its demise, and finally handles re-connecting to the peer if
// it's persistent. If the server intentionally disconnects a peer, it should // it's persistent. If the server intentionally disconnects a peer, it should
// have a corresponding entry in the ignorePeerTermination map which will cause // have a corresponding entry in the ignorePeerTermination map which will cause
// the cleanup routine to exit early. // the cleanup routine to exit early. The passed `ready` chan is used to
// synchronize when WaitForDisconnect should begin watching on the peer's
// waitgroup. The ready chan should only be signaled if the peer starts
// successfully, otherwise the peer should be disconnected instead.
// //
// NOTE: This MUST be launched as a goroutine. // NOTE: This MUST be launched as a goroutine.
func (s *server) peerTerminationWatcher(p *peer) { func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
defer s.wg.Done() defer s.wg.Done()
p.WaitForDisconnect() p.WaitForDisconnect(ready)
srvrLog.Debugf("Peer %v has been disconnected", p) srvrLog.Debugf("Peer %v has been disconnected", p)
@ -2121,7 +2124,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
addr := conn.RemoteAddr() addr := conn.RemoteAddr()
pubKey := brontideConn.RemotePub() pubKey := brontideConn.RemotePub()
srvrLog.Infof("finalizing connection to %x, inbound=%v", srvrLog.Infof("Finalizing connection to %x, inbound=%v",
pubKey.SerializeCompressed(), inbound) pubKey.SerializeCompressed(), inbound)
peerAddr := &lnwire.NetAddress{ peerAddr := &lnwire.NetAddress{
@ -2158,14 +2161,13 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
// TODO(roasbeef): update IP address for link-node // TODO(roasbeef): update IP address for link-node
// * also mark last-seen, do it one single transaction? // * also mark last-seen, do it one single transaction?
// Attempt to start the peer, if we're unable to do so, then disconnect
// this peer.
if err := p.Start(); err != nil {
p.Disconnect(errors.Errorf("unable to start peer: %v", err))
return
}
s.addPeer(p) s.addPeer(p)
// Dispatch a goroutine to asynchronously start the peer. This process
// includes sending and receiving Init messages, which would be a DOS
// vector if we held the server's mutex throughout the procedure.
s.wg.Add(1)
go s.peerInitializer(p)
} }
// shouldDropConnection determines if our local connection to a remote peer // shouldDropConnection determines if our local connection to a remote peer
@ -2453,15 +2455,49 @@ func (s *server) addPeer(p *peer) {
} else { } else {
s.outboundPeers[pubStr] = p s.outboundPeers[pubStr] = p
} }
}
// Launch a goroutine to watch for the unexpected termination of this // peerInitializer asynchronously starts a newly connected peer after it has
// peer, which will ensure all resources are properly cleaned up, and // been added to the server's peer map. This method sets up a
// re-establish persistent connections when necessary. The peer // peerTerminationWatcher for the given peer, and ensures that it executes even
// termination watcher will be short circuited if the peer is ever // if the peer failed to start. In the event of a successful connection, this
// added to the ignorePeerTermination map, indicating that the server // method reads the negotiated, local feature-bits and spawns the appropriate
// has already handled the removal of this peer. // graph synchronization method. Any registered clients of NotifyWhenOnline will
// be signaled of the new peer once the method returns.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerInitializer(p *peer) {
defer s.wg.Done()
// Avoid initializing peers while the server is exiting.
if s.Stopped() {
return
}
// Create a channel that will be used to signal a successful start of
// the link. This prevents the peer termination watcher from beginning
// its duty too early.
ready := make(chan struct{})
// Before starting the peer, launch a goroutine to watch for the
// unexpected termination of this peer, which will ensure all resources
// are properly cleaned up, and re-establish persistent connections when
// necessary. The peer termination watcher will be short circuited if
// the peer is ever added to the ignorePeerTermination map, indicating
// that the server has already handled the removal of this peer.
s.wg.Add(1) s.wg.Add(1)
go s.peerTerminationWatcher(p) go s.peerTerminationWatcher(p, ready)
// Start teh peer! If an error occurs, we Disconnect the peer, which
// will unblock the peerTerminationWatcher.
if err := p.Start(); err != nil {
p.Disconnect(errors.New("unable to start peer: %v"))
return
}
// Otherwise, signal to the peerTerminationWatcher that the peer startup
// was successful, and to begin watching the peer's wait group.
close(ready)
switch { switch {
// If the remote peer knows of the new gossip queries feature, then // If the remote peer knows of the new gossip queries feature, then
@ -2490,6 +2526,11 @@ func (s *server) addPeer(p *peer) {
go s.authGossiper.SynchronizeNode(p) go s.authGossiper.SynchronizeNode(p)
} }
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
s.mu.Lock()
defer s.mu.Unlock()
// Check if there are listeners waiting for this peer to come online. // Check if there are listeners waiting for this peer to come online.
for _, peerChan := range s.peerConnectedListeners[pubStr] { for _, peerChan := range s.peerConnectedListeners[pubStr] {
select { select {