discovery: handle initial historical sync disconnection

In this commit, we add logic to handle a peer with whom we're performing
an initial historical sync disconnecting. This is required to ensure we
get as much of the graph as possible when starting a fresh node. It will
also serve useful to ensure we do not get stalled once we prevent active
GossipSyncers from starting until the initial historical sync has
completed.
This commit is contained in:
Wilmer Paulino 2019-04-10 19:26:56 -07:00
parent 227e492ccf
commit 07136a5bc2
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 151 additions and 41 deletions

@ -185,6 +185,23 @@ func (m *SyncManager) syncerHandler() {
m.cfg.HistoricalSyncTicker.Resume()
defer m.cfg.HistoricalSyncTicker.Stop()
var (
// attemptInitialHistoricalSync determines whether we should
// attempt an initial historical sync when a new peer connects.
attemptInitialHistoricalSync = true
// initialHistoricalSyncer is the syncer we are currently
// performing an initial historical sync with.
initialHistoricalSyncer *GossipSyncer
// initialHistoricalSyncSignal is a signal that will fire once
// the intiial historical sync has been completed. This is
// crucial to ensure that another historical sync isn't
// attempted just because the initialHistoricalSyncer was
// disconnected.
initialHistoricalSyncSignal chan struct{}
)
for {
select {
// A new peer has been connected, so we'll create its
@ -224,22 +241,29 @@ func (m *SyncManager) syncerHandler() {
// We'll force a historical sync with the first peer we
// connect to, to ensure we get as much of the graph as
// possible.
var err error
m.historicalSync.Do(func() {
log.Infof("Attempting historical sync with "+
"GossipSyncer(%x)", s.cfg.peerPub)
err = s.historicalSync()
})
if err != nil {
log.Errorf("Unable to perform historical sync "+
"with GossipSyncer(%x): %v",
s.cfg.peerPub, err)
// Reset historicalSync to ensure it is tried
// again with a different peer.
m.historicalSync = sync.Once{}
if !attemptInitialHistoricalSync {
continue
}
log.Debugf("Attempting initial historical sync with "+
"GossipSyncer(%x)", s.cfg.peerPub)
if err := s.historicalSync(); err != nil {
log.Errorf("Unable to attempt initial "+
"historical sync with "+
"GossipSyncer(%x): %v", s.cfg.peerPub,
err)
continue
}
// Once the historical sync has started, we'll get a
// keep track of the corresponding syncer to properly
// handle disconnects. We'll also use a signal to know
// when the historical sync completed.
attemptInitialHistoricalSync = false
initialHistoricalSyncer = s
initialHistoricalSyncSignal = s.ResetSyncedSignal()
// An existing peer has disconnected, so we'll tear down its
// corresponding GossipSyncer.
case staleSyncer := <-m.staleSyncers:
@ -250,6 +274,43 @@ func (m *SyncManager) syncerHandler() {
m.removeGossipSyncer(staleSyncer.peer)
close(staleSyncer.doneChan)
// If we don't have an initialHistoricalSyncer, or we do
// but it is not the peer being disconnected, then we
// have nothing left to do and can proceed.
switch {
case initialHistoricalSyncer == nil:
fallthrough
case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub:
continue
}
// Otherwise, our initialHistoricalSyncer corresponds to
// the peer being disconnected, so we'll have to find a
// replacement.
log.Debug("Finding replacement for intitial " +
"historical sync")
s := m.forceHistoricalSync()
if s == nil {
log.Debug("No eligible replacement found " +
"for initial historical sync")
attemptInitialHistoricalSync = true
continue
}
log.Debugf("Replaced initial historical "+
"GossipSyncer(%v) with GossipSyncer(%x)",
staleSyncer.peer, s.cfg.peerPub)
initialHistoricalSyncer = s
initialHistoricalSyncSignal = s.ResetSyncedSignal()
// Our initial historical sync signal has completed, so we'll
// nil all of the relevant fields as they're no longer needed.
case <-initialHistoricalSyncSignal:
initialHistoricalSyncer = nil
initialHistoricalSyncSignal = nil
// Our RotateTicker has ticked, so we'll attempt to rotate a
// single active syncer with a passive one.
case <-m.cfg.RotateTicker.Ticks():
@ -406,13 +467,13 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
// forceHistoricalSync chooses a syncer with a remote peer at random and forces
// a historical sync with it.
func (m *SyncManager) forceHistoricalSync() {
func (m *SyncManager) forceHistoricalSync() *GossipSyncer {
m.syncersMu.Lock()
defer m.syncersMu.Unlock()
// We'll sample from both sets of active and inactive syncers in the
// event that we don't have any inactive syncers.
_ = chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
return s.historicalSync()
})
}

@ -80,44 +80,52 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) {
func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
t.Parallel()
// We'll create our test sync manager to only have one active syncer.
syncMgr := newTestSyncManager(1)
// We'll create our test sync manager to have two active syncers.
syncMgr := newTestSyncManager(2)
syncMgr.Start()
defer syncMgr.Stop()
// peer1 will represent an active syncer that performs a historical
// sync since it is the first registered peer with the SyncManager.
peer1 := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(peer1)
syncer1 := assertSyncerExistence(t, syncMgr, peer1)
assertActiveGossipTimestampRange(t, peer1)
assertTransitionToChansSynced(t, syncer1, peer1)
assertSyncerStatus(t, syncer1, chansSynced, ActiveSync)
// The first will be an active syncer that performs a historical sync
// since it is the first one registered with the SyncManager.
historicalSyncPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(historicalSyncPeer)
historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer)
assertActiveGossipTimestampRange(t, historicalSyncPeer)
assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer)
assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync)
// Then, we'll create the second active syncer, which is the one we'll
// disconnect.
activeSyncPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(activeSyncPeer)
activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer)
assertActiveGossipTimestampRange(t, activeSyncPeer)
assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
// It will then be torn down to simulate a disconnection. Since there
// are no other candidate syncers available, the active syncer won't be
// replaced.
syncMgr.PruneSyncState(peer1.PubKey())
syncMgr.PruneSyncState(activeSyncPeer.PubKey())
// Then, we'll start our active syncer again, but this time we'll also
// have a passive syncer available to replace the active syncer after
// the peer disconnects.
syncMgr.InitSyncState(peer1)
syncer1 = assertSyncerExistence(t, syncMgr, peer1)
assertActiveGossipTimestampRange(t, peer1)
assertSyncerStatus(t, syncer1, chansSynced, ActiveSync)
syncMgr.InitSyncState(activeSyncPeer)
activeSyncer = assertSyncerExistence(t, syncMgr, activeSyncPeer)
assertActiveGossipTimestampRange(t, activeSyncPeer)
assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
// Create our second peer, which should be initialized as a passive
// syncer.
peer2 := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(peer2)
syncer2 := assertSyncerExistence(t, syncMgr, peer2)
assertSyncerStatus(t, syncer2, chansSynced, PassiveSync)
newActiveSyncPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(newActiveSyncPeer)
newActiveSyncer := assertSyncerExistence(t, syncMgr, newActiveSyncPeer)
assertSyncerStatus(t, newActiveSyncer, chansSynced, PassiveSync)
// Disconnect our active syncer, which should trigger the SyncManager to
// replace it with our passive syncer.
go syncMgr.PruneSyncState(peer1.PubKey())
assertPassiveSyncerTransition(t, syncer2, peer2)
go syncMgr.PruneSyncState(activeSyncPeer.PubKey())
assertPassiveSyncerTransition(t, newActiveSyncer, newActiveSyncPeer)
}
// TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully
@ -169,10 +177,51 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) {
assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer)
}
// TestSyncManagerHistoricalSync ensures that we only attempt a single
// historical sync during the SyncManager's startup, and that we can routinely
// force historical syncs whenever the HistoricalSyncTicker fires.
func TestSyncManagerHistoricalSync(t *testing.T) {
// TestSyncManagerInitialHistoricalSync ensures that we only attempt a single
// historical sync during the SyncManager's startup. If the peer corresponding
// to the initial historical syncer disconnects, we should attempt to find a
// replacement.
func TestSyncManagerInitialHistoricalSync(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(0)
syncMgr.Start()
defer syncMgr.Stop()
// We should expect to see a QueryChannelRange message with a
// FirstBlockHeight of the genesis block, signaling that an initial
// historical sync is being attempted.
peer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(peer)
assertMsgSent(t, peer, &lnwire.QueryChannelRange{
FirstBlockHeight: 0,
NumBlocks: math.MaxUint32,
})
// If an additional peer connects, then another historical sync should
// not be attempted.
finalHistoricalPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(finalHistoricalPeer)
finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer)
assertNoMsgSent(t, finalHistoricalPeer)
// If we disconnect the peer performing the initial historical sync, a
// new one should be chosen.
syncMgr.PruneSyncState(peer.PubKey())
assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer)
// Once the initial historical sync has succeeded, another one should
// not be attempted by disconnecting the peer who performed it.
extraPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(extraPeer)
assertNoMsgSent(t, extraPeer)
syncMgr.PruneSyncState(finalHistoricalPeer.PubKey())
assertNoMsgSent(t, extraPeer)
}
// TestSyncManagerForceHistoricalSync ensures that we can perform routine
// historical syncs whenever the HistoricalSyncTicker fires.
func TestSyncManagerForceHistoricalSync(t *testing.T) {
t.Parallel()
syncMgr := newTestSyncManager(0)