diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 524db4e9..ae87f97a 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -30,6 +30,30 @@ var ( ErrSyncManagerExiting = errors.New("sync manager exiting") ) +// newSyncer in an internal message we'll use within the SyncManager to signal +// that we should create a GossipSyncer for a newly connected peer. +type newSyncer struct { + // peer is the newly connected peer. + peer lnpeer.Peer + + // doneChan serves as a signal to the caller that the SyncManager's + // internal state correctly reflects the stale active syncer. + doneChan chan struct{} +} + +// staleSyncer is an internal message we'll use within the SyncManager to signal +// that a peer has disconnected and its GossipSyncer should be removed. +type staleSyncer struct { + // peer is the peer that has disconnected. + peer routing.Vertex + + // doneChan serves as a signal to the caller that the SyncManager's + // internal state correctly reflects the stale active syncer. This is + // needed to ensure we always create a new syncer for a flappy peer + // after they disconnect if they happened to be an active syncer. + doneChan chan struct{} +} + // SyncManagerCfg contains all of the dependencies required for the SyncManager // to carry out its duties. type SyncManagerCfg struct { @@ -79,6 +103,18 @@ type SyncManager struct { // _once_ with a peer during the SyncManager's startup. historicalSync sync.Once + // newSyncers is a channel we'll use to process requests to create + // GossipSyncers for newly connected peers. + newSyncers chan *newSyncer + + // staleSyncers is a channel we'll use to process requests to tear down + // GossipSyncers for disconnected peers. + staleSyncers chan *staleSyncer + + // syncersMu guards the read and write access to the activeSyncers and + // inactiveSyncers maps below. + syncersMu sync.Mutex + // activeSyncers is the set of all syncers for which we are currently // receiving graph updates from. The number of possible active syncers // is bounded by NumActiveSyncers. @@ -88,7 +124,6 @@ type SyncManager struct { // currently receiving new graph updates from. inactiveSyncers map[routing.Vertex]*GossipSyncer - sync.Mutex wg sync.WaitGroup quit chan struct{} } @@ -96,7 +131,9 @@ type SyncManager struct { // newSyncManager constructs a new SyncManager backed by the given config. func newSyncManager(cfg *SyncManagerCfg) *SyncManager { return &SyncManager{ - cfg: *cfg, + cfg: *cfg, + newSyncers: make(chan *newSyncer), + staleSyncers: make(chan *staleSyncer), activeSyncers: make( map[routing.Vertex]*GossipSyncer, cfg.NumActiveSyncers, ), @@ -119,9 +156,6 @@ func (m *SyncManager) Stop() { close(m.quit) m.wg.Wait() - m.Lock() - defer m.Unlock() - for _, syncer := range m.inactiveSyncers { syncer.Stop() } @@ -133,11 +167,13 @@ func (m *SyncManager) Stop() { // syncerHandler is the SyncManager's main event loop responsible for: // -// 1. Finding new peers to receive graph updates from to ensure we don't only -// receive them from the same set of peers. -// -// 2. Finding new peers to force a historical sync with to ensure we have as -// much of the public network as possible. +// 1. Creating and tearing down GossipSyncers for connected/disconnected peers. + +// 2. Finding new peers to receive graph updates from to ensure we don't only +// receive them from the same set of peers. + +// 3. Finding new peers to force a historical sync with to ensure we have as +// much of the public network as possible. // // NOTE: This must be run as a goroutine. func (m *SyncManager) syncerHandler() { @@ -151,6 +187,69 @@ func (m *SyncManager) syncerHandler() { for { select { + // A new peer has been connected, so we'll create its + // accompanying GossipSyncer. + case newSyncer := <-m.newSyncers: + // If we already have a syncer, then we'll exit early as + // we don't want to override it. + if _, ok := m.GossipSyncer(newSyncer.peer.PubKey()); ok { + close(newSyncer.doneChan) + continue + } + + s := m.createGossipSyncer(newSyncer.peer) + + m.syncersMu.Lock() + switch { + // If we've exceeded our total number of active syncers, + // we'll initialize this GossipSyncer as passive. + case len(m.activeSyncers) >= m.cfg.NumActiveSyncers: + s.setSyncType(PassiveSync) + m.inactiveSyncers[s.cfg.peerPub] = s + + // Otherwise, it should be initialized as active. + default: + s.setSyncType(ActiveSync) + m.activeSyncers[s.cfg.peerPub] = s + } + m.syncersMu.Unlock() + + s.Start() + + // Once we create the GossipSyncer, we'll signal to the + // caller that they can proceed since the SyncManager's + // internal state has been updated. + close(newSyncer.doneChan) + + // We'll force a historical sync with the first peer we + // connect to, to ensure we get as much of the graph as + // possible. + var err error + m.historicalSync.Do(func() { + log.Infof("Attempting historical sync with "+ + "GossipSyncer(%x)", s.cfg.peerPub) + err = s.historicalSync() + }) + if err != nil { + log.Errorf("Unable to perform historical sync "+ + "with GossipSyncer(%x): %v", + s.cfg.peerPub, err) + + // Reset historicalSync to ensure it is tried + // again with a different peer. + m.historicalSync = sync.Once{} + } + + // An existing peer has disconnected, so we'll tear down its + // corresponding GossipSyncer. + case staleSyncer := <-m.staleSyncers: + // Once the corresponding GossipSyncer has been stopped + // and removed, we'll signal to the caller that they can + // proceed since the SyncManager's internal state has + // been updated. + m.removeGossipSyncer(staleSyncer.peer) + close(staleSyncer.doneChan) + // Our RotateTicker has ticked, so we'll attempt to rotate a // single active syncer with a passive one. case <-m.cfg.RotateTicker.Ticks(): @@ -167,15 +266,83 @@ func (m *SyncManager) syncerHandler() { } } +// createGossipSyncer creates the GossipSyncer for a newly connected peer. +func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { + nodeID := routing.Vertex(peer.PubKey()) + log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:]) + + encoding := lnwire.EncodingSortedPlain + s := newGossipSyncer(gossipSyncerCfg{ + chainHash: m.cfg.ChainHash, + peerPub: nodeID, + channelSeries: m.cfg.ChanSeries, + encodingType: encoding, + chunkSize: encodingTypeToChunkSize[encoding], + batchSize: requestBatchSize, + sendToPeer: func(msgs ...lnwire.Message) error { + return peer.SendMessageLazy(false, msgs...) + }, + }) + + // Gossip syncers are initialized by default in a PassiveSync type + // and chansSynced state so that they can reply to any peer queries or + // handle any sync transitions. + s.setSyncState(chansSynced) + s.setSyncType(PassiveSync) + return s +} + +// removeGossipSyncer removes all internal references to the disconnected peer's +// GossipSyncer and stops it. In the event of an active GossipSyncer being +// disconnected, a passive GossipSyncer, if any, will take its place. +func (m *SyncManager) removeGossipSyncer(peer routing.Vertex) { + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + + s, ok := m.gossipSyncer(peer) + if !ok { + return + } + + log.Infof("Removing GossipSyncer for peer=%v", peer) + + // We'll stop the GossipSyncer for the disconnected peer in a goroutine + // to prevent blocking the SyncManager. + go s.Stop() + + // If it's a non-active syncer, then we can just exit now. + if _, ok := m.inactiveSyncers[peer]; ok { + delete(m.inactiveSyncers, peer) + return + } + + // Otherwise, we'll need find a new one to replace it, if any. + delete(m.activeSyncers, peer) + newActiveSyncer := m.chooseRandomSyncer(nil, false) + if newActiveSyncer == nil { + return + } + + if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil { + log.Errorf("Unable to transition passive GossipSyncer(%x): %v", + newActiveSyncer.cfg.peerPub, err) + return + } + + log.Debugf("Replaced active GossipSyncer(%x) with GossipSyncer(%x)", + peer, newActiveSyncer.cfg.peerPub) +} + // rotateActiveSyncerCandidate rotates a single active syncer. In order to // achieve this, the active syncer must be in a chansSynced state in order to // process the sync transition. func (m *SyncManager) rotateActiveSyncerCandidate() { + m.syncersMu.Lock() + defer m.syncersMu.Unlock() + // If we don't have a candidate to rotate with, we can return early. - m.Lock() candidate := m.chooseRandomSyncer(nil, false) if candidate == nil { - m.Unlock() log.Debug("No eligible candidate to rotate active syncer") return } @@ -193,7 +360,6 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { activeSyncer = s break } - m.Unlock() // If we couldn't find an eligible one, we can return early. if activeSyncer == nil { @@ -201,13 +367,11 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { return } + // Otherwise, we'll attempt to transition each syncer to their + // respective new sync type. log.Debugf("Rotating active GossipSyncer(%x) with GossipSyncer(%x)", activeSyncer.cfg.peerPub, candidate.cfg.peerPub) - // Otherwise, we'll attempt to transition each syncer to their - // respective new sync type. We'll avoid performing the transition with - // the lock as it can potentially stall the SyncManager due to the - // syncTransitionTimeout. if err := m.transitionActiveSyncer(activeSyncer); err != nil { log.Errorf("Unable to transition active GossipSyncer(%x): %v", activeSyncer.cfg.peerPub, err) @@ -222,6 +386,8 @@ func (m *SyncManager) rotateActiveSyncerCandidate() { } // transitionActiveSyncer transitions an active syncer to a passive one. +// +// NOTE: This must be called with the syncersMu lock held. func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error { log.Debugf("Transitioning active GossipSyncer(%x) to passive", s.cfg.peerPub) @@ -230,15 +396,15 @@ func (m *SyncManager) transitionActiveSyncer(s *GossipSyncer) error { return err } - m.Lock() delete(m.activeSyncers, s.cfg.peerPub) m.inactiveSyncers[s.cfg.peerPub] = s - m.Unlock() return nil } // transitionPassiveSyncer transitions a passive syncer to an active one. +// +// NOTE: This must be called with the syncersMu lock held. func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { log.Debugf("Transitioning passive GossipSyncer(%x) to active", s.cfg.peerPub) @@ -247,10 +413,8 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { return err } - m.Lock() delete(m.inactiveSyncers, s.cfg.peerPub) m.activeSyncers[s.cfg.peerPub] = s - m.Unlock() return nil } @@ -258,8 +422,8 @@ func (m *SyncManager) transitionPassiveSyncer(s *GossipSyncer) error { // forceHistoricalSync chooses a syncer with a remote peer at random and forces // a historical sync with it. func (m *SyncManager) forceHistoricalSync() { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() // We'll choose a random peer with whom we can perform a historical sync // with. We'll set useActive to true to make sure we can still do one if @@ -340,64 +504,23 @@ func (m *SyncManager) chooseRandomSyncer(blacklist map[routing.Vertex]struct{}, // public channel graph as possible. // // TODO(wilmer): Only mark as ActiveSync if this isn't a channel peer. -func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { - m.Lock() - defer m.Unlock() +func (m *SyncManager) InitSyncState(peer lnpeer.Peer) error { + done := make(chan struct{}) - // If we already have a syncer, then we'll exit early as we don't want - // to override it. - nodeID := routing.Vertex(peer.PubKey()) - if _, ok := m.gossipSyncer(nodeID); ok { - return + select { + case m.newSyncers <- &newSyncer{ + peer: peer, + doneChan: done, + }: + case <-m.quit: + return ErrSyncManagerExiting } - log.Infof("Creating new GossipSyncer for peer=%x", nodeID[:]) - - encoding := lnwire.EncodingSortedPlain - s := newGossipSyncer(gossipSyncerCfg{ - chainHash: m.cfg.ChainHash, - peerPub: nodeID, - channelSeries: m.cfg.ChanSeries, - encodingType: encoding, - chunkSize: encodingTypeToChunkSize[encoding], - batchSize: requestBatchSize, - sendToPeer: func(msgs ...lnwire.Message) error { - return peer.SendMessageLazy(false, msgs...) - }, - }) - - // If we've yet to reach our desired number of active syncers, then - // we'll use this one. - if len(m.activeSyncers) < m.cfg.NumActiveSyncers { - s.setSyncType(ActiveSync) - m.activeSyncers[s.cfg.peerPub] = s - } else { - s.setSyncType(PassiveSync) - m.inactiveSyncers[s.cfg.peerPub] = s - } - - // Gossip syncers are initialized by default in a chansSynced state so - // that they can reply to any peer queries or - // handle any sync transitions. - s.setSyncState(chansSynced) - s.Start() - - // We'll force a historical sync with the first peer we connect to - // ensure we get as much of the graph as possible. - var err error - m.historicalSync.Do(func() { - log.Infof("Attempting historical sync with GossipSyncer(%x)", - s.cfg.peerPub) - - err = s.historicalSync() - }) - if err != nil { - log.Errorf("Unable to perform historical sync with "+ - "GossipSyncer(%x): %v", s.cfg.peerPub, err) - - // Reset historicalSync to ensure it is tried again with a - // different peer. - m.historicalSync = sync.Once{} + select { + case <-done: + return nil + case <-m.quit: + return ErrSyncManagerExiting } } @@ -405,48 +528,30 @@ func (m *SyncManager) InitSyncState(peer lnpeer.Peer) { // previously connected to has been disconnected. In this case we can stop the // existing GossipSyncer assigned to the peer and free up resources. func (m *SyncManager) PruneSyncState(peer routing.Vertex) { - s, ok := m.GossipSyncer(peer) - if !ok { + done := make(chan struct{}) + + // We avoid returning an error when the SyncManager is stopped since the + // GossipSyncer will be stopped then anyway. + select { + case m.staleSyncers <- &staleSyncer{ + peer: peer, + doneChan: done, + }: + case <-m.quit: return } - log.Infof("Removing GossipSyncer for peer=%v", peer) - - // We'll start by stopping the GossipSyncer for the disconnected peer. - s.Stop() - - // If it's a non-active syncer, then we can just exit now. - m.Lock() - if _, ok := m.inactiveSyncers[s.cfg.peerPub]; ok { - delete(m.inactiveSyncers, s.cfg.peerPub) - m.Unlock() - return + select { + case <-done: + case <-m.quit: } - - // Otherwise, we'll need to dequeue it from our pending active syncers - // queue and find a new one to replace it, if any. - delete(m.activeSyncers, s.cfg.peerPub) - newActiveSyncer := m.chooseRandomSyncer(nil, false) - m.Unlock() - if newActiveSyncer == nil { - return - } - - if err := m.transitionPassiveSyncer(newActiveSyncer); err != nil { - log.Errorf("Unable to transition passive GossipSyncer(%x): %v", - newActiveSyncer.cfg.peerPub, err) - return - } - - log.Debugf("Replaced active GossipSyncer(%v) with GossipSyncer(%x)", - peer, newActiveSyncer.cfg.peerPub) } // GossipSyncer returns the associated gossip syncer of a peer. The boolean // returned signals whether there exists a gossip syncer for the peer. func (m *SyncManager) GossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() return m.gossipSyncer(peer) } @@ -466,8 +571,8 @@ func (m *SyncManager) gossipSyncer(peer routing.Vertex) (*GossipSyncer, bool) { // GossipSyncers returns all of the currently initialized gossip syncers. func (m *SyncManager) GossipSyncers() map[routing.Vertex]*GossipSyncer { - m.Lock() - defer m.Unlock() + m.syncersMu.Lock() + defer m.syncersMu.Unlock() numSyncers := len(m.inactiveSyncers) + len(m.activeSyncers) syncers := make(map[routing.Vertex]*GossipSyncer, numSyncers) diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index ecdd8af1..e965af3c 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -55,21 +55,23 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { // should be active and passive to check them later on. for i := 0; i < numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) - go syncMgr.InitSyncState(peer) + syncMgr.InitSyncState(peer) + s := assertSyncerExistence(t, syncMgr, peer) // The first syncer registered always attempts a historical // sync. assertActiveGossipTimestampRange(t, peer) if i == 0 { - assertTransitionToChansSynced(t, syncMgr, peer) + assertTransitionToChansSynced(t, s, peer) } - assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync) + assertSyncerStatus(t, s, chansSynced, ActiveSync) } for i := 0; i < numSyncers-numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(peer) - assertSyncerStatus(t, syncMgr, peer, chansSynced, PassiveSync) + s := assertSyncerExistence(t, syncMgr, peer) + assertSyncerStatus(t, s, chansSynced, PassiveSync) } } @@ -86,10 +88,11 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // peer1 will represent an active syncer that performs a historical // sync since it is the first registered peer with the SyncManager. peer1 := randPeer(t, syncMgr.quit) - go syncMgr.InitSyncState(peer1) + syncMgr.InitSyncState(peer1) + syncer1 := assertSyncerExistence(t, syncMgr, peer1) assertActiveGossipTimestampRange(t, peer1) - assertTransitionToChansSynced(t, syncMgr, peer1) - assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync) + assertTransitionToChansSynced(t, syncer1, peer1) + assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) // It will then be torn down to simulate a disconnection. Since there // are no other candidate syncers available, the active syncer won't be @@ -99,20 +102,22 @@ func TestSyncManagerNewActiveSyncerAfterDisconnect(t *testing.T) { // Then, we'll start our active syncer again, but this time we'll also // have a passive syncer available to replace the active syncer after // the peer disconnects. - go syncMgr.InitSyncState(peer1) + syncMgr.InitSyncState(peer1) + syncer1 = assertSyncerExistence(t, syncMgr, peer1) assertActiveGossipTimestampRange(t, peer1) - assertSyncerStatus(t, syncMgr, peer1, chansSynced, ActiveSync) + assertSyncerStatus(t, syncer1, chansSynced, ActiveSync) // Create our second peer, which should be initialized as a passive // syncer. peer2 := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(peer2) - assertSyncerStatus(t, syncMgr, peer2, chansSynced, PassiveSync) + syncer2 := assertSyncerExistence(t, syncMgr, peer2) + assertSyncerStatus(t, syncer2, chansSynced, PassiveSync) // Disconnect our active syncer, which should trigger the SyncManager to // replace it with our passive syncer. go syncMgr.PruneSyncState(peer1.PubKey()) - assertPassiveSyncerTransition(t, syncMgr, peer2) + assertPassiveSyncerTransition(t, syncer2, peer2) } // TestSyncManagerRotateActiveSyncerCandidate tests that we can successfully @@ -127,21 +132,23 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // The first syncer registered always performs a historical sync. activeSyncPeer := randPeer(t, syncMgr.quit) - go syncMgr.InitSyncState(activeSyncPeer) + syncMgr.InitSyncState(activeSyncPeer) + activeSyncer := assertSyncerExistence(t, syncMgr, activeSyncPeer) assertActiveGossipTimestampRange(t, activeSyncPeer) - assertTransitionToChansSynced(t, syncMgr, activeSyncPeer) - assertSyncerStatus(t, syncMgr, activeSyncPeer, chansSynced, ActiveSync) + assertTransitionToChansSynced(t, activeSyncer, activeSyncPeer) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll send a tick to force a rotation. Since there aren't any // candidates, none of the active syncers will be rotated. syncMgr.cfg.RotateTicker.(*ticker.Force).Force <- time.Time{} assertNoMsgSent(t, activeSyncPeer) - assertSyncerStatus(t, syncMgr, activeSyncPeer, chansSynced, ActiveSync) + assertSyncerStatus(t, activeSyncer, chansSynced, ActiveSync) // We'll then go ahead and add a passive syncer. passiveSyncPeer := randPeer(t, syncMgr.quit) syncMgr.InitSyncState(passiveSyncPeer) - assertSyncerStatus(t, syncMgr, passiveSyncPeer, chansSynced, PassiveSync) + passiveSyncer := assertSyncerExistence(t, syncMgr, passiveSyncPeer) + assertSyncerStatus(t, passiveSyncer, chansSynced, PassiveSync) // We'll force another rotation - this time, since we have a passive // syncer available, they should be rotated. @@ -150,7 +157,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // The transition from an active syncer to a passive syncer causes the // peer to send out a new GossipTimestampRange in the past so that they // don't receive new graph updates. - assertActiveSyncerTransition(t, syncMgr, activeSyncPeer) + assertActiveSyncerTransition(t, activeSyncer, activeSyncPeer) // The transition from a passive syncer to an active syncer causes the // peer to send a new GossipTimestampRange with the current timestamp to @@ -159,7 +166,7 @@ func TestSyncManagerRotateActiveSyncerCandidate(t *testing.T) { // machine, starting from its initial syncingChans state. We'll then // need to transition it to its final chansSynced state to ensure the // next syncer is properly started in the round-robin. - assertPassiveSyncerTransition(t, syncMgr, passiveSyncPeer) + assertPassiveSyncerTransition(t, passiveSyncer, passiveSyncPeer) } // TestSyncManagerHistoricalSync ensures that we only attempt a single @@ -258,10 +265,9 @@ func assertActiveGossipTimestampRange(t *testing.T, peer *mockPeer) { } } -// assertSyncerStatus asserts that the gossip syncer for the given peer matches -// the expected sync state and type. -func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, - syncState syncerState, syncType SyncerType) { +// assertSyncerExistence asserts that a GossipSyncer exists for the given peer. +func assertSyncerExistence(t *testing.T, syncMgr *SyncManager, + peer *mockPeer) *GossipSyncer { t.Helper() @@ -270,19 +276,29 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, t.Fatalf("gossip syncer for peer %x not found", peer.PubKey()) } + return s +} + +// assertSyncerStatus asserts that the gossip syncer for the given peer matches +// the expected sync state and type. +func assertSyncerStatus(t *testing.T, s *GossipSyncer, syncState syncerState, + syncType SyncerType) { + + t.Helper() + // We'll check the status of our syncer within a WaitPredicate as some // sync transitions might cause this to be racy. err := lntest.WaitNoError(func() error { state := s.syncState() if s.syncState() != syncState { return fmt.Errorf("expected syncState %v for peer "+ - "%x, got %v", syncState, peer.PubKey(), state) + "%x, got %v", syncState, s.cfg.peerPub, state) } typ := s.SyncType() if s.SyncType() != syncType { return fmt.Errorf("expected syncType %v for peer "+ - "%x, got %v", syncType, peer.PubKey(), typ) + "%x, got %v", syncType, s.cfg.peerPub, typ) } return nil @@ -294,16 +310,9 @@ func assertSyncerStatus(t *testing.T, syncMgr *SyncManager, peer *mockPeer, // assertTransitionToChansSynced asserts the transition of an ActiveSync // GossipSyncer to its final chansSynced state. -func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { - +func assertTransitionToChansSynced(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() - s, ok := syncMgr.GossipSyncer(peer.PubKey()) - if !ok { - t.Fatalf("gossip syncer for peer %x not found", peer.PubKey()) - } - assertMsgSent(t, peer, &lnwire.QueryChannelRange{ FirstBlockHeight: 0, NumBlocks: math.MaxUint32, @@ -311,7 +320,7 @@ func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, s.ProcessQueryMsg(&lnwire.ReplyChannelRange{Complete: 1}, nil) - chanSeries := syncMgr.cfg.ChanSeries.(*mockChannelGraphTimeSeries) + chanSeries := s.cfg.channelSeries.(*mockChannelGraphTimeSeries) select { case <-chanSeries.filterReq: @@ -336,25 +345,22 @@ func assertTransitionToChansSynced(t *testing.T, syncMgr *SyncManager, // assertPassiveSyncerTransition asserts that a gossip syncer goes through all // of its expected steps when transitioning from passive to active. -func assertPassiveSyncerTransition(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { +func assertPassiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() assertActiveGossipTimestampRange(t, peer) - assertSyncerStatus(t, syncMgr, peer, chansSynced, ActiveSync) + assertSyncerStatus(t, s, chansSynced, ActiveSync) } // assertActiveSyncerTransition asserts that a gossip syncer goes through all of // its expected steps when transitioning from active to passive. -func assertActiveSyncerTransition(t *testing.T, syncMgr *SyncManager, - peer *mockPeer) { - +func assertActiveSyncerTransition(t *testing.T, s *GossipSyncer, peer *mockPeer) { t.Helper() assertMsgSent(t, peer, &lnwire.GossipTimestampRange{ FirstTimestamp: uint32(zeroTimestamp.Unix()), TimestampRange: 0, }) - assertSyncerStatus(t, syncMgr, peer, chansSynced, PassiveSync) + assertSyncerStatus(t, s, chansSynced, PassiveSync) }