discovery test: update gossiper test for new database structure
This commit is contained in:
parent
e3ae204fcb
commit
c3aa23f061
@ -155,6 +155,11 @@ func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) {
|
||||
|
||||
func (r *mockGraphSource) AddProof(chanID lnwire.ShortChannelID,
|
||||
proof *channeldb.ChannelAuthProof) error {
|
||||
info, ok := r.infos[chanID.ToUint64()]
|
||||
if !ok {
|
||||
return errors.New("channel does not exist")
|
||||
}
|
||||
info.AuthProof = proof
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -341,7 +346,8 @@ func createNodeAnnouncement(priv *btcec.PrivateKey) (*lnwire.NodeAnnouncement,
|
||||
}
|
||||
|
||||
signer := mockSigner{priv}
|
||||
if a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a); err != nil {
|
||||
a.Signature, err = SignAnnouncement(&signer, priv.PubKey(), a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -498,7 +504,11 @@ func TestProcessAnnouncement(t *testing.T) {
|
||||
t.Fatalf("can't create node announcement: %v", err)
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("remote announcement not processed")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("can't process remote announcement: %v", err)
|
||||
}
|
||||
@ -521,7 +531,11 @@ func TestProcessAnnouncement(t *testing.T) {
|
||||
t.Fatalf("can't create channel announcement: %v", err)
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("remote announcement not processed")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("can't process remote announcement: %v", err)
|
||||
}
|
||||
@ -544,7 +558,11 @@ func TestProcessAnnouncement(t *testing.T) {
|
||||
t.Fatalf("can't create update announcement: %v", err)
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("remote announcement not processed")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("can't process remote announcement: %v", err)
|
||||
}
|
||||
@ -641,8 +659,8 @@ func TestPrematureAnnouncement(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper properly
|
||||
// processes partial and fully announcement signatures message.
|
||||
// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper
|
||||
// properly processes partial and fully announcement signatures message.
|
||||
func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@ -674,7 +692,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
||||
|
||||
// Recreate lightning network topology. Initialize router with channel
|
||||
// between two nodes.
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
@ -684,10 +707,16 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
@ -706,7 +735,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
||||
t.Fatal("gossiper did not send channel update to peer")
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
@ -718,7 +752,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
||||
|
||||
// Pretending that we receive local channel announcement from funding
|
||||
// manager, thereby kick off the announcement exchange process.
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
@ -743,7 +782,12 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
|
||||
t.Fatal("wrong number of objects in storage")
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
@ -783,7 +827,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
||||
defer cleanup()
|
||||
|
||||
// Set up a channel that we can use to inspect the messages
|
||||
// sent directly fromn the gossiper.
|
||||
// sent directly from the gossiper.
|
||||
sentMsgs := make(chan lnwire.Message, 10)
|
||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey, msg ...lnwire.Message) error {
|
||||
select {
|
||||
@ -805,8 +849,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
||||
// Pretending that we receive local channel announcement from funding
|
||||
// manager, thereby kick off the announcement exchange process, in
|
||||
// this case the announcement should be added in the orphan batch
|
||||
// because we haven't announced the channel yet.
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
|
||||
// because we haven't announce the channel yet.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to proceed announcement: %v", err)
|
||||
}
|
||||
@ -827,7 +876,13 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
||||
|
||||
// Recreate lightning network topology. Initialize router with channel
|
||||
// between two nodes.
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process: %v", err)
|
||||
}
|
||||
@ -838,10 +893,16 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
@ -860,7 +921,12 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
||||
t.Fatal("gossiper did not send channel update to peer")
|
||||
}
|
||||
|
||||
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2, remoteKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process: %v", err)
|
||||
}
|
||||
@ -872,11 +938,209 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
||||
|
||||
// After that we process local announcement, and waiting to receive
|
||||
// the channel announcement.
|
||||
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey)
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process: %v", err)
|
||||
}
|
||||
|
||||
// The local proof should be sent to the remote peer.
|
||||
select {
|
||||
case msg := <-sentMsgs:
|
||||
if msg != batch.localProofAnn {
|
||||
t.Fatalf("expected local proof to be sent, got %v", msg)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("local proof was not sent to peer")
|
||||
}
|
||||
|
||||
// And since both remote and local announcements are processed, we
|
||||
// should be broadcasting the final channel announcements.
|
||||
for i := 0; i < 3; i++ {
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("announcement wasn't broadcast")
|
||||
}
|
||||
}
|
||||
|
||||
number = 0
|
||||
if err := ctx.gossiper.waitingProofs.ForAll(
|
||||
func(p *channeldb.WaitingProof) error {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 0 {
|
||||
t.Fatalf("wrong number of objects in storage: %v", number)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that sending AnnounceSignatures to remote peer will continue
|
||||
// to be tried until the peer comes online.
|
||||
func TestSignatureAnnouncementRetry(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
|
||||
if err != nil {
|
||||
t.Fatalf("can't create context: %v", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
batch, err := createAnnouncements(0)
|
||||
if err != nil {
|
||||
t.Fatalf("can't generate announcements: %v", err)
|
||||
}
|
||||
|
||||
localKey := batch.nodeAnn1.NodeID
|
||||
remoteKey := batch.nodeAnn2.NodeID
|
||||
|
||||
// Recreate lightning network topology. Initialize router with channel
|
||||
// between two nodes.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
// Make the SendToPeer fail, simulating the peer being offline.
|
||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
|
||||
msg ...lnwire.Message) error {
|
||||
return fmt.Errorf("intentional error in SendToPeer")
|
||||
}
|
||||
|
||||
// We expect the gossiper to register for a notification when the peer
|
||||
// comes back online, so keep track of the channel it wants to get
|
||||
// notified on.
|
||||
notifyPeers := make(chan chan<- struct{}, 1)
|
||||
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
|
||||
connectedChan chan<- struct{}) {
|
||||
notifyPeers <- connectedChan
|
||||
}
|
||||
|
||||
// Pretending that we receive local channel announcement from funding
|
||||
// manager, thereby kick off the announcement exchange process.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
// Since sending this local announcement proof to the remote will fail,
|
||||
// the gossiper should register for a notification when the remote is
|
||||
// online again.
|
||||
var conChan chan<- struct{}
|
||||
select {
|
||||
case conChan = <-notifyPeers:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("gossiper did not ask to get notified when " +
|
||||
"peer is online")
|
||||
}
|
||||
|
||||
// Since both proofs are not yet exchanged, no message should be
|
||||
// broadcasted yet.
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("announcements were broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
number := 0
|
||||
if err := ctx.gossiper.waitingProofs.ForAll(
|
||||
func(*channeldb.WaitingProof) error {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 1 {
|
||||
t.Fatal("wrong number of objects in storage")
|
||||
}
|
||||
|
||||
// When the peer comes online, the gossiper gets notified, and should
|
||||
// retry sending the AnnnounceSignatures. We make the SendToPeer
|
||||
// method work again.
|
||||
sentToPeer := make(chan lnwire.Message, 1)
|
||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
|
||||
msg ...lnwire.Message) error {
|
||||
sentToPeer <- msg[0]
|
||||
return nil
|
||||
}
|
||||
|
||||
// Notify that peer is now online. THis should trigger a new call
|
||||
// to SendToPeer.
|
||||
close(conChan)
|
||||
|
||||
select {
|
||||
case <-sentToPeer:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("gossiper did not send message when peer came online")
|
||||
}
|
||||
|
||||
// Now give the gossiper the remote proof. This should trigger a
|
||||
// broadcast of 3 messages (ChannelAnnouncement + 2 ChannelUpdate).
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
@ -891,13 +1155,397 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
); err != nil && err != channeldb.ErrWaitingProofNotFound {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 0 {
|
||||
t.Fatal("waiting proof should be removed from storage")
|
||||
}
|
||||
}
|
||||
|
||||
// Test that if we restart the gossiper, it will retry sending the
|
||||
// AnnounceSignatures to the peer if it did not succeed before
|
||||
// shutting down, and the full channel proof is not yet assembled.
|
||||
func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
|
||||
if err != nil {
|
||||
t.Fatalf("can't create context: %v", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
batch, err := createAnnouncements(0)
|
||||
if err != nil {
|
||||
t.Fatalf("can't generate announcements: %v", err)
|
||||
}
|
||||
|
||||
localKey := batch.nodeAnn1.NodeID
|
||||
remoteKey := batch.nodeAnn2.NodeID
|
||||
|
||||
// Recreate lightning network topology. Initialize router with channel
|
||||
// between two nodes.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
// Make the SendToPeerFail, simulating the peer being offline.
|
||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
|
||||
msg ...lnwire.Message) error {
|
||||
return fmt.Errorf("intentional error in SendToPeer")
|
||||
}
|
||||
notifyPeers := make(chan chan<- struct{}, 1)
|
||||
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
|
||||
connectedChan chan<- struct{}) {
|
||||
notifyPeers <- connectedChan
|
||||
}
|
||||
|
||||
// Pretending that we receive local channel announcement from funding
|
||||
// manager, thereby kick off the announcement exchange process.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
// Since sending to the remote peer will fail, the gossiper should
|
||||
// register for a notification when it comes back online.
|
||||
var conChan chan<- struct{}
|
||||
select {
|
||||
case conChan = <-notifyPeers:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("gossiper did not ask to get notified when " +
|
||||
"peer is online")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("announcements were broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
number := 0
|
||||
if err := ctx.gossiper.waitingProofs.ForAll(
|
||||
func(*channeldb.WaitingProof) error {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
); err != nil {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 1 {
|
||||
t.Fatal("wrong number of objects in storage")
|
||||
}
|
||||
|
||||
// Shut down gossiper, and restart. This should trigger a new attempt
|
||||
// to send the message to the peer.
|
||||
ctx.gossiper.Stop()
|
||||
gossiper, err := New(Config{
|
||||
Notifier: ctx.gossiper.cfg.Notifier,
|
||||
Broadcast: ctx.gossiper.cfg.Broadcast,
|
||||
SendToPeer: func(target *btcec.PublicKey,
|
||||
msg ...lnwire.Message) error {
|
||||
return fmt.Errorf("intentional error in SendToPeer")
|
||||
},
|
||||
NotifyWhenOnline: func(peer *btcec.PublicKey,
|
||||
connectedChan chan<- struct{}) {
|
||||
notifyPeers <- connectedChan
|
||||
},
|
||||
Router: ctx.gossiper.cfg.Router,
|
||||
TrickleDelay: trickleDelay,
|
||||
RetransmitDelay: retransmitDelay,
|
||||
ProofMatureDelta: proofMatureDelta,
|
||||
DB: ctx.gossiper.cfg.DB,
|
||||
}, ctx.gossiper.selfKey)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to recreate gossiper: %v", err)
|
||||
}
|
||||
if err := gossiper.Start(); err != nil {
|
||||
t.Fatalf("unable to start recreated gossiper: %v", err)
|
||||
}
|
||||
defer gossiper.Stop()
|
||||
|
||||
ctx.gossiper = gossiper
|
||||
|
||||
// After starting up, the gossiper will see that it has a waitingproof
|
||||
// in the database, and will retry sending its part to the remote. Since
|
||||
// SendToPeer will fail again, it should register for a notification
|
||||
// when the peer comes online.
|
||||
select {
|
||||
case conChan = <-notifyPeers:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("gossiper did not ask to get notified when " +
|
||||
"peer is online")
|
||||
}
|
||||
|
||||
// Fix the SendToPeer method.
|
||||
sentToPeer := make(chan lnwire.Message, 1)
|
||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
|
||||
msg ...lnwire.Message) error {
|
||||
select {
|
||||
case sentToPeer <- msg[0]:
|
||||
case <-ctx.gossiper.quit:
|
||||
return fmt.Errorf("shutting down")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
// Notify that peer is now online. This should trigger a new call
|
||||
// to SendToPeer.
|
||||
close(conChan)
|
||||
|
||||
select {
|
||||
case <-sentToPeer:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("gossiper did not send message when peer came online")
|
||||
}
|
||||
|
||||
// Now exchanging the remote channel proof, the channel annoncement
|
||||
// broadcast should continue as normal.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("announcement wasn't broadcast")
|
||||
}
|
||||
}
|
||||
|
||||
number = 0
|
||||
if err := ctx.gossiper.waitingProofs.ForAll(
|
||||
func(*channeldb.WaitingProof) error {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
); err != nil && err != channeldb.ErrWaitingProofNotFound {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 0 {
|
||||
t.Fatal("waiting proof should be removed from storage")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a
|
||||
// remote proof is received when we already have the full proof,
|
||||
// the gossiper will send the full proof (ChannelAnnouncement) to
|
||||
// the remote peer.
|
||||
func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
|
||||
if err != nil {
|
||||
t.Fatalf("can't create context: %v", err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
batch, err := createAnnouncements(0)
|
||||
if err != nil {
|
||||
t.Fatalf("can't generate announcements: %v", err)
|
||||
}
|
||||
|
||||
localKey := batch.nodeAnn1.NodeID
|
||||
remoteKey := batch.nodeAnn2.NodeID
|
||||
|
||||
// Recreate lightning network topology. Initialize router with channel
|
||||
// between two nodes.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process remote announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
t.Fatal("channel update announcement was broadcast")
|
||||
case <-time.After(2 * trickleDelay):
|
||||
}
|
||||
// Set up a channel we can use to inspect messages sent by the
|
||||
// gossiper to the remote peer.
|
||||
sentToPeer := make(chan lnwire.Message, 1)
|
||||
ctx.gossiper.cfg.SendToPeer = func(target *btcec.PublicKey,
|
||||
msg ...lnwire.Message) error {
|
||||
select {
|
||||
case <-ctx.gossiper.quit:
|
||||
return fmt.Errorf("gossiper shutting down")
|
||||
case sentToPeer <- msg[0]:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
notifyPeers := make(chan chan<- struct{}, 1)
|
||||
ctx.gossiper.cfg.NotifyWhenOnline = func(peer *btcec.PublicKey,
|
||||
connectedChan chan<- struct{}) {
|
||||
notifyPeers <- connectedChan
|
||||
}
|
||||
|
||||
// Pretending that we receive local channel announcement from funding
|
||||
// manager, thereby kick off the announcement exchange process.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn,
|
||||
localKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
// We expect the gossiper to send this message to the remote peer.
|
||||
select {
|
||||
case msg := <-sentToPeer:
|
||||
if msg != batch.localProofAnn {
|
||||
t.Fatalf("wrong message sent to peer: %v", msg)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not send local proof to peer")
|
||||
}
|
||||
|
||||
// And all channel announcements should be broadcast.
|
||||
for i := 0; i < 3; i++ {
|
||||
select {
|
||||
case <-ctx.broadcastedMessage:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("announcement wasn't broadcast")
|
||||
}
|
||||
}
|
||||
|
||||
number := 0
|
||||
if err := ctx.gossiper.waitingProofs.ForAll(
|
||||
func(*channeldb.WaitingProof) error {
|
||||
number++
|
||||
return nil
|
||||
},
|
||||
); err != nil && err != channeldb.ErrWaitingProofNotFound {
|
||||
t.Fatalf("unable to retrieve objects from store: %v", err)
|
||||
}
|
||||
|
||||
if number != 0 {
|
||||
t.Fatal("waiting proof should be removed from storage")
|
||||
}
|
||||
|
||||
// Now give the gossiper the remote proof yet again. This should
|
||||
// trigger a send of the full ChannelAnnouncement.
|
||||
select {
|
||||
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
|
||||
remoteKey):
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not process local announcement")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unable to process :%v", err)
|
||||
}
|
||||
|
||||
// We expect the gossiper to send this message to the remote peer.
|
||||
select {
|
||||
case msg := <-sentToPeer:
|
||||
_, ok := msg.(*lnwire.ChannelAnnouncement)
|
||||
if !ok {
|
||||
t.Fatalf("expected ChannelAnnouncement, intead got %T", msg)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not send local proof to peer")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestDeDuplicatedAnnouncements ensures that the deDupedAnnouncements struct
|
||||
|
Loading…
Reference in New Issue
Block a user