diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 32eba9ce..b5e4bcb6 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -93,6 +93,10 @@ type chanPolicyUpdateRequest struct { errChan chan error } +// PinnedSyncers is a set of node pubkeys for which we will maintain an active +// syncer at all times. +type PinnedSyncers map[route.Vertex]struct{} + // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 5999449d..4c636b6b 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -92,6 +92,11 @@ type SyncManagerCfg struct { // BestHeight returns the latest height known of the chain. BestHeight func() uint32 + + // PinnedSyncers is a set of peers that will always transition to + // ActiveSync upon connection. These peers will never transition to + // PassiveSync. + PinnedSyncers PinnedSyncers } // SyncManager is a subsystem of the gossiper that manages the gossip syncers @@ -140,6 +145,12 @@ type SyncManager struct { // currently receiving new graph updates from. inactiveSyncers map[route.Vertex]*GossipSyncer + // pinnedActiveSyncers is the set of all syncers which are pinned into + // an active sync. Pinned peers performan an initial historical sync on + // each connection and will continue to receive graph updates for the + // duration of the connection. + pinnedActiveSyncers map[route.Vertex]*GossipSyncer + wg sync.WaitGroup quit chan struct{} } @@ -154,7 +165,10 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager { map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers, ), inactiveSyncers: make(map[route.Vertex]*GossipSyncer), - quit: make(chan struct{}), + pinnedActiveSyncers: make( + map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers), + ), + quit: make(chan struct{}), } } @@ -240,6 +254,8 @@ func (m *SyncManager) syncerHandler() { s := m.createGossipSyncer(newSyncer.peer) + isPinnedSyncer := m.isPinnedSyncer(s) + // attemptHistoricalSync determines whether we should // attempt an initial historical sync when a new peer // connects. @@ -247,6 +263,12 @@ func (m *SyncManager) syncerHandler() { m.syncersMu.Lock() switch { + // For pinned syncers, we will immediately transition + // the peer into an active (pinned) sync state. + case isPinnedSyncer: + s.setSyncType(PinnedSync) + m.pinnedActiveSyncers[s.cfg.peerPub] = s + // Regardless of whether the initial historical sync // has completed, we'll re-trigger a historical sync if // we no longer have any syncers. This might be @@ -416,6 +438,13 @@ func (m *SyncManager) syncerHandler() { } } +// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned +// sync peers. +func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool { + _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub] + return isPinnedSyncer +} + // createGossipSyncer creates the GossipSyncer for a newly connected peer. func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { nodeID := route.Vertex(peer.PubKey()) @@ -474,6 +503,13 @@ func (m *SyncManager) removeGossipSyncer(peer route.Vertex) { return } + // If it's a pinned syncer, then we can just exit as this doesn't + // affect our active syncer count. + if _, ok := m.pinnedActiveSyncers[peer]; ok { + delete(m.pinnedActiveSyncers, peer) + return + } + // Otherwise, we'll need find a new one to replace it, if any. delete(m.activeSyncers, peer) newActiveSyncer := chooseRandomSyncer( @@ -676,6 +712,10 @@ func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) { if ok { return syncer, true } + syncer, ok = m.pinnedActiveSyncers[peer] + if ok { + return syncer, true + } return nil, false } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index 35707943..3eb97b4e 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -7,9 +7,11 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/btcec" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" ) @@ -18,8 +20,13 @@ import ( func randPeer(t *testing.T, quit chan struct{}) *mockPeer { t.Helper() + pk := randPubKey(t) + return peerWithPubkey(pk, quit) +} + +func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer { return &mockPeer{ - pk: randPubKey(t), + pk: pk, sentMsgs: make(chan lnwire.Message), quit: quit, } @@ -28,6 +35,14 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer { // newTestSyncManager creates a new test SyncManager using mock implementations // of its dependencies. func newTestSyncManager(numActiveSyncers int) *SyncManager { + return newPinnedTestSyncManager(numActiveSyncers, nil) +} + +// newTestSyncManager creates a new test SyncManager with a set of pinned +// syncers using mock implementations of its dependencies. +func newPinnedTestSyncManager(numActiveSyncers int, + pinnedSyncers PinnedSyncers) *SyncManager { + hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight} return newSyncManager(&SyncManagerCfg{ ChanSeries: newMockChannelGraphTimeSeries(hID), @@ -37,6 +52,7 @@ func newTestSyncManager(numActiveSyncers int) *SyncManager { BestHeight: func() uint32 { return latestKnownHeight }, + PinnedSyncers: pinnedSyncers, }) } @@ -48,14 +64,38 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { // We'll start by creating our test sync manager which will hold up to // 3 active syncers. const numActiveSyncers = 3 - const numSyncers = numActiveSyncers + 1 + const numPinnedSyncers = 3 + const numInactiveSyncers = 1 - syncMgr := newTestSyncManager(numActiveSyncers) + pinnedSyncers := make(PinnedSyncers) + pinnedPubkeys := make(map[route.Vertex]*btcec.PublicKey) + for i := 0; i < numPinnedSyncers; i++ { + pubkey := randPubKey(t) + vertex := route.NewVertex(pubkey) + + pinnedSyncers[vertex] = struct{}{} + pinnedPubkeys[vertex] = pubkey + + } + + syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers) syncMgr.Start() defer syncMgr.Stop() + // First we'll start by adding the pinned syncers. These should + // immediately be assigned PinnedSync. + for _, pubkey := range pinnedPubkeys { + peer := peerWithPubkey(pubkey, syncMgr.quit) + err := syncMgr.InitSyncState(peer) + require.NoError(t, err) + + s := assertSyncerExistence(t, syncMgr, peer) + assertSyncerStatus(t, s, chansSynced, PinnedSync) + } + // We'll go ahead and create our syncers. We'll gather the ones which - // should be active and passive to check them later on. + // should be active and passive to check them later on. The pinned peers + // added above should not influence the active syncer count. for i := 0; i < numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) err := syncMgr.InitSyncState(peer) @@ -72,7 +112,7 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { assertSyncerStatus(t, s, chansSynced, ActiveSync) } - for i := 0; i < numSyncers-numActiveSyncers; i++ { + for i := 0; i < numInactiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) err := syncMgr.InitSyncState(peer) require.NoError(t, err) diff --git a/discovery/syncer.go b/discovery/syncer.go index 04a722f2..6d137d71 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -42,6 +42,13 @@ const ( // They are started in a chansSynced state in order to accomplish their // responsibilities above. PassiveSync + + // PinnedSync denotes an ActiveSync that doesn't count towards the + // default active syncer limits and is always active throughout the + // duration of the peer's connection. Each pinned syncer will begin by + // performing a historical sync to ensure we are well synchronized with + // their routing table. + PinnedSync ) // String returns a human readable string describing the target SyncerType. @@ -51,11 +58,24 @@ func (t SyncerType) String() string { return "ActiveSync" case PassiveSync: return "PassiveSync" + case PinnedSync: + return "PinnedSync" default: return fmt.Sprintf("unknown sync type %d", t) } } +// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange +// allowing new gossip messages to be received from the peer. +func (t SyncerType) IsActiveSync() bool { + switch t { + case ActiveSync, PinnedSync: + return true + default: + return false + } +} + // syncerState is an enum that represents the current state of the GossipSyncer. // As the syncer is a state machine, we'll gate our actions based off of the // current state and the next incoming message. @@ -560,7 +580,9 @@ func (g *GossipSyncer) channelGraphSyncer() { // If we haven't yet sent out our update horizon, and // we want to receive real-time channel updates, we'll // do so now. - if g.localUpdateHorizon == nil && syncType == ActiveSync { + if g.localUpdateHorizon == nil && + syncType.IsActiveSync() { + err := g.sendGossipTimestampRange( time.Now(), math.MaxUint32, ) @@ -1418,7 +1440,7 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { switch req.newSyncType { // If an active sync has been requested, then we should resume receiving // new graph updates from the remote peer. - case ActiveSync: + case ActiveSync, PinnedSync: firstTimestamp = time.Now() timestampRange = math.MaxUint32