Merge pull request #2463 from wpaulino/notify-when-offline
server: implement notifications for disconnected peers
This commit is contained in:
commit
b700b969e8
53
server.go
53
server.go
@ -113,7 +113,8 @@ type server struct {
|
|||||||
inboundPeers map[string]*peer
|
inboundPeers map[string]*peer
|
||||||
outboundPeers map[string]*peer
|
outboundPeers map[string]*peer
|
||||||
|
|
||||||
peerConnectedListeners map[string][]chan<- lnpeer.Peer
|
peerConnectedListeners map[string][]chan<- lnpeer.Peer
|
||||||
|
peerDisconnectedListeners map[string][]chan<- struct{}
|
||||||
|
|
||||||
persistentPeers map[string]struct{}
|
persistentPeers map[string]struct{}
|
||||||
persistentPeersBackoff map[string]time.Duration
|
persistentPeersBackoff map[string]time.Duration
|
||||||
@ -282,11 +283,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
|||||||
ignorePeerTermination: make(map[*peer]struct{}),
|
ignorePeerTermination: make(map[*peer]struct{}),
|
||||||
scheduledPeerConnection: make(map[string]func()),
|
scheduledPeerConnection: make(map[string]func()),
|
||||||
|
|
||||||
peersByPub: make(map[string]*peer),
|
peersByPub: make(map[string]*peer),
|
||||||
inboundPeers: make(map[string]*peer),
|
inboundPeers: make(map[string]*peer),
|
||||||
outboundPeers: make(map[string]*peer),
|
outboundPeers: make(map[string]*peer),
|
||||||
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
|
peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
|
||||||
sentDisabled: make(map[wire.OutPoint]bool),
|
peerDisconnectedListeners: make(map[string][]chan<- struct{}),
|
||||||
|
sentDisabled: make(map[wire.OutPoint]bool),
|
||||||
|
|
||||||
globalFeatures: lnwire.NewFeatureVector(globalFeatures,
|
globalFeatures: lnwire.NewFeatureVector(globalFeatures,
|
||||||
lnwire.GlobalFeatures),
|
lnwire.GlobalFeatures),
|
||||||
@ -1922,6 +1924,34 @@ func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey,
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NotifyWhenOffline delivers a notification to the caller of when the peer with
|
||||||
|
// the given public key has been disconnected. The notification is signaled by
|
||||||
|
// closing the channel returned.
|
||||||
|
func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
c := make(chan struct{})
|
||||||
|
|
||||||
|
// If the peer is already offline, we can immediately trigger the
|
||||||
|
// notification.
|
||||||
|
peerPubKeyStr := string(peerPubKey[:])
|
||||||
|
if _, ok := s.peersByPub[peerPubKeyStr]; !ok {
|
||||||
|
srvrLog.Debugf("Notifying that peer %x is offline", peerPubKey)
|
||||||
|
close(c)
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, the peer is online, so we'll keep track of the channel to
|
||||||
|
// trigger the notification once the server detects the peer
|
||||||
|
// disconnects.
|
||||||
|
s.peerDisconnectedListeners[peerPubKeyStr] = append(
|
||||||
|
s.peerDisconnectedListeners[peerPubKeyStr], c,
|
||||||
|
)
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
|
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
|
||||||
// `targetPeer`. This method supports additional broadcast-level
|
// `targetPeer`. This method supports additional broadcast-level
|
||||||
// synchronization by using the additional `wg` to coordinate a particular
|
// synchronization by using the additional `wg` to coordinate a particular
|
||||||
@ -2438,6 +2468,7 @@ func (s *server) peerInitializer(p *peer) {
|
|||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
// Check if there are listeners waiting for this peer to come online.
|
// Check if there are listeners waiting for this peer to come online.
|
||||||
|
srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey())
|
||||||
for _, peerChan := range s.peerConnectedListeners[pubStr] {
|
for _, peerChan := range s.peerConnectedListeners[pubStr] {
|
||||||
select {
|
select {
|
||||||
case peerChan <- p:
|
case peerChan <- p:
|
||||||
@ -2502,6 +2533,15 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
// If there were any notification requests for when this peer
|
||||||
|
// disconnected, we can trigger them now.
|
||||||
|
srvrLog.Debugf("Notifying that peer %x is offline", p.PubKey())
|
||||||
|
pubStr := string(pubKey.SerializeCompressed())
|
||||||
|
for _, offlineChan := range s.peerDisconnectedListeners[pubStr] {
|
||||||
|
close(offlineChan)
|
||||||
|
}
|
||||||
|
delete(s.peerDisconnectedListeners, pubStr)
|
||||||
|
|
||||||
// If the server has already removed this peer, we can short circuit the
|
// If the server has already removed this peer, we can short circuit the
|
||||||
// peer termination watcher and skip cleanup.
|
// peer termination watcher and skip cleanup.
|
||||||
if _, ok := s.ignorePeerTermination[p]; ok {
|
if _, ok := s.ignorePeerTermination[p]; ok {
|
||||||
@ -2528,7 +2568,6 @@ func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
|
|||||||
s.removePeer(p)
|
s.removePeer(p)
|
||||||
|
|
||||||
// Next, check to see if this is a persistent peer or not.
|
// Next, check to see if this is a persistent peer or not.
|
||||||
pubStr := string(pubKey.SerializeCompressed())
|
|
||||||
_, ok := s.persistentPeers[pubStr]
|
_, ok := s.persistentPeers[pubStr]
|
||||||
if ok {
|
if ok {
|
||||||
// We'll only need to re-launch a connection request if one
|
// We'll only need to re-launch a connection request if one
|
||||||
|
Loading…
Reference in New Issue
Block a user