server: cleanup persistent connection retries

This commits changes the behavior of our connection
reestablishment, and resolves some minor issues that
could lead to uncancelled requests or an infinite
connection loop.

 - Will not attempt to Remove connection requests with
   an ID of 0. This can happen for reconnect attempts
   that get scheduled, but have not started at the
   time the server cancels the connection requests.

 - Adds a per-peer cancellation channel, that is
   closed upon a successful inbound or outbound
   connection. The goroutine spwaned to handle the
   reconnect by the peerTerminationWatch now
   selects on this channel, and skips reconnecting
   if it is closed before the backoff matures.

 - Properly computes the backoff when no entry in
   persistentPeersBackoff is found. Previously, a
   value of 0 would be returned, cause all subsequent
   backoff attempts to use a backoff of 0.

 - Cancels a peers retries and remove connections
   immediately after receiving an inbound connection,
   to mimic the structure of OutboundPeerConnected.

 - Cancels all persistent connection requests after
   calling DisconnectPeers.

 - Allow additional connection attempts to peers, even if
   there already exists a pending connection attempt.
This commit is contained in:
Conner Fromknecht 2018-03-30 16:19:52 -07:00
parent 4480052cd6
commit 2490b9b6e3
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

143
server.go

@ -51,7 +51,7 @@ var (
// maximumBackoff is the largest backoff we will permit when // maximumBackoff is the largest backoff we will permit when
// reattempting connections to persistent peers. // reattempting connections to persistent peers.
maximumBackoff = time.Minute maximumBackoff = time.Hour
) )
// server is the main server of the Lightning Network Daemon. The server houses // server is the main server of the Lightning Network Daemon. The server houses
@ -85,6 +85,7 @@ type server struct {
persistentPeers map[string]struct{} persistentPeers map[string]struct{}
persistentPeersBackoff map[string]time.Duration persistentPeersBackoff map[string]time.Duration
persistentConnReqs map[string][]*connmgr.ConnReq persistentConnReqs map[string][]*connmgr.ConnReq
persistentRetryCancels map[string]chan struct{}
// ignorePeerTermination tracks peers for which the server has initiated // ignorePeerTermination tracks peers for which the server has initiated
// a disconnect. Adding a peer to this map causes the peer termination // a disconnect. Adding a peer to this map causes the peer termination
@ -179,6 +180,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
persistentPeers: make(map[string]struct{}), persistentPeers: make(map[string]struct{}),
persistentPeersBackoff: make(map[string]time.Duration), persistentPeersBackoff: make(map[string]time.Duration),
persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentConnReqs: make(map[string][]*connmgr.ConnReq),
persistentRetryCancels: make(map[string]chan struct{}),
ignorePeerTermination: make(map[*peer]struct{}), ignorePeerTermination: make(map[*peer]struct{}),
peersByPub: make(map[string]*peer), peersByPub: make(map[string]*peer),
@ -1292,19 +1294,18 @@ func (s *server) peerTerminationWatcher(p *peer) {
s.persistentConnReqs[pubStr] = append( s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq) s.persistentConnReqs[pubStr], connReq)
// Now, determine the appropriate backoff to use for the retry. // Record the computed backoff in the backoff map.
backoff, ok := s.persistentPeersBackoff[pubStr] backoff := s.nextPeerBackoff(pubStr)
if !ok {
// If an existing backoff was unknown, use the default.
backoff = defaultBackoff
} else {
// Otherwise, use a previous backoff to compute the
// subsequent randomized exponential backoff duration.
backoff = computeNextBackoff(backoff)
}
s.persistentPeersBackoff[pubStr] = backoff s.persistentPeersBackoff[pubStr] = backoff
// Initialize a retry canceller for this peer if one does not
// exist.
cancelChan, ok := s.persistentRetryCancels[pubStr]
if !ok {
cancelChan = make(chan struct{})
s.persistentRetryCancels[pubStr] = cancelChan
}
// We choose not to wait group this go routine since the Connect // We choose not to wait group this go routine since the Connect
// call can stall for arbitrarily long if we shutdown while an // call can stall for arbitrarily long if we shutdown while an
// outbound connection attempt is being made. // outbound connection attempt is being made.
@ -1314,6 +1315,8 @@ func (s *server) peerTerminationWatcher(p *peer) {
select { select {
case <-time.After(backoff): case <-time.After(backoff):
case <-cancelChan:
return
case <-s.quit: case <-s.quit:
return return
} }
@ -1326,6 +1329,22 @@ func (s *server) peerTerminationWatcher(p *peer) {
} }
} }
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
// exponential backoff. If no previous backoff was known, the default is
// returned.
func (s *server) nextPeerBackoff(pubStr string) time.Duration {
// Now, determine the appropriate backoff to use for the retry.
backoff, ok := s.persistentPeersBackoff[pubStr]
if !ok {
// If an existing backoff was unknown, use the default.
return defaultBackoff
}
// Otherwise, use a previous backoff to compute the
// subsequent randomized exponential backoff duration.
return computeNextBackoff(backoff)
}
// shouldRequestGraphSync returns true if the servers deems it necessary that // shouldRequestGraphSync returns true if the servers deems it necessary that
// we sync channel graph state with the remote peer. This method is used to // we sync channel graph state with the remote peer. This method is used to
// avoid _always_ syncing channel graph state with each peer that connects. // avoid _always_ syncing channel graph state with each peer that connects.
@ -1434,8 +1453,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr()) srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
localPub := s.identityPriv.PubKey()
// Check to see if we already have a connection with this peer. If so, // Check to see if we already have a connection with this peer. If so,
// we may need to drop our existing connection. This prevents us from // we may need to drop our existing connection. This prevents us from
// having duplicate connections to the same peer. We forgo adding a // having duplicate connections to the same peer. We forgo adding a
@ -1452,6 +1469,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
// connection we've already established should be kept, then // connection we've already established should be kept, then
// we'll close out this connection s.t there's only a single // we'll close out this connection s.t there's only a single
// connection between us. // connection between us.
localPub := s.identityPriv.PubKey()
if !shouldDropLocalConnection(localPub, nodePub) { if !shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Received inbound connection from "+ srvrLog.Warnf("Received inbound connection from "+
"peer %x, but already connected, dropping conn", "peer %x, but already connected, dropping conn",
@ -1472,15 +1490,9 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
s.ignorePeerTermination[connectedPeer] = struct{}{} s.ignorePeerTermination[connectedPeer] = struct{}{}
} }
// Next, check to see if we have any outstanding persistent connection // Lastly, cancel all pending requests. The incoming connection will not
// requests to this peer. If so, then we'll remove all of these // have an associated connection request.
// connection requests, and also delete the entry from the map. s.cancelConnReqs(pubStr, nil)
if connReqs, ok := s.persistentConnReqs[pubStr]; ok {
for _, connReq := range connReqs {
s.connMgr.Remove(connReq.ID())
}
delete(s.persistentConnReqs, pubStr)
}
s.peerConnected(conn, nil, false) s.peerConnected(conn, nil, false)
} }
@ -1495,7 +1507,6 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
return return
} }
localPub := s.identityPriv.PubKey()
nodePub := conn.(*brontide.Conn).RemotePub() nodePub := conn.(*brontide.Conn).RemotePub()
pubStr := string(nodePub.SerializeCompressed()) pubStr := string(nodePub.SerializeCompressed())
@ -1506,29 +1517,31 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// this new connection. // this new connection.
if _, ok := s.outboundPeers[pubStr]; ok { if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Ignoring duplicate outbound connection") srvrLog.Debugf("Ignoring duplicate outbound connection")
if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
conn.Close() conn.Close()
return return
} }
if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil { if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil {
srvrLog.Debugf("Ignoring cancelled outbound connection") srvrLog.Debugf("Ignoring cancelled outbound connection")
s.connMgr.Remove(connReq.ID())
conn.Close() conn.Close()
return return
} }
srvrLog.Infof("Established connection to: %v", conn.RemoteAddr()) srvrLog.Infof("Established connection to: %v", conn.RemoteAddr())
// As we've just established an outbound connection to this peer, we'll if connReq != nil {
// cancel all other persistent connection requests and eliminate the // A successful connection was returned by the connmgr.
// entry for this peer from the map. // Immediately cancel all pending requests, excluding the
if connReqs, ok := s.persistentConnReqs[pubStr]; ok { // outbound connection we just established.
for _, pConnReq := range connReqs { ignore := connReq.ID()
if connReq != nil && s.cancelConnReqs(pubStr, &ignore)
pConnReq.ID() != connReq.ID() { } else {
// This was a successful connection made by some other
s.connMgr.Remove(pConnReq.ID()) // subsystem. Remove all requests being managed by the connmgr.
} s.cancelConnReqs(pubStr, nil)
}
delete(s.persistentConnReqs, pubStr)
} }
// If we already have a connection with this peer, decide whether or not // If we already have a connection with this peer, decide whether or not
@ -1546,6 +1559,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// If our (this) connection should be dropped, then we'll do // If our (this) connection should be dropped, then we'll do
// so, in order to ensure we don't have any duplicate // so, in order to ensure we don't have any duplicate
// connections. // connections.
localPub := s.identityPriv.PubKey()
if shouldDropLocalConnection(localPub, nodePub) { if shouldDropLocalConnection(localPub, nodePub) {
srvrLog.Warnf("Established outbound connection to "+ srvrLog.Warnf("Established outbound connection to "+
"peer %x, but already connected, dropping conn", "peer %x, but already connected, dropping conn",
@ -1573,6 +1587,55 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
s.peerConnected(conn, connReq, true) s.peerConnected(conn, connReq, true)
} }
// UnassignedConnID is the default connection ID that a request can have before
// it actually is submitted to the connmgr.
// TODO(conner): move into connmgr package, or better, add connmgr method for
// generating atomic IDs
const UnassignedConnID uint64 = 0
// cancelConnReqs stops all persistent connection requests for a given pubkey.
// Any attempts initiated by the peerTerminationWatcher are canceled first.
// Afterwards, each connection request removed from the connmgr. The caller can
// optionally specify a connection ID to ignore, which prevents us from
// canceling a successful request. All persistent connreqs for the provided
// pubkey are discarded after the operationjw.
func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
// First, cancel any lingering persistent retry attempts, which will
// prevent retries for any with backoffs that are still maturing.
if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok {
close(cancelChan)
delete(s.persistentRetryCancels, pubStr)
}
// Next, check to see if we have any outstanding persistent connection
// requests to this peer. If so, then we'll remove all of these
// connection requests, and also delete the entry from the map.
connReqs, ok := s.persistentConnReqs[pubStr]
if !ok {
return
}
for _, connReq := range connReqs {
// Atomically capture the current request identifier.
connID := connReq.ID()
// Skip any zero IDs, this indicates the request has not
// yet been schedule.
if connID == UnassignedConnID {
continue
}
// Skip a particular connection ID if instructed.
if skip != nil && connID == *skip {
continue
}
s.connMgr.Remove(connID)
}
delete(s.persistentConnReqs, pubStr)
}
// addPeer adds the passed peer to the server's global state of all active // addPeer adds the passed peer to the server's global state of all active
// peers. // peers.
func (s *server) addPeer(p *peer) { func (s *server) addPeer(p *peer) {
@ -1710,9 +1773,9 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
// If there's already a pending connection request for this pubkey, // If there's already a pending connection request for this pubkey,
// then we ignore this request to ensure we don't create a redundant // then we ignore this request to ensure we don't create a redundant
// connection. // connection.
if _, ok := s.persistentConnReqs[targetPub]; ok { if reqs, ok := s.persistentConnReqs[targetPub]; ok {
s.mu.Unlock() srvrLog.Warnf("Already have %d persistent connection "+
return fmt.Errorf("connection attempt to %v is pending", addr) "requests for %v, connecting anyway.", len(reqs), addr)
} }
// If there's not already a pending or active connection to this node, // If there's not already a pending or active connection to this node,
@ -1777,6 +1840,8 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
srvrLog.Infof("Disconnecting from %v", peer) srvrLog.Infof("Disconnecting from %v", peer)
//s.cancelConnReqs(pubStr, nil)
// If this peer was formerly a persistent connection, then we'll remove // If this peer was formerly a persistent connection, then we'll remove
// them from this map so we don't attempt to re-connect after we // them from this map so we don't attempt to re-connect after we
// disconnect. // disconnect.