From e42301dee22a3ed0a174c0480fc5340eac6b6339 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 29 Jan 2021 00:14:50 -0800 Subject: [PATCH] lntest: call markGraphSynced from gossipSyncer Rather than performing this call in the SyncManager, we give each gossipSyncer the ability to mark the first sync completed. This permits pinned syncers to contribute towards the rpc-level synced_to_graph value, allowing the value to be true after the first pinned syncer or regular syncer complets. Unlinke regular syncers, pinned syncers can proceed in parallel possibly decreasing the waiting time if consumers rely on this field before proceeding to load their application. --- discovery/sync_manager.go | 2 +- discovery/syncer.go | 14 ++++++++++++++ discovery/syncer_test.go | 1 + lntest/itest/lnd_test.go | 18 ++++++++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index ca54e614..fb004390 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -381,7 +381,6 @@ func (m *SyncManager) syncerHandler() { case <-initialHistoricalSyncSignal: initialHistoricalSyncer = nil initialHistoricalSyncSignal = nil - m.markGraphSynced() log.Debug("Initial historical sync completed") @@ -472,6 +471,7 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { maxUndelayedQueryReplies: DefaultMaxUndelayedQueryReplies, delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval, bestHeight: m.cfg.BestHeight, + markGraphSynced: m.markGraphSynced, maxQueryChanRangeReplies: maxQueryChanRangeReplies, }) diff --git a/discovery/syncer.go b/discovery/syncer.go index 388be680..36031f31 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -280,6 +280,10 @@ type gossipSyncerCfg struct { // bestHeight returns the latest height known of the chain. bestHeight func() uint32 + // markGraphSynced updates the SyncManager's perception of whether we + // have completed at least one historical sync. + markGraphSynced func() + // maxQueryChanRangeReplies is the maximum number of replies we'll allow // for a single QueryChannelRange request. maxQueryChanRangeReplies uint32 @@ -550,6 +554,11 @@ func (g *GossipSyncer) channelGraphSyncer() { // to our terminal state. g.setSyncState(chansSynced) + // Ensure that the sync manager becomes aware that the + // historical sync completed so synced_to_graph is + // updated over rpc. + g.cfg.markGraphSynced() + // In this state, we've just sent off a new query for channels // that we don't yet know of. We'll remain in this state until // the remote party signals they've responded to our query in @@ -862,6 +871,11 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro g.cfg.peerPub[:]) g.setSyncState(chansSynced) + + // Ensure that the sync manager becomes aware that the + // historical sync completed so synced_to_graph is updated over + // rpc. + g.cfg.markGraphSynced() return nil } diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index d9da9382..c3ad04f5 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -189,6 +189,7 @@ func newTestSyncer(hID lnwire.ShortChannelID, bestHeight: func() uint32 { return latestKnownHeight }, + markGraphSynced: func() {}, maxQueryChanRangeReplies: maxQueryChanRangeReplies, } syncer := newGossipSyncer(cfg) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index f476e2bd..8fb4b3fc 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -10352,6 +10352,20 @@ func assertSyncType(t *harnessTest, node *lntest.HarnessNode, t.t.Fatalf("unable to find peer: %s", peer) } +func waitForGraphSync(t *harnessTest, node *lntest.HarnessNode) { + t.t.Helper() + + err := wait.Predicate(func() bool { + ctxb := context.Background() + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + resp, err := node.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) + require.NoError(t.t, err) + + return resp.SyncedToGraph + }, defaultTimeout) + require.NoError(t.t, err) +} + func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { t.t.Run("pinned", func(t *testing.T) { ht := newHarnessTest(t, net) @@ -10415,6 +10429,10 @@ func testGraphTopologyNtfns(net *lntest.NetworkHarness, t *harnessTest, pinned b assertSyncType(t, alice, bobPubkey, lnrpc.Peer_ACTIVE_SYNC) } + // Regardless of syncer type, ensure that both peers report having + // completed their initial sync before continuing to make a channel. + waitForGraphSync(t, alice) + // Let Alice subscribe to graph notifications. graphSub := subscribeGraphNotifications( t, ctxb, alice,