diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ca1e4685..337c8410 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -306,12 +306,11 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), syncMgr: newSyncManager(&SyncManagerCfg{ - ChainHash: cfg.ChainHash, - ChanSeries: cfg.ChanSeries, - RotateTicker: cfg.RotateTicker, - HistoricalSyncTicker: cfg.HistoricalSyncTicker, - ActiveSyncerTimeoutTicker: cfg.ActiveSyncerTimeoutTicker, - NumActiveSyncers: cfg.NumActiveSyncers, + ChainHash: cfg.ChainHash, + ChanSeries: cfg.ChanSeries, + RotateTicker: cfg.RotateTicker, + HistoricalSyncTicker: cfg.HistoricalSyncTicker, + NumActiveSyncers: cfg.NumActiveSyncers, }), } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 51d1aad4..ed99ec27 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -741,17 +741,16 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { c := make(chan struct{}) return c }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: waitingProofStore, - MessageStore: newMockMessageStore(), - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), - NumActiveSyncers: 3, - AnnSigner: &mockSigner{nodeKeyPriv1}, + Router: router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: 3, + AnnSigner: &mockSigner{nodeKeyPriv1}, }, nodeKeyPub1) if err := gossiper.Start(); err != nil { @@ -1480,20 +1479,19 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ - Notifier: ctx.gossiper.cfg.Notifier, - Broadcast: ctx.gossiper.cfg.Broadcast, - NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, - NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, - Router: ctx.gossiper.cfg.Router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, - MessageStore: ctx.gossiper.cfg.MessageStore, - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), - NumActiveSyncers: 3, + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, + NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, + MessageStore: ctx.gossiper.cfg.MessageStore, + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: 3, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 9e945193..84ba8636 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -1,6 +1,7 @@ package discovery import ( + "errors" "net" "sync" @@ -30,6 +31,7 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { select { case p.sentMsgs <- msg: case <-p.quit: + return errors.New("peer disconnected") } } diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 7cf11316..06022518 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -1,7 +1,6 @@ package discovery import ( - "container/list" "errors" "sync" "time" @@ -22,11 +21,6 @@ const ( // force a historical sync to ensure we have as much of the public // network as possible. DefaultHistoricalSyncInterval = time.Hour - - // DefaultActiveSyncerTimeout is the default timeout interval in which - // we'll wait until an active syncer has completed its state machine and - // reached its final chansSynced state. - DefaultActiveSyncerTimeout = 5 * time.Minute ) var ( @@ -36,21 +30,28 @@ var ( ErrSyncManagerExiting = errors.New("sync manager exiting") ) -// staleActiveSyncer is an internal message the SyncManager will use in order to -// handle a peer corresponding to an active syncer being disconnected. -type staleActiveSyncer struct { - // syncer is the active syncer to be removed. - syncer *GossipSyncer +// newSyncer in an internal message we'll use within the SyncManager to signal +// that we should create a GossipSyncer for a newly connected peer. +type newSyncer struct { + // peer is the newly connected peer. + peer lnpeer.Peer - // transitioned, if true, signals that the active GossipSyncer is stale - // due to being transitioned to a PassiveSync state. - transitioned bool + // doneChan serves as a signal to the caller that the SyncManager's + // internal state correctly reflects the stale active syncer. + doneChan chan struct{} +} - // done serves as a signal to the caller that the SyncManager's internal - // state correctly reflects the stale active syncer. This is needed to - // ensure we always create a new syncer for a flappy peer after they - // disconnect if they happened to be an active syncer. - done chan struct{} +// staleSyncer is an internal message we'll use within the SyncManager to signal +// that a peer has disconnected and its GossipSyncer should be removed. +type staleSyncer struct { + // peer is the peer that has disconnected. + peer routing.Vertex + + // doneChan serves as a signal to the caller that the SyncManager's + // internal state correctly reflects the stale active syncer. This is + // needed to ensure we always create a new syncer for a flappy peer + // after they disconnect if they happened to be an active syncer. + doneChan chan struct{} } // SyncManagerCfg contains all of the dependencies required for the SyncManager @@ -81,12 +82,6 @@ type SyncManagerCfg struct { // SyncManager when it should attempt a historical sync with a gossip // sync peer. HistoricalSyncTicker ticker.Ticker - - // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the - // SyncManager when it should attempt to start the next pending - // activeSyncer due to the current one not completing its state machine - // within the timeout. - ActiveSyncerTimeoutTicker ticker.Ticker } // SyncManager is a subsystem of the gossiper that manages the gossip syncers @@ -108,6 +103,18 @@ type SyncManager struct { // _once_ with a peer during the SyncManager's startup. historicalSync sync.Once + // newSyncers is a channel we'll use to process requests to create + // GossipSyncers for newly connected peers. + newSyncers chan *newSyncer + + // staleSyncers is a channel we'll use to process requests to tear down + // GossipSyncers for disconnected peers. + staleSyncers chan *staleSyncer + + // syncersMu guards the read and write access to the activeSyncers and + // inactiveSyncers maps below. + syncersMu sync.Mutex + // activeSyncers is the set of all syncers for which we are currently // receiving graph updates from. The number of possible active syncers // is bounded by NumActiveSyncers. @@ -117,26 +124,6 @@ type SyncManager struct { // currently receiving new graph updates from. inactiveSyncers map[routing.Vertex]*GossipSyncer - // pendingActiveSyncers is a map that tracks our set of pending active - // syncers. This map will be queried when choosing the next pending - // active syncer in the queue to ensure it is not stale. - pendingActiveSyncers map[routing.Vertex]*GossipSyncer - - // pendingActiveSyncerQueue is the list of active syncers which are - // pending to be started. Syncers will be added to this list through the - // newActiveSyncers and staleActiveSyncers channels. - pendingActiveSyncerQueue *list.List - - // newActiveSyncers is a channel that will serve as a signal to the - // roundRobinHandler to allow it to transition the next pending active - // syncer in the queue. - newActiveSyncers chan struct{} - - // staleActiveSyncers is a channel through which we'll send any stale - // active syncers that should be removed from the round-robin. - staleActiveSyncers chan *staleActiveSyncer - - sync.Mutex wg sync.WaitGroup quit chan struct{} } @@ -144,25 +131,22 @@ type SyncManager struct { // newSyncManager constructs a new SyncManager backed by the given config. func newSyncManager(cfg *SyncManagerCfg) *SyncManager { return &SyncManager{ - cfg: *cfg, + cfg: *cfg, + newSyncers: make(chan *newSyncer), + staleSyncers: make(chan *staleSyncer), activeSyncers: make( map[routing.Vertex]*GossipSyncer, cfg.NumActiveSyncers, ), - inactiveSyncers: make(map[routing.Vertex]*GossipSyncer), - pendingActiveSyncers: make(map[routing.Vertex]*GossipSyncer), - pendingActiveSyncerQueue: list.New(), - newActiveSyncers: make(chan struct{}), - staleActiveSyncers: make(chan *staleActiveSyncer), - quit: make(chan struct{}), + inactiveSyncers: make(map[routing.Vertex]*GossipSyncer), + quit: make(chan struct{}), } } // Start starts the SyncManager in order to properly carry out its duties. func (m *SyncManager) Start() { m.start.Do(func() { - m.wg.Add(2) + m.wg.Add(1) go m.syncerHandler() - go m.roundRobinHandler() }) } @@ -172,15 +156,9 @@ func (m *SyncManager) Stop() { close(m.quit) m.wg.Wait() - m.Lock() - defer m.Unlock() - for _, syncer := range m.inactiveSyncers { syncer.Stop() } - for _, syncer := range m.pendingActiveSyncers { - syncer.Stop() - } for _, syncer := range m.activeSyncers { syncer.Stop() } @@ -189,11 +167,13 @@ func (m *SyncManager) Stop() { // syncerHandler is the SyncManager's main event loop responsible for: // -// 1. Finding new peers to receive graph updates from to ensure we don't only -// receive them from the same set of peers. -// -// 2. Finding new peers to force a historical sync with to ensure we have as -// much of the public network as possible. +// 1. Creating and tearing down GossipSyncers for connected/disconnected peers. + +// 2. Finding new peers to receive graph updates from to ensure we don't only +// receive them from the same set of peers. + +// 3. Finding new peers to force a historical sync with to ensure we have as +// much of the public network as possible. // // NOTE: This must be run as a goroutine. func (m *SyncManager) syncerHandler() { @@ -205,8 +185,175 @@ func (m *SyncManager) syncerHandler() { m.cfg.HistoricalSyncTicker.Resume() defer m.cfg.HistoricalSyncTicker.Stop() + var ( + // attemptInitialHistoricalSync determines whether we should + // attempt an initial historical sync when a new peer connects. + attemptInitialHistoricalSync = true + + // initialHistoricalSyncCompleted serves as a barrier when + // initializing new active GossipSyncers. If false, the initial + // historical sync has not completed, so we'll defer + // initializing any active GossipSyncers. If true, then we can + // transition the GossipSyncer immediately. We set up this + // barrier to ensure we have most of the graph before attempting + // to accept new updates at tip. + initialHistoricalSyncCompleted = false + + // initialHistoricalSyncer is the syncer we are currently + // performing an initial historical sync with. + initialHistoricalSyncer *GossipSyncer + + // initialHistoricalSyncSignal is a signal that will fire once + // the intiial historical sync has been completed. This is + // crucial to ensure that another historical sync isn't + // attempted just because the initialHistoricalSyncer was + // disconnected. + initialHistoricalSyncSignal chan struct{} + ) + for { select { + // A new peer has been connected, so we'll create its + // accompanying GossipSyncer. + case newSyncer := <-m.newSyncers: + // If we already have a syncer, then we'll exit early as + // we don't want to override it. + if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok { + close(newSyncer.doneChan) + continue + } + + s := m.createGossipSyncer(newSyncer.peer) + + m.syncersMu.Lock() + switch { + // If we've exceeded our total number of active syncers, + // we'll initialize this GossipSyncer as passive. + case len(m.activeSyncers) >= m.cfg.NumActiveSyncers: + fallthrough + + // Otherwise, it should be initialized as active. If the + // initial historical sync has yet to complete, then + // we'll declare is as passive and attempt to transition + // it when the initial historical sync completes. + case !initialHistoricalSyncCompleted: + s.setSyncType(PassiveSync) + m.inactiveSyncers[s.cfg.peerPub] = s + + // The initial historical sync has completed, so we can + // immediately start the GossipSyncer as active. + default: + s.setSyncType(ActiveSync) + m.activeSyncers[s.cfg.peerPub] = s + } + m.syncersMu.Unlock() + + s.Start() + + // Once we create the GossipSyncer, we'll signal to the + // caller that they can proceed since the SyncManager's + // internal state has been updated. + close(newSyncer.doneChan) + + // We'll force a historical sync with the first peer we + // connect to, to ensure we get as much of the graph as + // possible. + if !attemptInitialHistoricalSync { + continue + } + + log.Debugf("Attempting initial historical sync with "+ + "GossipSyncer(%x)", s.cfg.peerPub) + + if err := s.historicalSync(); err != nil { + log.Errorf("Unable to attempt initial "+ + "historical sync with "+ + "GossipSyncer(%x): %v", s.cfg.peerPub, + err) + continue + } + + // Once the historical sync has started, we'll get a + // keep track of the corresponding syncer to properly + // handle disconnects. We'll also use a signal to know + // when the historical sync completed. + attemptInitialHistoricalSync = false + initialHistoricalSyncer = s + initialHistoricalSyncSignal = s.ResetSyncedSignal() + + // An existing peer has disconnected, so we'll tear down its + // corresponding GossipSyncer. + case staleSyncer := <-m.staleSyncers: + // Once the corresponding GossipSyncer has been stopped + // and removed, we'll signal to the caller that they can + // proceed since the SyncManager's internal state has + // been updated. + m.removeGossipSyncer(staleSyncer.peer) + close(staleSyncer.doneChan) + + // If we don't have an initialHistoricalSyncer, or we do + // but it is not the peer being disconnected, then we + // have nothing left to do and can proceed. + switch { + case initialHistoricalSyncer == nil: + fallthrough + case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub: + continue + } + + // Otherwise, our initialHistoricalSyncer corresponds to + // the peer being disconnected, so we'll have to find a + // replacement. + log.Debug("Finding replacement for intitial " + + "historical sync") + + s := m.forceHistoricalSync() + if s == nil { + log.Debug("No eligible replacement found " + + "for initial historical sync") + attemptInitialHistoricalSync = true + continue + } + + log.Debugf("Replaced initial historical "+ + "GossipSyncer(%v) with GossipSyncer(%x)", + staleSyncer.peer, s.cfg.peerPub) + + initialHistoricalSyncer = s + initialHistoricalSyncSignal = s.ResetSyncedSignal() + + // Our initial historical sync signal has completed, so we'll + // nil all of the relevant fields as they're no longer needed. + case <-initialHistoricalSyncSignal: + initialHistoricalSyncer = nil + initialHistoricalSyncSignal = nil + initialHistoricalSyncCompleted = true + + log.Debug("Initial historical sync completed") + + // With the initial historical sync complete, we can + // begin receiving new graph updates at tip. We'll + // determine whether we can have any more active + // GossipSyncers. If we do, we'll randomly select some + // that are currently passive to transition. + m.syncersMu.Lock() + numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers) + if numActiveLeft <= 0 { + m.syncersMu.Unlock() + continue + } + + log.Debugf("Attempting to transition %v passive "+ + "GossipSyncers to active", numActiveLeft) + + for i := 0; i < numActiveLeft; i++ { + chooseRandomSyncer( + m.inactiveSyncers, m.transitionPassiveSyncer, + ) + } + + m.syncersMu.Unlock() + // Our RotateTicker has ticked, so we'll attempt to rotate a // single active syncer with a passive one. case <-m.cfg.RotateTicker.Ticks(): @@ -223,402 +370,9 @@ func (m *SyncManager) syncerHandler() { } } -// signalNewActiveSyncer sends a signal to the roundRobinHandler to ensure it -// transitions any pending active syncers. -func (m *SyncManager) signalNewActiveSyncer() { - select { - case m.newActiveSyncers <- struct{}{}: - case <-m.quit: - } -} - -// signalStaleActiveSyncer removes the syncer for the given peer from the -// round-robin queue. -func (m *SyncManager) signalStaleActiveSyncer(s *GossipSyncer, transitioned bool) { - done := make(chan struct{}) - - select { - case m.staleActiveSyncers <- &staleActiveSyncer{ - syncer: s, - transitioned: transitioned, - done: done, - }: - case <-m.quit: - } - - // Before returning to the caller, we'll wait for the roundRobinHandler - // to signal us that the SyncManager has correctly updated its internal - // state after handling the stale active syncer. - select { - case <-done: - case <-m.quit: - } -} - -// roundRobinHandler is the SyncManager's event loop responsible for managing -// the round-robin queue of our active syncers to ensure they don't overlap and -// request the same set of channels, which significantly reduces bandwidth -// usage. -// -// NOTE: This must be run as a goroutine. -func (m *SyncManager) roundRobinHandler() { - defer m.wg.Done() - - defer m.cfg.ActiveSyncerTimeoutTicker.Stop() - - var ( - // current will hold the current active syncer we're waiting for - // to complete its state machine. - current *GossipSyncer - - // transitionNext will be responsible for containing the signal - // of when the current active syncer has completed its state - // machine. This signal allows us to transition the next pending - // active syncer, if any. - transitionNext chan struct{} - ) - - // transitionNextSyncer is a helper closure that we'll use to transition - // the next syncer queued up. If there aren't any, this will act as a - // NOP. - transitionNextSyncer := func() { - m.Lock() - current = m.nextPendingActiveSyncer() - m.Unlock() - for current != nil { - // Ensure we properly handle a shutdown signal. - select { - case <-m.quit: - return - default: - } - - // We'll avoid performing the transition with the lock - // as it can potentially stall the SyncManager due to - // the syncTransitionTimeout. - err := m.transitionPassiveSyncer(current) - // If we timed out attempting to transition the syncer, - // we'll re-queue it to retry at a later time and move - // on to the next. - if err == ErrSyncTransitionTimeout { - log.Debugf("Timed out attempting to "+ - "transition pending active "+ - "GossipSyncer(%x)", current.cfg.peerPub) - - m.Lock() - m.queueActiveSyncer(current) - current = m.nextPendingActiveSyncer() - m.Unlock() - continue - } - if err != nil { - log.Errorf("Unable to transition pending "+ - "active GossipSyncer(%x): %v", - current.cfg.peerPub, err) - - m.Lock() - current = m.nextPendingActiveSyncer() - m.Unlock() - continue - } - - // The transition succeeded, so we'll set our signal to - // know when we should attempt to transition the next - // pending active syncer in our queue. - transitionNext = current.ResetSyncedSignal() - m.cfg.ActiveSyncerTimeoutTicker.Resume() - return - } - - transitionNext = nil - m.cfg.ActiveSyncerTimeoutTicker.Pause() - } - - for { - select { - // A new active syncer signal has been received, which indicates - // a new pending active syncer has been added to our queue. - // We'll only attempt to transition it now if we're not already - // in the middle of transitioning another one. We do this to - // ensure we don't overlap when requesting channels from - // different peers. - case <-m.newActiveSyncers: - if current == nil { - transitionNextSyncer() - } - - // A stale active syncer has been received, so we'll need to - // remove them from our queue. If we are currently waiting for - // its state machine to complete, we'll move on to the next - // active syncer in the queue. - case staleActiveSyncer := <-m.staleActiveSyncers: - s := staleActiveSyncer.syncer - - m.Lock() - // If the syncer has transitioned from an ActiveSync - // type, rather than disconnecting, we'll include it in - // the set of inactive syncers. - if staleActiveSyncer.transitioned { - m.inactiveSyncers[s.cfg.peerPub] = s - } else { - // Otherwise, since the peer is disconnecting, - // we'll attempt to find a passive syncer that - // can replace it. - newActiveSyncer := m.chooseRandomSyncer(nil, false) - if newActiveSyncer != nil { - m.queueActiveSyncer(newActiveSyncer) - } - } - - // Remove the internal active syncer references for this - // peer. - delete(m.pendingActiveSyncers, s.cfg.peerPub) - delete(m.activeSyncers, s.cfg.peerPub) - m.Unlock() - - // Signal to the caller that they can now proceed since - // the SyncManager's state correctly reflects the - // stale active syncer. - close(staleActiveSyncer.done) - - // If we're not currently waiting for an active syncer - // to reach its terminal state, or if we are but we are - // currently waiting for the peer being - // disconnected/transitioned, then we'll move on to the - // next active syncer in our queue. - if current == nil || (current != nil && - current.cfg.peerPub == s.cfg.peerPub) { - transitionNextSyncer() - } - - // Our current active syncer has reached its terminal - // chansSynced state, so we'll proceed to transitioning the next - // pending active syncer if there is one. - case <-transitionNext: - transitionNextSyncer() - - // We've timed out waiting for the current active syncer to - // reach its terminal chansSynced state, so we'll just - // move on to the next and avoid retrying as its already been - // transitioned. - case <-m.cfg.ActiveSyncerTimeoutTicker.Ticks(): - log.Warnf("Timed out waiting for GossipSyncer(%x) to "+ - "be fully synced", current.cfg.peerPub) - transitionNextSyncer() - - case <-m.quit: - return - } - } -} - -// queueActiveSyncer queues the given pending active gossip syncer to the end of -// the round-robin queue. -func (m *SyncManager) queueActiveSyncer(s *GossipSyncer) { - log.Debugf("Queueing next pending active GossipSyncer(%x)", - s.cfg.peerPub) - - delete(m.inactiveSyncers, s.cfg.peerPub) - m.pendingActiveSyncers[s.cfg.peerPub] = s - m.pendingActiveSyncerQueue.PushBack(s) -} - -// nextPendingActiveSyncer returns the next active syncer pending to be -// transitioned. If there aren't any, then `nil` is returned. -func (m *SyncManager) nextPendingActiveSyncer() *GossipSyncer { - next := m.pendingActiveSyncerQueue.Front() - for next != nil { - s := m.pendingActiveSyncerQueue.Remove(next).(*GossipSyncer) - - // If the next pending active syncer is no longer in our lookup - // map, then the corresponding peer has disconnected, so we'll - // skip them. - if _, ok := m.pendingActiveSyncers[s.cfg.peerPub]; !ok { - next = m.pendingActiveSyncerQueue.Front() - continue - } - - return s - } - - return nil -} - -// rotateActiveSyncerCandidate rotates a single active syncer. In order to -// achieve this, the active syncer must be in a chansSynced state in order to -// process the sync transition. -func (m *SyncManager) rotateActiveSyncerCandidate() { - // If we don't have a candidate to rotate with, we can return early. - m.Lock() - candidate := m.chooseRandomSyncer(nil, false) - if candidate == nil { - m.Unlock() - log.Debug("No eligible candidate to rotate active syncer") - return - } - - // We'll choose an active syncer at random that's within a chansSynced - // state to rotate. - var activeSyncer *GossipSyncer - for _, s := range m.activeSyncers { - // The active syncer must be in a chansSynced state in order to - // process sync transitions. - if s.syncState() != chansSynced { - continue - } - - activeSyncer = s - break - } - m.Unlock() - - // If we couldn't find an eligible one, we can return early. - if activeSyncer == nil { - log.Debug("No eligible active syncer to rotate") - return - } - - // Otherwise, we'll attempt to transition each syncer to their - // respective new sync type. We'll avoid performing the transition with - // the lock as it can potentially stall the SyncManager due to the - // syncTransitionTimeout. - if err := m.transitionActiveSyncer(activeSyncer); err != nil { - log.Errorf("Unable to transition active "+ - "GossipSyncer(%x): %v", activeSyncer.cfg.peerPub, err) - return - } - - m.Lock() - m.queueActiveSyncer(candidate) - m.Unlock() - - m.signalNewActiveSyncer() -} - -// transitionActiveSyncer transitions an active syncer to a passive one. -func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error { - log.Debugf("Transitioning active GossipSyncer(%x) to passive", - s.cfg.peerPub) - - if err := s.ProcessSyncTransition(PassiveSync); err != nil { - return err - } - - m.signalStaleActiveSyncer(s, true) - - return nil -} - -// transitionPassiveSyncer transitions a passive syncer to an active one. -func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { - log.Debugf("Transitioning passive GossipSyncer(%x) to active", - s.cfg.peerPub) - - if err := s.ProcessSyncTransition(ActiveSync); err != nil { - return err - } - - m.Lock() - m.activeSyncers[s.cfg.peerPub] = s - delete(m.pendingActiveSyncers, s.cfg.peerPub) - m.Unlock() - - return nil -} - -// forceHistoricalSync chooses a syncer with a remote peer at random and forces -// a historical sync with it. -func (m *SyncManager) forceHistoricalSync() { - m.Lock() - defer m.Unlock() - - // We'll choose a random peer with whom we can perform a historical sync - // with. We'll set useActive to true to make sure we can still do one if - // we don't happen to have any non-active syncers. - candidatesChosen := make(map[routing.Vertex]struct{}) - s := m.chooseRandomSyncer(candidatesChosen, true) - for s != nil { - // Ensure we properly handle a shutdown signal. - select { - case <-m.quit: - return - default: - } - - // Blacklist the candidate to ensure it's not chosen again. - candidatesChosen[s.cfg.peerPub] = struct{}{} - - err := s.historicalSync() - if err == nil { - return - } - - log.Errorf("Unable to perform historical sync with "+ - "GossipSyncer(%x): %v", s.cfg.peerPub, err) - - s = m.chooseRandomSyncer(candidatesChosen, true) - } -} - -// chooseRandomSyncer returns a random non-active syncer that's eligible for a -// sync transition. A blacklist can be used to skip any previously chosen -// candidates. The useActive boolean can be used to also filter active syncers. -// -// NOTE: It's possible for a nil value to be returned if there are no eligible -// candidate syncers. -// -// NOTE: This method must be called with the syncersMtx lock held. -func (m *SyncManager) chooseRandomSyncer(blacklist map[routing.Vertex]struct{}, - useActive bool) *GossipSyncer { - - eligible := func(s *GossipSyncer) bool { - // Skip any syncers that exist within the blacklist. - if blacklist != nil { - if _, ok := blacklist[s.cfg.peerPub]; ok { - return false - } - } - - // Only syncers in a chansSynced state are viable for sync - // transitions, so skip any that aren't. - return s.syncState() == chansSynced - } - - for _, s := range m.inactiveSyncers { - if !eligible(s) { - continue - } - return s - } - - if useActive { - for _, s := range m.activeSyncers { - if !eligible(s) { - continue - } - return s - } - } - - return nil -} - -// InitSyncState is called by outside sub-systems when a connection is -// established to a new peer that understands how to perform channel range -// queries. We'll allocate a new GossipSyncer for it, and start any goroutines -// needed to handle new queries. The first GossipSyncer registered with the -// SyncManager will attempt a historical sync to ensure we have as much of the -// public channel graph as possible. -// -// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer. -func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { - // If we already have a syncer, then we'll exit early as we don't want - // to override it. +// createGossipSyncer creates the GossipSyncer for a newly connected peer. +func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { nodeID := routing.Vertex(peer.PubKey()) - if _, ok := m.GossipSyncer(nodeID); ok { - return - } - log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:]) encoding := lnwire.EncodingSortedPlain @@ -634,79 +388,226 @@ func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { }, }) - // Gossip syncers are initialized by default as passive and in a - // chansSynced state so that they can reply to any peer queries or + // Gossip syncers are initialized by default in a PassiveSync type + // and chansSynced state so that they can reply to any peer queries or // handle any sync transitions. - s.setSyncType(PassiveSync) s.setSyncState(chansSynced) - s.Start() - - m.Lock() - m.inactiveSyncers[nodeID] = s - - // We'll force a historical sync with the first peer we connect to - // ensure we get as much of the graph as possible. - var err error - m.historicalSync.Do(func() { - log.Infof("Attempting historical sync with GossipSyncer(%x)", - s.cfg.peerPub) - - err = s.historicalSync() - }) - if err != nil { - log.Errorf("Unable to perform historical sync with "+ - "GossipSyncer(%x): %v", s.cfg.peerPub, err) - - // Reset historicalSync to ensure it is tried again with a - // different peer. - m.historicalSync = sync.Once{} - } - - // If we've yet to reach our desired number of active syncers, then - // we'll use this one. - numActiveSyncers := len(m.activeSyncers) + len(m.pendingActiveSyncers) - if numActiveSyncers < m.cfg.NumActiveSyncers { - m.queueActiveSyncer(s) - m.Unlock() - m.signalNewActiveSyncer() - return - } - m.Unlock() + s.setSyncType(PassiveSync) + return s } -// PruneSyncState is called by outside sub-systems once a peer that we were -// previously connected to has been disconnected. In this case we can stop the -// existing GossipSyncer assigned to the peer and free up resources. -func (m *SyncManager) PruneSyncState(peer routing.Vertex) { - s, ok := m.GossipSyncer(peer) +// removeGossipSyncer removes all internal references to the disconnected peer's +// GossipSyncer and stops it. In the event of an active GossipSyncer being +// disconnected, a passive GossipSyncer, if any, will take its place. +func (m *SyncManager) removeGossipSyncer(peer routing.Vertex) { + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + + s, ok := m.gossipSyncer(peer) if !ok { return } log.Infof("Removing GossipSyncer for peer=%v", peer) - // We'll start by stopping the GossipSyncer for the disconnected peer. - s.Stop() + // We'll stop the GossipSyncer for the disconnected peer in a goroutine + // to prevent blocking the SyncManager. + go s.Stop() // If it's a non-active syncer, then we can just exit now. - m.Lock() - if _, ok := m.inactiveSyncers[s.cfg.peerPub]; ok { - delete(m.inactiveSyncers, s.cfg.peerPub) - m.Unlock() + if _, ok := m.inactiveSyncers[peer]; ok { + delete(m.inactiveSyncers, peer) return } - m.Unlock() - // Otherwise, we'll need to dequeue it from our pending active syncers - // queue and find a new one to replace it, if any. - m.signalStaleActiveSyncer(s, false) + // Otherwise, we'll need find a new one to replace it, if any. + delete(m.activeSyncers, peer) + newActiveSyncer := chooseRandomSyncer( + m.inactiveSyncers, m.transitionPassiveSyncer, + ) + if newActiveSyncer == nil { + return + } + + log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)", + peer, newActiveSyncer.cfg.peerPub) +} + +// rotateActiveSyncerCandidate rotates a single active syncer. In order to +// achieve this, the active syncer must be in a chansSynced state in order to +// process the sync transition. +func (m *SyncManager) rotateActiveSyncerCandidate() { + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + + // If we couldn't find an eligible active syncer to rotate, we can + // return early. + activeSyncer := chooseRandomSyncer(m.activeSyncers, nil) + if activeSyncer == nil { + log.Debug("No eligible active syncer to rotate") + return + } + + // Similarly, if we don't have a candidate to rotate with, we can return + // early as well. + candidate := chooseRandomSyncer(m.inactiveSyncers, nil) + if candidate == nil { + log.Debug("No eligible candidate to rotate active syncer") + return + } + + // Otherwise, we'll attempt to transition each syncer to their + // respective new sync type. + log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)", + activeSyncer.cfg.peerPub, candidate.cfg.peerPub) + + if err := m.transitionActiveSyncer(activeSyncer); err != nil { + log.Errorf("Unable to transition active GossipSyncer(%x): %v", + activeSyncer.cfg.peerPub, err) + return + } + + if err := m.transitionPassiveSyncer(candidate); err != nil { + log.Errorf("Unable to transition passive GossipSyncer(%x): %v", + activeSyncer.cfg.peerPub, err) + return + } +} + +// transitionActiveSyncer transitions an active syncer to a passive one. +// +// NOTE: This must be called with the syncersMu lock held. +func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error { + log.Debugf("Transitioning active GossipSyncer(%x) to passive", + s.cfg.peerPub) + + if err := s.ProcessSyncTransition(PassiveSync); err != nil { + return err + } + + delete(m.activeSyncers, s.cfg.peerPub) + m.inactiveSyncers[s.cfg.peerPub] = s + + return nil +} + +// transitionPassiveSyncer transitions a passive syncer to an active one. +// +// NOTE: This must be called with the syncersMu lock held. +func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { + log.Debugf("Transitioning passive GossipSyncer(%x) to active", + s.cfg.peerPub) + + if err := s.ProcessSyncTransition(ActiveSync); err != nil { + return err + } + + delete(m.inactiveSyncers, s.cfg.peerPub) + m.activeSyncers[s.cfg.peerPub] = s + + return nil +} + +// forceHistoricalSync chooses a syncer with a remote peer at random and forces +// a historical sync with it. +func (m *SyncManager) forceHistoricalSync() *GossipSyncer { + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + + // We'll sample from both sets of active and inactive syncers in the + // event that we don't have any inactive syncers. + return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error { + return s.historicalSync() + }) +} + +// chooseRandomSyncer iterates through the set of syncers given and returns the +// first one which was able to successfully perform the action enclosed in the +// function closure. +// +// NOTE: It's possible for a nil value to be returned if there are no eligible +// candidate syncers. +func chooseRandomSyncer(syncers map[routing.Vertex]*GossipSyncer, + action func(*GossipSyncer) error) *GossipSyncer { + + for _, s := range syncers { + // Only syncers in a chansSynced state are viable for sync + // transitions, so skip any that aren't. + if s.syncState() != chansSynced { + continue + } + + if action != nil { + if err := action(s); err != nil { + log.Debugf("Skipping eligible candidate "+ + "GossipSyncer(%x): %v", s.cfg.peerPub, + err) + continue + } + } + + return s + } + + return nil +} + +// InitSyncState is called by outside sub-systems when a connection is +// established to a new peer that understands how to perform channel range +// queries. We'll allocate a new GossipSyncer for it, and start any goroutines +// needed to handle new queries. The first GossipSyncer registered with the +// SyncManager will attempt a historical sync to ensure we have as much of the +// public channel graph as possible. +// +// TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer. +func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error { + done := make(chan struct{}) + + select { + case m.newSyncers <- &newSyncer{ + peer: peer, + doneChan: done, + }: + case <-m.quit: + return ErrSyncManagerExiting + } + + select { + case <-done: + return nil + case <-m.quit: + return ErrSyncManagerExiting + } +} + +// PruneSyncState is called by outside sub-systems once a peer that we were +// previously connected to has been disconnected. In this case we can stop the +// existing GossipSyncer assigned to the peer and free up resources. +func (m *SyncManager) PruneSyncState(peer routing.Vertex) { + done := make(chan struct{}) + + // We avoid returning an error when the SyncManager is stopped since the + // GossipSyncer will be stopped then anyway. + select { + case m.staleSyncers <- &staleSyncer{ + peer: peer, + doneChan: done, + }: + case <-m.quit: + return + } + + select { + case <-done: + case <-m.quit: + } } // GossipSyncer returns the associated gossip syncer of a peer. The boolean // returned signals whether there exists a gossip syncer for the peer. func (m *SyncManager) GossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() return m.gossipSyncer(peer) } @@ -717,10 +618,6 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { if ok { return syncer, true } - syncer, ok = m.pendingActiveSyncers[peer] - if ok { - return syncer, true - } syncer, ok = m.activeSyncers[peer] if ok { return syncer, true @@ -730,19 +627,19 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { // GossipSyncers returns all of the currently initialized gossip syncers. func (m *SyncManager) GossipSyncers() map[routing.Vertex]*GossipSyncer { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + return m.gossipSyncers() +} - numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) + - len(m.inactiveSyncers) +// gossipSyncers returns all of the currently initialized gossip syncers. +func (m *SyncManager) gossipSyncers() map[routing.Vertex]*GossipSyncer { + numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) syncers := make(map[routing.Vertex]*GossipSyncer, numSyncers) for _, syncer := range m.inactiveSyncers { syncers[syncer.cfg.peerPub] = syncer } - for _, syncer := range m.pendingActiveSyncers { - syncers[syncer.cfg.peerPub] = syncer - } for _, syncer := range m.activeSyncers { syncers[syncer.cfg.peerPub] = syncer } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index 7fee2b3e..b61253de 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -30,11 +30,10 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer { func newTestSyncManager(numActiveSyncers int) *SyncManager { hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight} return newSyncManager(&SyncManagerCfg{ - ChanSeries: newMockChannelGraphTimeSeries(hID), - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), - NumActiveSyncers: numActiveSyncers, + ChanSeries: newMockChannelGraphTimeSeries(hID), + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: numActiveSyncers, }) } @@ -57,21 +56,22 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { for i := 0; i < numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(peer) + s := assertSyncerExistence(t, syncMgr, peer) // The first syncer registered always attempts a historical // sync. if i == 0 { - assertTransitionToChansSynced(t, syncMgr, peer, true) + assertTransitionToChansSynced(t, s, peer) } - - assertPassiveSyncerTransition(t, syncMgr, peer) - assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync) + assertActiveGossipTimestampRange(t, peer) + assertSyncerStatus(t, s, chansSynced, ActiveSync) } for i := 0; i < numSyncers-numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(peer) - assertSyncerStatus(t, syncMgr, peer, chansSynced, PassiveSync) + s := assertSyncerExistence(t, syncMgr, peer) + assertSyncerStatus(t, s, chansSynced, PassiveSync) } } @@ -80,39 +80,52 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { t.Parallel() - // We'll create our test sync manager to only have one active syncer. - syncMgr := newTestSyncManager(1) + // We'll create our test sync manager to have two active syncers. + syncMgr := newTestSyncManager(2) syncMgr.Start() defer syncMgr.Stop() - // peer1 will represent an active syncer that performs a historical - // sync since it is the first registered peer with the SyncManager. - peer1 := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer1) - assertTransitionToChansSynced(t, syncMgr, peer1, true) - assertPassiveSyncerTransition(t, syncMgr, peer1) + // The first will be an active syncer that performs a historical sync + // since it is the first one registered with the SyncManager. + historicalSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(historicalSyncPeer) + historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer) + assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer) + assertActiveGossipTimestampRange(t, historicalSyncPeer) + assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync) + + // Then, we'll create the second active syncer, which is the one we'll + // disconnect. + activeSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // It will then be torn down to simulate a disconnection. Since there // are no other candidate syncers available, the active syncer won't be // replaced. - syncMgr.PruneSyncState(peer1.PubKey()) + syncMgr.PruneSyncState(activeSyncPeer.PubKey()) // Then, we'll start our active syncer again, but this time we'll also // have a passive syncer available to replace the active syncer after // the peer disconnects. - syncMgr.InitSyncState(peer1) - assertPassiveSyncerTransition(t, syncMgr, peer1) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer = assertSyncerExistence(t, syncMgr, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // Create our second peer, which should be initialized as a passive // syncer. - peer2 := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer2) - assertSyncerStatus(t, syncMgr, peer2, chansSynced, PassiveSync) + newActiveSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(newActiveSyncPeer) + newActiveSyncer := assertSyncerExistence(t, syncMgr, newActiveSyncPeer) + assertSyncerStatus(t, newActiveSyncer, chansSynced, PassiveSync) // Disconnect our active syncer, which should trigger the SyncManager to // replace it with our passive syncer. - syncMgr.PruneSyncState(peer1.PubKey()) - assertPassiveSyncerTransition(t, syncMgr, peer2) + go syncMgr.PruneSyncState(activeSyncPeer.PubKey()) + assertPassiveSyncerTransition(t, newActiveSyncer, newActiveSyncPeer) } // TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully @@ -128,19 +141,22 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // The first syncer registered always performs a historical sync. activeSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(activeSyncPeer) - assertTransitionToChansSynced(t, syncMgr, activeSyncPeer, true) - assertPassiveSyncerTransition(t, syncMgr, activeSyncPeer) + activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) + assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll send a tick to force a rotation. Since there aren't any // candidates, none of the active syncers will be rotated. syncMgr.cfg.RotateTicker.(*ticker.Force).Force <- time.Time{} assertNoMsgSent(t, activeSyncPeer) - assertSyncerStatus(t, syncMgr, activeSyncPeer, chansSynced, ActiveSync) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll then go ahead and add a passive syncer. passiveSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(passiveSyncPeer) - assertSyncerStatus(t, syncMgr, passiveSyncPeer, chansSynced, PassiveSync) + passiveSyncer := assertSyncerExistence(t, syncMgr, passiveSyncPeer) + assertSyncerStatus(t, passiveSyncer, chansSynced, PassiveSync) // We'll force another rotation - this time, since we have a passive // syncer available, they should be rotated. @@ -149,7 +165,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // The transition from an active syncer to a passive syncer causes the // peer to send out a new GossipTimestampRange in the past so that they // don't receive new graph updates. - assertActiveSyncerTransition(t, syncMgr, activeSyncPeer) + assertActiveSyncerTransition(t, activeSyncer, activeSyncPeer) // The transition from a passive syncer to an active syncer causes the // peer to send a new GossipTimestampRange with the current timestamp to @@ -158,13 +174,54 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // machine, starting from its initial syncingChans state. We'll then // need to transition it to its final chansSynced state to ensure the // next syncer is properly started in the round-robin. - assertPassiveSyncerTransition(t, syncMgr, passiveSyncPeer) + assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer) } -// TestSyncManagerHistoricalSync ensures that we only attempt a single -// historical sync during the SyncManager's startup, and that we can routinely -// force historical syncs whenever the HistoricalSyncTicker fires. -func TestSyncManagerHistoricalSync(t *testing.T) { +// TestSyncManagerInitialHistoricalSync ensures that we only attempt a single +// historical sync during the SyncManager's startup. If the peer corresponding +// to the initial historical syncer disconnects, we should attempt to find a +// replacement. +func TestSyncManagerInitialHistoricalSync(t *testing.T) { + t.Parallel() + + syncMgr := newTestSyncManager(0) + syncMgr.Start() + defer syncMgr.Stop() + + // We should expect to see a QueryChannelRange message with a + // FirstBlockHeight of the genesis block, signaling that an initial + // historical sync is being attempted. + peer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(peer) + assertMsgSent(t, peer, &lnwire.QueryChannelRange{ + FirstBlockHeight: 0, + NumBlocks: math.MaxUint32, + }) + + // If an additional peer connects, then another historical sync should + // not be attempted. + finalHistoricalPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(finalHistoricalPeer) + finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer) + assertNoMsgSent(t, finalHistoricalPeer) + + // If we disconnect the peer performing the initial historical sync, a + // new one should be chosen. + syncMgr.PruneSyncState(peer.PubKey()) + assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer) + + // Once the initial historical sync has succeeded, another one should + // not be attempted by disconnecting the peer who performed it. + extraPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(extraPeer) + assertNoMsgSent(t, extraPeer) + syncMgr.PruneSyncState(finalHistoricalPeer.PubKey()) + assertNoMsgSent(t, extraPeer) +} + +// TestSyncManagerForceHistoricalSync ensures that we can perform routine +// historical syncs whenever the HistoricalSyncTicker fires. +func TestSyncManagerForceHistoricalSync(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(0) @@ -197,190 +254,64 @@ func TestSyncManagerHistoricalSync(t *testing.T) { }) } -// TestSyncManagerRoundRobinQueue ensures that any subsequent active syncers can -// only be started after the previous one has completed its state machine. -func TestSyncManagerRoundRobinQueue(t *testing.T) { +// TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers +// are initialized as ActiveSync until the initial historical sync has been +// completed. Once it does, the pending GossipSyncers should be transitioned to +// ActiveSync. +func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) { t.Parallel() - const numActiveSyncers = 3 + const numActiveSyncers = 2 - // We'll start by creating our sync manager with support for three - // active syncers. + // We'll start by creating our test sync manager which will hold up to + // 2 active syncers. syncMgr := newTestSyncManager(numActiveSyncers) syncMgr.Start() defer syncMgr.Stop() + // We'll go ahead and create our syncers. peers := make([]*mockPeer, 0, numActiveSyncers) - - // The first syncer registered always attempts a historical sync. - firstPeer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(firstPeer) - peers = append(peers, firstPeer) - assertTransitionToChansSynced(t, syncMgr, firstPeer, true) - - // After completing the historical sync, a sync transition to ActiveSync - // should happen. It should transition immediately since it has no - // dependents. - assertActiveGossipTimestampRange(t, firstPeer) - - // We'll create the remaining numActiveSyncers. These will be queued in - // the round robin since the first syncer has yet to reach chansSynced. - queuedPeers := make([]*mockPeer, 0, numActiveSyncers-1) - for i := 0; i < numActiveSyncers-1; i++ { + syncers := make([]*GossipSyncer, 0, numActiveSyncers) + for i := 0; i < numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer) peers = append(peers, peer) - queuedPeers = append(queuedPeers, peer) - } - // Ensure they cannot transition without sending a GossipTimestampRange - // message first. - for _, peer := range queuedPeers { - assertNoMsgSent(t, peer) - } - - // Transition the first syncer to chansSynced, which should allow the - // second to transition next. - assertTransitionToChansSynced(t, syncMgr, firstPeer, false) - - // assertSyncerTransitioned ensures the target peer's syncer is the only - // that has transitioned. - assertSyncerTransitioned := func(target *mockPeer) { - t.Helper() - - for _, peer := range peers { - if peer.PubKey() != target.PubKey() { - assertNoMsgSent(t, peer) - continue - } - - assertActiveGossipTimestampRange(t, target) - } - } - - // For each queued syncer, we'll ensure they have transitioned to an - // ActiveSync type and reached their final chansSynced state to allow - // the next one to transition. - for _, peer := range queuedPeers { - assertSyncerTransitioned(peer) - assertTransitionToChansSynced(t, syncMgr, peer, false) - } -} - -// TestSyncManagerRoundRobinTimeout ensures that if we timeout while waiting for -// an active syncer to reach its final chansSynced state, then we will go on to -// start the next. -func TestSyncManagerRoundRobinTimeout(t *testing.T) { - t.Parallel() - - // Create our sync manager with support for two active syncers. - syncMgr := newTestSyncManager(2) - syncMgr.Start() - defer syncMgr.Stop() - - // peer1 will be the first peer we start, which will time out and cause - // peer2 to start. - peer1 := randPeer(t, syncMgr.quit) - peer2 := randPeer(t, syncMgr.quit) - - // The first syncer registered always attempts a historical sync. - syncMgr.InitSyncState(peer1) - assertTransitionToChansSynced(t, syncMgr, peer1, true) - - // We assume the syncer for peer1 has transitioned once we see it send a - // lnwire.GossipTimestampRange message. - assertActiveGossipTimestampRange(t, peer1) - - // We'll then create the syncer for peer2. This should cause it to be - // queued so that it starts once the syncer for peer1 is done. - syncMgr.InitSyncState(peer2) - assertNoMsgSent(t, peer2) - - // Send a force tick to pretend the sync manager has timed out waiting - // for peer1's syncer to reach chansSynced. - syncMgr.cfg.ActiveSyncerTimeoutTicker.(*ticker.Force).Force <- time.Time{} - - // Finally, ensure that the syncer for peer2 has transitioned. - assertActiveGossipTimestampRange(t, peer2) -} - -// TestSyncManagerRoundRobinStaleSyncer ensures that any stale active syncers we -// are currently waiting for or are queued up to start are properly removed and -// stopped. -func TestSyncManagerRoundRobinStaleSyncer(t *testing.T) { - t.Parallel() - - const numActiveSyncers = 4 - - // We'll create and start our sync manager with some active syncers. - syncMgr := newTestSyncManager(numActiveSyncers) - syncMgr.Start() - defer syncMgr.Stop() - - peers := make([]*mockPeer, 0, numActiveSyncers) - - // The first syncer registered always attempts a historical sync. - firstPeer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(firstPeer) - peers = append(peers, firstPeer) - assertTransitionToChansSynced(t, syncMgr, firstPeer, true) - - // After completing the historical sync, a sync transition to ActiveSync - // should happen. It should transition immediately since it has no - // dependents. - assertActiveGossipTimestampRange(t, firstPeer) - assertMsgSent(t, firstPeer, &lnwire.QueryChannelRange{ - FirstBlockHeight: startHeight, - NumBlocks: math.MaxUint32 - startHeight, - }) - - // We'll create the remaining numActiveSyncers. These will be queued in - // the round robin since the first syncer has yet to reach chansSynced. - queuedPeers := make([]*mockPeer, 0, numActiveSyncers-1) - for i := 0; i < numActiveSyncers-1; i++ { - peer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(peer) - peers = append(peers, peer) - queuedPeers = append(queuedPeers, peer) - } + s := assertSyncerExistence(t, syncMgr, peer) + syncers = append(syncers, s) - // Ensure they cannot transition without sending a GossipTimestampRange - // message first. - for _, peer := range queuedPeers { - assertNoMsgSent(t, peer) - } - - // assertSyncerTransitioned ensures the target peer's syncer is the only - // that has transitioned. - assertSyncerTransitioned := func(target *mockPeer) { - t.Helper() - - for _, peer := range peers { - if peer.PubKey() != target.PubKey() { - assertNoMsgSent(t, peer) - continue - } - - assertPassiveSyncerTransition(t, syncMgr, target) - } - } - - // We'll then remove the syncers in the middle to cover the case where - // they are queued up in the sync manager's pending list. - for i, peer := range peers { - if i == 0 || i == len(peers)-1 { + // The first one always attempts a historical sync. We won't + // transition it to chansSynced to ensure the remaining syncers + // aren't started as active. + if i == 0 { + assertSyncerStatus(t, s, syncingChans, PassiveSync) continue } - syncMgr.PruneSyncState(peer.PubKey()) + // The rest should remain in a passive and chansSynced state, + // and they should be queued to transition to active once the + // initial historical sync is completed. + assertNoMsgSent(t, peer) + assertSyncerStatus(t, s, chansSynced, PassiveSync) } - // We'll then remove the syncer we are currently waiting for. This - // should prompt the last syncer to start since it is the only one left - // pending. We'll do this in a goroutine since the peer behind the new - // active syncer will need to send out its new GossipTimestampRange. - go syncMgr.PruneSyncState(peers[0].PubKey()) - assertSyncerTransitioned(peers[len(peers)-1]) + // To ensure we don't transition any pending active syncers that have + // previously disconnected, we'll disconnect the last one. + stalePeer := peers[numActiveSyncers-1] + syncMgr.PruneSyncState(stalePeer.PubKey()) + + // Then, we'll complete the initial historical sync by transitioning the + // historical syncer to its final chansSynced state. This should trigger + // all of the pending active syncers to transition, except for the one + // we disconnected. + assertTransitionToChansSynced(t, syncers[0], peers[0]) + for i, s := range syncers { + if i == numActiveSyncers-1 { + assertNoMsgSent(t, peers[i]) + continue + } + assertPassiveSyncerTransition(t, s, peers[i]) + } } // assertNoMsgSent is a helper function that ensures a peer hasn't sent any @@ -423,7 +354,7 @@ func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) { var msgSent lnwire.Message select { case msgSent = <-peer.sentMsgs: - case <-time.After(time.Second): + case <-time.After(2 * time.Second): t.Fatalf("expected peer %x to send lnwire.GossipTimestampRange "+ "message", peer.PubKey()) } @@ -443,10 +374,9 @@ func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) { } } -// assertSyncerStatus asserts that the gossip syncer for the given peer matches -// the expected sync state and type. -func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, - syncState syncerState, syncType SyncerType) { +// assertSyncerExistence asserts that a GossipSyncer exists for the given peer. +func assertSyncerExistence(t *testing.T, syncMgr *SyncManager, + peer *mockPeer) *GossipSyncer { t.Helper() @@ -455,19 +385,29 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, t.Fatalf("gossip syncer for peer %x not found", peer.PubKey()) } + return s +} + +// assertSyncerStatus asserts that the gossip syncer for the given peer matches +// the expected sync state and type. +func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState, + syncType SyncerType) { + + t.Helper() + // We'll check the status of our syncer within a WaitPredicate as some // sync transitions might cause this to be racy. err := lntest.WaitNoError(func() error { state := s.syncState() if s.syncState() != syncState { return fmt.Errorf("expected syncState %v for peer "+ - "%x, got %v", syncState, peer.PubKey(), state) + "%x, got %v", syncState, s.cfg.peerPub, state) } typ := s.SyncType() if s.SyncType() != syncType { return fmt.Errorf("expected syncType %v for peer "+ - "%x, got %v", syncType, peer.PubKey(), typ) + "%x, got %v", syncType, s.cfg.peerPub, typ) } return nil @@ -479,28 +419,17 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, // assertTransitionToChansSynced asserts the transition of an ActiveSync // GossipSyncer to its final chansSynced state. -func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, - peer *mockPeer, historicalSync bool) { - +func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() - s, ok := syncMgr.GossipSyncer(peer.PubKey()) - if !ok { - t.Fatalf("gossip syncer for peer %x not found", peer.PubKey()) - } - - firstBlockHeight := uint32(startHeight) - if historicalSync { - firstBlockHeight = 0 - } assertMsgSent(t, peer, &lnwire.QueryChannelRange{ - FirstBlockHeight: firstBlockHeight, - NumBlocks: math.MaxUint32 - firstBlockHeight, + FirstBlockHeight: 0, + NumBlocks: math.MaxUint32, }) s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil) - chanSeries := syncMgr.cfg.ChanSeries.(*mockChannelGraphTimeSeries) + chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries) select { case <-chanSeries.filterReq: @@ -525,25 +454,22 @@ func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, // assertPassiveSyncerTransition asserts that a gossip syncer goes through all // of its expected steps when transitioning from passive to active. -func assertPassiveSyncerTransition(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { +func assertPassiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() assertActiveGossipTimestampRange(t, peer) - assertTransitionToChansSynced(t, syncMgr, peer, false) + assertSyncerStatus(t, s, chansSynced, ActiveSync) } // assertActiveSyncerTransition asserts that a gossip syncer goes through all of // its expected steps when transitioning from active to passive. -func assertActiveSyncerTransition(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { - +func assertActiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() assertMsgSent(t, peer, &lnwire.GossipTimestampRange{ FirstTimestamp: uint32(zeroTimestamp.Unix()), TimestampRange: 0, }) - assertSyncerStatus(t, syncMgr, peer, chansSynced, PassiveSync) + assertSyncerStatus(t, s, chansSynced, PassiveSync) } diff --git a/discovery/syncer.go b/discovery/syncer.go index 9e45abbc..09f3ee3f 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -18,16 +18,23 @@ import ( type SyncerType uint8 const ( - // ActiveSync denotes that a gossip syncer should exercise its default - // behavior. This includes reconciling the set of missing graph updates - // with the remote peer _and_ receiving new updates from them. + // ActiveSync denotes that a gossip syncer: + // + // 1. Should not attempt to synchronize with the remote peer for + // missing channels. + // 2. Should respond to queries from the remote peer. + // 3. Should receive new updates from the remote peer. + // + // They are started in a chansSynced state in order to accomplish their + // responsibilities above. ActiveSync SyncerType = iota // PassiveSync denotes that a gossip syncer: // - // 1. Should not attempt to query the remote peer for graph updates. - // 2. Should respond to queries from the remote peer. - // 3. Should not receive new updates from the remote peer. + // 1. Should not attempt to synchronize with the remote peer for + // missing channels. + // 2. Should respond to queries from the remote peer. + // 3. Should not receive new updates from the remote peer. // // They are started in a chansSynced state in order to accomplish their // responsibilities above. @@ -161,6 +168,14 @@ type syncTransitionReq struct { errChan chan error } +// historicalSyncReq encapsulates a request for a gossip syncer to perform a +// historical sync. +type historicalSyncReq struct { + // doneChan is a channel that serves as a signal and is closed to ensure + // the historical sync is attempted by the time we return to the caller. + doneChan chan struct{} +} + // gossipSyncerCfg is a struct that packages all the information a GossipSyncer // needs to carry out its duties. type gossipSyncerCfg struct { @@ -246,7 +261,7 @@ type GossipSyncer struct { // gossip syncer to perform a historical sync. Theese can only be done // once the gossip syncer is in a chansSynced state to ensure its state // machine behaves as expected. - historicalSyncReqs chan struct{} + historicalSyncReqs chan *historicalSyncReq // genHistoricalChanRangeQuery when true signals to the gossip syncer // that it should request the remote peer for all of its known channel @@ -315,7 +330,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer { cfg: cfg, rateLimiter: rateLimiter, syncTransitionReqs: make(chan *syncTransitionReq), - historicalSyncReqs: make(chan struct{}), + historicalSyncReqs: make(chan *historicalSyncReq), gossipMsgs: make(chan lnwire.Message, 100), quit: make(chan struct{}), } @@ -515,8 +530,8 @@ func (g *GossipSyncer) channelGraphSyncer() { case req := <-g.syncTransitionReqs: req.errChan <- g.handleSyncTransition(req) - case <-g.historicalSyncReqs: - g.handleHistoricalSync() + case req := <-g.historicalSyncReqs: + g.handleHistoricalSync(req) case <-g.quit: return @@ -1128,7 +1143,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { var ( firstTimestamp time.Time timestampRange uint32 - newState syncerState ) switch req.newSyncType { @@ -1137,11 +1151,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { case ActiveSync: firstTimestamp = time.Now() timestampRange = math.MaxUint32 - newState = syncingChans - - // We'll set genHistoricalChanRangeQuery to false since in order - // to not perform another historical sync if we previously have. - g.genHistoricalChanRangeQuery = false // If a PassiveSync transition has been requested, then we should no // longer receive any new updates from the remote peer. We can do this @@ -1150,7 +1159,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { case PassiveSync: firstTimestamp = zeroTimestamp timestampRange = 0 - newState = chansSynced default: return fmt.Errorf("unhandled sync transition %v", @@ -1162,7 +1170,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { return fmt.Errorf("unable to send local update horizon: %v", err) } - g.setSyncState(newState) g.setSyncType(req.newSyncType) return nil @@ -1184,22 +1191,33 @@ func (g *GossipSyncer) SyncType() SyncerType { // NOTE: This can only be done once the gossip syncer has reached its final // chansSynced state. func (g *GossipSyncer) historicalSync() error { + done := make(chan struct{}) + select { - case g.historicalSyncReqs <- struct{}{}: - return nil + case g.historicalSyncReqs <- &historicalSyncReq{ + doneChan: done, + }: case <-time.After(syncTransitionTimeout): return ErrSyncTransitionTimeout case <-g.quit: return ErrGossiperShuttingDown } + + select { + case <-done: + return nil + case <-g.quit: + return ErrGossiperShuttingDown + } } // handleHistoricalSync handles a request to the gossip syncer to perform a // historical sync. -func (g *GossipSyncer) handleHistoricalSync() { +func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) { // We'll go back to our initial syncingChans state in order to request // the remote peer to give us all of the channel IDs they know of // starting from the genesis block. g.genHistoricalChanRangeQuery = true g.setSyncState(syncingChans) + close(req.doneChan) } diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 7d570704..5a5eb725 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -2014,8 +2014,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { syncState := g.syncState() if syncState != chansSynced { t.Fatalf("expected syncerState %v, "+ - "got %v", chansSynced, - syncState) + "got %v", chansSynced, syncState) } }, }, @@ -2037,21 +2036,10 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { TimestampRange: math.MaxUint32, }) - // The local update horizon should be followed - // by a QueryChannelRange message sent to the - // remote peer requesting all channels it - // knows of from the highest height the syncer - // knows of. - assertMsgSent(t, msgChan, &lnwire.QueryChannelRange{ - FirstBlockHeight: startHeight, - NumBlocks: math.MaxUint32 - startHeight, - }) - syncState := g.syncState() - if syncState != waitingQueryRangeReply { + if syncState != chansSynced { t.Fatalf("expected syncerState %v, "+ - "got %v", waitingQueryRangeReply, - syncState) + "got %v", chansSynced, syncState) } }, }, diff --git a/server.go b/server.go index 083ae07c..64523001 100644 --- a/server.go +++ b/server.go @@ -673,23 +673,22 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, } s.authGossiper = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - NotifyWhenOnline: s.NotifyWhenOnline, - NotifyWhenOffline: s.NotifyWhenOffline, - ProofMatureDelta: 0, - TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), - RetransmitDelay: time.Minute * 30, - WaitingProofStore: waitingProofStore, - MessageStore: gossipMessageStore, - AnnSigner: s.nodeSigner, - RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.New(discovery.DefaultActiveSyncerTimeout), - NumActiveSyncers: cfg.NumGraphSyncPeers, + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, + NotifyWhenOnline: s.NotifyWhenOnline, + NotifyWhenOffline: s.NotifyWhenOffline, + ProofMatureDelta: 0, + TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), + RetransmitDelay: time.Minute * 30, + WaitingProofStore: waitingProofStore, + MessageStore: gossipMessageStore, + AnnSigner: s.nodeSigner, + RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval), + NumActiveSyncers: cfg.NumGraphSyncPeers, }, s.identityPriv.PubKey(), )