diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 004e6a97..2360012f 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -22,6 +22,7 @@ import ( "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/btcec" @@ -535,6 +536,9 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error { return nil }, + FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { + return &mockPeer{target, nil, nil}, nil + }, Router: router, TrickleDelay: trickleDelay, RetransmitDelay: retransmitDelay, @@ -583,7 +587,7 @@ func TestProcessAnnouncement(t *testing.T) { } } - // Create node valid, signed announcement, process it with + // Create node valid, signed announcement, process it with // gossiper service, check that valid announcement have been // propagated farther into the lightning network, and check that we // added new node into router. @@ -592,10 +596,10 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create node announcement: %v", err) } - nodePub := nodeKeyPriv1.PubKey() + nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil} select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, nodePub): + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, nodePeer): case <-time.After(2 * time.Second): t.Fatal("remote announcement not processed") } @@ -605,7 +609,7 @@ func TestProcessAnnouncement(t *testing.T) { select { case msg := <-ctx.broadcastedMessage: - assertSenderExistence(nodePub, msg) + assertSenderExistence(nodePeer.IdentityKey(), msg) case <-time.After(2 * trickleDelay): t.Fatal("announcement wasn't proceeded") } @@ -623,7 +627,7 @@ func TestProcessAnnouncement(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePub): + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): case <-time.After(2 * time.Second): t.Fatal("remote announcement not processed") } @@ -633,7 +637,7 @@ func TestProcessAnnouncement(t *testing.T) { select { case msg := <-ctx.broadcastedMessage: - assertSenderExistence(nodePub, msg) + assertSenderExistence(nodePeer.IdentityKey(), msg) case <-time.After(2 * trickleDelay): t.Fatal("announcement wasn't proceeded") } @@ -651,7 +655,7 @@ func TestProcessAnnouncement(t *testing.T) { } select { - case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePub): + case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): case <-time.After(2 * time.Second): t.Fatal("remote announcement not processed") } @@ -661,7 +665,7 @@ func TestProcessAnnouncement(t *testing.T) { select { case msg := <-ctx.broadcastedMessage: - assertSenderExistence(nodePub, msg) + assertSenderExistence(nodePeer.IdentityKey(), msg) case <-time.After(2 * trickleDelay): t.Fatal("announcement wasn't proceeded") } @@ -690,7 +694,7 @@ func TestPrematureAnnouncement(t *testing.T) { t.Fatalf("can't create node announcement: %v", err) } - nodePub := nodeKeyPriv1.PubKey() + nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil} // Pretending that we receive the valid channel announcement from // remote side, but block height of this announcement is greater than @@ -702,7 +706,7 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePub): + case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer): t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -721,7 +725,7 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePub): + case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer): t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -770,6 +774,9 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages // sent directly fromn the gossiper. sentMsgs := make(chan lnwire.Message, 10) + ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { + return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil + } ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { select { case sentMsgs <- msg[0]: @@ -792,6 +799,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -840,7 +848,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -887,7 +895,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -932,6 +940,9 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // Set up a channel that we can use to inspect the messages // sent directly from the gossiper. sentMsgs := make(chan lnwire.Message, 10) + ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { + return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil + } ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { select { case sentMsgs <- msg[0]: @@ -954,6 +965,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } + remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit} // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process, in @@ -961,7 +973,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // because we haven't announce the channel yet. select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -1032,7 +1044,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -1116,6 +1128,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } + remotePeer := &mockPeer{remoteKey, nil, nil} // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1151,7 +1164,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -1248,7 +1261,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { // broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate). select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } @@ -1304,6 +1317,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } + remotePeer := &mockPeer{remoteKey, nil, nil} // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1339,7 +1353,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -1472,7 +1486,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // broadcast should continue as normal. select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -1529,6 +1543,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } + remotePeer := &mockPeer{remoteKey, nil, nil} // Recreate lightning network topology. Initialize router with channel // between two nodes. @@ -1564,7 +1579,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process remote announcement") } @@ -1579,6 +1594,8 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // Set up a channel we can use to inspect messages sent by the // gossiper to the remote peer. sentToPeer := make(chan lnwire.Message, 1) + remotePeer.sentMsgs = sentToPeer + remotePeer.quit = ctx.gossiper.quit ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { select { @@ -1609,7 +1626,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } @@ -1654,7 +1671,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { // trigger a send of the full ChannelAnnouncement. select { case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, - remoteKey): + remotePeer): case <-time.After(2 * time.Second): t.Fatal("did not process local announcement") } @@ -1702,7 +1719,13 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create remote channel announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: ca, peer: bitcoinKeyPub2}) + + nodePeer := &mockPeer{bitcoinKeyPub2, nil, nil} + announcements.AddMsgs(networkMsg{ + msg: ca, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.channelAnnouncements) != 1 { t.Fatal("new channel announcement not stored in batch") } @@ -1715,7 +1738,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create remote channel announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: ca2, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: ca2, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.channelAnnouncements) != 1 { t.Fatal("channel announcement not replaced in batch") } @@ -1727,7 +1754,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create update announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: ua, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: ua, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.channelUpdates) != 1 { t.Fatal("new channel update not stored in batch") } @@ -1738,7 +1769,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create update announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: ua2, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: ua2, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.channelUpdates) != 1 { t.Fatal("channel update not replaced in batch") } @@ -1749,7 +1784,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create update announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: ua3, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: ua3, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.channelUpdates) != 1 { t.Fatal("channel update not replaced in batch") } @@ -1779,7 +1818,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create update announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: ua4, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: ua4, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.channelUpdates) != 1 { t.Fatal("channel update not in batch") } @@ -1791,7 +1834,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create node announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: na, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: na, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.nodeAnnouncements) != 1 { t.Fatal("new node announcement not stored in batch") } @@ -1801,7 +1848,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create node announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: na2, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: na2, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.nodeAnnouncements) != 2 { t.Fatal("second node announcement not stored in batch") } @@ -1812,7 +1863,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create node announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: na3, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: na3, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.nodeAnnouncements) != 2 { t.Fatal("second node announcement not replaced in batch") } @@ -1824,7 +1879,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create node announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: na4, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: na4, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.nodeAnnouncements) != 2 { t.Fatal("second node announcement not replaced again in batch") } @@ -1835,7 +1894,11 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { if err != nil { t.Fatalf("can't create node announcement: %v", err) } - announcements.AddMsgs(networkMsg{msg: na5, peer: bitcoinKeyPub2}) + announcements.AddMsgs(networkMsg{ + msg: na5, + peer: nodePeer, + source: nodePeer.IdentityKey(), + }) if len(announcements.nodeAnnouncements) != 2 { t.Fatal("node announcement not replaced in batch") } @@ -1910,6 +1973,9 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { // Set up a channel that we can use to inspect the messages // sent directly fromn the gossiper. sentMsgs := make(chan lnwire.Message, 10) + ctx.gossiper.cfg.FindPeer = func(target *btcec.PublicKey) (lnpeer.Peer, error) { + return &mockPeer{target, sentMsgs, ctx.gossiper.quit}, nil + } ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error { select { case sentMsgs <- msg[0]: @@ -1932,11 +1998,12 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { if err != nil { t.Fatalf("unable to parse pubkey: %v", err) } + remotePeer := &mockPeer{remoteKey, nil, nil} // Recreate the case where the remote node is sending us its ChannelUpdate // before we have been able to process our own ChannelAnnouncement and // ChannelUpdate. - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remotePeer) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -2042,7 +2109,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Fatal("wrong number of objects in storage") } - err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remotePeer) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -2069,3 +2136,33 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Fatal("waiting proof should be removed from storage") } } + +// mockPeer implements the lnpeer.Peer interface and is used to test the +// gossiper's interaction with peers. +type mockPeer struct { + pk *btcec.PublicKey + sentMsgs chan lnwire.Message + quit chan struct{} +} + +func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { + if p.sentMsgs == nil && p.quit == nil { + return nil + } + + for _, msg := range msgs { + select { + case p.sentMsgs <- msg: + case <-p.quit: + } + } + + return nil +} +func (p *mockPeer) WipeChannel(_ *wire.OutPoint) error { return nil } +func (p *mockPeer) IdentityKey() *btcec.PublicKey { return p.pk } +func (p *mockPeer) PubKey() [33]byte { + var pubkey [33]byte + copy(pubkey[:], p.pk.SerializeCompressed()) + return pubkey +}