discovery: queue active syncers until initial historical sync signal

In this commit, we begin to queue any active syncers until the initial
historical sync has completed. We do this to ensure we can properly
handle any new channel updates at tip. This is required for fresh nodes
that are syncing the channel graph for the first time. If we begin
accepting updates at tip while the initial historical sync is still
ongoing, then we risk not processing certain updates since we've yet to
learn of the channels themselves.
This commit is contained in:
Wilmer Paulino 2019-04-10 19:27:12 -07:00
parent 07136a5bc2
commit d68842ee9e
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 110 additions and 5 deletions

@ -1,6 +1,7 @@
package discovery package discovery
import ( import (
"errors"
"net" "net"
"sync" "sync"
@ -30,6 +31,7 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error {
select { select {
case p.sentMsgs <- msg: case p.sentMsgs <- msg:
case <-p.quit: case <-p.quit:
return errors.New("peer disconnected")
} }
} }

@ -190,6 +190,15 @@ func (m *SyncManager) syncerHandler() {
// attempt an initial historical sync when a new peer connects. // attempt an initial historical sync when a new peer connects.
attemptInitialHistoricalSync = true attemptInitialHistoricalSync = true
// initialHistoricalSyncCompleted serves as a barrier when
// initializing new active GossipSyncers. If false, the initial
// historical sync has not completed, so we'll defer
// initializing any active GossipSyncers. If true, then we can
// transition the GossipSyncer immediately. We set up this
// barrier to ensure we have most of the graph before attempting
// to accept new updates at tip.
initialHistoricalSyncCompleted = false
// initialHistoricalSyncer is the syncer we are currently // initialHistoricalSyncer is the syncer we are currently
// performing an initial historical sync with. // performing an initial historical sync with.
initialHistoricalSyncer *GossipSyncer initialHistoricalSyncer *GossipSyncer
@ -221,10 +230,18 @@ func (m *SyncManager) syncerHandler() {
// If we've exceeded our total number of active syncers, // If we've exceeded our total number of active syncers,
// we'll initialize this GossipSyncer as passive. // we'll initialize this GossipSyncer as passive.
case len(m.activeSyncers) >= m.cfg.NumActiveSyncers: case len(m.activeSyncers) >= m.cfg.NumActiveSyncers:
fallthrough
// Otherwise, it should be initialized as active. If the
// initial historical sync has yet to complete, then
// we'll declare is as passive and attempt to transition
// it when the initial historical sync completes.
case !initialHistoricalSyncCompleted:
s.setSyncType(PassiveSync) s.setSyncType(PassiveSync)
m.inactiveSyncers[s.cfg.peerPub] = s m.inactiveSyncers[s.cfg.peerPub] = s
// Otherwise, it should be initialized as active. // The initial historical sync has completed, so we can
// immediately start the GossipSyncer as active.
default: default:
s.setSyncType(ActiveSync) s.setSyncType(ActiveSync)
m.activeSyncers[s.cfg.peerPub] = s m.activeSyncers[s.cfg.peerPub] = s
@ -310,6 +327,32 @@ func (m *SyncManager) syncerHandler() {
case <-initialHistoricalSyncSignal: case <-initialHistoricalSyncSignal:
initialHistoricalSyncer = nil initialHistoricalSyncer = nil
initialHistoricalSyncSignal = nil initialHistoricalSyncSignal = nil
initialHistoricalSyncCompleted = true
log.Debug("Initial historical sync completed")
// With the initial historical sync complete, we can
// begin receiving new graph updates at tip. We'll
// determine whether we can have any more active
// GossipSyncers. If we do, we'll randomly select some
// that are currently passive to transition.
m.syncersMu.Lock()
numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers)
if numActiveLeft <= 0 {
m.syncersMu.Unlock()
continue
}
log.Debugf("Attempting to transition %v passive "+
"GossipSyncers to active", numActiveLeft)
for i := 0; i < numActiveLeft; i++ {
chooseRandomSyncer(
m.inactiveSyncers, m.transitionPassiveSyncer,
)
}
m.syncersMu.Unlock()
// Our RotateTicker has ticked, so we'll attempt to rotate a // Our RotateTicker has ticked, so we'll attempt to rotate a
// single active syncer with a passive one. // single active syncer with a passive one.

@ -60,10 +60,10 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) {
// The first syncer registered always attempts a historical // The first syncer registered always attempts a historical
// sync. // sync.
assertActiveGossipTimestampRange(t, peer)
if i == 0 { if i == 0 {
assertTransitionToChansSynced(t, s, peer) assertTransitionToChansSynced(t, s, peer)
} }
assertActiveGossipTimestampRange(t, peer)
assertSyncerStatus(t, s, chansSynced, ActiveSync) assertSyncerStatus(t, s, chansSynced, ActiveSync)
} }
@ -90,8 +90,8 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
historicalSyncPeer := randPeer(t, syncMgr.quit) historicalSyncPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(historicalSyncPeer) syncMgr.InitSyncState(historicalSyncPeer)
historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer) historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer)
assertActiveGossipTimestampRange(t, historicalSyncPeer)
assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer) assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer)
assertActiveGossipTimestampRange(t, historicalSyncPeer)
assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync) assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync)
// Then, we'll create the second active syncer, which is the one we'll // Then, we'll create the second active syncer, which is the one we'll
@ -142,8 +142,8 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) {
activeSyncPeer := randPeer(t, syncMgr.quit) activeSyncPeer := randPeer(t, syncMgr.quit)
syncMgr.InitSyncState(activeSyncPeer) syncMgr.InitSyncState(activeSyncPeer)
activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer)
assertActiveGossipTimestampRange(t, activeSyncPeer)
assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer) assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer)
assertActiveGossipTimestampRange(t, activeSyncPeer)
assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync)
// We'll send a tick to force a rotation. Since there aren't any // We'll send a tick to force a rotation. Since there aren't any
@ -254,6 +254,66 @@ func TestSyncManagerForceHistoricalSync(t *testing.T) {
}) })
} }
// TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers
// are initialized as ActiveSync until the initial historical sync has been
// completed. Once it does, the pending GossipSyncers should be transitioned to
// ActiveSync.
func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) {
t.Parallel()
const numActiveSyncers = 2
// We'll start by creating our test sync manager which will hold up to
// 2 active syncers.
syncMgr := newTestSyncManager(numActiveSyncers)
syncMgr.Start()
defer syncMgr.Stop()
// We'll go ahead and create our syncers.
peers := make([]*mockPeer, 0, numActiveSyncers)
syncers := make([]*GossipSyncer, 0, numActiveSyncers)
for i := 0; i < numActiveSyncers; i++ {
peer := randPeer(t, syncMgr.quit)
peers = append(peers, peer)
syncMgr.InitSyncState(peer)
s := assertSyncerExistence(t, syncMgr, peer)
syncers = append(syncers, s)
// The first one always attempts a historical sync. We won't
// transition it to chansSynced to ensure the remaining syncers
// aren't started as active.
if i == 0 {
assertSyncerStatus(t, s, syncingChans, PassiveSync)
continue
}
// The rest should remain in a passive and chansSynced state,
// and they should be queued to transition to active once the
// initial historical sync is completed.
assertNoMsgSent(t, peer)
assertSyncerStatus(t, s, chansSynced, PassiveSync)
}
// To ensure we don't transition any pending active syncers that have
// previously disconnected, we'll disconnect the last one.
stalePeer := peers[numActiveSyncers-1]
syncMgr.PruneSyncState(stalePeer.PubKey())
// Then, we'll complete the initial historical sync by transitioning the
// historical syncer to its final chansSynced state. This should trigger
// all of the pending active syncers to transition, except for the one
// we disconnected.
assertTransitionToChansSynced(t, syncers[0], peers[0])
for i, s := range syncers {
if i == numActiveSyncers-1 {
assertNoMsgSent(t, peers[i])
continue
}
assertPassiveSyncerTransition(t, s, peers[i])
}
}
// assertNoMsgSent is a helper function that ensures a peer hasn't sent any // assertNoMsgSent is a helper function that ensures a peer hasn't sent any
// messages. // messages.
func assertNoMsgSent(t *testing.T, peer *mockPeer) { func assertNoMsgSent(t *testing.T, peer *mockPeer) {
@ -294,7 +354,7 @@ func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) {
var msgSent lnwire.Message var msgSent lnwire.Message
select { select {
case msgSent = <-peer.sentMsgs: case msgSent = <-peer.sentMsgs:
case <-time.After(time.Second): case <-time.After(2 * time.Second):
t.Fatalf("expected peer %x to send lnwire.GossipTimestampRange "+ t.Fatalf("expected peer %x to send lnwire.GossipTimestampRange "+
"message", peer.PubKey()) "message", peer.PubKey())
} }