diff --git a/discovery/gossiper.go b/discovery/gossiper.go index d11f5299..90c2cb0e 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -95,10 +95,6 @@ type Config struct { Broadcast func(skips map[routing.Vertex]struct{}, msg ...lnwire.Message) error - // SendToPeer is a function which allows the service to send a set of - // messages to a particular peer identified by the target public key. - SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error - // FindPeer returns the actively registered peer for a given remote // public key. An error is returned if the peer was not found or a // shutdown has been requested. @@ -109,8 +105,15 @@ type Config struct { // retry sending a peer message. // // NOTE: The peerChan channel must be buffered. + // + // TODO(wilmer): use [33]byte to avoid unnecessary serializations. NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer) + // NotifyWhenOffline is a function that allows the gossiper to be + // notified when a certain peer disconnects, allowing it to request a + // notification for when it reconnects. + NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} + // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 @@ -222,13 +225,17 @@ type AuthenticatedGossiper struct { syncerMtx sync.RWMutex peerSyncers map[routing.Vertex]*gossipSyncer + // reliableSender is a subsystem responsible for handling reliable + // message send requests to peers. + reliableSender *reliableSender + sync.Mutex } // New creates a new AuthenticatedGossiper instance, initialized with the // passed configuration parameters. func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { - return &AuthenticatedGossiper{ + gossiper := &AuthenticatedGossiper{ selfKey: selfKey, cfg: &cfg, networkMsgs: make(chan *networkMsg), @@ -240,6 +247,15 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { recentRejects: make(map[uint64]struct{}), peerSyncers: make(map[routing.Vertex]*gossipSyncer), } + + gossiper.reliableSender = newReliableSender(&reliableSenderCfg{ + NotifyWhenOnline: cfg.NotifyWhenOnline, + NotifyWhenOffline: cfg.NotifyWhenOffline, + MessageStore: cfg.MessageStore, + IsMsgStale: gossiper.isMsgStale, + }) + + return gossiper } // SynchronizeNode sends a message to the service indicating it should @@ -398,11 +414,10 @@ func (d *AuthenticatedGossiper) Start() error { } d.bestHeight = height - // In case we had an AnnounceSignatures ready to be sent when the - // gossiper was last shut down, we must continue on our quest to - // deliver this message to our peer such that they can craft the - // full channel proof. - if err := d.resendAnnounceSignatures(); err != nil { + // Start the reliable sender. In case we had any pending messages ready + // to be sent when the gossiper was last shut down, we must continue on + // our quest to deliver them to their respective peers. + if err := d.reliableSender.Start(); err != nil { return err } @@ -430,6 +445,10 @@ func (d *AuthenticatedGossiper) Stop() { close(d.quit) d.wg.Wait() + + // We'll stop our reliable sender after all of the gossiper's goroutines + // have exited to ensure nothing can cause it to continue executing. + d.reliableSender.Stop() } // TODO(roasbeef): need method to get current gossip timestamp? @@ -795,81 +814,6 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { return msgs } -// resendAnnounceSignatures will inspect the messageStore database bucket for -// AnnounceSignatures messages that we recently tried to send to a peer. If the -// associated channels still not have the full channel proofs assembled, we -// will try to resend them. If we have the full proof, we can safely delete the -// message from the messageStore. -func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { - peerMsgsToResend, err := d.cfg.MessageStore.Messages() - if err != nil { - return err - } - - // We now iterate over these messages, resending those that we don't - // have the full proof for, deleting the rest. - for peer, msgsToResend := range peerMsgsToResend { - pubKey, err := btcec.ParsePubKey(peer[:], btcec.S256()) - if err != nil { - return err - } - - for _, msg := range msgsToResend { - msg := msg.(*lnwire.AnnounceSignatures) - - // Check if the full channel proof exists in our graph. - chanInfo, _, _, err := d.cfg.Router.GetChannelByID( - msg.ShortChannelID) - if err != nil { - // If the channel cannot be found, it is most likely a - // leftover message for a channel that was closed. In - // this case we delete it from the message store. - log.Warnf("unable to fetch channel info for "+ - "chanID=%v from graph: %v. Will delete local"+ - "proof from database", - msg.ChannelID, err) - err = d.cfg.MessageStore.DeleteMessage(msg, peer) - if err != nil { - return err - } - continue - } - - // 1. If the full proof does not exist in the graph, it means - // that we haven't received the remote proof yet (or that we - // crashed before able to assemble the full proof). Since the - // remote node might think they have delivered their proof to - // us, we will resend _our_ proof to trigger a resend on their - // part: they will then be able to assemble and send us the - // full proof. - if chanInfo.AuthProof == nil { - err := d.sendAnnSigReliably(msg, pubKey) - if err != nil { - return err - } - continue - } - - // 2. If the proof does exist in the graph, we have - // successfully received the remote proof and assembled the - // full proof. In this case we can safely delete the local - // proof from the database. In case the remote hasn't been able - // to assemble the full proof yet (maybe because of a crash), - // we will send them the full proof if we notice that they - // retry sending their half proof. - if chanInfo.AuthProof != nil { - log.Debugf("Deleting message for chanID=%v from "+ - "messageStore", msg.ChannelID) - err := d.cfg.MessageStore.DeleteMessage(msg, peer) - if err != nil { - return err - } - } - } - } - return nil -} - // findGossipSyncer is a utility method used by the gossiper to locate the // gossip syncer for an inbound message so we can properly dispatch the // incoming message. If a gossip syncer isn't found, then one will be created @@ -2113,21 +2057,21 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // so they can also reconstruct the full channel // announcement. if !nMsg.isRemote { - var remotePeer *btcec.PublicKey + var remotePubKey [33]byte if isFirstNode { - remotePeer, _ = chanInfo.NodeKey2() + remotePubKey = chanInfo.NodeKey2Bytes } else { - remotePeer, _ = chanInfo.NodeKey1() + remotePubKey = chanInfo.NodeKey1Bytes } // Since the remote peer might not be online // we'll call a method that will attempt to // deliver the proof when it comes online. - err := d.sendAnnSigReliably(msg, remotePeer) + err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { - err := fmt.Errorf("unable to send reliably "+ - "to remote for short_chan_id=%v: %v", - shortChanID, err) - log.Error(err) + err := fmt.Errorf("unable to reliably send %v "+ + "for channel=%v to peer=%x: %v", + msg.MsgType(), msg.ShortChannelID, + remotePubKey, err) nMsg.err <- err return nil } @@ -2359,70 +2303,49 @@ func (d *AuthenticatedGossiper) fetchNodeAnn( return node.NodeAnnouncement(true) } -// sendAnnSigReliably will try to send the provided local AnnounceSignatures -// to the remote peer, waiting for it to come online if necessary. This -// method returns after adding the message to persistent storage, such -// that the caller knows that the message will be delivered at one point. -func (d *AuthenticatedGossiper) sendAnnSigReliably( - msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { +// isMsgStale determines whether a message retrieved from the backing +// MessageStore is seen as stale by the current graph. +func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { + switch msg := msg.(type) { + case *lnwire.AnnounceSignatures: + chanInfo, _, _, err := d.cfg.Router.GetChannelByID( + msg.ShortChannelID, + ) - // We first add this message to the database, such that in case - // we do not succeed in sending it to the peer, we'll fetch it - // from the DB next time we start, and retry. - var remotePubKey [33]byte - copy(remotePubKey[:], remotePeer.SerializeCompressed()) - if err := d.cfg.MessageStore.AddMessage(msg, remotePubKey); err != nil { - return err - } - - // We have succeeded adding the message to the database. We now launch - // a goroutine that will keep on trying sending the message to the - // remote peer until it succeeds, or the gossiper shuts down. In case - // of success, the message will be removed from the database. - d.wg.Add(1) - go func() { - defer d.wg.Done() - for { - log.Debugf("Sending AnnounceSignatures for channel "+ - "%v to remote peer %x", msg.ChannelID, - remotePeer.SerializeCompressed()) - err := d.cfg.SendToPeer(remotePeer, msg) - if err == nil { - // Sending succeeded, we can - // continue the flow. - break - } - - log.Errorf("unable to send AnnounceSignatures message "+ - "to peer(%x): %v. Will retry when online.", - remotePeer.SerializeCompressed(), err) - - peerChan := make(chan lnpeer.Peer, 1) - d.cfg.NotifyWhenOnline(remotePeer, peerChan) - - select { - case <-peerChan: - // Retry sending. - log.Infof("Peer %x reconnected. Retry sending"+ - " AnnounceSignatures.", - remotePeer.SerializeCompressed()) - - case <-d.quit: - log.Infof("Gossiper shutting down, did not " + - "send AnnounceSignatures.") - return - } + // If the channel cannot be found, it is most likely a leftover + // message for a channel that was closed, so we can consider it + // stale. + if err == channeldb.ErrEdgeNotFound { + return true + } + if err != nil { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", err) + return false } - log.Infof("Sent channel announcement proof to remote peer: %x", - remotePeer.SerializeCompressed()) - }() + // If the proof exists in the graph, then we have successfully + // received the remote proof and assembled the full proof, so we + // can safely delete the local proof from the database. + return chanInfo.AuthProof != nil - // This method returns after the message has been added to the database, - // such that the caller don't have to wait until the message is actually - // delivered, but can be assured that it will be delivered eventually - // when this method returns. - return nil + case *lnwire.ChannelUpdate: + // The MessageStore will always store the latest ChannelUpdate + // as it is not aware of its timestamp (by design), so it will + // never be stale. We should still however check if the channel + // is part of our graph. If it's not, we can mark it as stale. + _, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + if err != nil && err != channeldb.ErrEdgeNotFound { + log.Debugf("Unable to retrieve channel=%v from graph: "+ + "%v", err) + } + return err == channeldb.ErrEdgeNotFound + + default: + // We'll make sure to not mark any unsupported messages as stale + // to ensure they are not removed. + return false + } } // updateChannel creates a new fully signed update for the channel, and updates diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 1decc1e3..043007bb 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -644,12 +644,17 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil }, - SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error { - return nil - }, FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, nil, nil}, nil }, + NotifyWhenOnline: func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + peerChan <- &mockPeer{target, nil, nil} + }, + NotifyWhenOffline: func(_ [33]byte) <-chan struct{} { + c := make(chan struct{}) + return c + }, Router: router, TrickleDelay: trickleDelay, RetransmitDelay: retransmitDelay, @@ -880,21 +885,19 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { select { - case sentMsgs <- msg[0]: + case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") } - return nil } batch, err := createAnnouncements(0) @@ -1084,19 +1087,19 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly from the gossiper. + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + select { - case sentMsgs <- msg[0]: + case peerChan <- &mockPeer{target, sentMsgs, ctx.gossiper.quit}: case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") } - return nil } batch, err := createAnnouncements(0) @@ -1251,9 +1254,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // The local proof should be sent to the remote peer. select { case msg := <-sentMsgs: - if msg != batch.localProofAnn { - t.Fatalf("expected local proof to be sent, got %v", msg) - } + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatalf("local proof was not sent to peer") } @@ -1283,234 +1284,10 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } } -// Test that sending AnnounceSignatures to remote peer will continue -// to be tried until the peer comes online. -func TestSignatureAnnouncementRetry(t *testing.T) { - t.Parallel() - - ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) - if err != nil { - t.Fatalf("can't create context: %v", err) - } - defer cleanup() - - batch, err := createAnnouncements(0) - if err != nil { - t.Fatalf("can't generate announcements: %v", err) - } - - localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256()) - if err != nil { - t.Fatalf("unable to parse pubkey: %v", err) - } - remotePeer := &mockPeer{remoteKey, nil, nil} - - // Recreate lightning network topology. Initialize router with channel - // between two nodes. - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.localChanAnn, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process channel ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.chanUpdAnn1, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process channel update: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.nodeAnn1, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process node ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.chanUpdAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - if err != nil { - t.Fatalf("unable to process channel update: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("channel update announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.nodeAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - if err != nil { - t.Fatalf("unable to process node ann: %v", err) - } - select { - case <-ctx.broadcastedMessage: - t.Fatal("node announcement was broadcast") - case <-time.After(2 * trickleDelay): - } - - // Make the SendToPeer fail, simulating the peer being offline. - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - } - - // We expect the gossiper to register for a notification when the peer - // comes back online, so keep track of the channel it wants to get - // notified on. - notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - } - - // Pretending that we receive local channel announcement from funding - // manager, thereby kick off the announcement exchange process. - select { - case err = <-ctx.gossiper.ProcessLocalAnnouncement( - batch.localProofAnn, localKey, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process local proof: %v", err) - } - - // Since sending this local announcement proof to the remote will fail, - // the gossiper should register for a notification when the remote is - // online again. - var conChan chan<- lnpeer.Peer - select { - case conChan = <-notifyPeers: - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not ask to get notified when " + - "peer is online") - } - - // Since both proofs are not yet exchanged, no message should be - // broadcasted yet. - select { - case <-ctx.broadcastedMessage: - t.Fatal("announcements were broadcast") - case <-time.After(2 * trickleDelay): - } - - number := 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - ); err != nil { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 1 { - t.Fatal("wrong number of objects in storage") - } - - // When the peer comes online, the gossiper gets notified, and should - // retry sending the AnnounceSignatures. We make the SendToPeer - // method work again. - sentToPeer := make(chan lnwire.Message, 1) - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - sentToPeer <- msg[0] - return nil - } - - // Notify that peer is now online. This should trigger a new call - // to SendToPeer. - close(conChan) - - select { - case <-sentToPeer: - case <-time.After(2 * time.Second): - t.Fatalf("gossiper did not send message when peer came online") - } - - // Now give the gossiper the remote proof. This should trigger a - // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). - select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement( - batch.remoteProofAnn, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process local announcement") - } - if err != nil { - t.Fatalf("unable to process remote proof: %v", err) - } - - for i := 0; i < 5; i++ { - select { - case <-ctx.broadcastedMessage: - case <-time.After(time.Second): - t.Fatal("announcement wasn't broadcast") - } - } - - number = 0 - if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( - func(*channeldb.WaitingProof) error { - number++ - return nil - }, - ); err != nil && err != channeldb.ErrWaitingProofNotFound { - t.Fatalf("unable to retrieve objects from store: %v", err) - } - - if number != 0 { - t.Fatal("waiting proof should be removed from storage") - } -} - -// Test that if we restart the gossiper, it will retry sending the -// AnnounceSignatures to the peer if it did not succeed before -// shutting down, and the full channel proof is not yet assembled. +// TestSignatureAnnouncementRetryAtStartup tests that if we restart the +// gossiper, it will retry sending the AnnounceSignatures to the peer if it did +// not succeed before shutting down, and the full channel proof is not yet +// assembled. func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Parallel() @@ -1533,7 +1310,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel to intercept the messages sent to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1617,13 +1397,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { case <-time.After(2 * trickleDelay): } - // Make the SendToPeerFail, simulating the peer being offline. - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - } + // Since the reliable send to the remote peer of the local channel proof + // requires a notification when the peer comes online, we'll capture the + // channel through which it gets sent to control exactly when to + // dispatch it. notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan } @@ -1640,11 +1419,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { t.Fatalf("unable to process :%v", err) } - // Since sending to the remote peer will fail, the gossiper should - // register for a notification when it comes back online. - var conChan chan<- lnpeer.Peer + // The gossiper should register for a notification for when the peer is + // online. select { - case conChan = <-notifyPeers: + case <-notifyPeers: case <-time.After(2 * time.Second): t.Fatalf("gossiper did not ask to get notified when " + "peer is online") @@ -1674,16 +1452,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // to send the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ - Notifier: ctx.gossiper.cfg.Notifier, - Broadcast: ctx.gossiper.cfg.Broadcast, - SendToPeer: func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - return fmt.Errorf("intentional error in SendToPeer") - }, - NotifyWhenOnline: func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - }, + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, + NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, Router: ctx.gossiper.cfg.Router, TrickleDelay: trickleDelay, RetransmitDelay: retransmitDelay, @@ -1700,36 +1472,26 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { defer gossiper.Stop() ctx.gossiper = gossiper + remotePeer.quit = ctx.gossiper.quit - // After starting up, the gossiper will see that it has a waitingproof - // in the database, and will retry sending its part to the remote. Since - // SendToPeer will fail again, it should register for a notification - // when the peer comes online. + // After starting up, the gossiper will see that it has a proof in the + // WaitingProofStore, and will retry sending its part to the remote. + // It should register for a notification for when the peer is online. + var peerChan chan<- lnpeer.Peer select { - case conChan = <-notifyPeers: + case peerChan = <-notifyPeers: case <-time.After(2 * time.Second): t.Fatalf("gossiper did not ask to get notified when " + "peer is online") } - // Fix the SendToPeer method. - sentToPeer := make(chan lnwire.Message, 1) - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - select { - case sentToPeer <- msg[0]: - case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") - } - - return nil - } - // Notify that peer is now online. This should trigger a new call - // to SendToPeer. - close(conChan) + // Notify that peer is now online. This should allow the proof to be + // sent. + peerChan <- remotePeer select { - case <-sentToPeer: + case msg := <-sentToPeer: + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatalf("gossiper did not send message when peer came online") } @@ -1770,10 +1532,9 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } } -// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a -// remote proof is received when we already have the full proof, -// the gossiper will send the full proof (ChannelAnnouncement) to -// the remote peer. +// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a remote +// proof is received when we already have the full proof, the gossiper will send +// the full proof (ChannelAnnouncement) to the remote peer. func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { t.Parallel() @@ -1796,7 +1557,19 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit} + + // Override NotifyWhenOnline to return the remote peer which we expect + // meesages to be sent to. + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1880,27 +1653,6 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { case <-time.After(2 * trickleDelay): } - // Set up a channel we can use to inspect messages sent by the - // gossiper to the remote peer. - sentToPeer := make(chan lnwire.Message, 1) - remotePeer.sentMsgs = sentToPeer - remotePeer.quit = ctx.gossiper.quit - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, - msg ...lnwire.Message) error { - select { - case <-ctx.gossiper.quit: - return fmt.Errorf("gossiper shutting down") - case sentToPeer <- msg[0]: - } - return nil - } - - notifyPeers := make(chan chan<- lnpeer.Peer, 1) - ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, - connectedChan chan<- lnpeer.Peer) { - notifyPeers <- connectedChan - } - // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. select { @@ -1928,9 +1680,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // We expect the gossiper to send this message to the remote peer. select { case msg := <-sentToPeer: - if msg != batch.localProofAnn { - t.Fatalf("wrong message sent to peer: %v", msg) - } + assertMessage(t, batch.localProofAnn, msg) case <-time.After(2 * time.Second): t.Fatal("did not send local proof to peer") } @@ -2356,10 +2106,9 @@ func TestForwardPrivateNodeAnnouncement(t *testing.T) { } } -// TestReceiveRemoteChannelUpdateFirst tests that if we receive a -// ChannelUpdate from the remote before we have processed our -// own ChannelAnnouncement, it will be reprocessed later, after -// our ChannelAnnouncement. +// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate +// from the remote before we have processed our own ChannelAnnouncement, it will +// be reprocessed later, after our ChannelAnnouncement. func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Parallel() @@ -2369,21 +2118,6 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } defer cleanup() - // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. - sentMsgs := make(chan lnwire.Message, 10) - ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { - return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil - } - ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { - select { - case sentMsgs <- msg[0]: - case <-ctx.gossiper.quit: - return fmt.Errorf("shutting down") - } - return nil - } - batch, err := createAnnouncements(0) if err != nil { t.Fatalf("can't generate announcements: %v", err) @@ -2397,7 +2131,22 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } - remotePeer := &mockPeer{remoteKey, nil, nil} + + // Set up a channel that we can use to inspect the messages sent + // directly from the gossiper. + sentMsgs := make(chan lnwire.Message, 10) + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} + + // Override NotifyWhenOnline and FindPeer to return the remote peer + // which we expect meesages to be sent to. + ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { + return remotePeer, nil + } + ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + peerChan chan<- lnpeer.Peer) { + + peerChan <- remotePeer + } // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and @@ -2896,3 +2645,12 @@ func TestOptionalFieldsChannelUpdateValidation(t *testing.T) { t.Fatalf("unable to process announcement: %v", err) } } + +func assertMessage(t *testing.T, expected, got lnwire.Message) { + t.Helper() + + if !reflect.DeepEqual(expected, got) { + t.Fatalf("expected: %v\ngot: %v", spew.Sdump(expected), + spew.Sdump(got)) + } +} diff --git a/server.go b/server.go index 89b8d39c..32cf6fe1 100644 --- a/server.go +++ b/server.go @@ -598,11 +598,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ChainHash: *activeNetParams.GenesisHash, Broadcast: s.BroadcastMessage, ChanSeries: chanSeries, - SendToPeer: s.SendToPeer, FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { return s.FindPeer(pub) }, NotifyWhenOnline: s.NotifyWhenOnline, + NotifyWhenOffline: s.NotifyWhenOffline, ProofMatureDelta: 0, TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), RetransmitDelay: time.Minute * 30,