From cbce6e03fc916860ee7546a04d0708f24f1552db Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 9 Jan 2019 20:16:27 -0800 Subject: [PATCH] server: implement notifications for disconnected peers --- server.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/server.go b/server.go index a55f2f60..f0c34ac4 100644 --- a/server.go +++ b/server.go @@ -113,7 +113,8 @@ type server struct { inboundPeers map[string]*peer outboundPeers map[string]*peer - peerConnectedListeners map[string][]chan<- lnpeer.Peer + peerConnectedListeners map[string][]chan<- lnpeer.Peer + peerDisconnectedListeners map[string][]chan<- struct{} persistentPeers map[string]struct{} persistentPeersBackoff map[string]time.Duration @@ -282,11 +283,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ignorePeerTermination: make(map[*peer]struct{}), scheduledPeerConnection: make(map[string]func()), - peersByPub: make(map[string]*peer), - inboundPeers: make(map[string]*peer), - outboundPeers: make(map[string]*peer), - peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), - sentDisabled: make(map[wire.OutPoint]bool), + peersByPub: make(map[string]*peer), + inboundPeers: make(map[string]*peer), + outboundPeers: make(map[string]*peer), + peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer), + peerDisconnectedListeners: make(map[string][]chan<- struct{}), + sentDisabled: make(map[wire.OutPoint]bool), globalFeatures: lnwire.NewFeatureVector(globalFeatures, lnwire.GlobalFeatures), @@ -1922,6 +1924,34 @@ func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey, ) } +// NotifyWhenOffline delivers a notification to the caller of when the peer with +// the given public key has been disconnected. The notification is signaled by +// closing the channel returned. +func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} { + s.mu.Lock() + defer s.mu.Unlock() + + c := make(chan struct{}) + + // If the peer is already offline, we can immediately trigger the + // notification. + peerPubKeyStr := string(peerPubKey[:]) + if _, ok := s.peersByPub[peerPubKeyStr]; !ok { + srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey) + close(c) + return c + } + + // Otherwise, the peer is online, so we'll keep track of the channel to + // trigger the notification once the server detects the peer + // disconnects. + s.peerDisconnectedListeners[peerPubKeyStr] = append( + s.peerDisconnectedListeners[peerPubKeyStr], c, + ) + + return c +} + // sendPeerMessages enqueues a list of messages into the outgoingQueue of the // `targetPeer`. This method supports additional broadcast-level // synchronization by using the additional `wg` to coordinate a particular @@ -2438,6 +2468,7 @@ func (s *server) peerInitializer(p *peer) { defer s.mu.Unlock() // Check if there are listeners waiting for this peer to come online. + srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey()) for _, peerChan := range s.peerConnectedListeners[pubStr] { select { case peerChan <- p: @@ -2502,6 +2533,15 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { s.mu.Lock() defer s.mu.Unlock() + // If there were any notification requests for when this peer + // disconnected, we can trigger them now. + srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey()) + pubStr := string(pubKey.SerializeCompressed()) + for _, offlineChan := range s.peerDisconnectedListeners[pubStr] { + close(offlineChan) + } + delete(s.peerDisconnectedListeners, pubStr) + // If the server has already removed this peer, we can short circuit the // peer termination watcher and skip cleanup. if _, ok := s.ignorePeerTermination[p]; ok { @@ -2528,7 +2568,6 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) { s.removePeer(p) // Next, check to see if this is a persistent peer or not. - pubStr := string(pubKey.SerializeCompressed()) _, ok := s.persistentPeers[pubStr] if ok { // We'll only need to re-launch a connection request if one