server: implement notifications for disconnected peers

This commit is contained in:
Wilmer Paulino 2019-01-09 20:16:27 -08:00
parent 9c59ac4383
commit cbce6e03fc
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -113,7 +113,8 @@ type server struct {
inboundPeers map[string]*peer
outboundPeers map[string]*peer
peerConnectedListeners map[string][]chan<- lnpeer.Peer
peerConnectedListeners map[string][]chan<- lnpeer.Peer
peerDisconnectedListeners map[string][]chan<- struct{}
persistentPeers map[string]struct{}
persistentPeersBackoff map[string]time.Duration
@ -282,11 +283,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
ignorePeerTermination: make(map[*peer]struct{}),
scheduledPeerConnection: make(map[string]func()),
peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
sentDisabled: make(map[wire.OutPoint]bool),
peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
peerDisconnectedListeners: make(map[string][]chan<- struct{}),
sentDisabled: make(map[wire.OutPoint]bool),
globalFeatures: lnwire.NewFeatureVector(globalFeatures,
lnwire.GlobalFeatures),
@ -1922,6 +1924,34 @@ func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey,
)
}
// NotifyWhenOffline delivers a notification to the caller of when the peer with
// the given public key has been disconnected. The notification is signaled by
// closing the channel returned.
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
c := make(chan struct{})
// If the peer is already offline, we can immediately trigger the
// notification.
peerPubKeyStr := string(peerPubKey[:])
if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
close(c)
return c
}
// Otherwise, the peer is online, so we'll keep track of the channel to
// trigger the notification once the server detects the peer
// disconnects.
s.peerDisconnectedListeners[peerPubKeyStr] = append(
s.peerDisconnectedListeners[peerPubKeyStr], c,
)
return c
}
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
// `targetPeer`. This method supports additional broadcast-level
// synchronization by using the additional `wg` to coordinate a particular
@ -2438,6 +2468,7 @@ func (s *server) peerInitializer(p *peer) {
defer s.mu.Unlock()
// Check if there are listeners waiting for this peer to come online.
srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey())
for _, peerChan := range s.peerConnectedListeners[pubStr] {
select {
case peerChan <- p:
@ -2502,6 +2533,15 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
s.mu.Lock()
defer s.mu.Unlock()
// If there were any notification requests for when this peer
// disconnected, we can trigger them now.
srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey())
pubStr := string(pubKey.SerializeCompressed())
for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
close(offlineChan)
}
delete(s.peerDisconnectedListeners, pubStr)
// If the server has already removed this peer, we can short circuit the
// peer termination watcher and skip cleanup.
if _, ok := s.ignorePeerTermination[p]; ok {
@ -2528,7 +2568,6 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
s.removePeer(p)
// Next, check to see if this is a persistent peer or not.
pubStr := string(pubKey.SerializeCompressed())
_, ok := s.persistentPeers[pubStr]
if ok {
// We'll only need to re-launch a connection request if one