diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index ca54e614..fb004390 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -381,7 +381,6 @@ func (m *SyncManager) syncerHandler() { case <-initialHistoricalSyncSignal: initialHistoricalSyncer = nil initialHistoricalSyncSignal = nil - m.markGraphSynced() log.Debug("Initial historical sync completed") @@ -472,6 +471,7 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { maxUndelayedQueryReplies: DefaultMaxUndelayedQueryReplies, delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval, bestHeight: m.cfg.BestHeight, + markGraphSynced: m.markGraphSynced, maxQueryChanRangeReplies: maxQueryChanRangeReplies, }) diff --git a/discovery/syncer.go b/discovery/syncer.go index 388be680..36031f31 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -280,6 +280,10 @@ type gossipSyncerCfg struct { // bestHeight returns the latest height known of the chain. bestHeight func() uint32 + // markGraphSynced updates the SyncManager's perception of whether we + // have completed at least one historical sync. + markGraphSynced func() + // maxQueryChanRangeReplies is the maximum number of replies we'll allow // for a single QueryChannelRange request. maxQueryChanRangeReplies uint32 @@ -550,6 +554,11 @@ func (g *GossipSyncer) channelGraphSyncer() { // to our terminal state. g.setSyncState(chansSynced) + // Ensure that the sync manager becomes aware that the + // historical sync completed so synced_to_graph is + // updated over rpc. + g.cfg.markGraphSynced() + // In this state, we've just sent off a new query for channels // that we don't yet know of. We'll remain in this state until // the remote party signals they've responded to our query in @@ -862,6 +871,11 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro g.cfg.peerPub[:]) g.setSyncState(chansSynced) + + // Ensure that the sync manager becomes aware that the + // historical sync completed so synced_to_graph is updated over + // rpc. + g.cfg.markGraphSynced() return nil } diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index d9da9382..c3ad04f5 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -189,6 +189,7 @@ func newTestSyncer(hID lnwire.ShortChannelID, bestHeight: func() uint32 { return latestKnownHeight }, + markGraphSynced: func() {}, maxQueryChanRangeReplies: maxQueryChanRangeReplies, } syncer := newGossipSyncer(cfg) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index f476e2bd..8fb4b3fc 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -10352,6 +10352,20 @@ func assertSyncType(t *harnessTest, node *lntest.HarnessNode, t.t.Fatalf("unable to find peer: %s", peer) } +func waitForGraphSync(t *harnessTest, node *lntest.HarnessNode) { + t.t.Helper() + + err := wait.Predicate(func() bool { + ctxb := context.Background() + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + resp, err := node.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) + require.NoError(t.t, err) + + return resp.SyncedToGraph + }, defaultTimeout) + require.NoError(t.t, err) +} + func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { t.t.Run("pinned", func(t *testing.T) { ht := newHarnessTest(t, net) @@ -10415,6 +10429,10 @@ func testGraphTopologyNtfns(net *lntest.NetworkHarness, t *harnessTest, pinned b assertSyncType(t, alice, bobPubkey, lnrpc.Peer_ACTIVE_SYNC) } + // Regardless of syncer type, ensure that both peers report having + // completed their initial sync before continuing to make a channel. + waitForGraphSync(t, alice) + // Let Alice subscribe to graph notifications. graphSub := subscribeGraphNotifications( t, ctxb, alice,