discovery: simplify chooseRandomSyncer helper

This commit is contained in:
Wilmer Paulino 2019-04-10 19:26:24 -07:00
parent 29baa12254
commit 72e9674cff
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -318,17 +318,13 @@ func (m *SyncManager) removeGossipSyncer(peer routing.Vertex) {
// Otherwise, we'll need find a new one to replace it, if any.
delete(m.activeSyncers, peer)
newActiveSyncer := m.chooseRandomSyncer(nil, false)
newActiveSyncer := chooseRandomSyncer(
m.inactiveSyncers, m.transitionPassiveSyncer,
)
if newActiveSyncer == nil {
return
}
if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil {
log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
newActiveSyncer.cfg.peerPub, err)
return
}
log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)",
peer, newActiveSyncer.cfg.peerPub)
}
@ -340,30 +336,19 @@ func (m *SyncManager) rotateActiveSyncerCandidate() {
m.syncersMu.Lock()
defer m.syncersMu.Unlock()
// If we don't have a candidate to rotate with, we can return early.
candidate := m.chooseRandomSyncer(nil, false)
if candidate == nil {
log.Debug("No eligible candidate to rotate active syncer")
// If we couldn't find an eligible active syncer to rotate, we can
// return early.
activeSyncer := chooseRandomSyncer(m.activeSyncers, nil)
if activeSyncer == nil {
log.Debug("No eligible active syncer to rotate")
return
}
// We'll choose an active syncer at random that's within a chansSynced
// state to rotate.
var activeSyncer *GossipSyncer
for _, s := range m.activeSyncers {
// The active syncer must be in a chansSynced state in order to
// process sync transitions.
if s.syncState() != chansSynced {
continue
}
activeSyncer = s
break
}
// If we couldn't find an eligible one, we can return early.
if activeSyncer == nil {
log.Debug("No eligible active syncer to rotate")
// Similarly, if we don't have a candidate to rotate with, we can return
// early as well.
candidate := chooseRandomSyncer(m.inactiveSyncers, nil)
if candidate == nil {
log.Debug("No eligible candidate to rotate active syncer")
return
}
@ -425,72 +410,39 @@ func (m *SyncManager) forceHistoricalSync() {
m.syncersMu.Lock()
defer m.syncersMu.Unlock()
// We'll choose a random peer with whom we can perform a historical sync
// with. We'll set useActive to true to make sure we can still do one if
// we don't happen to have any non-active syncers.
candidatesChosen := make(map[routing.Vertex]struct{})
s := m.chooseRandomSyncer(candidatesChosen, true)
for s != nil {
// Ensure we properly handle a shutdown signal.
select {
case <-m.quit:
return
default:
}
// Blacklist the candidate to ensure it's not chosen again.
candidatesChosen[s.cfg.peerPub] = struct{}{}
err := s.historicalSync()
if err == nil {
return
}
log.Errorf("Unable to perform historical sync with "+
"GossipSyncer(%x): %v", s.cfg.peerPub, err)
s = m.chooseRandomSyncer(candidatesChosen, true)
}
// We'll sample from both sets of active and inactive syncers in the
// event that we don't have any inactive syncers.
_ = chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error {
return s.historicalSync()
})
}
// chooseRandomSyncer returns a random non-active syncer that's eligible for a
// sync transition. A blacklist can be used to skip any previously chosen
// candidates. The useActive boolean can be used to also filter active syncers.
// chooseRandomSyncer iterates through the set of syncers given and returns the
// first one which was able to successfully perform the action enclosed in the
// function closure.
//
// NOTE: It's possible for a nil value to be returned if there are no eligible
// candidate syncers.
//
// NOTE: This method must be called with the syncersMtx lock held.
func (m *SyncManager) chooseRandomSyncer(blacklist map[routing.Vertex]struct{},
useActive bool) *GossipSyncer {
eligible := func(s *GossipSyncer) bool {
// Skip any syncers that exist within the blacklist.
if blacklist != nil {
if _, ok := blacklist[s.cfg.peerPub]; ok {
return false
}
}
func chooseRandomSyncer(syncers map[routing.Vertex]*GossipSyncer,
action func(*GossipSyncer) error) *GossipSyncer {
for _, s := range syncers {
// Only syncers in a chansSynced state are viable for sync
// transitions, so skip any that aren't.
return s.syncState() == chansSynced
}
for _, s := range m.inactiveSyncers {
if !eligible(s) {
if s.syncState() != chansSynced {
continue
}
return s
}
if useActive {
for _, s := range m.activeSyncers {
if !eligible(s) {
if action != nil {
if err := action(s); err != nil {
log.Debugf("Skipping eligible candidate "+
"GossipSyncer(%x): %v", s.cfg.peerPub,
err)
continue
}
return s
}
return s
}
return nil
@ -573,7 +525,11 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) {
func (m *SyncManager) GossipSyncers() map[routing.Vertex]*GossipSyncer {
m.syncersMu.Lock()
defer m.syncersMu.Unlock()
return m.gossipSyncers()
}
// gossipSyncers returns all of the currently initialized gossip syncers.
func (m *SyncManager) gossipSyncers() map[routing.Vertex]*GossipSyncer {
numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
syncers := make(map[routing.Vertex]*GossipSyncer, numSyncers)