Merge pull request #3103 from halseth/syncmanager-resync-historical

discovery/sync_manager: restart historical sync on first connected peer
This commit is contained in:
Johan T. Halseth 2019-05-24 12:17:02 +02:00 committed by GitHub
commit af3b04e53f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 11 deletions

@ -186,10 +186,6 @@ func (m *SyncManager) syncerHandler() {
defer m.cfg.HistoricalSyncTicker.Stop() defer m.cfg.HistoricalSyncTicker.Stop()
var ( var (
// attemptInitialHistoricalSync determines whether we should
// attempt an initial historical sync when a new peer connects.
attemptInitialHistoricalSync = true
// initialHistoricalSyncCompleted serves as a barrier when // initialHistoricalSyncCompleted serves as a barrier when
// initializing new active GossipSyncers. If false, the initial // initializing new active GossipSyncers. If false, the initial
// historical sync has not completed, so we'll defer // historical sync has not completed, so we'll defer
@ -225,17 +221,33 @@ func (m *SyncManager) syncerHandler() {
s := m.createGossipSyncer(newSyncer.peer) s := m.createGossipSyncer(newSyncer.peer)
// attemptHistoricalSync determines whether we should
// attempt an initial historical sync when a new peer
// connects.
attemptHistoricalSync := false
m.syncersMu.Lock() m.syncersMu.Lock()
switch { switch {
// Regardless of whether the initial historical sync
// has completed, we'll re-trigger a historical sync if
// we no longer have any syncers. This might be
// necessary if we lost all our peers at one point, and
// now we finally have one again.
case len(m.activeSyncers) == 0 &&
len(m.inactiveSyncers) == 0:
attemptHistoricalSync = true
fallthrough
// If we've exceeded our total number of active syncers, // If we've exceeded our total number of active syncers,
// we'll initialize this GossipSyncer as passive. // we'll initialize this GossipSyncer as passive.
case len(m.activeSyncers) >= m.cfg.NumActiveSyncers: case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
fallthrough fallthrough
// Otherwise, it should be initialized as active. If the // If the initial historical sync has yet to complete,
// initial historical sync has yet to complete, then // then we'll declare is as passive and attempt to
// we'll declare is as passive and attempt to transition // transition it when the initial historical sync
// it when the initial historical sync completes. // completes.
case !initialHistoricalSyncCompleted: case !initialHistoricalSyncCompleted:
s.setSyncType(PassiveSync) s.setSyncType(PassiveSync)
m.inactiveSyncers[s.cfg.peerPub] = s m.inactiveSyncers[s.cfg.peerPub] = s
@ -258,9 +270,10 @@ func (m *SyncManager) syncerHandler() {
// We'll force a historical sync with the first peer we // We'll force a historical sync with the first peer we
// connect to, to ensure we get as much of the graph as // connect to, to ensure we get as much of the graph as
// possible. // possible.
if !attemptInitialHistoricalSync { if !attemptHistoricalSync {
continue continue
} }
initialHistoricalSyncCompleted = false
log.Debugf("Attempting initial historical sync with "+ log.Debugf("Attempting initial historical sync with "+
"GossipSyncer(%x)", s.cfg.peerPub) "GossipSyncer(%x)", s.cfg.peerPub)
@ -277,7 +290,6 @@ func (m *SyncManager) syncerHandler() {
// keep track of the corresponding syncer to properly // keep track of the corresponding syncer to properly
// handle disconnects. We'll also use a signal to know // handle disconnects. We'll also use a signal to know
// when the historical sync completed. // when the historical sync completed.
attemptInitialHistoricalSync = false
initialHistoricalSyncer = s initialHistoricalSyncer = s
initialHistoricalSyncSignal = s.ResetSyncedSignal() initialHistoricalSyncSignal = s.ResetSyncedSignal()
@ -311,7 +323,6 @@ func (m *SyncManager) syncerHandler() {
if s == nil { if s == nil {
log.Debug("No eligible replacement found " + log.Debug("No eligible replacement found " +
"for initial historical sync") "for initial historical sync")
attemptInitialHistoricalSync = true
continue continue
} }

@ -219,6 +219,41 @@ func TestSyncManagerInitialHistoricalSync(t *testing.T) {
assertNoMsgSent(t, extraPeer) assertNoMsgSent(t, extraPeer)
} }
// TestSyncManagerHistoricalSyncOnReconnect tests that the sync manager will
// re-trigger a historical sync when a new peer connects after a historical
// sync has completed, but we have lost all peers.
func TestSyncManagerHistoricalSyncOnReconnect(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(2)
syncMgr.Start()
defer syncMgr.Stop()
// We should expect to see a QueryChannelRange message with a
// FirstBlockHeight of the genesis block, signaling that an initial
// historical sync is being attempted.
peer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(peer)
s := assertSyncerExistence(t, syncMgr, peer)
assertTransitionToChansSynced(t, s, peer)
assertActiveGossipTimestampRange(t, peer)
assertSyncerStatus(t, s, chansSynced, ActiveSync)
// Now that the historical sync is completed, we prune the syncer,
// simulating all peers having disconnected.
syncMgr.PruneSyncState(peer.PubKey())
// If a new peer now connects, then another historical sync should
// be attempted. This is to ensure we get an up-to-date graph if we
// haven't had any peers for a time.
nextPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(nextPeer)
s1 := assertSyncerExistence(t, syncMgr, nextPeer)
assertTransitionToChansSynced(t, s1, nextPeer)
assertActiveGossipTimestampRange(t, nextPeer)
assertSyncerStatus(t, s1, chansSynced, ActiveSync)
}
// TestSyncManagerForceHistoricalSync ensures that we can perform routine // TestSyncManagerForceHistoricalSync ensures that we can perform routine
// historical syncs whenever the HistoricalSyncTicker fires. // historical syncs whenever the HistoricalSyncTicker fires.
func TestSyncManagerForceHistoricalSync(t *testing.T) { func TestSyncManagerForceHistoricalSync(t *testing.T) {