server: add peer connection callback scheduling
This commit adds a simple scheduling mechanism for resolving potential deadlocks when dropping a stale connection (via pubkey inspection). Ideally, we'd like to wait to activate a new peer until the previous one has exited entirely. However, the current logic attempts to disconnect (and wait) until the peer has been cleaned up fully, which can result in deadlocks with other portions of the codebase, since other blocking methods may also need acquire the mutex before the peer can exit. When existing connections are replaced, they now schedule a callback that is executed inside the peerTerminationWatcher. Since the peer now waits for the clean exit of the prior peer, this callback is now executed with a clean slate, adds the peer to the server's maps, and initiates peer's Start() method.
This commit is contained in:
parent
27ca61aedf
commit
8c04dd0030
72
server.go
72
server.go
@ -93,6 +93,13 @@ type server struct {
|
||||
// disconnected.
|
||||
ignorePeerTermination map[*peer]struct{}
|
||||
|
||||
// scheduledPeerConnection maps a pubkey string to a callback that
|
||||
// should be executed in the peerTerminationWatcher the prior peer with
|
||||
// the same pubkey exits. This allows the server to wait until the
|
||||
// prior peer has cleaned up successfully, before adding the new peer
|
||||
// intended to replace it.
|
||||
scheduledPeerConnection map[string]func()
|
||||
|
||||
cc *chainControl
|
||||
|
||||
fundingMgr *fundingManager
|
||||
@ -177,11 +184,12 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
|
||||
sphinx: htlcswitch.NewOnionProcessor(sphinxRouter),
|
||||
lightningID: sha256.Sum256(serializedPubKey),
|
||||
|
||||
persistentPeers: make(map[string]struct{}),
|
||||
persistentPeersBackoff: make(map[string]time.Duration),
|
||||
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
|
||||
persistentRetryCancels: make(map[string]chan struct{}),
|
||||
ignorePeerTermination: make(map[*peer]struct{}),
|
||||
persistentPeers: make(map[string]struct{}),
|
||||
persistentPeersBackoff: make(map[string]time.Duration),
|
||||
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
|
||||
persistentRetryCancels: make(map[string]chan struct{}),
|
||||
ignorePeerTermination: make(map[*peer]struct{}),
|
||||
scheduledPeerConnection: make(map[string]func()),
|
||||
|
||||
peersByPub: make(map[string]*peer),
|
||||
inboundPeers: make(map[string]*peer),
|
||||
@ -1290,6 +1298,20 @@ func (s *server) peerTerminationWatcher(p *peer) {
|
||||
// peer termination watcher and skip cleanup.
|
||||
if _, ok := s.ignorePeerTermination[p]; ok {
|
||||
delete(s.ignorePeerTermination, p)
|
||||
|
||||
pubKey := p.PubKey()
|
||||
pubStr := string(pubKey[:])
|
||||
|
||||
// If a connection callback is present, we'll go ahead and
|
||||
// execute it now that previous peer has fully disconnected. If
|
||||
// the callback is not present, this likely implies the peer was
|
||||
// purposefully disconnected via RPC, and that no reconnect
|
||||
// should be attempted.
|
||||
connCallback, ok := s.scheduledPeerConnection[pubStr]
|
||||
if ok {
|
||||
delete(s.scheduledPeerConnection, pubStr)
|
||||
connCallback()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -1480,8 +1502,23 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
// If we already have a valid connection that is scheduled to take
|
||||
// precedence once the prior peer has finished disconnecting, we'll
|
||||
// ignore this connection.
|
||||
if _, ok := s.scheduledPeerConnection[pubStr]; ok {
|
||||
srvrLog.Debugf("Ignoring connection, peer already scheduled")
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
|
||||
|
||||
// Cancel all pending connection requests, we either already have an
|
||||
// outbound connection, or this incoming connection will become our
|
||||
// primary connection. The incoming connection will not have an
|
||||
// associated connection request, so we pass nil.
|
||||
s.cancelConnReqs(pubStr, nil)
|
||||
|
||||
// Check to see if we already have a connection with this peer. If so,
|
||||
// we may need to drop our existing connection. This prevents us from
|
||||
// having duplicate connections to the same peer. We forgo adding a
|
||||
@ -1492,6 +1529,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
case ErrPeerNotConnected:
|
||||
// We were unable to locate an existing connection with the
|
||||
// target peer, proceed to connect.
|
||||
s.peerConnected(conn, nil, false)
|
||||
|
||||
case nil:
|
||||
// We already have a connection with the incoming peer. If the
|
||||
@ -1517,13 +1555,10 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
|
||||
// execute for this peer.
|
||||
s.removePeer(connectedPeer)
|
||||
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
||||
s.scheduledPeerConnection[pubStr] = func() {
|
||||
s.peerConnected(conn, nil, false)
|
||||
}
|
||||
}
|
||||
|
||||
// Lastly, cancel all pending requests. The incoming connection will not
|
||||
// have an associated connection request.
|
||||
s.cancelConnReqs(pubStr, nil)
|
||||
|
||||
s.peerConnected(conn, nil, false)
|
||||
}
|
||||
|
||||
// OutboundPeerConnected initializes a new peer in response to a new outbound
|
||||
@ -1559,6 +1594,15 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
return
|
||||
}
|
||||
|
||||
// If we already have a valid connection that is scheduled to take
|
||||
// precedence once the prior peer has finished disconnecting, we'll
|
||||
// ignore this connection.
|
||||
if _, ok := s.scheduledPeerConnection[pubStr]; ok {
|
||||
srvrLog.Debugf("Ignoring connection, peer already scheduled")
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
srvrLog.Infof("Established connection to: %v", conn.RemoteAddr())
|
||||
|
||||
if connReq != nil {
|
||||
@ -1582,6 +1626,7 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
case ErrPeerNotConnected:
|
||||
// We were unable to locate an existing connection with the
|
||||
// target peer, proceed to connect.
|
||||
s.peerConnected(conn, connReq, true)
|
||||
|
||||
case nil:
|
||||
// We already have a connection open with the target peer.
|
||||
@ -1611,9 +1656,10 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
||||
// execute for this peer.
|
||||
s.removePeer(connectedPeer)
|
||||
s.ignorePeerTermination[connectedPeer] = struct{}{}
|
||||
s.scheduledPeerConnection[pubStr] = func() {
|
||||
s.peerConnected(conn, connReq, true)
|
||||
}
|
||||
}
|
||||
|
||||
s.peerConnected(conn, connReq, true)
|
||||
}
|
||||
|
||||
// UnassignedConnID is the default connection ID that a request can have before
|
||||
|
Loading…
Reference in New Issue
Block a user