From af4234f680b15944b1a926df1fc768366e1e1b04 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 26 Jul 2019 18:22:37 -0700 Subject: [PATCH] discovery: allow the SyncManager to report whether the graph is synced --- discovery/sync_manager.go | 47 ++++++++++++++++++++++++---------- discovery/sync_manager_test.go | 20 +++++++++++++++ 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 49bce8c1..6175e44a 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -3,6 +3,7 @@ package discovery import ( "errors" "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -100,6 +101,16 @@ type SyncManagerCfg struct { // attempt a historical sync to ensure we have as much of the public channel // graph as possible. type SyncManager struct { + // initialHistoricalSyncCompleted serves as a barrier when initializing + // new active GossipSyncers. If 0, the initial historical sync has not + // completed, so we'll defer initializing any active GossipSyncers. If + // 1, then we can transition the GossipSyncer immediately. We set up + // this barrier to ensure we have most of the graph before attempting to + // accept new updates at tip. + // + // NOTE: This must be used atomically. + initialHistoricalSyncCompleted int32 + start sync.Once stop sync.Once @@ -192,15 +203,6 @@ func (m *SyncManager) syncerHandler() { defer m.cfg.HistoricalSyncTicker.Stop() var ( - // initialHistoricalSyncCompleted serves as a barrier when - // initializing new active GossipSyncers. If false, the initial - // historical sync has not completed, so we'll defer - // initializing any active GossipSyncers. If true, then we can - // transition the GossipSyncer immediately. We set up this - // barrier to ensure we have most of the graph before attempting - // to accept new updates at tip. - initialHistoricalSyncCompleted = false - // initialHistoricalSyncer is the syncer we are currently // performing an initial historical sync with. initialHistoricalSyncer *GossipSyncer @@ -251,10 +253,10 @@ func (m *SyncManager) syncerHandler() { fallthrough // If the initial historical sync has yet to complete, - // then we'll declare is as passive and attempt to + // then we'll declare it as passive and attempt to // transition it when the initial historical sync // completes. - case !initialHistoricalSyncCompleted: + case !m.IsGraphSynced(): s.setSyncType(PassiveSync) m.inactiveSyncers[s.cfg.peerPub] = s @@ -279,7 +281,7 @@ func (m *SyncManager) syncerHandler() { if !attemptHistoricalSync { continue } - initialHistoricalSyncCompleted = false + m.markGraphSyncing() log.Debugf("Attempting initial historical sync with "+ "GossipSyncer(%x)", s.cfg.peerPub) @@ -344,7 +346,7 @@ func (m *SyncManager) syncerHandler() { case <-initialHistoricalSyncSignal: initialHistoricalSyncer = nil initialHistoricalSyncSignal = nil - initialHistoricalSyncCompleted = true + m.markGraphSynced() log.Debug("Initial historical sync completed") @@ -667,3 +669,22 @@ func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer { return syncers } + +// markGraphSynced allows us to report that the initial historical sync has +// completed. +func (m *SyncManager) markGraphSynced() { + atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1) +} + +// markGraphSyncing allows us to report that the initial historical sync is +// still undergoing. +func (m *SyncManager) markGraphSyncing() { + atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 0) +} + +// IsGraphSynced determines whether we've completed our initial historical sync. +// The initial historical sync is done to ensure we've ingested as much of the +// public graph as possible. +func (m *SyncManager) IsGraphSynced() bool { + return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1 +} diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index 56d7181e..7df7d75f 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -185,6 +185,13 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(0) + + // The graph should not be considered as synced since the sync manager + // has yet to start. + if syncMgr.IsGraphSynced() { + t.Fatal("expected graph to not be considered as synced") + } + syncMgr.Start() defer syncMgr.Stop() @@ -198,6 +205,12 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) { NumBlocks: math.MaxUint32, }) + // The graph should not be considered as synced since the initial + // historical sync has not finished. + if syncMgr.IsGraphSynced() { + t.Fatal("expected graph to not be considered as synced") + } + // If an additional peer connects, then another historical sync should // not be attempted. finalHistoricalPeer := randPeer(t, syncMgr.quit) @@ -208,7 +221,14 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) { // If we disconnect the peer performing the initial historical sync, a // new one should be chosen. syncMgr.PruneSyncState(peer.PubKey()) + + // Complete the initial historical sync by transitionining the syncer to + // its final chansSynced state. The graph should be considered as synced + // after the fact. assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer) + if !syncMgr.IsGraphSynced() { + t.Fatal("expected graph to be considered as synced") + } // Once the initial historical sync has succeeded, another one should // not be attempted by disconnecting the peer who performed it.