From 9a6e8ecb9e45a3a5d2f7ae012b9f970c2efe931d Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:25:38 -0700 Subject: [PATCH 1/7] discovery: remove channel synchronization from ActiveSync GossipSyncers In this commit, we remove the ability for ActiveSync GossipSyncers to synchronize our graph with our remote peers. This serves as a starting point towards allowing the daemon to only synchronize our graph through historical syncs, which will be routinely done by the SyncManager. --- discovery/syncer.go | 27 +++++++++++++-------------- discovery/syncer_test.go | 18 +++--------------- 2 files changed, 16 insertions(+), 29 deletions(-) diff --git a/discovery/syncer.go b/discovery/syncer.go index 9e45abbc..4a421461 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -18,16 +18,23 @@ import ( type SyncerType uint8 const ( - // ActiveSync denotes that a gossip syncer should exercise its default - // behavior. This includes reconciling the set of missing graph updates - // with the remote peer _and_ receiving new updates from them. + // ActiveSync denotes that a gossip syncer: + // + // 1. Should not attempt to synchronize with the remote peer for + // missing channels. + // 2. Should respond to queries from the remote peer. + // 3. Should receive new updates from the remote peer. + // + // They are started in a chansSynced state in order to accomplish their + // responsibilities above. ActiveSync SyncerType = iota // PassiveSync denotes that a gossip syncer: // - // 1. Should not attempt to query the remote peer for graph updates. - // 2. Should respond to queries from the remote peer. - // 3. Should not receive new updates from the remote peer. + // 1. Should not attempt to synchronize with the remote peer for + // missing channels. + // 2. Should respond to queries from the remote peer. + // 3. Should not receive new updates from the remote peer. // // They are started in a chansSynced state in order to accomplish their // responsibilities above. @@ -1128,7 +1135,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { var ( firstTimestamp time.Time timestampRange uint32 - newState syncerState ) switch req.newSyncType { @@ -1137,11 +1143,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { case ActiveSync: firstTimestamp = time.Now() timestampRange = math.MaxUint32 - newState = syncingChans - - // We'll set genHistoricalChanRangeQuery to false since in order - // to not perform another historical sync if we previously have. - g.genHistoricalChanRangeQuery = false // If a PassiveSync transition has been requested, then we should no // longer receive any new updates from the remote peer. We can do this @@ -1150,7 +1151,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { case PassiveSync: firstTimestamp = zeroTimestamp timestampRange = 0 - newState = chansSynced default: return fmt.Errorf("unhandled sync transition %v", @@ -1162,7 +1162,6 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { return fmt.Errorf("unable to send local update horizon: %v", err) } - g.setSyncState(newState) g.setSyncType(req.newSyncType) return nil diff --git a/discovery/syncer_test.go b/discovery/syncer_test.go index 7d570704..5a5eb725 100644 --- a/discovery/syncer_test.go +++ b/discovery/syncer_test.go @@ -2014,8 +2014,7 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { syncState := g.syncState() if syncState != chansSynced { t.Fatalf("expected syncerState %v, "+ - "got %v", chansSynced, - syncState) + "got %v", chansSynced, syncState) } }, }, @@ -2037,21 +2036,10 @@ func TestGossipSyncerSyncTransitions(t *testing.T) { TimestampRange: math.MaxUint32, }) - // The local update horizon should be followed - // by a QueryChannelRange message sent to the - // remote peer requesting all channels it - // knows of from the highest height the syncer - // knows of. - assertMsgSent(t, msgChan, &lnwire.QueryChannelRange{ - FirstBlockHeight: startHeight, - NumBlocks: math.MaxUint32 - startHeight, - }) - syncState := g.syncState() - if syncState != waitingQueryRangeReply { + if syncState != chansSynced { t.Fatalf("expected syncerState %v, "+ - "got %v", waitingQueryRangeReply, - syncState) + "got %v", chansSynced, syncState) } }, }, From 5db2cf627384317cee2a3629b5ca0a0dd099aa0a Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:25:53 -0700 Subject: [PATCH 2/7] discovery+server: remove roundRobinHandler and related code Since ActiveSync GossipSyncers no longer synchronize our state with the remote peers, none of the logic surrounding the round-robin is required within the SyncManager. --- discovery/gossiper.go | 11 +- discovery/gossiper_test.go | 48 +++-- discovery/sync_manager.go | 368 +++++---------------------------- discovery/sync_manager_test.go | 235 +++------------------ server.go | 33 ++- 5 files changed, 117 insertions(+), 578 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ca1e4685..337c8410 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -306,12 +306,11 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), syncMgr: newSyncManager(&SyncManagerCfg{ - ChainHash: cfg.ChainHash, - ChanSeries: cfg.ChanSeries, - RotateTicker: cfg.RotateTicker, - HistoricalSyncTicker: cfg.HistoricalSyncTicker, - ActiveSyncerTimeoutTicker: cfg.ActiveSyncerTimeoutTicker, - NumActiveSyncers: cfg.NumActiveSyncers, + ChainHash: cfg.ChainHash, + ChanSeries: cfg.ChanSeries, + RotateTicker: cfg.RotateTicker, + HistoricalSyncTicker: cfg.HistoricalSyncTicker, + NumActiveSyncers: cfg.NumActiveSyncers, }), } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 51d1aad4..ed99ec27 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -741,17 +741,16 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { c := make(chan struct{}) return c }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: waitingProofStore, - MessageStore: newMockMessageStore(), - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), - NumActiveSyncers: 3, - AnnSigner: &mockSigner{nodeKeyPriv1}, + Router: router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: 3, + AnnSigner: &mockSigner{nodeKeyPriv1}, }, nodeKeyPub1) if err := gossiper.Start(); err != nil { @@ -1480,20 +1479,19 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // the message to the peer. ctx.gossiper.Stop() gossiper := New(Config{ - Notifier: ctx.gossiper.cfg.Notifier, - Broadcast: ctx.gossiper.cfg.Broadcast, - NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, - NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, - Router: ctx.gossiper.cfg.Router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, - MessageStore: ctx.gossiper.cfg.MessageStore, - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), - NumActiveSyncers: 3, + Notifier: ctx.gossiper.cfg.Notifier, + Broadcast: ctx.gossiper.cfg.Broadcast, + NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline, + NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, + MessageStore: ctx.gossiper.cfg.MessageStore, + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: 3, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 7cf11316..524db4e9 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -1,7 +1,6 @@ package discovery import ( - "container/list" "errors" "sync" "time" @@ -22,11 +21,6 @@ const ( // force a historical sync to ensure we have as much of the public // network as possible. DefaultHistoricalSyncInterval = time.Hour - - // DefaultActiveSyncerTimeout is the default timeout interval in which - // we'll wait until an active syncer has completed its state machine and - // reached its final chansSynced state. - DefaultActiveSyncerTimeout = 5 * time.Minute ) var ( @@ -36,23 +30,6 @@ var ( ErrSyncManagerExiting = errors.New("sync manager exiting") ) -// staleActiveSyncer is an internal message the SyncManager will use in order to -// handle a peer corresponding to an active syncer being disconnected. -type staleActiveSyncer struct { - // syncer is the active syncer to be removed. - syncer *GossipSyncer - - // transitioned, if true, signals that the active GossipSyncer is stale - // due to being transitioned to a PassiveSync state. - transitioned bool - - // done serves as a signal to the caller that the SyncManager's internal - // state correctly reflects the stale active syncer. This is needed to - // ensure we always create a new syncer for a flappy peer after they - // disconnect if they happened to be an active syncer. - done chan struct{} -} - // SyncManagerCfg contains all of the dependencies required for the SyncManager // to carry out its duties. type SyncManagerCfg struct { @@ -81,12 +58,6 @@ type SyncManagerCfg struct { // SyncManager when it should attempt a historical sync with a gossip // sync peer. HistoricalSyncTicker ticker.Ticker - - // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the - // SyncManager when it should attempt to start the next pending - // activeSyncer due to the current one not completing its state machine - // within the timeout. - ActiveSyncerTimeoutTicker ticker.Ticker } // SyncManager is a subsystem of the gossiper that manages the gossip syncers @@ -117,25 +88,6 @@ type SyncManager struct { // currently receiving new graph updates from. inactiveSyncers map[routing.Vertex]*GossipSyncer - // pendingActiveSyncers is a map that tracks our set of pending active - // syncers. This map will be queried when choosing the next pending - // active syncer in the queue to ensure it is not stale. - pendingActiveSyncers map[routing.Vertex]*GossipSyncer - - // pendingActiveSyncerQueue is the list of active syncers which are - // pending to be started. Syncers will be added to this list through the - // newActiveSyncers and staleActiveSyncers channels. - pendingActiveSyncerQueue *list.List - - // newActiveSyncers is a channel that will serve as a signal to the - // roundRobinHandler to allow it to transition the next pending active - // syncer in the queue. - newActiveSyncers chan struct{} - - // staleActiveSyncers is a channel through which we'll send any stale - // active syncers that should be removed from the round-robin. - staleActiveSyncers chan *staleActiveSyncer - sync.Mutex wg sync.WaitGroup quit chan struct{} @@ -148,21 +100,16 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager { activeSyncers: make( map[routing.Vertex]*GossipSyncer, cfg.NumActiveSyncers, ), - inactiveSyncers: make(map[routing.Vertex]*GossipSyncer), - pendingActiveSyncers: make(map[routing.Vertex]*GossipSyncer), - pendingActiveSyncerQueue: list.New(), - newActiveSyncers: make(chan struct{}), - staleActiveSyncers: make(chan *staleActiveSyncer), - quit: make(chan struct{}), + inactiveSyncers: make(map[routing.Vertex]*GossipSyncer), + quit: make(chan struct{}), } } // Start starts the SyncManager in order to properly carry out its duties. func (m *SyncManager) Start() { m.start.Do(func() { - m.wg.Add(2) + m.wg.Add(1) go m.syncerHandler() - go m.roundRobinHandler() }) } @@ -178,9 +125,6 @@ func (m *SyncManager) Stop() { for _, syncer := range m.inactiveSyncers { syncer.Stop() } - for _, syncer := range m.pendingActiveSyncers { - syncer.Stop() - } for _, syncer := range m.activeSyncers { syncer.Stop() } @@ -223,227 +167,6 @@ func (m *SyncManager) syncerHandler() { } } -// signalNewActiveSyncer sends a signal to the roundRobinHandler to ensure it -// transitions any pending active syncers. -func (m *SyncManager) signalNewActiveSyncer() { - select { - case m.newActiveSyncers <- struct{}{}: - case <-m.quit: - } -} - -// signalStaleActiveSyncer removes the syncer for the given peer from the -// round-robin queue. -func (m *SyncManager) signalStaleActiveSyncer(s *GossipSyncer, transitioned bool) { - done := make(chan struct{}) - - select { - case m.staleActiveSyncers <- &staleActiveSyncer{ - syncer: s, - transitioned: transitioned, - done: done, - }: - case <-m.quit: - } - - // Before returning to the caller, we'll wait for the roundRobinHandler - // to signal us that the SyncManager has correctly updated its internal - // state after handling the stale active syncer. - select { - case <-done: - case <-m.quit: - } -} - -// roundRobinHandler is the SyncManager's event loop responsible for managing -// the round-robin queue of our active syncers to ensure they don't overlap and -// request the same set of channels, which significantly reduces bandwidth -// usage. -// -// NOTE: This must be run as a goroutine. -func (m *SyncManager) roundRobinHandler() { - defer m.wg.Done() - - defer m.cfg.ActiveSyncerTimeoutTicker.Stop() - - var ( - // current will hold the current active syncer we're waiting for - // to complete its state machine. - current *GossipSyncer - - // transitionNext will be responsible for containing the signal - // of when the current active syncer has completed its state - // machine. This signal allows us to transition the next pending - // active syncer, if any. - transitionNext chan struct{} - ) - - // transitionNextSyncer is a helper closure that we'll use to transition - // the next syncer queued up. If there aren't any, this will act as a - // NOP. - transitionNextSyncer := func() { - m.Lock() - current = m.nextPendingActiveSyncer() - m.Unlock() - for current != nil { - // Ensure we properly handle a shutdown signal. - select { - case <-m.quit: - return - default: - } - - // We'll avoid performing the transition with the lock - // as it can potentially stall the SyncManager due to - // the syncTransitionTimeout. - err := m.transitionPassiveSyncer(current) - // If we timed out attempting to transition the syncer, - // we'll re-queue it to retry at a later time and move - // on to the next. - if err == ErrSyncTransitionTimeout { - log.Debugf("Timed out attempting to "+ - "transition pending active "+ - "GossipSyncer(%x)", current.cfg.peerPub) - - m.Lock() - m.queueActiveSyncer(current) - current = m.nextPendingActiveSyncer() - m.Unlock() - continue - } - if err != nil { - log.Errorf("Unable to transition pending "+ - "active GossipSyncer(%x): %v", - current.cfg.peerPub, err) - - m.Lock() - current = m.nextPendingActiveSyncer() - m.Unlock() - continue - } - - // The transition succeeded, so we'll set our signal to - // know when we should attempt to transition the next - // pending active syncer in our queue. - transitionNext = current.ResetSyncedSignal() - m.cfg.ActiveSyncerTimeoutTicker.Resume() - return - } - - transitionNext = nil - m.cfg.ActiveSyncerTimeoutTicker.Pause() - } - - for { - select { - // A new active syncer signal has been received, which indicates - // a new pending active syncer has been added to our queue. - // We'll only attempt to transition it now if we're not already - // in the middle of transitioning another one. We do this to - // ensure we don't overlap when requesting channels from - // different peers. - case <-m.newActiveSyncers: - if current == nil { - transitionNextSyncer() - } - - // A stale active syncer has been received, so we'll need to - // remove them from our queue. If we are currently waiting for - // its state machine to complete, we'll move on to the next - // active syncer in the queue. - case staleActiveSyncer := <-m.staleActiveSyncers: - s := staleActiveSyncer.syncer - - m.Lock() - // If the syncer has transitioned from an ActiveSync - // type, rather than disconnecting, we'll include it in - // the set of inactive syncers. - if staleActiveSyncer.transitioned { - m.inactiveSyncers[s.cfg.peerPub] = s - } else { - // Otherwise, since the peer is disconnecting, - // we'll attempt to find a passive syncer that - // can replace it. - newActiveSyncer := m.chooseRandomSyncer(nil, false) - if newActiveSyncer != nil { - m.queueActiveSyncer(newActiveSyncer) - } - } - - // Remove the internal active syncer references for this - // peer. - delete(m.pendingActiveSyncers, s.cfg.peerPub) - delete(m.activeSyncers, s.cfg.peerPub) - m.Unlock() - - // Signal to the caller that they can now proceed since - // the SyncManager's state correctly reflects the - // stale active syncer. - close(staleActiveSyncer.done) - - // If we're not currently waiting for an active syncer - // to reach its terminal state, or if we are but we are - // currently waiting for the peer being - // disconnected/transitioned, then we'll move on to the - // next active syncer in our queue. - if current == nil || (current != nil && - current.cfg.peerPub == s.cfg.peerPub) { - transitionNextSyncer() - } - - // Our current active syncer has reached its terminal - // chansSynced state, so we'll proceed to transitioning the next - // pending active syncer if there is one. - case <-transitionNext: - transitionNextSyncer() - - // We've timed out waiting for the current active syncer to - // reach its terminal chansSynced state, so we'll just - // move on to the next and avoid retrying as its already been - // transitioned. - case <-m.cfg.ActiveSyncerTimeoutTicker.Ticks(): - log.Warnf("Timed out waiting for GossipSyncer(%x) to "+ - "be fully synced", current.cfg.peerPub) - transitionNextSyncer() - - case <-m.quit: - return - } - } -} - -// queueActiveSyncer queues the given pending active gossip syncer to the end of -// the round-robin queue. -func (m *SyncManager) queueActiveSyncer(s *GossipSyncer) { - log.Debugf("Queueing next pending active GossipSyncer(%x)", - s.cfg.peerPub) - - delete(m.inactiveSyncers, s.cfg.peerPub) - m.pendingActiveSyncers[s.cfg.peerPub] = s - m.pendingActiveSyncerQueue.PushBack(s) -} - -// nextPendingActiveSyncer returns the next active syncer pending to be -// transitioned. If there aren't any, then `nil` is returned. -func (m *SyncManager) nextPendingActiveSyncer() *GossipSyncer { - next := m.pendingActiveSyncerQueue.Front() - for next != nil { - s := m.pendingActiveSyncerQueue.Remove(next).(*GossipSyncer) - - // If the next pending active syncer is no longer in our lookup - // map, then the corresponding peer has disconnected, so we'll - // skip them. - if _, ok := m.pendingActiveSyncers[s.cfg.peerPub]; !ok { - next = m.pendingActiveSyncerQueue.Front() - continue - } - - return s - } - - return nil -} - // rotateActiveSyncerCandidate rotates a single active syncer. In order to // achieve this, the active syncer must be in a chansSynced state in order to // process the sync transition. @@ -478,21 +201,24 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { return } + log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)", + activeSyncer.cfg.peerPub, candidate.cfg.peerPub) + // Otherwise, we'll attempt to transition each syncer to their // respective new sync type. We'll avoid performing the transition with // the lock as it can potentially stall the SyncManager due to the // syncTransitionTimeout. if err := m.transitionActiveSyncer(activeSyncer); err != nil { - log.Errorf("Unable to transition active "+ - "GossipSyncer(%x): %v", activeSyncer.cfg.peerPub, err) + log.Errorf("Unable to transition active GossipSyncer(%x): %v", + activeSyncer.cfg.peerPub, err) return } - m.Lock() - m.queueActiveSyncer(candidate) - m.Unlock() - - m.signalNewActiveSyncer() + if err := m.transitionPassiveSyncer(candidate); err != nil { + log.Errorf("Unable to transition passive GossipSyncer(%x): %v", + activeSyncer.cfg.peerPub, err) + return + } } // transitionActiveSyncer transitions an active syncer to a passive one. @@ -504,7 +230,10 @@ func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error { return err } - m.signalStaleActiveSyncer(s, true) + m.Lock() + delete(m.activeSyncers, s.cfg.peerPub) + m.inactiveSyncers[s.cfg.peerPub] = s + m.Unlock() return nil } @@ -519,8 +248,8 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { } m.Lock() + delete(m.inactiveSyncers, s.cfg.peerPub) m.activeSyncers[s.cfg.peerPub] = s - delete(m.pendingActiveSyncers, s.cfg.peerPub) m.Unlock() return nil @@ -612,10 +341,13 @@ func (m *SyncManager) chooseRandomSyncer(blacklist map[routing.Vertex]struct{}, // // TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer. func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { + m.Lock() + defer m.Unlock() + // If we already have a syncer, then we'll exit early as we don't want // to override it. nodeID := routing.Vertex(peer.PubKey()) - if _, ok := m.GossipSyncer(nodeID); ok { + if _, ok := m.gossipSyncer(nodeID); ok { return } @@ -634,16 +366,22 @@ func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { }, }) - // Gossip syncers are initialized by default as passive and in a - // chansSynced state so that they can reply to any peer queries or + // If we've yet to reach our desired number of active syncers, then + // we'll use this one. + if len(m.activeSyncers) < m.cfg.NumActiveSyncers { + s.setSyncType(ActiveSync) + m.activeSyncers[s.cfg.peerPub] = s + } else { + s.setSyncType(PassiveSync) + m.inactiveSyncers[s.cfg.peerPub] = s + } + + // Gossip syncers are initialized by default in a chansSynced state so + // that they can reply to any peer queries or // handle any sync transitions. - s.setSyncType(PassiveSync) s.setSyncState(chansSynced) s.Start() - m.Lock() - m.inactiveSyncers[nodeID] = s - // We'll force a historical sync with the first peer we connect to // ensure we get as much of the graph as possible. var err error @@ -661,17 +399,6 @@ func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { // different peer. m.historicalSync = sync.Once{} } - - // If we've yet to reach our desired number of active syncers, then - // we'll use this one. - numActiveSyncers := len(m.activeSyncers) + len(m.pendingActiveSyncers) - if numActiveSyncers < m.cfg.NumActiveSyncers { - m.queueActiveSyncer(s) - m.Unlock() - m.signalNewActiveSyncer() - return - } - m.Unlock() } // PruneSyncState is called by outside sub-systems once a peer that we were @@ -695,11 +422,24 @@ func (m *SyncManager) PruneSyncState(peer routing.Vertex) { m.Unlock() return } - m.Unlock() // Otherwise, we'll need to dequeue it from our pending active syncers // queue and find a new one to replace it, if any. - m.signalStaleActiveSyncer(s, false) + delete(m.activeSyncers, s.cfg.peerPub) + newActiveSyncer := m.chooseRandomSyncer(nil, false) + m.Unlock() + if newActiveSyncer == nil { + return + } + + if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil { + log.Errorf("Unable to transition passive GossipSyncer(%x): %v", + newActiveSyncer.cfg.peerPub, err) + return + } + + log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)", + peer, newActiveSyncer.cfg.peerPub) } // GossipSyncer returns the associated gossip syncer of a peer. The boolean @@ -717,10 +457,6 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { if ok { return syncer, true } - syncer, ok = m.pendingActiveSyncers[peer] - if ok { - return syncer, true - } syncer, ok = m.activeSyncers[peer] if ok { return syncer, true @@ -733,16 +469,12 @@ func (m *SyncManager) GossipSyncers() map[routing.Vertex]*GossipSyncer { m.Lock() defer m.Unlock() - numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) + - len(m.inactiveSyncers) + numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) syncers := make(map[routing.Vertex]*GossipSyncer, numSyncers) for _, syncer := range m.inactiveSyncers { syncers[syncer.cfg.peerPub] = syncer } - for _, syncer := range m.pendingActiveSyncers { - syncers[syncer.cfg.peerPub] = syncer - } for _, syncer := range m.activeSyncers { syncers[syncer.cfg.peerPub] = syncer } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index 7fee2b3e..ecdd8af1 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -30,11 +30,10 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer { func newTestSyncManager(numActiveSyncers int) *SyncManager { hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight} return newSyncManager(&SyncManagerCfg{ - ChanSeries: newMockChannelGraphTimeSeries(hID), - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.NewForce(DefaultActiveSyncerTimeout), - NumActiveSyncers: numActiveSyncers, + ChanSeries: newMockChannelGraphTimeSeries(hID), + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: numActiveSyncers, }) } @@ -56,15 +55,14 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { // should be active and passive to check them later on. for i := 0; i < numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer) + go syncMgr.InitSyncState(peer) // The first syncer registered always attempts a historical // sync. + assertActiveGossipTimestampRange(t, peer) if i == 0 { - assertTransitionToChansSynced(t, syncMgr, peer, true) + assertTransitionToChansSynced(t, syncMgr, peer) } - - assertPassiveSyncerTransition(t, syncMgr, peer) assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync) } @@ -88,9 +86,10 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // peer1 will represent an active syncer that performs a historical // sync since it is the first registered peer with the SyncManager. peer1 := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer1) - assertTransitionToChansSynced(t, syncMgr, peer1, true) - assertPassiveSyncerTransition(t, syncMgr, peer1) + go syncMgr.InitSyncState(peer1) + assertActiveGossipTimestampRange(t, peer1) + assertTransitionToChansSynced(t, syncMgr, peer1) + assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync) // It will then be torn down to simulate a disconnection. Since there // are no other candidate syncers available, the active syncer won't be @@ -100,8 +99,9 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // Then, we'll start our active syncer again, but this time we'll also // have a passive syncer available to replace the active syncer after // the peer disconnects. - syncMgr.InitSyncState(peer1) - assertPassiveSyncerTransition(t, syncMgr, peer1) + go syncMgr.InitSyncState(peer1) + assertActiveGossipTimestampRange(t, peer1) + assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync) // Create our second peer, which should be initialized as a passive // syncer. @@ -111,7 +111,7 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // Disconnect our active syncer, which should trigger the SyncManager to // replace it with our passive syncer. - syncMgr.PruneSyncState(peer1.PubKey()) + go syncMgr.PruneSyncState(peer1.PubKey()) assertPassiveSyncerTransition(t, syncMgr, peer2) } @@ -127,9 +127,10 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // The first syncer registered always performs a historical sync. activeSyncPeer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(activeSyncPeer) - assertTransitionToChansSynced(t, syncMgr, activeSyncPeer, true) - assertPassiveSyncerTransition(t, syncMgr, activeSyncPeer) + go syncMgr.InitSyncState(activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertTransitionToChansSynced(t, syncMgr, activeSyncPeer) + assertSyncerStatus(t, syncMgr, activeSyncPeer, chansSynced, ActiveSync) // We'll send a tick to force a rotation. Since there aren't any // candidates, none of the active syncers will be rotated. @@ -197,192 +198,6 @@ func TestSyncManagerHistoricalSync(t *testing.T) { }) } -// TestSyncManagerRoundRobinQueue ensures that any subsequent active syncers can -// only be started after the previous one has completed its state machine. -func TestSyncManagerRoundRobinQueue(t *testing.T) { - t.Parallel() - - const numActiveSyncers = 3 - - // We'll start by creating our sync manager with support for three - // active syncers. - syncMgr := newTestSyncManager(numActiveSyncers) - syncMgr.Start() - defer syncMgr.Stop() - - peers := make([]*mockPeer, 0, numActiveSyncers) - - // The first syncer registered always attempts a historical sync. - firstPeer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(firstPeer) - peers = append(peers, firstPeer) - assertTransitionToChansSynced(t, syncMgr, firstPeer, true) - - // After completing the historical sync, a sync transition to ActiveSync - // should happen. It should transition immediately since it has no - // dependents. - assertActiveGossipTimestampRange(t, firstPeer) - - // We'll create the remaining numActiveSyncers. These will be queued in - // the round robin since the first syncer has yet to reach chansSynced. - queuedPeers := make([]*mockPeer, 0, numActiveSyncers-1) - for i := 0; i < numActiveSyncers-1; i++ { - peer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer) - peers = append(peers, peer) - queuedPeers = append(queuedPeers, peer) - } - - // Ensure they cannot transition without sending a GossipTimestampRange - // message first. - for _, peer := range queuedPeers { - assertNoMsgSent(t, peer) - } - - // Transition the first syncer to chansSynced, which should allow the - // second to transition next. - assertTransitionToChansSynced(t, syncMgr, firstPeer, false) - - // assertSyncerTransitioned ensures the target peer's syncer is the only - // that has transitioned. - assertSyncerTransitioned := func(target *mockPeer) { - t.Helper() - - for _, peer := range peers { - if peer.PubKey() != target.PubKey() { - assertNoMsgSent(t, peer) - continue - } - - assertActiveGossipTimestampRange(t, target) - } - } - - // For each queued syncer, we'll ensure they have transitioned to an - // ActiveSync type and reached their final chansSynced state to allow - // the next one to transition. - for _, peer := range queuedPeers { - assertSyncerTransitioned(peer) - assertTransitionToChansSynced(t, syncMgr, peer, false) - } -} - -// TestSyncManagerRoundRobinTimeout ensures that if we timeout while waiting for -// an active syncer to reach its final chansSynced state, then we will go on to -// start the next. -func TestSyncManagerRoundRobinTimeout(t *testing.T) { - t.Parallel() - - // Create our sync manager with support for two active syncers. - syncMgr := newTestSyncManager(2) - syncMgr.Start() - defer syncMgr.Stop() - - // peer1 will be the first peer we start, which will time out and cause - // peer2 to start. - peer1 := randPeer(t, syncMgr.quit) - peer2 := randPeer(t, syncMgr.quit) - - // The first syncer registered always attempts a historical sync. - syncMgr.InitSyncState(peer1) - assertTransitionToChansSynced(t, syncMgr, peer1, true) - - // We assume the syncer for peer1 has transitioned once we see it send a - // lnwire.GossipTimestampRange message. - assertActiveGossipTimestampRange(t, peer1) - - // We'll then create the syncer for peer2. This should cause it to be - // queued so that it starts once the syncer for peer1 is done. - syncMgr.InitSyncState(peer2) - assertNoMsgSent(t, peer2) - - // Send a force tick to pretend the sync manager has timed out waiting - // for peer1's syncer to reach chansSynced. - syncMgr.cfg.ActiveSyncerTimeoutTicker.(*ticker.Force).Force <- time.Time{} - - // Finally, ensure that the syncer for peer2 has transitioned. - assertActiveGossipTimestampRange(t, peer2) -} - -// TestSyncManagerRoundRobinStaleSyncer ensures that any stale active syncers we -// are currently waiting for or are queued up to start are properly removed and -// stopped. -func TestSyncManagerRoundRobinStaleSyncer(t *testing.T) { - t.Parallel() - - const numActiveSyncers = 4 - - // We'll create and start our sync manager with some active syncers. - syncMgr := newTestSyncManager(numActiveSyncers) - syncMgr.Start() - defer syncMgr.Stop() - - peers := make([]*mockPeer, 0, numActiveSyncers) - - // The first syncer registered always attempts a historical sync. - firstPeer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(firstPeer) - peers = append(peers, firstPeer) - assertTransitionToChansSynced(t, syncMgr, firstPeer, true) - - // After completing the historical sync, a sync transition to ActiveSync - // should happen. It should transition immediately since it has no - // dependents. - assertActiveGossipTimestampRange(t, firstPeer) - assertMsgSent(t, firstPeer, &lnwire.QueryChannelRange{ - FirstBlockHeight: startHeight, - NumBlocks: math.MaxUint32 - startHeight, - }) - - // We'll create the remaining numActiveSyncers. These will be queued in - // the round robin since the first syncer has yet to reach chansSynced. - queuedPeers := make([]*mockPeer, 0, numActiveSyncers-1) - for i := 0; i < numActiveSyncers-1; i++ { - peer := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer) - peers = append(peers, peer) - queuedPeers = append(queuedPeers, peer) - } - - // Ensure they cannot transition without sending a GossipTimestampRange - // message first. - for _, peer := range queuedPeers { - assertNoMsgSent(t, peer) - } - - // assertSyncerTransitioned ensures the target peer's syncer is the only - // that has transitioned. - assertSyncerTransitioned := func(target *mockPeer) { - t.Helper() - - for _, peer := range peers { - if peer.PubKey() != target.PubKey() { - assertNoMsgSent(t, peer) - continue - } - - assertPassiveSyncerTransition(t, syncMgr, target) - } - } - - // We'll then remove the syncers in the middle to cover the case where - // they are queued up in the sync manager's pending list. - for i, peer := range peers { - if i == 0 || i == len(peers)-1 { - continue - } - - syncMgr.PruneSyncState(peer.PubKey()) - } - - // We'll then remove the syncer we are currently waiting for. This - // should prompt the last syncer to start since it is the only one left - // pending. We'll do this in a goroutine since the peer behind the new - // active syncer will need to send out its new GossipTimestampRange. - go syncMgr.PruneSyncState(peers[0].PubKey()) - assertSyncerTransitioned(peers[len(peers)-1]) -} - // assertNoMsgSent is a helper function that ensures a peer hasn't sent any // messages. func assertNoMsgSent(t *testing.T, peer *mockPeer) { @@ -480,7 +295,7 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, // assertTransitionToChansSynced asserts the transition of an ActiveSync // GossipSyncer to its final chansSynced state. func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, - peer *mockPeer, historicalSync bool) { + peer *mockPeer) { t.Helper() @@ -489,13 +304,9 @@ func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, t.Fatalf("gossip syncer for peer %x not found", peer.PubKey()) } - firstBlockHeight := uint32(startHeight) - if historicalSync { - firstBlockHeight = 0 - } assertMsgSent(t, peer, &lnwire.QueryChannelRange{ - FirstBlockHeight: firstBlockHeight, - NumBlocks: math.MaxUint32 - firstBlockHeight, + FirstBlockHeight: 0, + NumBlocks: math.MaxUint32, }) s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil) @@ -531,7 +342,7 @@ func assertPassiveSyncerTransition(t *testing.T, syncMgr *SyncManager, t.Helper() assertActiveGossipTimestampRange(t, peer) - assertTransitionToChansSynced(t, syncMgr, peer, false) + assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync) } // assertActiveSyncerTransition asserts that a gossip syncer goes through all of diff --git a/server.go b/server.go index 50b8fd9e..e4f59dcd 100644 --- a/server.go +++ b/server.go @@ -673,23 +673,22 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, } s.authGossiper = discovery.New(discovery.Config{ - Router: s.chanRouter, - Notifier: s.cc.chainNotifier, - ChainHash: *activeNetParams.GenesisHash, - Broadcast: s.BroadcastMessage, - ChanSeries: chanSeries, - NotifyWhenOnline: s.NotifyWhenOnline, - NotifyWhenOffline: s.NotifyWhenOffline, - ProofMatureDelta: 0, - TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), - RetransmitDelay: time.Minute * 30, - WaitingProofStore: waitingProofStore, - MessageStore: gossipMessageStore, - AnnSigner: s.nodeSigner, - RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval), - ActiveSyncerTimeoutTicker: ticker.New(discovery.DefaultActiveSyncerTimeout), - NumActiveSyncers: cfg.NumGraphSyncPeers, + Router: s.chanRouter, + Notifier: s.cc.chainNotifier, + ChainHash: *activeNetParams.GenesisHash, + Broadcast: s.BroadcastMessage, + ChanSeries: chanSeries, + NotifyWhenOnline: s.NotifyWhenOnline, + NotifyWhenOffline: s.NotifyWhenOffline, + ProofMatureDelta: 0, + TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), + RetransmitDelay: time.Minute * 30, + WaitingProofStore: waitingProofStore, + MessageStore: gossipMessageStore, + AnnSigner: s.nodeSigner, + RotateTicker: ticker.New(discovery.DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.New(cfg.HistoricalSyncInterval), + NumActiveSyncers: cfg.NumGraphSyncPeers, }, s.identityPriv.PubKey(), ) From 29baa1225484db227aa784e14852a6c779e5f788 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:26:08 -0700 Subject: [PATCH 3/7] discovery: synchronize new/stale GossipSyncers with syncerHandler Now that the roundRobinHandler is no longer present, this commit aims to clean up and simplify some of the logic surrounding initializing/tearing down new/stale GossipSyncers from the SyncManager. Along the way, we also synchronize these calls with the syncerHandler, which will serve useful in future work that allows us to recovery from initial historical sync disconnections. --- discovery/sync_manager.go | 331 ++++++++++++++++++++++----------- discovery/sync_manager_test.go | 86 +++++---- 2 files changed, 264 insertions(+), 153 deletions(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 524db4e9..ae87f97a 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -30,6 +30,30 @@ var ( ErrSyncManagerExiting = errors.New("sync manager exiting") ) +// newSyncer in an internal message we'll use within the SyncManager to signal +// that we should create a GossipSyncer for a newly connected peer. +type newSyncer struct { + // peer is the newly connected peer. + peer lnpeer.Peer + + // doneChan serves as a signal to the caller that the SyncManager's + // internal state correctly reflects the stale active syncer. + doneChan chan struct{} +} + +// staleSyncer is an internal message we'll use within the SyncManager to signal +// that a peer has disconnected and its GossipSyncer should be removed. +type staleSyncer struct { + // peer is the peer that has disconnected. + peer routing.Vertex + + // doneChan serves as a signal to the caller that the SyncManager's + // internal state correctly reflects the stale active syncer. This is + // needed to ensure we always create a new syncer for a flappy peer + // after they disconnect if they happened to be an active syncer. + doneChan chan struct{} +} + // SyncManagerCfg contains all of the dependencies required for the SyncManager // to carry out its duties. type SyncManagerCfg struct { @@ -79,6 +103,18 @@ type SyncManager struct { // _once_ with a peer during the SyncManager's startup. historicalSync sync.Once + // newSyncers is a channel we'll use to process requests to create + // GossipSyncers for newly connected peers. + newSyncers chan *newSyncer + + // staleSyncers is a channel we'll use to process requests to tear down + // GossipSyncers for disconnected peers. + staleSyncers chan *staleSyncer + + // syncersMu guards the read and write access to the activeSyncers and + // inactiveSyncers maps below. + syncersMu sync.Mutex + // activeSyncers is the set of all syncers for which we are currently // receiving graph updates from. The number of possible active syncers // is bounded by NumActiveSyncers. @@ -88,7 +124,6 @@ type SyncManager struct { // currently receiving new graph updates from. inactiveSyncers map[routing.Vertex]*GossipSyncer - sync.Mutex wg sync.WaitGroup quit chan struct{} } @@ -96,7 +131,9 @@ type SyncManager struct { // newSyncManager constructs a new SyncManager backed by the given config. func newSyncManager(cfg *SyncManagerCfg) *SyncManager { return &SyncManager{ - cfg: *cfg, + cfg: *cfg, + newSyncers: make(chan *newSyncer), + staleSyncers: make(chan *staleSyncer), activeSyncers: make( map[routing.Vertex]*GossipSyncer, cfg.NumActiveSyncers, ), @@ -119,9 +156,6 @@ func (m *SyncManager) Stop() { close(m.quit) m.wg.Wait() - m.Lock() - defer m.Unlock() - for _, syncer := range m.inactiveSyncers { syncer.Stop() } @@ -133,11 +167,13 @@ func (m *SyncManager) Stop() { // syncerHandler is the SyncManager's main event loop responsible for: // -// 1. Finding new peers to receive graph updates from to ensure we don't only -// receive them from the same set of peers. -// -// 2. Finding new peers to force a historical sync with to ensure we have as -// much of the public network as possible. +// 1. Creating and tearing down GossipSyncers for connected/disconnected peers. + +// 2. Finding new peers to receive graph updates from to ensure we don't only +// receive them from the same set of peers. + +// 3. Finding new peers to force a historical sync with to ensure we have as +// much of the public network as possible. // // NOTE: This must be run as a goroutine. func (m *SyncManager) syncerHandler() { @@ -151,6 +187,69 @@ func (m *SyncManager) syncerHandler() { for { select { + // A new peer has been connected, so we'll create its + // accompanying GossipSyncer. + case newSyncer := <-m.newSyncers: + // If we already have a syncer, then we'll exit early as + // we don't want to override it. + if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok { + close(newSyncer.doneChan) + continue + } + + s := m.createGossipSyncer(newSyncer.peer) + + m.syncersMu.Lock() + switch { + // If we've exceeded our total number of active syncers, + // we'll initialize this GossipSyncer as passive. + case len(m.activeSyncers) >= m.cfg.NumActiveSyncers: + s.setSyncType(PassiveSync) + m.inactiveSyncers[s.cfg.peerPub] = s + + // Otherwise, it should be initialized as active. + default: + s.setSyncType(ActiveSync) + m.activeSyncers[s.cfg.peerPub] = s + } + m.syncersMu.Unlock() + + s.Start() + + // Once we create the GossipSyncer, we'll signal to the + // caller that they can proceed since the SyncManager's + // internal state has been updated. + close(newSyncer.doneChan) + + // We'll force a historical sync with the first peer we + // connect to, to ensure we get as much of the graph as + // possible. + var err error + m.historicalSync.Do(func() { + log.Infof("Attempting historical sync with "+ + "GossipSyncer(%x)", s.cfg.peerPub) + err = s.historicalSync() + }) + if err != nil { + log.Errorf("Unable to perform historical sync "+ + "with GossipSyncer(%x): %v", + s.cfg.peerPub, err) + + // Reset historicalSync to ensure it is tried + // again with a different peer. + m.historicalSync = sync.Once{} + } + + // An existing peer has disconnected, so we'll tear down its + // corresponding GossipSyncer. + case staleSyncer := <-m.staleSyncers: + // Once the corresponding GossipSyncer has been stopped + // and removed, we'll signal to the caller that they can + // proceed since the SyncManager's internal state has + // been updated. + m.removeGossipSyncer(staleSyncer.peer) + close(staleSyncer.doneChan) + // Our RotateTicker has ticked, so we'll attempt to rotate a // single active syncer with a passive one. case <-m.cfg.RotateTicker.Ticks(): @@ -167,15 +266,83 @@ func (m *SyncManager) syncerHandler() { } } +// createGossipSyncer creates the GossipSyncer for a newly connected peer. +func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { + nodeID := routing.Vertex(peer.PubKey()) + log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:]) + + encoding := lnwire.EncodingSortedPlain + s := newGossipSyncer(gossipSyncerCfg{ + chainHash: m.cfg.ChainHash, + peerPub: nodeID, + channelSeries: m.cfg.ChanSeries, + encodingType: encoding, + chunkSize: encodingTypeToChunkSize[encoding], + batchSize: requestBatchSize, + sendToPeer: func(msgs ...lnwire.Message) error { + return peer.SendMessageLazy(false, msgs...) + }, + }) + + // Gossip syncers are initialized by default in a PassiveSync type + // and chansSynced state so that they can reply to any peer queries or + // handle any sync transitions. + s.setSyncState(chansSynced) + s.setSyncType(PassiveSync) + return s +} + +// removeGossipSyncer removes all internal references to the disconnected peer's +// GossipSyncer and stops it. In the event of an active GossipSyncer being +// disconnected, a passive GossipSyncer, if any, will take its place. +func (m *SyncManager) removeGossipSyncer(peer routing.Vertex) { + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + + s, ok := m.gossipSyncer(peer) + if !ok { + return + } + + log.Infof("Removing GossipSyncer for peer=%v", peer) + + // We'll stop the GossipSyncer for the disconnected peer in a goroutine + // to prevent blocking the SyncManager. + go s.Stop() + + // If it's a non-active syncer, then we can just exit now. + if _, ok := m.inactiveSyncers[peer]; ok { + delete(m.inactiveSyncers, peer) + return + } + + // Otherwise, we'll need find a new one to replace it, if any. + delete(m.activeSyncers, peer) + newActiveSyncer := m.chooseRandomSyncer(nil, false) + if newActiveSyncer == nil { + return + } + + if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil { + log.Errorf("Unable to transition passive GossipSyncer(%x): %v", + newActiveSyncer.cfg.peerPub, err) + return + } + + log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)", + peer, newActiveSyncer.cfg.peerPub) +} + // rotateActiveSyncerCandidate rotates a single active syncer. In order to // achieve this, the active syncer must be in a chansSynced state in order to // process the sync transition. func (m *SyncManager) rotateActiveSyncerCandidate() { + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + // If we don't have a candidate to rotate with, we can return early. - m.Lock() candidate := m.chooseRandomSyncer(nil, false) if candidate == nil { - m.Unlock() log.Debug("No eligible candidate to rotate active syncer") return } @@ -193,7 +360,6 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { activeSyncer = s break } - m.Unlock() // If we couldn't find an eligible one, we can return early. if activeSyncer == nil { @@ -201,13 +367,11 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { return } + // Otherwise, we'll attempt to transition each syncer to their + // respective new sync type. log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)", activeSyncer.cfg.peerPub, candidate.cfg.peerPub) - // Otherwise, we'll attempt to transition each syncer to their - // respective new sync type. We'll avoid performing the transition with - // the lock as it can potentially stall the SyncManager due to the - // syncTransitionTimeout. if err := m.transitionActiveSyncer(activeSyncer); err != nil { log.Errorf("Unable to transition active GossipSyncer(%x): %v", activeSyncer.cfg.peerPub, err) @@ -222,6 +386,8 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { } // transitionActiveSyncer transitions an active syncer to a passive one. +// +// NOTE: This must be called with the syncersMu lock held. func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error { log.Debugf("Transitioning active GossipSyncer(%x) to passive", s.cfg.peerPub) @@ -230,15 +396,15 @@ func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error { return err } - m.Lock() delete(m.activeSyncers, s.cfg.peerPub) m.inactiveSyncers[s.cfg.peerPub] = s - m.Unlock() return nil } // transitionPassiveSyncer transitions a passive syncer to an active one. +// +// NOTE: This must be called with the syncersMu lock held. func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { log.Debugf("Transitioning passive GossipSyncer(%x) to active", s.cfg.peerPub) @@ -247,10 +413,8 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { return err } - m.Lock() delete(m.inactiveSyncers, s.cfg.peerPub) m.activeSyncers[s.cfg.peerPub] = s - m.Unlock() return nil } @@ -258,8 +422,8 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { // forceHistoricalSync chooses a syncer with a remote peer at random and forces // a historical sync with it. func (m *SyncManager) forceHistoricalSync() { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() // We'll choose a random peer with whom we can perform a historical sync // with. We'll set useActive to true to make sure we can still do one if @@ -340,64 +504,23 @@ func (m *SyncManager) chooseRandomSyncer(blacklist map[routing.Vertex]struct{}, // public channel graph as possible. // // TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer. -func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { - m.Lock() - defer m.Unlock() +func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error { + done := make(chan struct{}) - // If we already have a syncer, then we'll exit early as we don't want - // to override it. - nodeID := routing.Vertex(peer.PubKey()) - if _, ok := m.gossipSyncer(nodeID); ok { - return + select { + case m.newSyncers <- &newSyncer{ + peer: peer, + doneChan: done, + }: + case <-m.quit: + return ErrSyncManagerExiting } - log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:]) - - encoding := lnwire.EncodingSortedPlain - s := newGossipSyncer(gossipSyncerCfg{ - chainHash: m.cfg.ChainHash, - peerPub: nodeID, - channelSeries: m.cfg.ChanSeries, - encodingType: encoding, - chunkSize: encodingTypeToChunkSize[encoding], - batchSize: requestBatchSize, - sendToPeer: func(msgs ...lnwire.Message) error { - return peer.SendMessageLazy(false, msgs...) - }, - }) - - // If we've yet to reach our desired number of active syncers, then - // we'll use this one. - if len(m.activeSyncers) < m.cfg.NumActiveSyncers { - s.setSyncType(ActiveSync) - m.activeSyncers[s.cfg.peerPub] = s - } else { - s.setSyncType(PassiveSync) - m.inactiveSyncers[s.cfg.peerPub] = s - } - - // Gossip syncers are initialized by default in a chansSynced state so - // that they can reply to any peer queries or - // handle any sync transitions. - s.setSyncState(chansSynced) - s.Start() - - // We'll force a historical sync with the first peer we connect to - // ensure we get as much of the graph as possible. - var err error - m.historicalSync.Do(func() { - log.Infof("Attempting historical sync with GossipSyncer(%x)", - s.cfg.peerPub) - - err = s.historicalSync() - }) - if err != nil { - log.Errorf("Unable to perform historical sync with "+ - "GossipSyncer(%x): %v", s.cfg.peerPub, err) - - // Reset historicalSync to ensure it is tried again with a - // different peer. - m.historicalSync = sync.Once{} + select { + case <-done: + return nil + case <-m.quit: + return ErrSyncManagerExiting } } @@ -405,48 +528,30 @@ func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { // previously connected to has been disconnected. In this case we can stop the // existing GossipSyncer assigned to the peer and free up resources. func (m *SyncManager) PruneSyncState(peer routing.Vertex) { - s, ok := m.GossipSyncer(peer) - if !ok { + done := make(chan struct{}) + + // We avoid returning an error when the SyncManager is stopped since the + // GossipSyncer will be stopped then anyway. + select { + case m.staleSyncers <- &staleSyncer{ + peer: peer, + doneChan: done, + }: + case <-m.quit: return } - log.Infof("Removing GossipSyncer for peer=%v", peer) - - // We'll start by stopping the GossipSyncer for the disconnected peer. - s.Stop() - - // If it's a non-active syncer, then we can just exit now. - m.Lock() - if _, ok := m.inactiveSyncers[s.cfg.peerPub]; ok { - delete(m.inactiveSyncers, s.cfg.peerPub) - m.Unlock() - return + select { + case <-done: + case <-m.quit: } - - // Otherwise, we'll need to dequeue it from our pending active syncers - // queue and find a new one to replace it, if any. - delete(m.activeSyncers, s.cfg.peerPub) - newActiveSyncer := m.chooseRandomSyncer(nil, false) - m.Unlock() - if newActiveSyncer == nil { - return - } - - if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil { - log.Errorf("Unable to transition passive GossipSyncer(%x): %v", - newActiveSyncer.cfg.peerPub, err) - return - } - - log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)", - peer, newActiveSyncer.cfg.peerPub) } // GossipSyncer returns the associated gossip syncer of a peer. The boolean // returned signals whether there exists a gossip syncer for the peer. func (m *SyncManager) GossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() return m.gossipSyncer(peer) } @@ -466,8 +571,8 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { // GossipSyncers returns all of the currently initialized gossip syncers. func (m *SyncManager) GossipSyncers() map[routing.Vertex]*GossipSyncer { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) syncers := make(map[routing.Vertex]*GossipSyncer, numSyncers) diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index ecdd8af1..e965af3c 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -55,21 +55,23 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { // should be active and passive to check them later on. for i := 0; i < numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) - go syncMgr.InitSyncState(peer) + syncMgr.InitSyncState(peer) + s := assertSyncerExistence(t, syncMgr, peer) // The first syncer registered always attempts a historical // sync. assertActiveGossipTimestampRange(t, peer) if i == 0 { - assertTransitionToChansSynced(t, syncMgr, peer) + assertTransitionToChansSynced(t, s, peer) } - assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync) + assertSyncerStatus(t, s, chansSynced, ActiveSync) } for i := 0; i < numSyncers-numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(peer) - assertSyncerStatus(t, syncMgr, peer, chansSynced, PassiveSync) + s := assertSyncerExistence(t, syncMgr, peer) + assertSyncerStatus(t, s, chansSynced, PassiveSync) } } @@ -86,10 +88,11 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // peer1 will represent an active syncer that performs a historical // sync since it is the first registered peer with the SyncManager. peer1 := randPeer(t, syncMgr.quit) - go syncMgr.InitSyncState(peer1) + syncMgr.InitSyncState(peer1) + syncer1 := assertSyncerExistence(t, syncMgr, peer1) assertActiveGossipTimestampRange(t, peer1) - assertTransitionToChansSynced(t, syncMgr, peer1) - assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync) + assertTransitionToChansSynced(t, syncer1, peer1) + assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) // It will then be torn down to simulate a disconnection. Since there // are no other candidate syncers available, the active syncer won't be @@ -99,20 +102,22 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // Then, we'll start our active syncer again, but this time we'll also // have a passive syncer available to replace the active syncer after // the peer disconnects. - go syncMgr.InitSyncState(peer1) + syncMgr.InitSyncState(peer1) + syncer1 = assertSyncerExistence(t, syncMgr, peer1) assertActiveGossipTimestampRange(t, peer1) - assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync) + assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) // Create our second peer, which should be initialized as a passive // syncer. peer2 := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(peer2) - assertSyncerStatus(t, syncMgr, peer2, chansSynced, PassiveSync) + syncer2 := assertSyncerExistence(t, syncMgr, peer2) + assertSyncerStatus(t, syncer2, chansSynced, PassiveSync) // Disconnect our active syncer, which should trigger the SyncManager to // replace it with our passive syncer. go syncMgr.PruneSyncState(peer1.PubKey()) - assertPassiveSyncerTransition(t, syncMgr, peer2) + assertPassiveSyncerTransition(t, syncer2, peer2) } // TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully @@ -127,21 +132,23 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // The first syncer registered always performs a historical sync. activeSyncPeer := randPeer(t, syncMgr.quit) - go syncMgr.InitSyncState(activeSyncPeer) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) assertActiveGossipTimestampRange(t, activeSyncPeer) - assertTransitionToChansSynced(t, syncMgr, activeSyncPeer) - assertSyncerStatus(t, syncMgr, activeSyncPeer, chansSynced, ActiveSync) + assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll send a tick to force a rotation. Since there aren't any // candidates, none of the active syncers will be rotated. syncMgr.cfg.RotateTicker.(*ticker.Force).Force <- time.Time{} assertNoMsgSent(t, activeSyncPeer) - assertSyncerStatus(t, syncMgr, activeSyncPeer, chansSynced, ActiveSync) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll then go ahead and add a passive syncer. passiveSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(passiveSyncPeer) - assertSyncerStatus(t, syncMgr, passiveSyncPeer, chansSynced, PassiveSync) + passiveSyncer := assertSyncerExistence(t, syncMgr, passiveSyncPeer) + assertSyncerStatus(t, passiveSyncer, chansSynced, PassiveSync) // We'll force another rotation - this time, since we have a passive // syncer available, they should be rotated. @@ -150,7 +157,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // The transition from an active syncer to a passive syncer causes the // peer to send out a new GossipTimestampRange in the past so that they // don't receive new graph updates. - assertActiveSyncerTransition(t, syncMgr, activeSyncPeer) + assertActiveSyncerTransition(t, activeSyncer, activeSyncPeer) // The transition from a passive syncer to an active syncer causes the // peer to send a new GossipTimestampRange with the current timestamp to @@ -159,7 +166,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // machine, starting from its initial syncingChans state. We'll then // need to transition it to its final chansSynced state to ensure the // next syncer is properly started in the round-robin. - assertPassiveSyncerTransition(t, syncMgr, passiveSyncPeer) + assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer) } // TestSyncManagerHistoricalSync ensures that we only attempt a single @@ -258,10 +265,9 @@ func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) { } } -// assertSyncerStatus asserts that the gossip syncer for the given peer matches -// the expected sync state and type. -func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, - syncState syncerState, syncType SyncerType) { +// assertSyncerExistence asserts that a GossipSyncer exists for the given peer. +func assertSyncerExistence(t *testing.T, syncMgr *SyncManager, + peer *mockPeer) *GossipSyncer { t.Helper() @@ -270,19 +276,29 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, t.Fatalf("gossip syncer for peer %x not found", peer.PubKey()) } + return s +} + +// assertSyncerStatus asserts that the gossip syncer for the given peer matches +// the expected sync state and type. +func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState, + syncType SyncerType) { + + t.Helper() + // We'll check the status of our syncer within a WaitPredicate as some // sync transitions might cause this to be racy. err := lntest.WaitNoError(func() error { state := s.syncState() if s.syncState() != syncState { return fmt.Errorf("expected syncState %v for peer "+ - "%x, got %v", syncState, peer.PubKey(), state) + "%x, got %v", syncState, s.cfg.peerPub, state) } typ := s.SyncType() if s.SyncType() != syncType { return fmt.Errorf("expected syncType %v for peer "+ - "%x, got %v", syncType, peer.PubKey(), typ) + "%x, got %v", syncType, s.cfg.peerPub, typ) } return nil @@ -294,16 +310,9 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, // assertTransitionToChansSynced asserts the transition of an ActiveSync // GossipSyncer to its final chansSynced state. -func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { - +func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() - s, ok := syncMgr.GossipSyncer(peer.PubKey()) - if !ok { - t.Fatalf("gossip syncer for peer %x not found", peer.PubKey()) - } - assertMsgSent(t, peer, &lnwire.QueryChannelRange{ FirstBlockHeight: 0, NumBlocks: math.MaxUint32, @@ -311,7 +320,7 @@ func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil) - chanSeries := syncMgr.cfg.ChanSeries.(*mockChannelGraphTimeSeries) + chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries) select { case <-chanSeries.filterReq: @@ -336,25 +345,22 @@ func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, // assertPassiveSyncerTransition asserts that a gossip syncer goes through all // of its expected steps when transitioning from passive to active. -func assertPassiveSyncerTransition(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { +func assertPassiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() assertActiveGossipTimestampRange(t, peer) - assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync) + assertSyncerStatus(t, s, chansSynced, ActiveSync) } // assertActiveSyncerTransition asserts that a gossip syncer goes through all of // its expected steps when transitioning from active to passive. -func assertActiveSyncerTransition(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { - +func assertActiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() assertMsgSent(t, peer, &lnwire.GossipTimestampRange{ FirstTimestamp: uint32(zeroTimestamp.Unix()), TimestampRange: 0, }) - assertSyncerStatus(t, syncMgr, peer, chansSynced, PassiveSync) + assertSyncerStatus(t, s, chansSynced, PassiveSync) } From 72e9674cff8d09f25dfe0157a30d9abb61faaa7d Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:26:24 -0700 Subject: [PATCH 4/7] discovery: simplify chooseRandomSyncer helper --- discovery/sync_manager.go | 116 ++++++++++++-------------------------- 1 file changed, 36 insertions(+), 80 deletions(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index ae87f97a..72de0d65 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -318,17 +318,13 @@ func (m *SyncManager) removeGossipSyncer(peer routing.Vertex) { // Otherwise, we'll need find a new one to replace it, if any. delete(m.activeSyncers, peer) - newActiveSyncer := m.chooseRandomSyncer(nil, false) + newActiveSyncer := chooseRandomSyncer( + m.inactiveSyncers, m.transitionPassiveSyncer, + ) if newActiveSyncer == nil { return } - if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil { - log.Errorf("Unable to transition passive GossipSyncer(%x): %v", - newActiveSyncer.cfg.peerPub, err) - return - } - log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)", peer, newActiveSyncer.cfg.peerPub) } @@ -340,30 +336,19 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { m.syncersMu.Lock() defer m.syncersMu.Unlock() - // If we don't have a candidate to rotate with, we can return early. - candidate := m.chooseRandomSyncer(nil, false) - if candidate == nil { - log.Debug("No eligible candidate to rotate active syncer") + // If we couldn't find an eligible active syncer to rotate, we can + // return early. + activeSyncer := chooseRandomSyncer(m.activeSyncers, nil) + if activeSyncer == nil { + log.Debug("No eligible active syncer to rotate") return } - // We'll choose an active syncer at random that's within a chansSynced - // state to rotate. - var activeSyncer *GossipSyncer - for _, s := range m.activeSyncers { - // The active syncer must be in a chansSynced state in order to - // process sync transitions. - if s.syncState() != chansSynced { - continue - } - - activeSyncer = s - break - } - - // If we couldn't find an eligible one, we can return early. - if activeSyncer == nil { - log.Debug("No eligible active syncer to rotate") + // Similarly, if we don't have a candidate to rotate with, we can return + // early as well. + candidate := chooseRandomSyncer(m.inactiveSyncers, nil) + if candidate == nil { + log.Debug("No eligible candidate to rotate active syncer") return } @@ -425,72 +410,39 @@ func (m *SyncManager) forceHistoricalSync() { m.syncersMu.Lock() defer m.syncersMu.Unlock() - // We'll choose a random peer with whom we can perform a historical sync - // with. We'll set useActive to true to make sure we can still do one if - // we don't happen to have any non-active syncers. - candidatesChosen := make(map[routing.Vertex]struct{}) - s := m.chooseRandomSyncer(candidatesChosen, true) - for s != nil { - // Ensure we properly handle a shutdown signal. - select { - case <-m.quit: - return - default: - } - - // Blacklist the candidate to ensure it's not chosen again. - candidatesChosen[s.cfg.peerPub] = struct{}{} - - err := s.historicalSync() - if err == nil { - return - } - - log.Errorf("Unable to perform historical sync with "+ - "GossipSyncer(%x): %v", s.cfg.peerPub, err) - - s = m.chooseRandomSyncer(candidatesChosen, true) - } + // We'll sample from both sets of active and inactive syncers in the + // event that we don't have any inactive syncers. + _ = chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error { + return s.historicalSync() + }) } -// chooseRandomSyncer returns a random non-active syncer that's eligible for a -// sync transition. A blacklist can be used to skip any previously chosen -// candidates. The useActive boolean can be used to also filter active syncers. +// chooseRandomSyncer iterates through the set of syncers given and returns the +// first one which was able to successfully perform the action enclosed in the +// function closure. // // NOTE: It's possible for a nil value to be returned if there are no eligible // candidate syncers. -// -// NOTE: This method must be called with the syncersMtx lock held. -func (m *SyncManager) chooseRandomSyncer(blacklist map[routing.Vertex]struct{}, - useActive bool) *GossipSyncer { - - eligible := func(s *GossipSyncer) bool { - // Skip any syncers that exist within the blacklist. - if blacklist != nil { - if _, ok := blacklist[s.cfg.peerPub]; ok { - return false - } - } +func chooseRandomSyncer(syncers map[routing.Vertex]*GossipSyncer, + action func(*GossipSyncer) error) *GossipSyncer { + for _, s := range syncers { // Only syncers in a chansSynced state are viable for sync // transitions, so skip any that aren't. - return s.syncState() == chansSynced - } - - for _, s := range m.inactiveSyncers { - if !eligible(s) { + if s.syncState() != chansSynced { continue } - return s - } - if useActive { - for _, s := range m.activeSyncers { - if !eligible(s) { + if action != nil { + if err := action(s); err != nil { + log.Debugf("Skipping eligible candidate "+ + "GossipSyncer(%x): %v", s.cfg.peerPub, + err) continue } - return s } + + return s } return nil @@ -573,7 +525,11 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { func (m *SyncManager) GossipSyncers() map[routing.Vertex]*GossipSyncer { m.syncersMu.Lock() defer m.syncersMu.Unlock() + return m.gossipSyncers() +} +// gossipSyncers returns all of the currently initialized gossip syncers. +func (m *SyncManager) gossipSyncers() map[routing.Vertex]*GossipSyncer { numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) syncers := make(map[routing.Vertex]*GossipSyncer, numSyncers) From 227e492ccf9050cb0491a13833c5c0fad960e464 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:26:40 -0700 Subject: [PATCH 5/7] discovery: make historicalSync transition synchronous We do this to ensure that the state transition from chansSynced to syncingChans has occurred by the time we return back to the caller. --- discovery/syncer.go | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/discovery/syncer.go b/discovery/syncer.go index 4a421461..09f3ee3f 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -168,6 +168,14 @@ type syncTransitionReq struct { errChan chan error } +// historicalSyncReq encapsulates a request for a gossip syncer to perform a +// historical sync. +type historicalSyncReq struct { + // doneChan is a channel that serves as a signal and is closed to ensure + // the historical sync is attempted by the time we return to the caller. + doneChan chan struct{} +} + // gossipSyncerCfg is a struct that packages all the information a GossipSyncer // needs to carry out its duties. type gossipSyncerCfg struct { @@ -253,7 +261,7 @@ type GossipSyncer struct { // gossip syncer to perform a historical sync. Theese can only be done // once the gossip syncer is in a chansSynced state to ensure its state // machine behaves as expected. - historicalSyncReqs chan struct{} + historicalSyncReqs chan *historicalSyncReq // genHistoricalChanRangeQuery when true signals to the gossip syncer // that it should request the remote peer for all of its known channel @@ -322,7 +330,7 @@ func newGossipSyncer(cfg gossipSyncerCfg) *GossipSyncer { cfg: cfg, rateLimiter: rateLimiter, syncTransitionReqs: make(chan *syncTransitionReq), - historicalSyncReqs: make(chan struct{}), + historicalSyncReqs: make(chan *historicalSyncReq), gossipMsgs: make(chan lnwire.Message, 100), quit: make(chan struct{}), } @@ -522,8 +530,8 @@ func (g *GossipSyncer) channelGraphSyncer() { case req := <-g.syncTransitionReqs: req.errChan <- g.handleSyncTransition(req) - case <-g.historicalSyncReqs: - g.handleHistoricalSync() + case req := <-g.historicalSyncReqs: + g.handleHistoricalSync(req) case <-g.quit: return @@ -1183,22 +1191,33 @@ func (g *GossipSyncer) SyncType() SyncerType { // NOTE: This can only be done once the gossip syncer has reached its final // chansSynced state. func (g *GossipSyncer) historicalSync() error { + done := make(chan struct{}) + select { - case g.historicalSyncReqs <- struct{}{}: - return nil + case g.historicalSyncReqs <- &historicalSyncReq{ + doneChan: done, + }: case <-time.After(syncTransitionTimeout): return ErrSyncTransitionTimeout case <-g.quit: return ErrGossiperShuttingDown } + + select { + case <-done: + return nil + case <-g.quit: + return ErrGossiperShuttingDown + } } // handleHistoricalSync handles a request to the gossip syncer to perform a // historical sync. -func (g *GossipSyncer) handleHistoricalSync() { +func (g *GossipSyncer) handleHistoricalSync(req *historicalSyncReq) { // We'll go back to our initial syncingChans state in order to request // the remote peer to give us all of the channel IDs they know of // starting from the genesis block. g.genHistoricalChanRangeQuery = true g.setSyncState(syncingChans) + close(req.doneChan) } From 07136a5bc28e75ce6eb0eac3ecd8ac59a54ebbb9 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:26:56 -0700 Subject: [PATCH 6/7] discovery: handle initial historical sync disconnection In this commit, we add logic to handle a peer with whom we're performing an initial historical sync disconnecting. This is required to ensure we get as much of the graph as possible when starting a fresh node. It will also serve useful to ensure we do not get stalled once we prevent active GossipSyncers from starting until the initial historical sync has completed. --- discovery/sync_manager.go | 93 ++++++++++++++++++++++++++------ discovery/sync_manager_test.go | 99 +++++++++++++++++++++++++--------- 2 files changed, 151 insertions(+), 41 deletions(-) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 72de0d65..f95a784a 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -185,6 +185,23 @@ func (m *SyncManager) syncerHandler() { m.cfg.HistoricalSyncTicker.Resume() defer m.cfg.HistoricalSyncTicker.Stop() + var ( + // attemptInitialHistoricalSync determines whether we should + // attempt an initial historical sync when a new peer connects. + attemptInitialHistoricalSync = true + + // initialHistoricalSyncer is the syncer we are currently + // performing an initial historical sync with. + initialHistoricalSyncer *GossipSyncer + + // initialHistoricalSyncSignal is a signal that will fire once + // the intiial historical sync has been completed. This is + // crucial to ensure that another historical sync isn't + // attempted just because the initialHistoricalSyncer was + // disconnected. + initialHistoricalSyncSignal chan struct{} + ) + for { select { // A new peer has been connected, so we'll create its @@ -224,22 +241,29 @@ func (m *SyncManager) syncerHandler() { // We'll force a historical sync with the first peer we // connect to, to ensure we get as much of the graph as // possible. - var err error - m.historicalSync.Do(func() { - log.Infof("Attempting historical sync with "+ - "GossipSyncer(%x)", s.cfg.peerPub) - err = s.historicalSync() - }) - if err != nil { - log.Errorf("Unable to perform historical sync "+ - "with GossipSyncer(%x): %v", - s.cfg.peerPub, err) - - // Reset historicalSync to ensure it is tried - // again with a different peer. - m.historicalSync = sync.Once{} + if !attemptInitialHistoricalSync { + continue } + log.Debugf("Attempting initial historical sync with "+ + "GossipSyncer(%x)", s.cfg.peerPub) + + if err := s.historicalSync(); err != nil { + log.Errorf("Unable to attempt initial "+ + "historical sync with "+ + "GossipSyncer(%x): %v", s.cfg.peerPub, + err) + continue + } + + // Once the historical sync has started, we'll get a + // keep track of the corresponding syncer to properly + // handle disconnects. We'll also use a signal to know + // when the historical sync completed. + attemptInitialHistoricalSync = false + initialHistoricalSyncer = s + initialHistoricalSyncSignal = s.ResetSyncedSignal() + // An existing peer has disconnected, so we'll tear down its // corresponding GossipSyncer. case staleSyncer := <-m.staleSyncers: @@ -250,6 +274,43 @@ func (m *SyncManager) syncerHandler() { m.removeGossipSyncer(staleSyncer.peer) close(staleSyncer.doneChan) + // If we don't have an initialHistoricalSyncer, or we do + // but it is not the peer being disconnected, then we + // have nothing left to do and can proceed. + switch { + case initialHistoricalSyncer == nil: + fallthrough + case staleSyncer.peer != initialHistoricalSyncer.cfg.peerPub: + continue + } + + // Otherwise, our initialHistoricalSyncer corresponds to + // the peer being disconnected, so we'll have to find a + // replacement. + log.Debug("Finding replacement for intitial " + + "historical sync") + + s := m.forceHistoricalSync() + if s == nil { + log.Debug("No eligible replacement found " + + "for initial historical sync") + attemptInitialHistoricalSync = true + continue + } + + log.Debugf("Replaced initial historical "+ + "GossipSyncer(%v) with GossipSyncer(%x)", + staleSyncer.peer, s.cfg.peerPub) + + initialHistoricalSyncer = s + initialHistoricalSyncSignal = s.ResetSyncedSignal() + + // Our initial historical sync signal has completed, so we'll + // nil all of the relevant fields as they're no longer needed. + case <-initialHistoricalSyncSignal: + initialHistoricalSyncer = nil + initialHistoricalSyncSignal = nil + // Our RotateTicker has ticked, so we'll attempt to rotate a // single active syncer with a passive one. case <-m.cfg.RotateTicker.Ticks(): @@ -406,13 +467,13 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { // forceHistoricalSync chooses a syncer with a remote peer at random and forces // a historical sync with it. -func (m *SyncManager) forceHistoricalSync() { +func (m *SyncManager) forceHistoricalSync() *GossipSyncer { m.syncersMu.Lock() defer m.syncersMu.Unlock() // We'll sample from both sets of active and inactive syncers in the // event that we don't have any inactive syncers. - _ = chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error { + return chooseRandomSyncer(m.gossipSyncers(), func(s *GossipSyncer) error { return s.historicalSync() }) } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index e965af3c..3c7fd6c7 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -80,44 +80,52 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { t.Parallel() - // We'll create our test sync manager to only have one active syncer. - syncMgr := newTestSyncManager(1) + // We'll create our test sync manager to have two active syncers. + syncMgr := newTestSyncManager(2) syncMgr.Start() defer syncMgr.Stop() - // peer1 will represent an active syncer that performs a historical - // sync since it is the first registered peer with the SyncManager. - peer1 := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer1) - syncer1 := assertSyncerExistence(t, syncMgr, peer1) - assertActiveGossipTimestampRange(t, peer1) - assertTransitionToChansSynced(t, syncer1, peer1) - assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) + // The first will be an active syncer that performs a historical sync + // since it is the first one registered with the SyncManager. + historicalSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(historicalSyncPeer) + historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer) + assertActiveGossipTimestampRange(t, historicalSyncPeer) + assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer) + assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync) + + // Then, we'll create the second active syncer, which is the one we'll + // disconnect. + activeSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // It will then be torn down to simulate a disconnection. Since there // are no other candidate syncers available, the active syncer won't be // replaced. - syncMgr.PruneSyncState(peer1.PubKey()) + syncMgr.PruneSyncState(activeSyncPeer.PubKey()) // Then, we'll start our active syncer again, but this time we'll also // have a passive syncer available to replace the active syncer after // the peer disconnects. - syncMgr.InitSyncState(peer1) - syncer1 = assertSyncerExistence(t, syncMgr, peer1) - assertActiveGossipTimestampRange(t, peer1) - assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer = assertSyncerExistence(t, syncMgr, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // Create our second peer, which should be initialized as a passive // syncer. - peer2 := randPeer(t, syncMgr.quit) - syncMgr.InitSyncState(peer2) - syncer2 := assertSyncerExistence(t, syncMgr, peer2) - assertSyncerStatus(t, syncer2, chansSynced, PassiveSync) + newActiveSyncPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(newActiveSyncPeer) + newActiveSyncer := assertSyncerExistence(t, syncMgr, newActiveSyncPeer) + assertSyncerStatus(t, newActiveSyncer, chansSynced, PassiveSync) // Disconnect our active syncer, which should trigger the SyncManager to // replace it with our passive syncer. - go syncMgr.PruneSyncState(peer1.PubKey()) - assertPassiveSyncerTransition(t, syncer2, peer2) + go syncMgr.PruneSyncState(activeSyncPeer.PubKey()) + assertPassiveSyncerTransition(t, newActiveSyncer, newActiveSyncPeer) } // TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully @@ -169,10 +177,51 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer) } -// TestSyncManagerHistoricalSync ensures that we only attempt a single -// historical sync during the SyncManager's startup, and that we can routinely -// force historical syncs whenever the HistoricalSyncTicker fires. -func TestSyncManagerHistoricalSync(t *testing.T) { +// TestSyncManagerInitialHistoricalSync ensures that we only attempt a single +// historical sync during the SyncManager's startup. If the peer corresponding +// to the initial historical syncer disconnects, we should attempt to find a +// replacement. +func TestSyncManagerInitialHistoricalSync(t *testing.T) { + t.Parallel() + + syncMgr := newTestSyncManager(0) + syncMgr.Start() + defer syncMgr.Stop() + + // We should expect to see a QueryChannelRange message with a + // FirstBlockHeight of the genesis block, signaling that an initial + // historical sync is being attempted. + peer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(peer) + assertMsgSent(t, peer, &lnwire.QueryChannelRange{ + FirstBlockHeight: 0, + NumBlocks: math.MaxUint32, + }) + + // If an additional peer connects, then another historical sync should + // not be attempted. + finalHistoricalPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(finalHistoricalPeer) + finalHistoricalSyncer := assertSyncerExistence(t, syncMgr, finalHistoricalPeer) + assertNoMsgSent(t, finalHistoricalPeer) + + // If we disconnect the peer performing the initial historical sync, a + // new one should be chosen. + syncMgr.PruneSyncState(peer.PubKey()) + assertTransitionToChansSynced(t, finalHistoricalSyncer, finalHistoricalPeer) + + // Once the initial historical sync has succeeded, another one should + // not be attempted by disconnecting the peer who performed it. + extraPeer := randPeer(t, syncMgr.quit) + syncMgr.InitSyncState(extraPeer) + assertNoMsgSent(t, extraPeer) + syncMgr.PruneSyncState(finalHistoricalPeer.PubKey()) + assertNoMsgSent(t, extraPeer) +} + +// TestSyncManagerForceHistoricalSync ensures that we can perform routine +// historical syncs whenever the HistoricalSyncTicker fires. +func TestSyncManagerForceHistoricalSync(t *testing.T) { t.Parallel() syncMgr := newTestSyncManager(0) From d68842ee9ebd27f93b355877df84c3c9e446ec5d Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Wed, 10 Apr 2019 19:27:12 -0700 Subject: [PATCH 7/7] discovery: queue active syncers until initial historical sync signal In this commit, we begin to queue any active syncers until the initial historical sync has completed. We do this to ensure we can properly handle any new channel updates at tip. This is required for fresh nodes that are syncing the channel graph for the first time. If we begin accepting updates at tip while the initial historical sync is still ongoing, then we risk not processing certain updates since we've yet to learn of the channels themselves. --- discovery/mock_test.go | 2 + discovery/sync_manager.go | 45 +++++++++++++++++++++- discovery/sync_manager_test.go | 68 ++++++++++++++++++++++++++++++++-- 3 files changed, 110 insertions(+), 5 deletions(-) diff --git a/discovery/mock_test.go b/discovery/mock_test.go index 9e945193..84ba8636 100644 --- a/discovery/mock_test.go +++ b/discovery/mock_test.go @@ -1,6 +1,7 @@ package discovery import ( + "errors" "net" "sync" @@ -30,6 +31,7 @@ func (p *mockPeer) SendMessage(_ bool, msgs ...lnwire.Message) error { select { case p.sentMsgs <- msg: case <-p.quit: + return errors.New("peer disconnected") } } diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index f95a784a..06022518 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -190,6 +190,15 @@ func (m *SyncManager) syncerHandler() { // attempt an initial historical sync when a new peer connects. attemptInitialHistoricalSync = true + // initialHistoricalSyncCompleted serves as a barrier when + // initializing new active GossipSyncers. If false, the initial + // historical sync has not completed, so we'll defer + // initializing any active GossipSyncers. If true, then we can + // transition the GossipSyncer immediately. We set up this + // barrier to ensure we have most of the graph before attempting + // to accept new updates at tip. + initialHistoricalSyncCompleted = false + // initialHistoricalSyncer is the syncer we are currently // performing an initial historical sync with. initialHistoricalSyncer *GossipSyncer @@ -221,10 +230,18 @@ func (m *SyncManager) syncerHandler() { // If we've exceeded our total number of active syncers, // we'll initialize this GossipSyncer as passive. case len(m.activeSyncers) >= m.cfg.NumActiveSyncers: + fallthrough + + // Otherwise, it should be initialized as active. If the + // initial historical sync has yet to complete, then + // we'll declare is as passive and attempt to transition + // it when the initial historical sync completes. + case !initialHistoricalSyncCompleted: s.setSyncType(PassiveSync) m.inactiveSyncers[s.cfg.peerPub] = s - // Otherwise, it should be initialized as active. + // The initial historical sync has completed, so we can + // immediately start the GossipSyncer as active. default: s.setSyncType(ActiveSync) m.activeSyncers[s.cfg.peerPub] = s @@ -310,6 +327,32 @@ func (m *SyncManager) syncerHandler() { case <-initialHistoricalSyncSignal: initialHistoricalSyncer = nil initialHistoricalSyncSignal = nil + initialHistoricalSyncCompleted = true + + log.Debug("Initial historical sync completed") + + // With the initial historical sync complete, we can + // begin receiving new graph updates at tip. We'll + // determine whether we can have any more active + // GossipSyncers. If we do, we'll randomly select some + // that are currently passive to transition. + m.syncersMu.Lock() + numActiveLeft := m.cfg.NumActiveSyncers - len(m.activeSyncers) + if numActiveLeft <= 0 { + m.syncersMu.Unlock() + continue + } + + log.Debugf("Attempting to transition %v passive "+ + "GossipSyncers to active", numActiveLeft) + + for i := 0; i < numActiveLeft; i++ { + chooseRandomSyncer( + m.inactiveSyncers, m.transitionPassiveSyncer, + ) + } + + m.syncersMu.Unlock() // Our RotateTicker has ticked, so we'll attempt to rotate a // single active syncer with a passive one. diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index 3c7fd6c7..b61253de 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -60,10 +60,10 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { // The first syncer registered always attempts a historical // sync. - assertActiveGossipTimestampRange(t, peer) if i == 0 { assertTransitionToChansSynced(t, s, peer) } + assertActiveGossipTimestampRange(t, peer) assertSyncerStatus(t, s, chansSynced, ActiveSync) } @@ -90,8 +90,8 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { historicalSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(historicalSyncPeer) historicalSyncer := assertSyncerExistence(t, syncMgr, historicalSyncPeer) - assertActiveGossipTimestampRange(t, historicalSyncPeer) assertTransitionToChansSynced(t, historicalSyncer, historicalSyncPeer) + assertActiveGossipTimestampRange(t, historicalSyncPeer) assertSyncerStatus(t, historicalSyncer, chansSynced, ActiveSync) // Then, we'll create the second active syncer, which is the one we'll @@ -142,8 +142,8 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { activeSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(activeSyncPeer) activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) - assertActiveGossipTimestampRange(t, activeSyncPeer) assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer) + assertActiveGossipTimestampRange(t, activeSyncPeer) assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll send a tick to force a rotation. Since there aren't any @@ -254,6 +254,66 @@ func TestSyncManagerForceHistoricalSync(t *testing.T) { }) } +// TestSyncManagerWaitUntilInitialHistoricalSync ensures that no GossipSyncers +// are initialized as ActiveSync until the initial historical sync has been +// completed. Once it does, the pending GossipSyncers should be transitioned to +// ActiveSync. +func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) { + t.Parallel() + + const numActiveSyncers = 2 + + // We'll start by creating our test sync manager which will hold up to + // 2 active syncers. + syncMgr := newTestSyncManager(numActiveSyncers) + syncMgr.Start() + defer syncMgr.Stop() + + // We'll go ahead and create our syncers. + peers := make([]*mockPeer, 0, numActiveSyncers) + syncers := make([]*GossipSyncer, 0, numActiveSyncers) + for i := 0; i < numActiveSyncers; i++ { + peer := randPeer(t, syncMgr.quit) + peers = append(peers, peer) + + syncMgr.InitSyncState(peer) + s := assertSyncerExistence(t, syncMgr, peer) + syncers = append(syncers, s) + + // The first one always attempts a historical sync. We won't + // transition it to chansSynced to ensure the remaining syncers + // aren't started as active. + if i == 0 { + assertSyncerStatus(t, s, syncingChans, PassiveSync) + continue + } + + // The rest should remain in a passive and chansSynced state, + // and they should be queued to transition to active once the + // initial historical sync is completed. + assertNoMsgSent(t, peer) + assertSyncerStatus(t, s, chansSynced, PassiveSync) + } + + // To ensure we don't transition any pending active syncers that have + // previously disconnected, we'll disconnect the last one. + stalePeer := peers[numActiveSyncers-1] + syncMgr.PruneSyncState(stalePeer.PubKey()) + + // Then, we'll complete the initial historical sync by transitioning the + // historical syncer to its final chansSynced state. This should trigger + // all of the pending active syncers to transition, except for the one + // we disconnected. + assertTransitionToChansSynced(t, syncers[0], peers[0]) + for i, s := range syncers { + if i == numActiveSyncers-1 { + assertNoMsgSent(t, peers[i]) + continue + } + assertPassiveSyncerTransition(t, s, peers[i]) + } +} + // assertNoMsgSent is a helper function that ensures a peer hasn't sent any // messages. func assertNoMsgSent(t *testing.T, peer *mockPeer) { @@ -294,7 +354,7 @@ func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) { var msgSent lnwire.Message select { case msgSent = <-peer.sentMsgs: - case <-time.After(time.Second): + case <-time.After(2 * time.Second): t.Fatalf("expected peer %x to send lnwire.GossipTimestampRange "+ "message", peer.PubKey()) }