diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 8372772f..ac85a129 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -913,6 +913,12 @@ func (d *AuthenticatedGossiper) networkHandler() { policyUpdate.errResp <- nil case announcement := <-d.networkMsgs: + // We should only broadcast this message forward if it + // originated from us or it wasn't received as part of + // our initial historical sync. + shouldBroadcast := !announcement.isRemote || + d.syncMgr.IsGraphSynced() + switch announcement.msg.(type) { // Channel announcement signatures are amongst the only // messages that we'll process serially. @@ -981,12 +987,16 @@ func (d *AuthenticatedGossiper) networkHandler() { // the emitted announcements to our announce // batch to be broadcast once the trickle timer // ticks gain. - if emittedAnnouncements != nil { + if emittedAnnouncements != nil && shouldBroadcast { // TODO(roasbeef): exclude peer that // sent. announcements.AddMsgs( emittedAnnouncements..., ) + } else if emittedAnnouncements != nil { + log.Trace("Skipping broadcast of " + + "announcements received " + + "during initial graph sync") } }() diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index a727870e..be27e9e7 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -763,6 +763,10 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil, nil, fmt.Errorf("unable to start router: %v", err) } + // Mark the graph as synced in order to allow the announcements to be + // broadcast. + gossiper.syncMgr.markGraphSynced() + cleanUp := func() { gossiper.Stop() cleanUpDb() @@ -1512,6 +1516,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } defer gossiper.Stop() + // Mark the graph as synced in order to allow the announcements to be + // broadcast. + gossiper.syncMgr.markGraphSynced() + ctx.gossiper = gossiper remotePeer.quit = ctx.gossiper.quit @@ -3649,3 +3657,86 @@ func TestCalculateCorrectSubBatchSizesDifferentDelay(t *testing.T) { assertCorrectSubBatchSize(t, expectedBatchSize, actualSubBatchSize) } } + +// TestBroadcastAnnsAfterGraphSynced ensures that we only broadcast +// announcements after the graph has been considered as synced, i.e., after our +// initial historical sync has completed. +func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { + t.Parallel() + + ctx, cleanup, err := createTestCtx(10) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + // We'll mark the graph as not synced. This should prevent us from + // broadcasting any messages we've received as part of our initial + // historical sync. + ctx.gossiper.syncMgr.markGraphSyncing() + + assertBroadcast := func(msg lnwire.Message, isRemote bool, + shouldBroadcast bool) { + + t.Helper() + + nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil} + var errChan chan error + if isRemote { + errChan = ctx.gossiper.ProcessRemoteAnnouncement( + msg, nodePeer, + ) + } else { + errChan = ctx.gossiper.ProcessLocalAnnouncement( + msg, nodePeer.pk, + ) + } + + select { + case err := <-errChan: + if err != nil { + t.Fatalf("unable to process gossip message: %v", + err) + } + case <-time.After(2 * time.Second): + t.Fatal("gossip message not processed") + } + + select { + case <-ctx.broadcastedMessage: + if !shouldBroadcast { + t.Fatal("gossip message was broadcast") + } + case <-time.After(2 * trickleDelay): + if shouldBroadcast { + t.Fatal("gossip message wasn't broadcast") + } + } + } + + // A remote channel announcement should not be broadcast since the graph + // has not yet been synced. + chanAnn1, err := createRemoteChannelAnnouncement(0) + if err != nil { + t.Fatalf("unable to create channel announcement: %v", err) + } + assertBroadcast(chanAnn1, true, false) + + // A local channel announcement should be broadcast though, regardless + // of whether we've synced our graph or not. + chanUpd, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, 1) + if err != nil { + t.Fatalf("unable to create channel announcement: %v", err) + } + assertBroadcast(chanUpd, false, true) + + // Mark the graph as synced, which should allow the channel announcement + // should to be broadcast. + ctx.gossiper.syncMgr.markGraphSynced() + + chanAnn2, err := createRemoteChannelAnnouncement(1) + if err != nil { + t.Fatalf("unable to create channel announcement: %v", err) + } + assertBroadcast(chanAnn2, true, true) +}