diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 426147c0..ecff2b8c 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -155,6 +155,11 @@ func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) { func (r *mockGraphSource) AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error { + info, ok := r.infos[chanID.ToUint64()] + if !ok { + return errors.New("channel does not exist") + } + info.AuthProof = proof return nil } @@ -341,7 +346,8 @@ func createNodeAnnouncement(priv *btcec.PrivateKey) (*lnwire.NodeAnnouncement, } signer := mockSigner{priv} - if a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a); err != nil { + a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a) + if err != nil { return nil, err } @@ -498,7 +504,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create node announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -521,7 +531,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create channel announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -544,7 +558,11 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create update announcement: %v", err) } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID): + case <-time.After(2 * time.Second): + t.Fatal("remote announcement not processed") + } if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -641,8 +659,8 @@ func TestPrematureAnnouncement(t *testing.T) { } } -// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper properly -// processes partial and fully announcement signatures message. +// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper +// properly processes partial and fully announcement signatures message. func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Parallel() @@ -674,7 +692,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -684,10 +707,16 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -706,7 +735,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Fatal("gossiper did not send channel update to peer") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -718,7 +752,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -743,7 +782,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Fatal("wrong number of objects in storage") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process :%v", err) } @@ -783,7 +827,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { defer cleanup() // Set up a channel that we can use to inspect the messages - // sent directly fromn the gossiper. + // sent directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { select { @@ -805,8 +849,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process, in // this case the announcement should be added in the orphan batch - // because we haven't announced the channel yet. - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + // because we haven't announce the channel yet. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to proceed announcement: %v", err) } @@ -827,7 +876,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { t.Fatalf("unable to process: %v", err) } @@ -838,10 +893,16 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } + select { case <-ctx.broadcastedMessage: t.Fatal("channel update announcement was broadcast") @@ -860,7 +921,12 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { t.Fatal("gossiper did not send channel update to peer") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey) + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } @@ -872,11 +938,209 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // After that we process local announcement, and waiting to receive // the channel announcement. - err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } if err != nil { t.Fatalf("unable to process: %v", err) } + // The local proof should be sent to the remote peer. + select { + case msg := <-sentMsgs: + if msg != batch.localProofAnn { + t.Fatalf("expected local proof to be sent, got %v", msg) + } + case <-time.After(2 * time.Second): + t.Fatalf("local proof was not sent to peer") + } + + // And since both remote and local announcements are processed, we + // should be broadcasting the final channel announcements. + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(p *channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatalf("wrong number of objects in storage: %v", number) + } +} + +// Test that sending AnnounceSignatures to remote peer will continue +// to be tried until the peer comes online. +func TestSignatureAnnouncementRetry(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Make the SendToPeer fail, simulating the peer being offline. + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + + // We expect the gossiper to register for a notification when the peer + // comes back online, so keep track of the channel it wants to get + // notified on. + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // Since sending this local announcement proof to the remote will fail, + // the gossiper should register for a notification when the remote is + // online again. + var conChan chan<- struct{} + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + // Since both proofs are not yet exchanged, no message should be + // broadcasted yet. + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 1 { + t.Fatal("wrong number of objects in storage") + } + + // When the peer comes online, the gossiper gets notified, and should + // retry sending the AnnnounceSignatures. We make the SendToPeer + // method work again. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + sentToPeer <- msg[0] + return nil + } + + // Notify that peer is now online. THis should trigger a new call + // to SendToPeer. + close(conChan) + + select { + case <-sentToPeer: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer came online") + } + + // Now give the gossiper the remote proof. This should trigger a + // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + for i := 0; i < 3; i++ { select { case <-ctx.broadcastedMessage: @@ -891,13 +1155,397 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { number++ return nil }, - ); err != nil { + ); err != nil && err != channeldb.ErrWaitingProofNotFound { t.Fatalf("unable to retrieve objects from store: %v", err) } if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } +} + +// Test that if we restart the gossiper, it will retry sending the +// AnnounceSignatures to the peer if it did not succeed before +// shutting down, and the full channel proof is not yet assembled. +func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + // Make the SendToPeerFail, simulating the peer being offline. + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // Since sending to the remote peer will fail, the gossiper should + // register for a notification when it comes back online. + var conChan chan<- struct{} + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcements were broadcast") + case <-time.After(2 * trickleDelay): + } + + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 1 { t.Fatal("wrong number of objects in storage") } + + // Shut down gossiper, and restart. This should trigger a new attempt + // to send the message to the peer. + ctx.gossiper.Stop() + gossiper, err := New(Config{ + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + SendToPeer: func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + }, + NotifyWhenOnline: func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + }, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + DB: ctx.gossiper.cfg.DB, + }, ctx.gossiper.selfKey) + if err != nil { + t.Fatalf("unable to recreate gossiper: %v", err) + } + if err := gossiper.Start(); err != nil { + t.Fatalf("unable to start recreated gossiper: %v", err) + } + defer gossiper.Stop() + + ctx.gossiper = gossiper + + // After starting up, the gossiper will see that it has a waitingproof + // in the database, and will retry sending its part to the remote. Since + // SendToPeer will fail again, it should register for a notification + // when the peer comes online. + select { + case conChan = <-notifyPeers: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not ask to get notified when " + + "peer is online") + } + + // Fix the SendToPeer method. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + select { + case sentToPeer <- msg[0]: + case <-ctx.gossiper.quit: + return fmt.Errorf("shutting down") + } + + return nil + } + // Notify that peer is now online. This should trigger a new call + // to SendToPeer. + close(conChan) + + select { + case <-sentToPeer: + case <-time.After(2 * time.Second): + t.Fatalf("gossiper did not send message when peer came online") + } + + // Now exchanging the remote channel proof, the channel annoncement + // broadcast should continue as normal. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } +} + +// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a +// remote proof is received when we already have the full proof, +// the gossiper will send the full proof (ChannelAnnouncement) to +// the remote peer. +func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta)) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + batch, err := createAnnouncements(0) + if err != nil { + t.Fatalf("can't generate announcements: %v", err) + } + + localKey := batch.nodeAnn1.NodeID + remoteKey := batch.nodeAnn2.NodeID + + // Recreate lightning network topology. Initialize router with channel + // between two nodes. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process remote announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + select { + case <-ctx.broadcastedMessage: + t.Fatal("channel update announcement was broadcast") + case <-time.After(2 * trickleDelay): + } + // Set up a channel we can use to inspect messages sent by the + // gossiper to the remote peer. + sentToPeer := make(chan lnwire.Message, 1) + ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, + msg ...lnwire.Message) error { + select { + case <-ctx.gossiper.quit: + return fmt.Errorf("gossiper shutting down") + case sentToPeer <- msg[0]: + } + return nil + } + + notifyPeers := make(chan chan<- struct{}, 1) + ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + notifyPeers <- connectedChan + } + + // Pretending that we receive local channel announcement from funding + // manager, thereby kick off the announcement exchange process. + select { + case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, + localKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // We expect the gossiper to send this message to the remote peer. + select { + case msg := <-sentToPeer: + if msg != batch.localProofAnn { + t.Fatalf("wrong message sent to peer: %v", msg) + } + case <-time.After(2 * time.Second): + t.Fatal("did not send local proof to peer") + } + + // And all channel announcements should be broadcast. + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcement wasn't broadcast") + } + } + + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } + + // Now give the gossiper the remote proof yet again. This should + // trigger a send of the full ChannelAnnouncement. + select { + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, + remoteKey): + case <-time.After(2 * time.Second): + t.Fatal("did not process local announcement") + } + if err != nil { + t.Fatalf("unable to process :%v", err) + } + + // We expect the gossiper to send this message to the remote peer. + select { + case msg := <-sentToPeer: + _, ok := msg.(*lnwire.ChannelAnnouncement) + if !ok { + t.Fatalf("expected ChannelAnnouncement, intead got %T", msg) + } + case <-time.After(2 * time.Second): + t.Fatal("did not send local proof to peer") + } + } // TestDeDuplicatedAnnouncements ensures that the deDupedAnnouncements struct