discovery/sync_manager: Pause/Resume HistoricalSyncTicker
This gives each initial historical syncer an equal amount of time before being rotated, even if some fail.
This commit is contained in:
parent
ef0cd82c1f
commit
9e932f2a64
@ -198,7 +198,6 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
m.cfg.RotateTicker.Resume()
|
m.cfg.RotateTicker.Resume()
|
||||||
defer m.cfg.RotateTicker.Stop()
|
defer m.cfg.RotateTicker.Stop()
|
||||||
|
|
||||||
m.cfg.HistoricalSyncTicker.Resume()
|
|
||||||
defer m.cfg.HistoricalSyncTicker.Stop()
|
defer m.cfg.HistoricalSyncTicker.Stop()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -214,9 +213,17 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
initialHistoricalSyncSignal chan struct{}
|
initialHistoricalSyncSignal chan struct{}
|
||||||
)
|
)
|
||||||
|
|
||||||
setHistoricalSyncer := func(s *GossipSyncer) {
|
setInitialHistoricalSyncer := func(s *GossipSyncer) {
|
||||||
initialHistoricalSyncer = s
|
initialHistoricalSyncer = s
|
||||||
initialHistoricalSyncSignal = s.ResetSyncedSignal()
|
initialHistoricalSyncSignal = s.ResetSyncedSignal()
|
||||||
|
|
||||||
|
// Restart the timer for our new historical sync peer. This will
|
||||||
|
// ensure that all initial syncers recevie an equivalent
|
||||||
|
// duration before attempting the next sync. Without doing so we
|
||||||
|
// might attempt two historical sync back to back if a peer
|
||||||
|
// disconnects just before the ticker fires.
|
||||||
|
m.cfg.HistoricalSyncTicker.Pause()
|
||||||
|
m.cfg.HistoricalSyncTicker.Resume()
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -302,7 +309,7 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
// keep track of the corresponding syncer to properly
|
// keep track of the corresponding syncer to properly
|
||||||
// handle disconnects. We'll also use a signal to know
|
// handle disconnects. We'll also use a signal to know
|
||||||
// when the historical sync completed.
|
// when the historical sync completed.
|
||||||
setHistoricalSyncer(s)
|
setInitialHistoricalSyncer(s)
|
||||||
|
|
||||||
// An existing peer has disconnected, so we'll tear down its
|
// An existing peer has disconnected, so we'll tear down its
|
||||||
// corresponding GossipSyncer.
|
// corresponding GossipSyncer.
|
||||||
@ -341,7 +348,7 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
"GossipSyncer(%v) with GossipSyncer(%x)",
|
"GossipSyncer(%v) with GossipSyncer(%x)",
|
||||||
staleSyncer.peer, s.cfg.peerPub)
|
staleSyncer.peer, s.cfg.peerPub)
|
||||||
|
|
||||||
setHistoricalSyncer(s)
|
setInitialHistoricalSyncer(s)
|
||||||
|
|
||||||
// Our initial historical sync signal has completed, so we'll
|
// Our initial historical sync signal has completed, so we'll
|
||||||
// nil all of the relevant fields as they're no longer needed.
|
// nil all of the relevant fields as they're no longer needed.
|
||||||
@ -401,7 +408,7 @@ func (m *SyncManager) syncerHandler() {
|
|||||||
// where our previous historical sync peer did not
|
// where our previous historical sync peer did not
|
||||||
// respond to our queries and we haven't ingested as
|
// respond to our queries and we haven't ingested as
|
||||||
// much of the graph as we should.
|
// much of the graph as we should.
|
||||||
setHistoricalSyncer(s)
|
setInitialHistoricalSyncer(s)
|
||||||
|
|
||||||
case <-m.quit:
|
case <-m.quit:
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user