discovery+server: remove roundRobinHandler and related code
Since ActiveSync GossipSyncers no longer synchronize our state with the remote peers, none of the logic surrounding the round-robin is required within the SyncManager.
This commit is contained in:
parent
9a6e8ecb9e
commit
5db2cf6273
@ -306,12 +306,11 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
|
||||
channelMtx: multimutex.NewMutex(),
|
||||
recentRejects: make(map[uint64]struct{}),
|
||||
syncMgr: newSyncManager(&SyncManagerCfg{
|
||||
ChainHash: cfg.ChainHash,
|
||||
ChanSeries: cfg.ChanSeries,
|
||||
RotateTicker: cfg.RotateTicker,
|
||||
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
|
||||
ActiveSyncerTimeoutTicker: cfg.ActiveSyncerTimeoutTicker,
|
||||
NumActiveSyncers: cfg.NumActiveSyncers,
|
||||
ChainHash: cfg.ChainHash,
|
||||
ChanSeries: cfg.ChanSeries,
|
||||
RotateTicker: cfg.RotateTicker,
|
||||
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
|
||||
NumActiveSyncers: cfg.NumActiveSyncers,
|
||||
}),
|
||||
}
|
||||
|
||||
|
@ -741,17 +741,16 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
|
||||
c := make(chan struct{})
|
||||
return c
|
||||
},
|
||||
Router: router,
|
||||
TrickleDelay: trickleDelay,
|
||||
RetransmitDelay: retransmitDelay,
|
||||
ProofMatureDelta: proofMatureDelta,
|
||||
WaitingProofStore: waitingProofStore,
|
||||
MessageStore: newMockMessageStore(),
|
||||
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||
ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout),
|
||||
NumActiveSyncers: 3,
|
||||
AnnSigner: &mockSigner{nodeKeyPriv1},
|
||||
Router: router,
|
||||
TrickleDelay: trickleDelay,
|
||||
RetransmitDelay: retransmitDelay,
|
||||
ProofMatureDelta: proofMatureDelta,
|
||||
WaitingProofStore: waitingProofStore,
|
||||
MessageStore: newMockMessageStore(),
|
||||
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||
NumActiveSyncers: 3,
|
||||
AnnSigner: &mockSigner{nodeKeyPriv1},
|
||||
}, nodeKeyPub1)
|
||||
|
||||
if err := gossiper.Start(); err != nil {
|
||||
@ -1480,20 +1479,19 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
|
||||
// the message to the peer.
|
||||
ctx.gossiper.Stop()
|
||||
gossiper := New(Config{
|
||||
Notifier: ctx.gossiper.cfg.Notifier,
|
||||
Broadcast: ctx.gossiper.cfg.Broadcast,
|
||||
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
|
||||
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
|
||||
Router: ctx.gossiper.cfg.Router,
|
||||
TrickleDelay: trickleDelay,
|
||||
RetransmitDelay: retransmitDelay,
|
||||
ProofMatureDelta: proofMatureDelta,
|
||||
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
|
||||
MessageStore: ctx.gossiper.cfg.MessageStore,
|
||||
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||
ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout),
|
||||
NumActiveSyncers: 3,
|
||||
Notifier: ctx.gossiper.cfg.Notifier,
|
||||
Broadcast: ctx.gossiper.cfg.Broadcast,
|
||||
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
|
||||
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
|
||||
Router: ctx.gossiper.cfg.Router,
|
||||
TrickleDelay: trickleDelay,
|
||||
RetransmitDelay: retransmitDelay,
|
||||
ProofMatureDelta: proofMatureDelta,
|
||||
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
|
||||
MessageStore: ctx.gossiper.cfg.MessageStore,
|
||||
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||
NumActiveSyncers: 3,
|
||||
}, ctx.gossiper.selfKey)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to recreate gossiper: %v", err)
|
||||
|
@ -1,7 +1,6 @@
|
||||
package discovery
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
@ -22,11 +21,6 @@ const (
|
||||
// force a historical sync to ensure we have as much of the public
|
||||
// network as possible.
|
||||
DefaultHistoricalSyncInterval = time.Hour
|
||||
|
||||
// DefaultActiveSyncerTimeout is the default timeout interval in which
|
||||
// we'll wait until an active syncer has completed its state machine and
|
||||
// reached its final chansSynced state.
|
||||
DefaultActiveSyncerTimeout = 5 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
@ -36,23 +30,6 @@ var (
|
||||
ErrSyncManagerExiting = errors.New("sync manager exiting")
|
||||
)
|
||||
|
||||
// staleActiveSyncer is an internal message the SyncManager will use in order to
|
||||
// handle a peer corresponding to an active syncer being disconnected.
|
||||
type staleActiveSyncer struct {
|
||||
// syncer is the active syncer to be removed.
|
||||
syncer *GossipSyncer
|
||||
|
||||
// transitioned, if true, signals that the active GossipSyncer is stale
|
||||
// due to being transitioned to a PassiveSync state.
|
||||
transitioned bool
|
||||
|
||||
// done serves as a signal to the caller that the SyncManager's internal
|
||||
// state correctly reflects the stale active syncer. This is needed to
|
||||
// ensure we always create a new syncer for a flappy peer after they
|
||||
// disconnect if they happened to be an active syncer.
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// SyncManagerCfg contains all of the dependencies required for the SyncManager
|
||||
// to carry out its duties.
|
||||
type SyncManagerCfg struct {
|
||||
@ -81,12 +58,6 @@ type SyncManagerCfg struct {
|
||||
// SyncManager when it should attempt a historical sync with a gossip
|
||||
// sync peer.
|
||||
HistoricalSyncTicker ticker.Ticker
|
||||
|
||||
// ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
|
||||
// SyncManager when it should attempt to start the next pending
|
||||
// activeSyncer due to the current one not completing its state machine
|
||||
// within the timeout.
|
||||
ActiveSyncerTimeoutTicker ticker.Ticker
|
||||
}
|
||||
|
||||
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
|
||||
@ -117,25 +88,6 @@ type SyncManager struct {
|
||||
// currently receiving new graph updates from.
|
||||
inactiveSyncers map[routing.Vertex]*GossipSyncer
|
||||
|
||||
// pendingActiveSyncers is a map that tracks our set of pending active
|
||||
// syncers. This map will be queried when choosing the next pending
|
||||
// active syncer in the queue to ensure it is not stale.
|
||||
pendingActiveSyncers map[routing.Vertex]*GossipSyncer
|
||||
|
||||
// pendingActiveSyncerQueue is the list of active syncers which are
|
||||
// pending to be started. Syncers will be added to this list through the
|
||||
// newActiveSyncers and staleActiveSyncers channels.
|
||||
pendingActiveSyncerQueue *list.List
|
||||
|
||||
// newActiveSyncers is a channel that will serve as a signal to the
|
||||
// roundRobinHandler to allow it to transition the next pending active
|
||||
// syncer in the queue.
|
||||
newActiveSyncers chan struct{}
|
||||
|
||||
// staleActiveSyncers is a channel through which we'll send any stale
|
||||
// active syncers that should be removed from the round-robin.
|
||||
staleActiveSyncers chan *staleActiveSyncer
|
||||
|
||||
sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
@ -148,21 +100,16 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
|
||||
activeSyncers: make(
|
||||
map[routing.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
|
||||
),
|
||||
inactiveSyncers: make(map[routing.Vertex]*GossipSyncer),
|
||||
pendingActiveSyncers: make(map[routing.Vertex]*GossipSyncer),
|
||||
pendingActiveSyncerQueue: list.New(),
|
||||
newActiveSyncers: make(chan struct{}),
|
||||
staleActiveSyncers: make(chan *staleActiveSyncer),
|
||||
quit: make(chan struct{}),
|
||||
inactiveSyncers: make(map[routing.Vertex]*GossipSyncer),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the SyncManager in order to properly carry out its duties.
|
||||
func (m *SyncManager) Start() {
|
||||
m.start.Do(func() {
|
||||
m.wg.Add(2)
|
||||
m.wg.Add(1)
|
||||
go m.syncerHandler()
|
||||
go m.roundRobinHandler()
|
||||
})
|
||||
}
|
||||
|
||||
@ -178,9 +125,6 @@ func (m *SyncManager) Stop() {
|
||||
for _, syncer := range m.inactiveSyncers {
|
||||
syncer.Stop()
|
||||
}
|
||||
for _, syncer := range m.pendingActiveSyncers {
|
||||
syncer.Stop()
|
||||
}
|
||||
for _, syncer := range m.activeSyncers {
|
||||
syncer.Stop()
|
||||
}
|
||||
@ -223,227 +167,6 @@ func (m *SyncManager) syncerHandler() {
|
||||
}
|
||||
}
|
||||
|
||||
// signalNewActiveSyncer sends a signal to the roundRobinHandler to ensure it
|
||||
// transitions any pending active syncers.
|
||||
func (m *SyncManager) signalNewActiveSyncer() {
|
||||
select {
|
||||
case m.newActiveSyncers <- struct{}{}:
|
||||
case <-m.quit:
|
||||
}
|
||||
}
|
||||
|
||||
// signalStaleActiveSyncer removes the syncer for the given peer from the
|
||||
// round-robin queue.
|
||||
func (m *SyncManager) signalStaleActiveSyncer(s *GossipSyncer, transitioned bool) {
|
||||
done := make(chan struct{})
|
||||
|
||||
select {
|
||||
case m.staleActiveSyncers <- &staleActiveSyncer{
|
||||
syncer: s,
|
||||
transitioned: transitioned,
|
||||
done: done,
|
||||
}:
|
||||
case <-m.quit:
|
||||
}
|
||||
|
||||
// Before returning to the caller, we'll wait for the roundRobinHandler
|
||||
// to signal us that the SyncManager has correctly updated its internal
|
||||
// state after handling the stale active syncer.
|
||||
select {
|
||||
case <-done:
|
||||
case <-m.quit:
|
||||
}
|
||||
}
|
||||
|
||||
// roundRobinHandler is the SyncManager's event loop responsible for managing
|
||||
// the round-robin queue of our active syncers to ensure they don't overlap and
|
||||
// request the same set of channels, which significantly reduces bandwidth
|
||||
// usage.
|
||||
//
|
||||
// NOTE: This must be run as a goroutine.
|
||||
func (m *SyncManager) roundRobinHandler() {
|
||||
defer m.wg.Done()
|
||||
|
||||
defer m.cfg.ActiveSyncerTimeoutTicker.Stop()
|
||||
|
||||
var (
|
||||
// current will hold the current active syncer we're waiting for
|
||||
// to complete its state machine.
|
||||
current *GossipSyncer
|
||||
|
||||
// transitionNext will be responsible for containing the signal
|
||||
// of when the current active syncer has completed its state
|
||||
// machine. This signal allows us to transition the next pending
|
||||
// active syncer, if any.
|
||||
transitionNext chan struct{}
|
||||
)
|
||||
|
||||
// transitionNextSyncer is a helper closure that we'll use to transition
|
||||
// the next syncer queued up. If there aren't any, this will act as a
|
||||
// NOP.
|
||||
transitionNextSyncer := func() {
|
||||
m.Lock()
|
||||
current = m.nextPendingActiveSyncer()
|
||||
m.Unlock()
|
||||
for current != nil {
|
||||
// Ensure we properly handle a shutdown signal.
|
||||
select {
|
||||
case <-m.quit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// We'll avoid performing the transition with the lock
|
||||
// as it can potentially stall the SyncManager due to
|
||||
// the syncTransitionTimeout.
|
||||
err := m.transitionPassiveSyncer(current)
|
||||
// If we timed out attempting to transition the syncer,
|
||||
// we'll re-queue it to retry at a later time and move
|
||||
// on to the next.
|
||||
if err == ErrSyncTransitionTimeout {
|
||||
log.Debugf("Timed out attempting to "+
|
||||
"transition pending active "+
|
||||
"GossipSyncer(%x)", current.cfg.peerPub)
|
||||
|
||||
m.Lock()
|
||||
m.queueActiveSyncer(current)
|
||||
current = m.nextPendingActiveSyncer()
|
||||
m.Unlock()
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("Unable to transition pending "+
|
||||
"active GossipSyncer(%x): %v",
|
||||
current.cfg.peerPub, err)
|
||||
|
||||
m.Lock()
|
||||
current = m.nextPendingActiveSyncer()
|
||||
m.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// The transition succeeded, so we'll set our signal to
|
||||
// know when we should attempt to transition the next
|
||||
// pending active syncer in our queue.
|
||||
transitionNext = current.ResetSyncedSignal()
|
||||
m.cfg.ActiveSyncerTimeoutTicker.Resume()
|
||||
return
|
||||
}
|
||||
|
||||
transitionNext = nil
|
||||
m.cfg.ActiveSyncerTimeoutTicker.Pause()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
// A new active syncer signal has been received, which indicates
|
||||
// a new pending active syncer has been added to our queue.
|
||||
// We'll only attempt to transition it now if we're not already
|
||||
// in the middle of transitioning another one. We do this to
|
||||
// ensure we don't overlap when requesting channels from
|
||||
// different peers.
|
||||
case <-m.newActiveSyncers:
|
||||
if current == nil {
|
||||
transitionNextSyncer()
|
||||
}
|
||||
|
||||
// A stale active syncer has been received, so we'll need to
|
||||
// remove them from our queue. If we are currently waiting for
|
||||
// its state machine to complete, we'll move on to the next
|
||||
// active syncer in the queue.
|
||||
case staleActiveSyncer := <-m.staleActiveSyncers:
|
||||
s := staleActiveSyncer.syncer
|
||||
|
||||
m.Lock()
|
||||
// If the syncer has transitioned from an ActiveSync
|
||||
// type, rather than disconnecting, we'll include it in
|
||||
// the set of inactive syncers.
|
||||
if staleActiveSyncer.transitioned {
|
||||
m.inactiveSyncers[s.cfg.peerPub] = s
|
||||
} else {
|
||||
// Otherwise, since the peer is disconnecting,
|
||||
// we'll attempt to find a passive syncer that
|
||||
// can replace it.
|
||||
newActiveSyncer := m.chooseRandomSyncer(nil, false)
|
||||
if newActiveSyncer != nil {
|
||||
m.queueActiveSyncer(newActiveSyncer)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the internal active syncer references for this
|
||||
// peer.
|
||||
delete(m.pendingActiveSyncers, s.cfg.peerPub)
|
||||
delete(m.activeSyncers, s.cfg.peerPub)
|
||||
m.Unlock()
|
||||
|
||||
// Signal to the caller that they can now proceed since
|
||||
// the SyncManager's state correctly reflects the
|
||||
// stale active syncer.
|
||||
close(staleActiveSyncer.done)
|
||||
|
||||
// If we're not currently waiting for an active syncer
|
||||
// to reach its terminal state, or if we are but we are
|
||||
// currently waiting for the peer being
|
||||
// disconnected/transitioned, then we'll move on to the
|
||||
// next active syncer in our queue.
|
||||
if current == nil || (current != nil &&
|
||||
current.cfg.peerPub == s.cfg.peerPub) {
|
||||
transitionNextSyncer()
|
||||
}
|
||||
|
||||
// Our current active syncer has reached its terminal
|
||||
// chansSynced state, so we'll proceed to transitioning the next
|
||||
// pending active syncer if there is one.
|
||||
case <-transitionNext:
|
||||
transitionNextSyncer()
|
||||
|
||||
// We've timed out waiting for the current active syncer to
|
||||
// reach its terminal chansSynced state, so we'll just
|
||||
// move on to the next and avoid retrying as its already been
|
||||
// transitioned.
|
||||
case <-m.cfg.ActiveSyncerTimeoutTicker.Ticks():
|
||||
log.Warnf("Timed out waiting for GossipSyncer(%x) to "+
|
||||
"be fully synced", current.cfg.peerPub)
|
||||
transitionNextSyncer()
|
||||
|
||||
case <-m.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// queueActiveSyncer queues the given pending active gossip syncer to the end of
|
||||
// the round-robin queue.
|
||||
func (m *SyncManager) queueActiveSyncer(s *GossipSyncer) {
|
||||
log.Debugf("Queueing next pending active GossipSyncer(%x)",
|
||||
s.cfg.peerPub)
|
||||
|
||||
delete(m.inactiveSyncers, s.cfg.peerPub)
|
||||
m.pendingActiveSyncers[s.cfg.peerPub] = s
|
||||
m.pendingActiveSyncerQueue.PushBack(s)
|
||||
}
|
||||
|
||||
// nextPendingActiveSyncer returns the next active syncer pending to be
|
||||
// transitioned. If there aren't any, then `nil` is returned.
|
||||
func (m *SyncManager) nextPendingActiveSyncer() *GossipSyncer {
|
||||
next := m.pendingActiveSyncerQueue.Front()
|
||||
for next != nil {
|
||||
s := m.pendingActiveSyncerQueue.Remove(next).(*GossipSyncer)
|
||||
|
||||
// If the next pending active syncer is no longer in our lookup
|
||||
// map, then the corresponding peer has disconnected, so we'll
|
||||
// skip them.
|
||||
if _, ok := m.pendingActiveSyncers[s.cfg.peerPub]; !ok {
|
||||
next = m.pendingActiveSyncerQueue.Front()
|
||||
continue
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rotateActiveSyncerCandidate rotates a single active syncer. In order to
|
||||
// achieve this, the active syncer must be in a chansSynced state in order to
|
||||
// process the sync transition.
|
||||
@ -478,21 +201,24 @@ func (m *SyncManager) rotateActiveSyncerCandidate() {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)",
|
||||
activeSyncer.cfg.peerPub, candidate.cfg.peerPub)
|
||||
|
||||
// Otherwise, we'll attempt to transition each syncer to their
|
||||
// respective new sync type. We'll avoid performing the transition with
|
||||
// the lock as it can potentially stall the SyncManager due to the
|
||||
// syncTransitionTimeout.
|
||||
if err := m.transitionActiveSyncer(activeSyncer); err != nil {
|
||||
log.Errorf("Unable to transition active "+
|
||||
"GossipSyncer(%x): %v", activeSyncer.cfg.peerPub, err)
|
||||
log.Errorf("Unable to transition active GossipSyncer(%x): %v",
|
||||
activeSyncer.cfg.peerPub, err)
|
||||
return
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.queueActiveSyncer(candidate)
|
||||
m.Unlock()
|
||||
|
||||
m.signalNewActiveSyncer()
|
||||
if err := m.transitionPassiveSyncer(candidate); err != nil {
|
||||
log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
|
||||
activeSyncer.cfg.peerPub, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// transitionActiveSyncer transitions an active syncer to a passive one.
|
||||
@ -504,7 +230,10 @@ func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
m.signalStaleActiveSyncer(s, true)
|
||||
m.Lock()
|
||||
delete(m.activeSyncers, s.cfg.peerPub)
|
||||
m.inactiveSyncers[s.cfg.peerPub] = s
|
||||
m.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -519,8 +248,8 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error {
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
delete(m.inactiveSyncers, s.cfg.peerPub)
|
||||
m.activeSyncers[s.cfg.peerPub] = s
|
||||
delete(m.pendingActiveSyncers, s.cfg.peerPub)
|
||||
m.Unlock()
|
||||
|
||||
return nil
|
||||
@ -612,10 +341,13 @@ func (m *SyncManager) chooseRandomSyncer(blacklist map[routing.Vertex]struct{},
|
||||
//
|
||||
// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer.
|
||||
func (m *SyncManager) InitSyncState(peer lnpeer.Peer) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// If we already have a syncer, then we'll exit early as we don't want
|
||||
// to override it.
|
||||
nodeID := routing.Vertex(peer.PubKey())
|
||||
if _, ok := m.GossipSyncer(nodeID); ok {
|
||||
if _, ok := m.gossipSyncer(nodeID); ok {
|
||||
return
|
||||
}
|
||||
|
||||
@ -634,16 +366,22 @@ func (m *SyncManager) InitSyncState(peer lnpeer.Peer) {
|
||||
},
|
||||
})
|
||||
|
||||
// Gossip syncers are initialized by default as passive and in a
|
||||
// chansSynced state so that they can reply to any peer queries or
|
||||
// If we've yet to reach our desired number of active syncers, then
|
||||
// we'll use this one.
|
||||
if len(m.activeSyncers) < m.cfg.NumActiveSyncers {
|
||||
s.setSyncType(ActiveSync)
|
||||
m.activeSyncers[s.cfg.peerPub] = s
|
||||
} else {
|
||||
s.setSyncType(PassiveSync)
|
||||
m.inactiveSyncers[s.cfg.peerPub] = s
|
||||
}
|
||||
|
||||
// Gossip syncers are initialized by default in a chansSynced state so
|
||||
// that they can reply to any peer queries or
|
||||
// handle any sync transitions.
|
||||
s.setSyncType(PassiveSync)
|
||||
s.setSyncState(chansSynced)
|
||||
s.Start()
|
||||
|
||||
m.Lock()
|
||||
m.inactiveSyncers[nodeID] = s
|
||||
|
||||
// We'll force a historical sync with the first peer we connect to
|
||||
// ensure we get as much of the graph as possible.
|
||||
var err error
|
||||
@ -661,17 +399,6 @@ func (m *SyncManager) InitSyncState(peer lnpeer.Peer) {
|
||||
// different peer.
|
||||
m.historicalSync = sync.Once{}
|
||||
}
|
||||
|
||||
// If we've yet to reach our desired number of active syncers, then
|
||||
// we'll use this one.
|
||||
numActiveSyncers := len(m.activeSyncers) + len(m.pendingActiveSyncers)
|
||||
if numActiveSyncers < m.cfg.NumActiveSyncers {
|
||||
m.queueActiveSyncer(s)
|
||||
m.Unlock()
|
||||
m.signalNewActiveSyncer()
|
||||
return
|
||||
}
|
||||
m.Unlock()
|
||||
}
|
||||
|
||||
// PruneSyncState is called by outside sub-systems once a peer that we were
|
||||
@ -695,11 +422,24 @@ func (m *SyncManager) PruneSyncState(peer routing.Vertex) {
|
||||
m.Unlock()
|
||||
return
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
// Otherwise, we'll need to dequeue it from our pending active syncers
|
||||
// queue and find a new one to replace it, if any.
|
||||
m.signalStaleActiveSyncer(s, false)
|
||||
delete(m.activeSyncers, s.cfg.peerPub)
|
||||
newActiveSyncer := m.chooseRandomSyncer(nil, false)
|
||||
m.Unlock()
|
||||
if newActiveSyncer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil {
|
||||
log.Errorf("Unable to transition passive GossipSyncer(%x): %v",
|
||||
newActiveSyncer.cfg.peerPub, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)",
|
||||
peer, newActiveSyncer.cfg.peerPub)
|
||||
}
|
||||
|
||||
// GossipSyncer returns the associated gossip syncer of a peer. The boolean
|
||||
@ -717,10 +457,6 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) {
|
||||
if ok {
|
||||
return syncer, true
|
||||
}
|
||||
syncer, ok = m.pendingActiveSyncers[peer]
|
||||
if ok {
|
||||
return syncer, true
|
||||
}
|
||||
syncer, ok = m.activeSyncers[peer]
|
||||
if ok {
|
||||
return syncer, true
|
||||
@ -733,16 +469,12 @@ func (m *SyncManager) GossipSyncers() map[routing.Vertex]*GossipSyncer {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) +
|
||||
len(m.inactiveSyncers)
|
||||
numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers)
|
||||
syncers := make(map[routing.Vertex]*GossipSyncer, numSyncers)
|
||||
|
||||
for _, syncer := range m.inactiveSyncers {
|
||||
syncers[syncer.cfg.peerPub] = syncer
|
||||
}
|
||||
for _, syncer := range m.pendingActiveSyncers {
|
||||
syncers[syncer.cfg.peerPub] = syncer
|
||||
}
|
||||
for _, syncer := range m.activeSyncers {
|
||||
syncers[syncer.cfg.peerPub] = syncer
|
||||
}
|
||||
|
@ -30,11 +30,10 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer {
|
||||
func newTestSyncManager(numActiveSyncers int) *SyncManager {
|
||||
hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight}
|
||||
return newSyncManager(&SyncManagerCfg{
|
||||
ChanSeries: newMockChannelGraphTimeSeries(hID),
|
||||
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||
ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout),
|
||||
NumActiveSyncers: numActiveSyncers,
|
||||
ChanSeries: newMockChannelGraphTimeSeries(hID),
|
||||
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
|
||||
NumActiveSyncers: numActiveSyncers,
|
||||
})
|
||||
}
|
||||
|
||||
@ -56,15 +55,14 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) {
|
||||
// should be active and passive to check them later on.
|
||||
for i := 0; i < numActiveSyncers; i++ {
|
||||
peer := randPeer(t, syncMgr.quit)
|
||||
syncMgr.InitSyncState(peer)
|
||||
go syncMgr.InitSyncState(peer)
|
||||
|
||||
// The first syncer registered always attempts a historical
|
||||
// sync.
|
||||
assertActiveGossipTimestampRange(t, peer)
|
||||
if i == 0 {
|
||||
assertTransitionToChansSynced(t, syncMgr, peer, true)
|
||||
assertTransitionToChansSynced(t, syncMgr, peer)
|
||||
}
|
||||
|
||||
assertPassiveSyncerTransition(t, syncMgr, peer)
|
||||
assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync)
|
||||
}
|
||||
|
||||
@ -88,9 +86,10 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
|
||||
// peer1 will represent an active syncer that performs a historical
|
||||
// sync since it is the first registered peer with the SyncManager.
|
||||
peer1 := randPeer(t, syncMgr.quit)
|
||||
syncMgr.InitSyncState(peer1)
|
||||
assertTransitionToChansSynced(t, syncMgr, peer1, true)
|
||||
assertPassiveSyncerTransition(t, syncMgr, peer1)
|
||||
go syncMgr.InitSyncState(peer1)
|
||||
assertActiveGossipTimestampRange(t, peer1)
|
||||
assertTransitionToChansSynced(t, syncMgr, peer1)
|
||||
assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync)
|
||||
|
||||
// It will then be torn down to simulate a disconnection. Since there
|
||||
// are no other candidate syncers available, the active syncer won't be
|
||||
@ -100,8 +99,9 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
|
||||
// Then, we'll start our active syncer again, but this time we'll also
|
||||
// have a passive syncer available to replace the active syncer after
|
||||
// the peer disconnects.
|
||||
syncMgr.InitSyncState(peer1)
|
||||
assertPassiveSyncerTransition(t, syncMgr, peer1)
|
||||
go syncMgr.InitSyncState(peer1)
|
||||
assertActiveGossipTimestampRange(t, peer1)
|
||||
assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync)
|
||||
|
||||
// Create our second peer, which should be initialized as a passive
|
||||
// syncer.
|
||||
@ -111,7 +111,7 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) {
|
||||
|
||||
// Disconnect our active syncer, which should trigger the SyncManager to
|
||||
// replace it with our passive syncer.
|
||||
syncMgr.PruneSyncState(peer1.PubKey())
|
||||
go syncMgr.PruneSyncState(peer1.PubKey())
|
||||
assertPassiveSyncerTransition(t, syncMgr, peer2)
|
||||
}
|
||||
|
||||
@ -127,9 +127,10 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) {
|
||||
|
||||
// The first syncer registered always performs a historical sync.
|
||||
activeSyncPeer := randPeer(t, syncMgr.quit)
|
||||
syncMgr.InitSyncState(activeSyncPeer)
|
||||
assertTransitionToChansSynced(t, syncMgr, activeSyncPeer, true)
|
||||
assertPassiveSyncerTransition(t, syncMgr, activeSyncPeer)
|
||||
go syncMgr.InitSyncState(activeSyncPeer)
|
||||
assertActiveGossipTimestampRange(t, activeSyncPeer)
|
||||
assertTransitionToChansSynced(t, syncMgr, activeSyncPeer)
|
||||
assertSyncerStatus(t, syncMgr, activeSyncPeer, chansSynced, ActiveSync)
|
||||
|
||||
// We'll send a tick to force a rotation. Since there aren't any
|
||||
// candidates, none of the active syncers will be rotated.
|
||||
@ -197,192 +198,6 @@ func TestSyncManagerHistoricalSync(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestSyncManagerRoundRobinQueue ensures that any subsequent active syncers can
|
||||
// only be started after the previous one has completed its state machine.
|
||||
func TestSyncManagerRoundRobinQueue(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const numActiveSyncers = 3
|
||||
|
||||
// We'll start by creating our sync manager with support for three
|
||||
// active syncers.
|
||||
syncMgr := newTestSyncManager(numActiveSyncers)
|
||||
syncMgr.Start()
|
||||
defer syncMgr.Stop()
|
||||
|
||||
peers := make([]*mockPeer, 0, numActiveSyncers)
|
||||
|
||||
// The first syncer registered always attempts a historical sync.
|
||||
firstPeer := randPeer(t, syncMgr.quit)
|
||||
syncMgr.InitSyncState(firstPeer)
|
||||
peers = append(peers, firstPeer)
|
||||
assertTransitionToChansSynced(t, syncMgr, firstPeer, true)
|
||||
|
||||
// After completing the historical sync, a sync transition to ActiveSync
|
||||
// should happen. It should transition immediately since it has no
|
||||
// dependents.
|
||||
assertActiveGossipTimestampRange(t, firstPeer)
|
||||
|
||||
// We'll create the remaining numActiveSyncers. These will be queued in
|
||||
// the round robin since the first syncer has yet to reach chansSynced.
|
||||
queuedPeers := make([]*mockPeer, 0, numActiveSyncers-1)
|
||||
for i := 0; i < numActiveSyncers-1; i++ {
|
||||
peer := randPeer(t, syncMgr.quit)
|
||||
syncMgr.InitSyncState(peer)
|
||||
peers = append(peers, peer)
|
||||
queuedPeers = append(queuedPeers, peer)
|
||||
}
|
||||
|
||||
// Ensure they cannot transition without sending a GossipTimestampRange
|
||||
// message first.
|
||||
for _, peer := range queuedPeers {
|
||||
assertNoMsgSent(t, peer)
|
||||
}
|
||||
|
||||
// Transition the first syncer to chansSynced, which should allow the
|
||||
// second to transition next.
|
||||
assertTransitionToChansSynced(t, syncMgr, firstPeer, false)
|
||||
|
||||
// assertSyncerTransitioned ensures the target peer's syncer is the only
|
||||
// that has transitioned.
|
||||
assertSyncerTransitioned := func(target *mockPeer) {
|
||||
t.Helper()
|
||||
|
||||
for _, peer := range peers {
|
||||
if peer.PubKey() != target.PubKey() {
|
||||
assertNoMsgSent(t, peer)
|
||||
continue
|
||||
}
|
||||
|
||||
assertActiveGossipTimestampRange(t, target)
|
||||
}
|
||||
}
|
||||
|
||||
// For each queued syncer, we'll ensure they have transitioned to an
|
||||
// ActiveSync type and reached their final chansSynced state to allow
|
||||
// the next one to transition.
|
||||
for _, peer := range queuedPeers {
|
||||
assertSyncerTransitioned(peer)
|
||||
assertTransitionToChansSynced(t, syncMgr, peer, false)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSyncManagerRoundRobinTimeout ensures that if we timeout while waiting for
|
||||
// an active syncer to reach its final chansSynced state, then we will go on to
|
||||
// start the next.
|
||||
func TestSyncManagerRoundRobinTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create our sync manager with support for two active syncers.
|
||||
syncMgr := newTestSyncManager(2)
|
||||
syncMgr.Start()
|
||||
defer syncMgr.Stop()
|
||||
|
||||
// peer1 will be the first peer we start, which will time out and cause
|
||||
// peer2 to start.
|
||||
peer1 := randPeer(t, syncMgr.quit)
|
||||
peer2 := randPeer(t, syncMgr.quit)
|
||||
|
||||
// The first syncer registered always attempts a historical sync.
|
||||
syncMgr.InitSyncState(peer1)
|
||||
assertTransitionToChansSynced(t, syncMgr, peer1, true)
|
||||
|
||||
// We assume the syncer for peer1 has transitioned once we see it send a
|
||||
// lnwire.GossipTimestampRange message.
|
||||
assertActiveGossipTimestampRange(t, peer1)
|
||||
|
||||
// We'll then create the syncer for peer2. This should cause it to be
|
||||
// queued so that it starts once the syncer for peer1 is done.
|
||||
syncMgr.InitSyncState(peer2)
|
||||
assertNoMsgSent(t, peer2)
|
||||
|
||||
// Send a force tick to pretend the sync manager has timed out waiting
|
||||
// for peer1's syncer to reach chansSynced.
|
||||
syncMgr.cfg.ActiveSyncerTimeoutTicker.(*ticker.Force).Force <- time.Time{}
|
||||
|
||||
// Finally, ensure that the syncer for peer2 has transitioned.
|
||||
assertActiveGossipTimestampRange(t, peer2)
|
||||
}
|
||||
|
||||
// TestSyncManagerRoundRobinStaleSyncer ensures that any stale active syncers we
|
||||
// are currently waiting for or are queued up to start are properly removed and
|
||||
// stopped.
|
||||
func TestSyncManagerRoundRobinStaleSyncer(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const numActiveSyncers = 4
|
||||
|
||||
// We'll create and start our sync manager with some active syncers.
|
||||
syncMgr := newTestSyncManager(numActiveSyncers)
|
||||
syncMgr.Start()
|
||||
defer syncMgr.Stop()
|
||||
|
||||
peers := make([]*mockPeer, 0, numActiveSyncers)
|
||||
|
||||
// The first syncer registered always attempts a historical sync.
|
||||
firstPeer := randPeer(t, syncMgr.quit)
|
||||
syncMgr.InitSyncState(firstPeer)
|
||||
peers = append(peers, firstPeer)
|
||||
assertTransitionToChansSynced(t, syncMgr, firstPeer, true)
|
||||
|
||||
// After completing the historical sync, a sync transition to ActiveSync
|
||||
// should happen. It should transition immediately since it has no
|
||||
// dependents.
|
||||
assertActiveGossipTimestampRange(t, firstPeer)
|
||||
assertMsgSent(t, firstPeer, &lnwire.QueryChannelRange{
|
||||
FirstBlockHeight: startHeight,
|
||||
NumBlocks: math.MaxUint32 - startHeight,
|
||||
})
|
||||
|
||||
// We'll create the remaining numActiveSyncers. These will be queued in
|
||||
// the round robin since the first syncer has yet to reach chansSynced.
|
||||
queuedPeers := make([]*mockPeer, 0, numActiveSyncers-1)
|
||||
for i := 0; i < numActiveSyncers-1; i++ {
|
||||
peer := randPeer(t, syncMgr.quit)
|
||||
syncMgr.InitSyncState(peer)
|
||||
peers = append(peers, peer)
|
||||
queuedPeers = append(queuedPeers, peer)
|
||||
}
|
||||
|
||||
// Ensure they cannot transition without sending a GossipTimestampRange
|
||||
// message first.
|
||||
for _, peer := range queuedPeers {
|
||||
assertNoMsgSent(t, peer)
|
||||
}
|
||||
|
||||
// assertSyncerTransitioned ensures the target peer's syncer is the only
|
||||
// that has transitioned.
|
||||
assertSyncerTransitioned := func(target *mockPeer) {
|
||||
t.Helper()
|
||||
|
||||
for _, peer := range peers {
|
||||
if peer.PubKey() != target.PubKey() {
|
||||
assertNoMsgSent(t, peer)
|
||||
continue
|
||||
}
|
||||
|
||||
assertPassiveSyncerTransition(t, syncMgr, target)
|
||||
}
|
||||
}
|
||||
|
||||
// We'll then remove the syncers in the middle to cover the case where
|
||||
// they are queued up in the sync manager's pending list.
|
||||
for i, peer := range peers {
|
||||
if i == 0 || i == len(peers)-1 {
|
||||
continue
|
||||
}
|
||||
|
||||
syncMgr.PruneSyncState(peer.PubKey())
|
||||
}
|
||||
|
||||
// We'll then remove the syncer we are currently waiting for. This
|
||||
// should prompt the last syncer to start since it is the only one left
|
||||
// pending. We'll do this in a goroutine since the peer behind the new
|
||||
// active syncer will need to send out its new GossipTimestampRange.
|
||||
go syncMgr.PruneSyncState(peers[0].PubKey())
|
||||
assertSyncerTransitioned(peers[len(peers)-1])
|
||||
}
|
||||
|
||||
// assertNoMsgSent is a helper function that ensures a peer hasn't sent any
|
||||
// messages.
|
||||
func assertNoMsgSent(t *testing.T, peer *mockPeer) {
|
||||
@ -480,7 +295,7 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer,
|
||||
// assertTransitionToChansSynced asserts the transition of an ActiveSync
|
||||
// GossipSyncer to its final chansSynced state.
|
||||
func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager,
|
||||
peer *mockPeer, historicalSync bool) {
|
||||
peer *mockPeer) {
|
||||
|
||||
t.Helper()
|
||||
|
||||
@ -489,13 +304,9 @@ func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager,
|
||||
t.Fatalf("gossip syncer for peer %x not found", peer.PubKey())
|
||||
}
|
||||
|
||||
firstBlockHeight := uint32(startHeight)
|
||||
if historicalSync {
|
||||
firstBlockHeight = 0
|
||||
}
|
||||
assertMsgSent(t, peer, &lnwire.QueryChannelRange{
|
||||
FirstBlockHeight: firstBlockHeight,
|
||||
NumBlocks: math.MaxUint32 - firstBlockHeight,
|
||||
FirstBlockHeight: 0,
|
||||
NumBlocks: math.MaxUint32,
|
||||
})
|
||||
|
||||
s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil)
|
||||
@ -531,7 +342,7 @@ func assertPassiveSyncerTransition(t *testing.T, syncMgr *SyncManager,
|
||||
t.Helper()
|
||||
|
||||
assertActiveGossipTimestampRange(t, peer)
|
||||
assertTransitionToChansSynced(t, syncMgr, peer, false)
|
||||
assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync)
|
||||
}
|
||||
|
||||
// assertActiveSyncerTransition asserts that a gossip syncer goes through all of
|
||||
|
33
server.go
33
server.go
@ -673,23 +673,22 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
||||
}
|
||||
|
||||
s.authGossiper = discovery.New(discovery.Config{
|
||||
Router: s.chanRouter,
|
||||
Notifier: s.cc.chainNotifier,
|
||||
ChainHash: *activeNetParams.GenesisHash,
|
||||
Broadcast: s.BroadcastMessage,
|
||||
ChanSeries: chanSeries,
|
||||
NotifyWhenOnline: s.NotifyWhenOnline,
|
||||
NotifyWhenOffline: s.NotifyWhenOffline,
|
||||
ProofMatureDelta: 0,
|
||||
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
|
||||
RetransmitDelay: time.Minute * 30,
|
||||
WaitingProofStore: waitingProofStore,
|
||||
MessageStore: gossipMessageStore,
|
||||
AnnSigner: s.nodeSigner,
|
||||
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
|
||||
ActiveSyncerTimeoutTicker: ticker.New(discovery.DefaultActiveSyncerTimeout),
|
||||
NumActiveSyncers: cfg.NumGraphSyncPeers,
|
||||
Router: s.chanRouter,
|
||||
Notifier: s.cc.chainNotifier,
|
||||
ChainHash: *activeNetParams.GenesisHash,
|
||||
Broadcast: s.BroadcastMessage,
|
||||
ChanSeries: chanSeries,
|
||||
NotifyWhenOnline: s.NotifyWhenOnline,
|
||||
NotifyWhenOffline: s.NotifyWhenOffline,
|
||||
ProofMatureDelta: 0,
|
||||
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
|
||||
RetransmitDelay: time.Minute * 30,
|
||||
WaitingProofStore: waitingProofStore,
|
||||
MessageStore: gossipMessageStore,
|
||||
AnnSigner: s.nodeSigner,
|
||||
RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval),
|
||||
HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval),
|
||||
NumActiveSyncers: cfg.NumGraphSyncPeers,
|
||||
},
|
||||
s.identityPriv.PubKey(),
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user