server: modify NotifyWhenOnline to return the peer once connected

This commit is contained in:
Wilmer Paulino 2018-07-05 18:03:04 -07:00
parent 6504a9cfa8
commit 04c5eba194
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -110,7 +110,7 @@ type server struct {
inboundPeers map[string]*peer inboundPeers map[string]*peer
outboundPeers map[string]*peer outboundPeers map[string]*peer
peerConnectedListeners map[string][]chan<- struct{} peerConnectedListeners map[string][]chan<- lnpeer.Peer
persistentPeers map[string]struct{} persistentPeers map[string]struct{}
persistentPeersBackoff map[string]time.Duration persistentPeersBackoff map[string]time.Duration
@ -264,7 +264,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
peersByPub: make(map[string]*peer), peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer), inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer), outboundPeers: make(map[string]*peer),
peerConnectedListeners: make(map[string][]chan<- struct{}), peerConnectedListeners: make(map[string][]chan<- lnpeer.Peer),
globalFeatures: lnwire.NewFeatureVector(globalFeatures, globalFeatures: lnwire.NewFeatureVector(globalFeatures,
lnwire.GlobalFeatures), lnwire.GlobalFeatures),
@ -1568,31 +1568,38 @@ func (s *server) SendToPeer(target *btcec.PublicKey,
} }
// NotifyWhenOnline can be called by other subsystems to get notified when a // NotifyWhenOnline can be called by other subsystems to get notified when a
// particular peer comes online. // particular peer comes online. The peer itself is sent across the peerChan.
// //
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, func (s *server) NotifyWhenOnline(peerKey *btcec.PublicKey,
connectedChan chan<- struct{}) { peerChan chan<- lnpeer.Peer) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
// Compute the target peer's identifier. // Compute the target peer's identifier.
pubStr := string(peer.SerializeCompressed()) pubStr := string(peerKey.SerializeCompressed())
// Check if peer is connected. // Check if peer is connected.
_, ok := s.peersByPub[pubStr] peer, ok := s.peersByPub[pubStr]
if ok { if ok {
// Connected, can return early. // Connected, can return early.
srvrLog.Debugf("Notifying that peer %x is online", srvrLog.Debugf("Notifying that peer %x is online",
peer.SerializeCompressed()) peerKey.SerializeCompressed())
close(connectedChan)
select {
case peerChan <- peer:
case <-s.quit:
}
return return
} }
// Not connected, store this listener such that it can be notified when // Not connected, store this listener such that it can be notified when
// the peer comes online. // the peer comes online.
s.peerConnectedListeners[pubStr] = append( s.peerConnectedListeners[pubStr] = append(
s.peerConnectedListeners[pubStr], connectedChan) s.peerConnectedListeners[pubStr], peerChan,
)
} }
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the // sendPeerMessages enqueues a list of messages into the outgoingQueue of the
@ -2240,8 +2247,12 @@ func (s *server) addPeer(p *peer) {
} }
// Check if there are listeners waiting for this peer to come online. // Check if there are listeners waiting for this peer to come online.
for _, con := range s.peerConnectedListeners[pubStr] { for _, peerChan := range s.peerConnectedListeners[pubStr] {
close(con) select {
case peerChan <- p:
case <-s.quit:
return
}
} }
delete(s.peerConnectedListeners, pubStr) delete(s.peerConnectedListeners, pubStr)
} }