From 07136a5bc28e75ce6eb0eac3ecd8ac59a54ebbb9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:26:56 -0700 Subject: [PATCH] discovery: handle initial historical sync disconnection In this commit, we add logic to handle a peer with whom we're performing an initial historical sync disconnecting. This is required to ensure we get as much of the graph as possible when starting a fresh node. It will also serve useful to ensure we do not get stalled once we prevent active GossipSyncers from starting until the initial historical sync has completed. --- discovery/sync_manager.go | 93 ++++++++++++++++++++++++++------ discovery/sync_manager_test.go | 99 +++++++++++++++++++++++++--------- 2 files changed, 151 insertions(+), 41 deletions(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 72de0d65..f95a784a 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -185,6 +185,23 @@ func (m *SyncManager) syncerHandler() { m.cfg.HistoricalSyncTicker.Resume() defer m.cfg.HistoricalSyncTicker.Stop() + var ( + // attemptInitialHistoricalSync determines whether we should + // attempt an initial historical sync when a new peer connects. + attemptInitialHistoricalSync = true + + // initialHistoricalSyncer is the syncer we are currently + // performing an initial historical sync with. + initialHistoricalSyncer *GossipSyncer + + // initialHistoricalSyncSignal is a signal that will fire once + // the intiial historical sync has been completed. This is + // crucial to ensure that another historical sync isn't + // attempted just because the initialHistoricalSyncer was + // disconnected. + initialHistoricalSyncSignal chan struct{} + ) + for { select { // A new peer has been connected, so we'll create its @@ -224,22 +241,29 @@ func (m *SyncManager) syncerHandler() { // We'll force a historical sync with the first peer we // connect to, to ensure we get as much of the graph as // possible. - var err error - m.historicalSync.Do(func() { - log.Infof("Attempting historical sync with "+ - "GossipSyncer(%x)", s.cfg.peerPub) - err = s.historicalSync() - }) - if err != nil { - log.Errorf("Unable to perform historical sync "+ - "with GossipSyncer(%x): %v", - s.cfg.peerPub, err) - - // Reset historicalSync to ensure it is tried - // again with a different peer. - m.historicalSync = sync.Once{} + if !attemptInitialHistoricalSync { + continue } + log.Debugf("Attempting initial historical sync with "+ + "GossipSyncer(%x)", s.cfg.peerPub) + + if err := s.historicalSync(); err != nil { + log.Errorf("Unable to attempt initial "+ + "historical sync with "+ + "GossipSyncer(%x): %v", s.cfg.peerPub, + err) + continue + } + + // Once the historical sync has started, we'll get a + // keep track of the corresponding syncer to properly + // handle disconnects. We'll also use a signal to know + // when the historical sync completed. + attemptInitialHistoricalSync = false + initialHistoricalSyncer = s + initialHistoricalSyncSignal = s.ResetSyncedSignal() + // An existing peer has disconnected, so we'll tear down its // corresponding GossipSyncer. case staleSyncer := <-m.staleSyncers: @@ -250,6 +274,43 @@ func (m *SyncManager) syncerHandler() { m.removeGossipSyncer(staleSyncer.peer) close(staleSyncer.doneChan) + // If we don't have an initialHistoricalSyncer, or we do + // but it is not the peer being disconnected, then we + // have nothing left to do and can proceed. + switch { + case initialHistoricalSyncer == nil: + fallthrough + case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub: + continue + } + + // Otherwise, our initialHistoricalSyncer corresponds to + // the peer being disconnected, so we'll have to find a + // replacement. + log.Debug("Finding replacement for intitial " + + "historical sync") + + s := m.forceHistoricalSync() + if s == nil { + log.Debug("No eligible replacement found " + + "for initial historical sync") + attemptInitialHistoricalSync = true + continue + } + + log.Debugf("Replaced initial historical "+ + "GossipSyncer(%v) with GossipSyncer(%x)", + staleSyncer.peer, s.cfg.peerPub) + + initialHistoricalSyncer = s + initialHistoricalSyncSignal = s.ResetSyncedSignal() + + // Our initial historical sync signal has completed, so we'll + // nil all of the relevant fields as they're no longer needed. + case <-initialHistoricalSyncSignal: + initialHistoricalSyncer = nil + initialHistoricalSyncSignal = nil + // Our RotateTicker has ticked, so we'll attempt to rotate a // single active syncer with a passive one. case <-m.cfg.RotateTicker.Ticks(): @@ -406,13 +467,13 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { // forceHistoricalSync chooses a syncer with a remote peer at random and forces // a historical sync with it. -func (m *SyncManager) forceHistoricalSync() { +func (m *SyncManager) forceHistoricalSync() *GossipSyncer { m.syncersMu.Lock() defer m.syncersMu.Unlock() // We'll sample from both sets of active and inactive syncers in the // event that we don't have any inactive syncers. - _ = chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error { + return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error { return s.historicalSync() }) } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index e965af3c..3c7fd6c7 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -80,44 +80,52 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { t.Parallel() - // We'll create our test sync manager to only have one active syncer. - syncMgr := newTestSyncManager(1) + // We'll create our test sync manager to have two active syncers. + syncMgr := newTestSyncManager(2) syncMgr.Start() defer syncMgr.Stop() - // peer1 will represent an active syncer that performs a historical - // sync since it is the first registered peer with the SyncManager. - peer1 := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer1) - syncer1 := assertSyncerExistence(t, syncMgr, peer1) - assertActiveGossipTimestampRange(t, peer1) - assertTransitionToChansSynced(t, syncer1, peer1) - assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) + // The first will be an active syncer that performs a historical sync + // since it is the first one registered with the SyncManager. + historicalSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(historicalSyncPeer) + historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer) + assertActiveGossipTimestampRange(t, historicalSyncPeer) + assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer) + assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync) + + // Then, we'll create the second active syncer, which is the one we'll + // disconnect. + activeSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // It will then be torn down to simulate a disconnection. Since there // are no other candidate syncers available, the active syncer won't be // replaced. - syncMgr.PruneSyncState(peer1.PubKey()) + syncMgr.PruneSyncState(activeSyncPeer.PubKey()) // Then, we'll start our active syncer again, but this time we'll also // have a passive syncer available to replace the active syncer after // the peer disconnects. - syncMgr.InitSyncState(peer1) - syncer1 = assertSyncerExistence(t, syncMgr, peer1) - assertActiveGossipTimestampRange(t, peer1) - assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer = assertSyncerExistence(t, syncMgr, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // Create our second peer, which should be initialized as a passive // syncer. - peer2 := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer2) - syncer2 := assertSyncerExistence(t, syncMgr, peer2) - assertSyncerStatus(t, syncer2, chansSynced, PassiveSync) + newActiveSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(newActiveSyncPeer) + newActiveSyncer := assertSyncerExistence(t, syncMgr, newActiveSyncPeer) + assertSyncerStatus(t, newActiveSyncer, chansSynced, PassiveSync) // Disconnect our active syncer, which should trigger the SyncManager to // replace it with our passive syncer. - go syncMgr.PruneSyncState(peer1.PubKey()) - assertPassiveSyncerTransition(t, syncer2, peer2) + go syncMgr.PruneSyncState(activeSyncPeer.PubKey()) + assertPassiveSyncerTransition(t, newActiveSyncer, newActiveSyncPeer) } // TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully @@ -169,10 +177,51 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer) } -// TestSyncManagerHistoricalSync ensures that we only attempt a single -// historical sync during the SyncManager's startup, and that we can routinely -// force historical syncs whenever the HistoricalSyncTicker fires. -func TestSyncManagerHistoricalSync(t *testing.T) { +// TestSyncManagerInitialHistoricalSync ensures that we only attempt a single +// historical sync during the SyncManager's startup. If the peer corresponding +// to the initial historical syncer disconnects, we should attempt to find a +// replacement. +func TestSyncManagerInitialHistoricalSync(t *testing.T) { + t.Parallel() + + syncMgr := newTestSyncManager(0) + syncMgr.Start() + defer syncMgr.Stop() + + // We should expect to see a QueryChannelRange message with a + // FirstBlockHeight of the genesis block, signaling that an initial + // historical sync is being attempted. + peer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(peer) + assertMsgSent(t, peer, &lnwire.QueryChannelRange{ + FirstBlockHeight: 0, + NumBlocks: math.MaxUint32, + }) + + // If an additional peer connects, then another historical sync should + // not be attempted. + finalHistoricalPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(finalHistoricalPeer) + finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer) + assertNoMsgSent(t, finalHistoricalPeer) + + // If we disconnect the peer performing the initial historical sync, a + // new one should be chosen. + syncMgr.PruneSyncState(peer.PubKey()) + assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer) + + // Once the initial historical sync has succeeded, another one should + // not be attempted by disconnecting the peer who performed it. + extraPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(extraPeer) + assertNoMsgSent(t, extraPeer) + syncMgr.PruneSyncState(finalHistoricalPeer.PubKey()) + assertNoMsgSent(t, extraPeer) +} + +// TestSyncManagerForceHistoricalSync ensures that we can perform routine +// historical syncs whenever the HistoricalSyncTicker fires. +func TestSyncManagerForceHistoricalSync(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(0)