package discovery import ( "sync" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" ) // reliableSenderCfg contains all of necessary items for the reliableSender to // carry out its duties. type reliableSenderCfg struct { // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. // // NOTE: The peerChan channel must be buffered. NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer) // NotifyWhenOffline is a function that allows the gossiper to be // notified when a certain peer disconnects, allowing it to request a // notification for when it reconnects. NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} // MessageStore is a persistent storage of gossip messages which we will // use to determine which messages need to be resent for a given peer. MessageStore GossipMessageStore // IsMsgStale determines whether a message retrieved from the backing // MessageStore is seen as stale by the current graph. IsMsgStale func(lnwire.Message) bool } // peerManager contains the set of channels required for the peerHandler to // properly carry out its duties. type peerManager struct { // msgs is the channel through which messages will be streamed to the // handler in order to send the message to the peer while they're // online. msgs chan lnwire.Message // done is a channel that will be closed to signal that the handler for // the given peer has been torn down for whatever reason. done chan struct{} } // reliableSender is a small subsystem of the gossiper used to reliably send // gossip messages to peers. type reliableSender struct { start sync.Once stop sync.Once cfg reliableSenderCfg // activePeers keeps track of whether a peerHandler exists for a given // peer. A peerHandler is tasked with handling requests for messages // that should be reliably sent to peers while also taking into account // the peer's connection lifecycle. activePeers map[[33]byte]peerManager activePeersMtx sync.Mutex wg sync.WaitGroup quit chan struct{} } // newReliableSender returns a new reliableSender backed by the given config. func newReliableSender(cfg *reliableSenderCfg) *reliableSender { return &reliableSender{ cfg: *cfg, activePeers: make(map[[33]byte]peerManager), quit: make(chan struct{}), } } // Start spawns message handlers for any peers with pending messages. func (s *reliableSender) Start() error { var err error s.start.Do(func() { err = s.resendPendingMsgs() }) return err } // Stop halts the reliable sender from sending messages to peers. func (s *reliableSender) Stop() { s.stop.Do(func() { close(s.quit) s.wg.Wait() }) } // sendMessage constructs a request to send a message reliably to a peer. In the // event that the peer is currently offline, this will only write the message to // disk. Once the peer reconnects, this message, along with any others pending, // will be sent to the peer. func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error { // We'll start by persisting the message to disk. This allows us to // resend the message upon restarts and peer reconnections. if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil { return err } // Then, we'll spawn a peerHandler for this peer to handle resending its // pending messages while taking into account its connection lifecycle. spawnHandler: msgHandler, ok := s.spawnPeerHandler(peerPubKey) // If the handler wasn't previously active, we can exit now as we know // that the message will be sent once the peer online notification is // received. This prevents us from potentially sending the message // twice. if !ok { return nil } // Otherwise, we'll attempt to stream the message to the handler. // There's a subtle race condition where the handler can be torn down // due to all of the messages sent being stale, so we'll handle this // gracefully by spawning another one to prevent blocking. select { case msgHandler.msgs <- msg: case <-msgHandler.done: goto spawnHandler case <-s.quit: return ErrGossiperShuttingDown } return nil } // spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't // one already active. The boolean returned signals whether there was already // one active or not. func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) { s.activePeersMtx.Lock() defer s.activePeersMtx.Unlock() msgHandler, ok := s.activePeers[peerPubKey] if !ok { msgHandler = peerManager{ msgs: make(chan lnwire.Message), done: make(chan struct{}), } s.activePeers[peerPubKey] = msgHandler s.wg.Add(1) go s.peerHandler(msgHandler, peerPubKey) } return msgHandler, ok } // peerHandler is responsible for handling our reliable message send requests // for a given peer while also taking into account the peer's connection // lifecycle. Any messages that are attempted to be sent while the peer is // offline will be queued and sent once the peer reconnects. // // NOTE: This must be run as a goroutine. func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { defer s.wg.Done() // We'll start by requesting a notification for when the peer // reconnects. peerChan := make(chan lnpeer.Peer, 1) waitUntilOnline: log.Debugf("Requesting online notification for peer=%x", peerPubKey) s.cfg.NotifyWhenOnline(peerPubKey, peerChan) var peer lnpeer.Peer out: for { select { // While we're waiting, we'll also consume any messages that // must be sent to prevent blocking the caller. These can be // ignored for now since the peer is currently offline. Once // they reconnect, the messages will be sent since they should // have been persisted to disk. case msg := <-peerMgr.msgs: // Retrieve the short channel ID for which this message // applies for logging purposes. The error can be // ignored as the store can only contain messages which // have a ShortChannelID field. shortChanID, _ := msgShortChanID(msg) log.Debugf("Received request to send %v message for "+ "channel=%v while peer=%x is offline", msg.MsgType(), shortChanID, peerPubKey) case peer = <-peerChan: break out case <-s.quit: return } } log.Debugf("Peer=%x is now online, proceeding to send pending messages", peerPubKey) // Once we detect the peer has reconnected, we'll also request a // notification for when they disconnect. We'll use this to make sure // they haven't disconnected (in the case of a flappy peer, etc.) by the // time we attempt to send them the pending messages. log.Debugf("Requesting offline notification for peer=%x", peerPubKey) offlineChan := s.cfg.NotifyWhenOffline(peerPubKey) pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey) if err != nil { log.Errorf("Unable to retrieve pending messages for peer %x: %v", peerPubKey, err) return } // With the peer online, we can now proceed to send our pending messages // for them. for _, msg := range pendingMsgs { // Retrieve the short channel ID for which this message applies // for logging purposes. The error can be ignored as the store // can only contain messages which have a ShortChannelID field. shortChanID, _ := msgShortChanID(msg) // Ensure the peer is still online right before sending the // message. select { case <-offlineChan: goto waitUntilOnline default: } if err := peer.SendMessage(false, msg); err != nil { log.Errorf("Unable to send %v message for channel=%v "+ "to %x: %v", msg.MsgType(), shortChanID, peerPubKey, err) goto waitUntilOnline } log.Debugf("Successfully sent %v message for channel=%v with "+ "peer=%x upon reconnection", msg.MsgType(), shortChanID, peerPubKey) // Now that the message has at least been sent once, we can // check whether it's stale. This guarantees that // AnnounceSignatures are sent at least once if we happen to // already have signatures for both parties. if s.cfg.IsMsgStale(msg) { err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey) if err != nil { log.Errorf("Unable to remove stale %v message "+ "for channel=%v with peer %x: %v", msg.MsgType(), shortChanID, peerPubKey, err) continue } log.Debugf("Removed stale %v message for channel=%v "+ "with peer=%x", msg.MsgType(), shortChanID, peerPubKey) } } // If all of our messages were stale, then there's no need for this // handler to continue running, so we can exit now. pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey) if err != nil { log.Errorf("Unable to retrieve pending messages for peer %x: %v", peerPubKey, err) return } if len(pendingMsgs) == 0 { log.Debugf("No pending messages left for peer=%x", peerPubKey) s.activePeersMtx.Lock() delete(s.activePeers, peerPubKey) s.activePeersMtx.Unlock() close(peerMgr.done) return } // Once the pending messages are sent, we can continue to send any // future messages while the peer remains connected. for { select { case msg := <-peerMgr.msgs: // Retrieve the short channel ID for which this message // applies for logging purposes. The error can be // ignored as the store can only contain messages which // have a ShortChannelID field. shortChanID, _ := msgShortChanID(msg) if err := peer.SendMessage(false, msg); err != nil { log.Errorf("Unable to send %v message for "+ "channel=%v to %x: %v", msg.MsgType(), shortChanID, peerPubKey, err) } log.Debugf("Successfully sent %v message for "+ "channel=%v with peer=%x", msg.MsgType(), shortChanID, peerPubKey) case <-offlineChan: goto waitUntilOnline case <-s.quit: return } } } // resendPendingMsgs retrieves and sends all of the messages within the message // store that should be reliably sent to their respective peers. func (s *reliableSender) resendPendingMsgs() error { // Fetch all of the peers for which we have pending messages for and // spawn a peerMsgHandler for each. Once the peer is seen as online, all // of the pending messages will be sent. peers, err := s.cfg.MessageStore.Peers() if err != nil { return err } for peer := range peers { s.spawnPeerHandler(peer) } return nil }