discovery: allow the SyncManager to report whether the graph is synced
This commit is contained in:
parent
9f0cfe2bee
commit
af4234f680
@ -3,6 +3,7 @@ package discovery
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
@ -100,6 +101,16 @@ type SyncManagerCfg struct {
|
|||||||
// attempt a historical sync to ensure we have as much of the public channel
|
// attempt a historical sync to ensure we have as much of the public channel
|
||||||
// graph as possible.
|
// graph as possible.
|
||||||
type SyncManager struct {
|
type SyncManager struct {
|
||||||
|
// initialHistoricalSyncCompleted serves as a barrier when initializing
|
||||||
|
// new active GossipSyncers. If 0, the initial historical sync has not
|
||||||
|
// completed, so we'll defer initializing any active GossipSyncers. If
|
||||||
|
// 1, then we can transition the GossipSyncer immediately. We set up
|
||||||
|
// this barrier to ensure we have most of the graph before attempting to
|
||||||
|
// accept new updates at tip.
|
||||||
|
//
|
||||||
|
// NOTE: This must be used atomically.
|
||||||
|
initialHistoricalSyncCompleted int32
|
||||||
|
|
||||||
start sync.Once
|
start sync.Once
|
||||||
stop sync.Once
|
stop sync.Once
|
||||||
|
|
||||||
@ -192,15 +203,6 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
defer m.cfg.HistoricalSyncTicker.Stop()
|
defer m.cfg.HistoricalSyncTicker.Stop()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// initialHistoricalSyncCompleted serves as a barrier when
|
|
||||||
// initializing new active GossipSyncers. If false, the initial
|
|
||||||
// historical sync has not completed, so we'll defer
|
|
||||||
// initializing any active GossipSyncers. If true, then we can
|
|
||||||
// transition the GossipSyncer immediately. We set up this
|
|
||||||
// barrier to ensure we have most of the graph before attempting
|
|
||||||
// to accept new updates at tip.
|
|
||||||
initialHistoricalSyncCompleted = false
|
|
||||||
|
|
||||||
// initialHistoricalSyncer is the syncer we are currently
|
// initialHistoricalSyncer is the syncer we are currently
|
||||||
// performing an initial historical sync with.
|
// performing an initial historical sync with.
|
||||||
initialHistoricalSyncer *GossipSyncer
|
initialHistoricalSyncer *GossipSyncer
|
||||||
@ -251,10 +253,10 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
fallthrough
|
fallthrough
|
||||||
|
|
||||||
// If the initial historical sync has yet to complete,
|
// If the initial historical sync has yet to complete,
|
||||||
// then we'll declare is as passive and attempt to
|
// then we'll declare it as passive and attempt to
|
||||||
// transition it when the initial historical sync
|
// transition it when the initial historical sync
|
||||||
// completes.
|
// completes.
|
||||||
case !initialHistoricalSyncCompleted:
|
case !m.IsGraphSynced():
|
||||||
s.setSyncType(PassiveSync)
|
s.setSyncType(PassiveSync)
|
||||||
m.inactiveSyncers[s.cfg.peerPub] = s
|
m.inactiveSyncers[s.cfg.peerPub] = s
|
||||||
|
|
||||||
@ -279,7 +281,7 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
if !attemptHistoricalSync {
|
if !attemptHistoricalSync {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
initialHistoricalSyncCompleted = false
|
m.markGraphSyncing()
|
||||||
|
|
||||||
log.Debugf("Attempting initial historical sync with "+
|
log.Debugf("Attempting initial historical sync with "+
|
||||||
"GossipSyncer(%x)", s.cfg.peerPub)
|
"GossipSyncer(%x)", s.cfg.peerPub)
|
||||||
@ -344,7 +346,7 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
case <-initialHistoricalSyncSignal:
|
case <-initialHistoricalSyncSignal:
|
||||||
initialHistoricalSyncer = nil
|
initialHistoricalSyncer = nil
|
||||||
initialHistoricalSyncSignal = nil
|
initialHistoricalSyncSignal = nil
|
||||||
initialHistoricalSyncCompleted = true
|
m.markGraphSynced()
|
||||||
|
|
||||||
log.Debug("Initial historical sync completed")
|
log.Debug("Initial historical sync completed")
|
||||||
|
|
||||||
@ -667,3 +669,22 @@ func (m *SyncManager) gossipSyncers() map[route.Vertex]*GossipSyncer {
|
|||||||
|
|
||||||
return syncers
|
return syncers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// markGraphSynced allows us to report that the initial historical sync has
|
||||||
|
// completed.
|
||||||
|
func (m *SyncManager) markGraphSynced() {
|
||||||
|
atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// markGraphSyncing allows us to report that the initial historical sync is
|
||||||
|
// still undergoing.
|
||||||
|
func (m *SyncManager) markGraphSyncing() {
|
||||||
|
atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsGraphSynced determines whether we've completed our initial historical sync.
|
||||||
|
// The initial historical sync is done to ensure we've ingested as much of the
|
||||||
|
// public graph as possible.
|
||||||
|
func (m *SyncManager) IsGraphSynced() bool {
|
||||||
|
return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
|
||||||
|
}
|
||||||
|
@ -185,6 +185,13 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
syncMgr := newTestSyncManager(0)
|
syncMgr := newTestSyncManager(0)
|
||||||
|
|
||||||
|
// The graph should not be considered as synced since the sync manager
|
||||||
|
// has yet to start.
|
||||||
|
if syncMgr.IsGraphSynced() {
|
||||||
|
t.Fatal("expected graph to not be considered as synced")
|
||||||
|
}
|
||||||
|
|
||||||
syncMgr.Start()
|
syncMgr.Start()
|
||||||
defer syncMgr.Stop()
|
defer syncMgr.Stop()
|
||||||
|
|
||||||
@ -198,6 +205,12 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
|
|||||||
NumBlocks: math.MaxUint32,
|
NumBlocks: math.MaxUint32,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// The graph should not be considered as synced since the initial
|
||||||
|
// historical sync has not finished.
|
||||||
|
if syncMgr.IsGraphSynced() {
|
||||||
|
t.Fatal("expected graph to not be considered as synced")
|
||||||
|
}
|
||||||
|
|
||||||
// If an additional peer connects, then another historical sync should
|
// If an additional peer connects, then another historical sync should
|
||||||
// not be attempted.
|
// not be attempted.
|
||||||
finalHistoricalPeer := randPeer(t, syncMgr.quit)
|
finalHistoricalPeer := randPeer(t, syncMgr.quit)
|
||||||
@ -208,7 +221,14 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
|
|||||||
// If we disconnect the peer performing the initial historical sync, a
|
// If we disconnect the peer performing the initial historical sync, a
|
||||||
// new one should be chosen.
|
// new one should be chosen.
|
||||||
syncMgr.PruneSyncState(peer.PubKey())
|
syncMgr.PruneSyncState(peer.PubKey())
|
||||||
|
|
||||||
|
// Complete the initial historical sync by transitionining the syncer to
|
||||||
|
// its final chansSynced state. The graph should be considered as synced
|
||||||
|
// after the fact.
|
||||||
assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
|
assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
|
||||||
|
if !syncMgr.IsGraphSynced() {
|
||||||
|
t.Fatal("expected graph to be considered as synced")
|
||||||
|
}
|
||||||
|
|
||||||
// Once the initial historical sync has succeeded, another one should
|
// Once the initial historical sync has succeeded, another one should
|
||||||
// not be attempted by disconnecting the peer who performed it.
|
// not be attempted by disconnecting the peer who performed it.
|
||||||
|
Loading…
Reference in New Issue
Block a user