lntest: call markGraphSynced from gossipSyncer

Rather than performing this call in the SyncManager, we give each
gossipSyncer the ability to mark the first sync completed. This permits
pinned syncers to contribute towards the rpc-level synced_to_graph
value, allowing the value to be true after the first pinned syncer or
regular syncer complets. Unlinke regular syncers, pinned syncers can
proceed in parallel possibly decreasing the waiting time if consumers
rely on this field before proceeding to load their application.
This commit is contained in:
Conner Fromknecht 2021-01-29 00:14:50 -08:00
parent 920eda26fc
commit e42301dee2
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
4 changed files with 34 additions and 1 deletions

@ -381,7 +381,6 @@ func (m *SyncManager) syncerHandler() {
case <-initialHistoricalSyncSignal: case <-initialHistoricalSyncSignal:
initialHistoricalSyncer = nil initialHistoricalSyncer = nil
initialHistoricalSyncSignal = nil initialHistoricalSyncSignal = nil
m.markGraphSynced()
log.Debug("Initial historical sync completed") log.Debug("Initial historical sync completed")
@ -472,6 +471,7 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
maxUndelayedQueryReplies: DefaultMaxUndelayedQueryReplies, maxUndelayedQueryReplies: DefaultMaxUndelayedQueryReplies,
delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval, delayedQueryReplyInterval: DefaultDelayedQueryReplyInterval,
bestHeight: m.cfg.BestHeight, bestHeight: m.cfg.BestHeight,
markGraphSynced: m.markGraphSynced,
maxQueryChanRangeReplies: maxQueryChanRangeReplies, maxQueryChanRangeReplies: maxQueryChanRangeReplies,
}) })

@ -280,6 +280,10 @@ type gossipSyncerCfg struct {
// bestHeight returns the latest height known of the chain. // bestHeight returns the latest height known of the chain.
bestHeight func() uint32 bestHeight func() uint32
// markGraphSynced updates the SyncManager's perception of whether we
// have completed at least one historical sync.
markGraphSynced func()
// maxQueryChanRangeReplies is the maximum number of replies we'll allow // maxQueryChanRangeReplies is the maximum number of replies we'll allow
// for a single QueryChannelRange request. // for a single QueryChannelRange request.
maxQueryChanRangeReplies uint32 maxQueryChanRangeReplies uint32
@ -550,6 +554,11 @@ func (g *GossipSyncer) channelGraphSyncer() {
// to our terminal state. // to our terminal state.
g.setSyncState(chansSynced) g.setSyncState(chansSynced)
// Ensure that the sync manager becomes aware that the
// historical sync completed so synced_to_graph is
// updated over rpc.
g.cfg.markGraphSynced()
// In this state, we've just sent off a new query for channels // In this state, we've just sent off a new query for channels
// that we don't yet know of. We'll remain in this state until // that we don't yet know of. We'll remain in this state until
// the remote party signals they've responded to our query in // the remote party signals they've responded to our query in
@ -862,6 +871,11 @@ func (g *GossipSyncer) processChanRangeReply(msg *lnwire.ReplyChannelRange) erro
g.cfg.peerPub[:]) g.cfg.peerPub[:])
g.setSyncState(chansSynced) g.setSyncState(chansSynced)
// Ensure that the sync manager becomes aware that the
// historical sync completed so synced_to_graph is updated over
// rpc.
g.cfg.markGraphSynced()
return nil return nil
} }

@ -189,6 +189,7 @@ func newTestSyncer(hID lnwire.ShortChannelID,
bestHeight: func() uint32 { bestHeight: func() uint32 {
return latestKnownHeight return latestKnownHeight
}, },
markGraphSynced: func() {},
maxQueryChanRangeReplies: maxQueryChanRangeReplies, maxQueryChanRangeReplies: maxQueryChanRangeReplies,
} }
syncer := newGossipSyncer(cfg) syncer := newGossipSyncer(cfg)

@ -10352,6 +10352,20 @@ func assertSyncType(t *harnessTest, node *lntest.HarnessNode,
t.t.Fatalf("unable to find peer: %s", peer) t.t.Fatalf("unable to find peer: %s", peer)
} }
func waitForGraphSync(t *harnessTest, node *lntest.HarnessNode) {
t.t.Helper()
err := wait.Predicate(func() bool {
ctxb := context.Background()
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
resp, err := node.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
require.NoError(t.t, err)
return resp.SyncedToGraph
}, defaultTimeout)
require.NoError(t.t, err)
}
func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) {
t.t.Run("pinned", func(t *testing.T) { t.t.Run("pinned", func(t *testing.T) {
ht := newHarnessTest(t, net) ht := newHarnessTest(t, net)
@ -10415,6 +10429,10 @@ func testGraphTopologyNtfns(net *lntest.NetworkHarness, t *harnessTest, pinned b
assertSyncType(t, alice, bobPubkey, lnrpc.Peer_ACTIVE_SYNC) assertSyncType(t, alice, bobPubkey, lnrpc.Peer_ACTIVE_SYNC)
} }
// Regardless of syncer type, ensure that both peers report having
// completed their initial sync before continuing to make a channel.
waitForGraphSync(t, alice)
// Let Alice subscribe to graph notifications. // Let Alice subscribe to graph notifications.
graphSub := subscribeGraphNotifications( graphSub := subscribeGraphNotifications(
t, ctxb, alice, t, ctxb, alice,