diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go new file mode 100644 index 00000000..c336b01d --- /dev/null +++ b/discovery/reliable_sender.go @@ -0,0 +1,316 @@ +package discovery + +import ( + "sync" + + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// reliableSenderCfg contains all of necessary items for the reliableSender to +// carry out its duties. +type reliableSenderCfg struct { + // NotifyWhenOnline is a function that allows the gossiper to be + // notified when a certain peer comes online, allowing it to + // retry sending a peer message. + // + // NOTE: The peerChan channel must be buffered. + // + // TODO(wilmer): use [33]byte to avoid unnecessary serializations. + NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + + // NotifyWhenOffline is a function that allows the gossiper to be + // notified when a certain peer disconnects, allowing it to request a + // notification for when it reconnects. + NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} + + // MessageStore is a persistent storage of gossip messages which we will + // use to determine which messages need to be resent for a given peer. + MessageStore GossipMessageStore + + // IsMsgStale determines whether a message retrieved from the backing + // MessageStore is seen as stale by the current graph. + IsMsgStale func(lnwire.Message) bool +} + +// peerManager contains the set of channels required for the peerHandler to +// properly carry out its duties. +type peerManager struct { + // msgs is the channel through which messages will be streamed to the + // handler in order to send the message to the peer while they're + // online. + msgs chan lnwire.Message + + // done is a channel that will be closed to signal that the handler for + // the given peer has been torn down for whatever reason. + done chan struct{} +} + +// reliableSender is a small subsystem of the gossiper used to reliably send +// gossip messages to peers. +type reliableSender struct { + start sync.Once + stop sync.Once + + cfg reliableSenderCfg + + // activePeers keeps track of whether a peerHandler exists for a given + // peer. A peerHandler is tasked with handling requests for messages + // that should be reliably sent to peers while also taking into account + // the peer's connection lifecycle. + activePeers map[[33]byte]peerManager + activePeersMtx sync.Mutex + + wg sync.WaitGroup + quit chan struct{} +} + +// newReliableSender returns a new reliableSender backed by the given config. +func newReliableSender(cfg *reliableSenderCfg) *reliableSender { + return &reliableSender{ + cfg: *cfg, + activePeers: make(map[[33]byte]peerManager), + quit: make(chan struct{}), + } +} + +// Start spawns message handlers for any peers with pending messages. +func (s *reliableSender) Start() error { + var err error + s.start.Do(func() { + err = s.resendPendingMsgs() + }) + return err +} + +// Stop halts the reliable sender from sending messages to peers. +func (s *reliableSender) Stop() { + s.stop.Do(func() { + close(s.quit) + s.wg.Wait() + }) +} + +// sendMessage constructs a request to send a message reliably to a peer. In the +// event that the peer is currently offline, this will only write the message to +// disk. Once the peer reconnects, this message, along with any others pending, +// will be sent to the peer. +func (s *reliableSender) sendMessage(msg lnwire.Message, peerPubKey [33]byte) error { + // We'll start by persisting the message to disk. This allows us to + // resend the message upon restarts and peer reconnections. + if err := s.cfg.MessageStore.AddMessage(msg, peerPubKey); err != nil { + return err + } + + // Then, we'll spawn a peerHandler for this peer to handle resending its + // pending messages while taking into account its connection lifecycle. +spawnHandler: + msgHandler, ok := s.spawnPeerHandler(peerPubKey) + + // If the handler wasn't previously active, we can exit now as we know + // that the message will be sent once the peer online notification is + // received. This prevents us from potentially sending the message + // twice. + if !ok { + return nil + } + + // Otherwise, we'll attempt to stream the message to the handler. + // There's a subtle race condition where the handler can be torn down + // due to all of the messages sent being stale, so we'll handle this + // gracefully by spawning another one to prevent blocking. + select { + case msgHandler.msgs <- msg: + case <-msgHandler.done: + goto spawnHandler + case <-s.quit: + return ErrGossiperShuttingDown + } + + return nil +} + +// spawnPeerMsgHandler spawns a peerHandler for the given peer if there isn't +// one already active. The boolean returned signals whether there was already +// one active or not. +func (s *reliableSender) spawnPeerHandler(peerPubKey [33]byte) (peerManager, bool) { + s.activePeersMtx.Lock() + defer s.activePeersMtx.Unlock() + + msgHandler, ok := s.activePeers[peerPubKey] + if !ok { + msgHandler = peerManager{ + msgs: make(chan lnwire.Message), + done: make(chan struct{}), + } + s.activePeers[peerPubKey] = msgHandler + + s.wg.Add(1) + go s.peerHandler(msgHandler, peerPubKey) + } + + return msgHandler, ok +} + +// peerHandler is responsible for handling our reliable message send requests +// for a given peer while also taking into account the peer's connection +// lifecycle. Any messages that are attempted to be sent while the peer is +// offline will be queued and sent once the peer reconnects. +// +// NOTE: This must be run as a goroutine. +func (s *reliableSender) peerHandler(peerMgr peerManager, peerPubKey [33]byte) { + defer s.wg.Done() + + // We'll start by requesting a notification for when the peer + // reconnects. + pubKey, _ := btcec.ParsePubKey(peerPubKey[:], btcec.S256()) + peerChan := make(chan lnpeer.Peer, 1) + +waitUntilOnline: + log.Debugf("Requesting online notification for peer=%x", peerPubKey) + + s.cfg.NotifyWhenOnline(pubKey, peerChan) + + var peer lnpeer.Peer +out: + for { + select { + // While we're waiting, we'll also consume any messages that + // must be sent to prevent blocking the caller. These can be + // ignored for now since the peer is currently offline. Once + // they reconnect, the messages will be sent since they should + // have been persisted to disk. + case <-peerMgr.msgs: + case peer = <-peerChan: + break out + case <-s.quit: + return + } + } + + log.Debugf("Peer=%x is now online, proceeding to send pending messages", + peerPubKey) + + // Once we detect the peer has reconnected, we'll also request a + // notification for when they disconnect. We'll use this to make sure + // they haven't disconnected (in the case of a flappy peer, etc.) by the + // time we attempt to send them the pending messages. + log.Debugf("Requesting offline notification for peer=%x", peerPubKey) + + offlineChan := s.cfg.NotifyWhenOffline(peerPubKey) + + pendingMsgs, err := s.cfg.MessageStore.MessagesForPeer(peerPubKey) + if err != nil { + log.Errorf("Unable to retrieve pending messages for peer %x: %v", + peerPubKey, err) + return + } + + // With the peer online, we can now proceed to send our pending messages + // for them. + for _, msg := range pendingMsgs { + // Retrieve the short channel ID for which this message applies + // for logging purposes. The error can be ignored as the store + // can only contain messages which have a ShortChannelID field. + shortChanID, _ := msgShortChanID(msg) + + if err := peer.SendMessage(false, msg); err != nil { + log.Errorf("Unable to send %v message for channel=%v "+ + "to %x: %v", msg.MsgType(), shortChanID, + peerPubKey, err) + goto waitUntilOnline + } + + log.Debugf("Successfully sent %v message for channel=%v with "+ + "peer=%x upon reconnection", msg.MsgType(), shortChanID, + peerPubKey) + + // Now that the message has at least been sent once, we can + // check whether it's stale. This guarantees that + // AnnounceSignatures are sent at least once if we happen to + // already have signatures for both parties. + if s.cfg.IsMsgStale(msg) { + err := s.cfg.MessageStore.DeleteMessage(msg, peerPubKey) + if err != nil { + log.Errorf("Unable to remove stale %v message "+ + "for channel=%v with peer %x: %v", + msg.MsgType(), shortChanID, peerPubKey, + err) + continue + } + + log.Debugf("Removed stale %v message for channel=%v "+ + "with peer=%x", msg.MsgType(), shortChanID, + peerPubKey) + } + } + + // If all of our messages were stale, then there's no need for this + // handler to continue running, so we can exit now. + pendingMsgs, err = s.cfg.MessageStore.MessagesForPeer(peerPubKey) + if err != nil { + log.Errorf("Unable to retrieve pending messages for peer %x: %v", + peerPubKey, err) + return + } + + if len(pendingMsgs) == 0 { + log.Debugf("No pending messages left for peer=%x", peerPubKey) + + s.activePeersMtx.Lock() + delete(s.activePeers, peerPubKey) + s.activePeersMtx.Unlock() + + close(peerMgr.done) + + return + } + + // Once the pending messages are sent, we can continue to send any + // future messages while the peer remains connected. + for { + select { + case msg := <-peerMgr.msgs: + // Retrieve the short channel ID for which this message + // applies for logging purposes. The error can be + // ignored as the store can only contain messages which + // have a ShortChannelID field. + shortChanID, _ := msgShortChanID(msg) + + if err := peer.SendMessage(false, msg); err != nil { + log.Errorf("Unable to send %v message for "+ + "channel=%v to %x: %v", msg.MsgType(), + shortChanID, peerPubKey, err) + } + + log.Debugf("Successfully sent %v message for "+ + "channel=%v with peer=%x", msg.MsgType(), + shortChanID, peerPubKey) + + case <-offlineChan: + goto waitUntilOnline + + case <-s.quit: + return + } + } +} + +// resendPendingMsgs retrieves and sends all of the messages within the message +// store that should be reliably sent to their respective peers. +func (s *reliableSender) resendPendingMsgs() error { + // Fetch all of the peers for which we have pending messages for and + // spawn a peerMsgHandler for each. Once the peer is seen as online, all + // of the pending messages will be sent. + peers, err := s.cfg.MessageStore.Peers() + if err != nil { + return err + } + + for peer := range peers { + s.spawnPeerHandler(peer) + } + + return nil +} diff --git a/discovery/reliable_sender_test.go b/discovery/reliable_sender_test.go new file mode 100644 index 00000000..8633b7d4 --- /dev/null +++ b/discovery/reliable_sender_test.go @@ -0,0 +1,312 @@ +package discovery + +import ( + "fmt" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// newTestReliableSender creates a new reliable sender instance used for +// testing. +func newTestReliableSender(t *testing.T) *reliableSender { + t.Helper() + + cfg := &reliableSenderCfg{ + NotifyWhenOnline: func(pubKey *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + peerChan <- &mockPeer{pk: pubKey} + }, + NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}, 1) + return c + }, + MessageStore: newMockMessageStore(), + IsMsgStale: func(lnwire.Message) bool { + return false + }, + } + + return newReliableSender(cfg) +} + +// assertMsgsSent ensures that the given messages can be read from a mock peer's +// msgChan. +func assertMsgsSent(t *testing.T, msgChan chan lnwire.Message, + msgs ...lnwire.Message) { + + t.Helper() + + m := make(map[lnwire.Message]struct{}, len(msgs)) + for _, msg := range msgs { + m[msg] = struct{}{} + } + + for i := 0; i < len(msgs); i++ { + select { + case msg := <-msgChan: + if _, ok := m[msg]; !ok { + t.Fatalf("found unexpected message sent: %v", + spew.Sdump(msg)) + } + case <-time.After(time.Second): + t.Fatal("reliable sender did not send message to peer") + } + } +} + +// waitPredicate is a helper test function that will wait for a timeout period +// of time until the passed predicate returns true. +func waitPredicate(t *testing.T, timeout time.Duration, pred func() bool) { + t.Helper() + + const pollInterval = 20 * time.Millisecond + exitTimer := time.After(timeout) + + for { + <-time.After(pollInterval) + + select { + case <-exitTimer: + t.Fatalf("predicate not satisfied after timeout") + default: + } + + if pred() { + return + } + } +} + +// TestReliableSenderFlow ensures that the flow for sending messages reliably to +// a peer while taking into account its connection lifecycle works as expected. +func TestReliableSenderFlow(t *testing.T) { + t.Parallel() + + reliableSender := newTestReliableSender(t) + + // Create a mock peer to send the messages to. + pubKey := randPubKey(t) + msgsSent := make(chan lnwire.Message) + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + + // Override NotifyWhenOnline and NotifyWhenOffline to provide the + // notification channels so that we can control when notifications get + // dispatched. + notifyOnline := make(chan chan<- lnpeer.Peer, 2) + notifyOffline := make(chan chan struct{}, 1) + + reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + notifyOnline <- peerChan + } + reliableSender.cfg.NotifyWhenOffline = func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}, 1) + notifyOffline <- c + return c + } + + // We'll start by creating our first message which we should reliably + // send to our peer. + msg1 := randChannelUpdate() + var peerPubKey [33]byte + copy(peerPubKey[:], pubKey.SerializeCompressed()) + if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Since there isn't a peerHandler for this peer currently active due to + // this being the first message being sent reliably, we should expect to + // see a notification request for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // We'll then attempt to send another additional message reliably. + msg2 := randAnnounceSignatures() + if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // This should not however request another peer online notification as + // the peerHandler has already been started and is waiting for the + // notification to be dispatched. + select { + case <-notifyOnline: + t.Fatal("reliable sender should not request online notification") + case <-time.After(time.Second): + } + + // We'll go ahead and notify the peer. + peerChan <- peer + + // By doing so, we should expect to see a notification request for when + // the peer is offline. + var offlineChan chan struct{} + select { + case offlineChan = <-notifyOffline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request offline notification") + } + + // We should also see the messages arrive at the peer since they are now + // seen as online. + assertMsgsSent(t, peer.sentMsgs, msg1, msg2) + + // Then, we'll send one more message reliably. + msg3 := randChannelUpdate() + if err := reliableSender.sendMessage(msg3, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Again, this should not request another peer online notification + // request since we are currently waiting for the peer to be offline. + select { + case <-notifyOnline: + t.Fatal("reliable sender should not request online notification") + case <-time.After(time.Second): + } + + // The expected message should be sent to the peer. + assertMsgsSent(t, peer.sentMsgs, msg3) + + // We'll then notify that the peer is offline. + close(offlineChan) + + // This should cause an online notification to be requested. + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // Once we dispatch it, we should expect to see the messages be resent + // to the peer as they are not stale. + peerChan <- peer + + select { + case <-notifyOffline: + case <-time.After(5 * time.Second): + t.Fatal("reliable sender did not request offline notification") + } + + assertMsgsSent(t, peer.sentMsgs, msg1, msg2, msg3) +} + +// TestReliableSenderStaleMessages ensures that the reliable sender is no longer +// active for a peer which has successfully sent all of its messages and deemed +// them as stale. +func TestReliableSenderStaleMessages(t *testing.T) { + t.Parallel() + + reliableSender := newTestReliableSender(t) + + // Create a mock peer to send the messages to. + pubKey := randPubKey(t) + msgsSent := make(chan lnwire.Message) + peer := &mockPeer{pubKey, msgsSent, reliableSender.quit} + + // Override NotifyWhenOnline to provide the notification channel so that + // we can control when notifications get dispatched. + notifyOnline := make(chan chan<- lnpeer.Peer, 1) + reliableSender.cfg.NotifyWhenOnline = func(_ *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + notifyOnline <- peerChan + } + + // We'll also override IsMsgStale to mark all messages as stale as we're + // interested in testing the stale message behavior. + reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + return true + } + + // We'll start by creating our first message which we should reliably + // send to our peer, but will be seen as stale. + msg1 := randAnnounceSignatures() + var peerPubKey [33]byte + copy(peerPubKey[:], pubKey.SerializeCompressed()) + if err := reliableSender.sendMessage(msg1, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // Since there isn't a peerHandler for this peer currently active due to + // this being the first message being sent reliably, we should expect to + // see a notification request for when the peer is online. + var peerChan chan<- lnpeer.Peer + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // We'll go ahead and notify the peer. + peerChan <- peer + + // This should cause the message to be sent to the peer since they are + // now seen as online. The message will be sent at least once to ensure + // they can propagate before deciding whether they are stale or not. + assertMsgsSent(t, peer.sentMsgs, msg1) + + // We'll create another message which we'll send reliably. This one + // won't be seen as stale. + msg2 := randChannelUpdate() + + // We'll then wait for the message to be removed from the backing + // message store since it is seen as stale and has been sent at least + // once. Once the message is removed, the peerHandler should be torn + // down as there are no longer any pending messages within the store. + var predErr error + waitPredicate(t, time.Second, func() bool { + msgs, err := reliableSender.cfg.MessageStore.MessagesForPeer( + peerPubKey, + ) + if err != nil { + predErr = fmt.Errorf("unable to retrieve messages for "+ + "peer: %v", err) + return false + } + if len(msgs) != 0 { + predErr = fmt.Errorf("expected to not find any "+ + "messages for peer, found %d", len(msgs)) + return false + } + + predErr = nil + return true + }) + if predErr != nil { + t.Fatal(predErr) + } + + // Override IsMsgStale to no longer mark messages as stale. + reliableSender.cfg.IsMsgStale = func(_ lnwire.Message) bool { + return false + } + + // We'll request the message to be sent reliably. + if err := reliableSender.sendMessage(msg2, peerPubKey); err != nil { + t.Fatalf("unable to reliably send message: %v", err) + } + + // We should see an online notification request indicating that a new + // peerHandler has been spawned since it was previously torn down. + select { + case peerChan = <-notifyOnline: + case <-time.After(time.Second): + t.Fatal("reliable sender did not request online notification") + } + + // Finally, notifying the peer is online should prompt the message to be + // sent. Only the ChannelUpdate will be sent in this case since the + // AnnounceSignatures message above was seen as stale. + peerChan <- peer + + assertMsgsSent(t, peer.sentMsgs, msg2) +}