From 2f0d56d53976f58070c02b18d4cb072b32d0954a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 29 Jan 2021 00:13:38 -0800 Subject: [PATCH] discovery: add support for PinnedSyncers A pinned syncer is an ActiveSyncer that is configured to always remain active for the lifetime of the connection. Pinned syncers do not count towards the total NumActiveSyncer count, which are rotated periodically. This features allows nodes to more tightly synchronize their routing tables by ensuring they are always receiving gossip from distinguished subset of peers. --- discovery/gossiper.go | 4 +++ discovery/sync_manager.go | 42 +++++++++++++++++++++++++++- discovery/sync_manager_test.go | 50 ++++++++++++++++++++++++++++++---- discovery/syncer.go | 26 ++++++++++++++++-- 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 32eba9ce..b5e4bcb6 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -93,6 +93,10 @@ type chanPolicyUpdateRequest struct { errChan chan error } +// PinnedSyncers is a set of node pubkeys for which we will maintain an active +// syncer at all times. +type PinnedSyncers map[route.Vertex]struct{} + // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 5999449d..4c636b6b 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -92,6 +92,11 @@ type SyncManagerCfg struct { // BestHeight returns the latest height known of the chain. BestHeight func() uint32 + + // PinnedSyncers is a set of peers that will always transition to + // ActiveSync upon connection. These peers will never transition to + // PassiveSync. + PinnedSyncers PinnedSyncers } // SyncManager is a subsystem of the gossiper that manages the gossip syncers @@ -140,6 +145,12 @@ type SyncManager struct { // currently receiving new graph updates from. inactiveSyncers map[route.Vertex]*GossipSyncer + // pinnedActiveSyncers is the set of all syncers which are pinned into + // an active sync. Pinned peers performan an initial historical sync on + // each connection and will continue to receive graph updates for the + // duration of the connection. + pinnedActiveSyncers map[route.Vertex]*GossipSyncer + wg sync.WaitGroup quit chan struct{} } @@ -154,7 +165,10 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager { map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers, ), inactiveSyncers: make(map[route.Vertex]*GossipSyncer), - quit: make(chan struct{}), + pinnedActiveSyncers: make( + map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers), + ), + quit: make(chan struct{}), } } @@ -240,6 +254,8 @@ func (m *SyncManager) syncerHandler() { s := m.createGossipSyncer(newSyncer.peer) + isPinnedSyncer := m.isPinnedSyncer(s) + // attemptHistoricalSync determines whether we should // attempt an initial historical sync when a new peer // connects. @@ -247,6 +263,12 @@ func (m *SyncManager) syncerHandler() { m.syncersMu.Lock() switch { + // For pinned syncers, we will immediately transition + // the peer into an active (pinned) sync state. + case isPinnedSyncer: + s.setSyncType(PinnedSync) + m.pinnedActiveSyncers[s.cfg.peerPub] = s + // Regardless of whether the initial historical sync // has completed, we'll re-trigger a historical sync if // we no longer have any syncers. This might be @@ -416,6 +438,13 @@ func (m *SyncManager) syncerHandler() { } } +// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned +// sync peers. +func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool { + _, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub] + return isPinnedSyncer +} + // createGossipSyncer creates the GossipSyncer for a newly connected peer. func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { nodeID := route.Vertex(peer.PubKey()) @@ -474,6 +503,13 @@ func (m *SyncManager) removeGossipSyncer(peer route.Vertex) { return } + // If it's a pinned syncer, then we can just exit as this doesn't + // affect our active syncer count. + if _, ok := m.pinnedActiveSyncers[peer]; ok { + delete(m.pinnedActiveSyncers, peer) + return + } + // Otherwise, we'll need find a new one to replace it, if any. delete(m.activeSyncers, peer) newActiveSyncer := chooseRandomSyncer( @@ -676,6 +712,10 @@ func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) { if ok { return syncer, true } + syncer, ok = m.pinnedActiveSyncers[peer] + if ok { + return syncer, true + } return nil, false } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index 35707943..3eb97b4e 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -7,9 +7,11 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/btcec" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" ) @@ -18,8 +20,13 @@ import ( func randPeer(t *testing.T, quit chan struct{}) *mockPeer { t.Helper() + pk := randPubKey(t) + return peerWithPubkey(pk, quit) +} + +func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer { return &mockPeer{ - pk: randPubKey(t), + pk: pk, sentMsgs: make(chan lnwire.Message), quit: quit, } @@ -28,6 +35,14 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer { // newTestSyncManager creates a new test SyncManager using mock implementations // of its dependencies. func newTestSyncManager(numActiveSyncers int) *SyncManager { + return newPinnedTestSyncManager(numActiveSyncers, nil) +} + +// newTestSyncManager creates a new test SyncManager with a set of pinned +// syncers using mock implementations of its dependencies. +func newPinnedTestSyncManager(numActiveSyncers int, + pinnedSyncers PinnedSyncers) *SyncManager { + hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight} return newSyncManager(&SyncManagerCfg{ ChanSeries: newMockChannelGraphTimeSeries(hID), @@ -37,6 +52,7 @@ func newTestSyncManager(numActiveSyncers int) *SyncManager { BestHeight: func() uint32 { return latestKnownHeight }, + PinnedSyncers: pinnedSyncers, }) } @@ -48,14 +64,38 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { // We'll start by creating our test sync manager which will hold up to // 3 active syncers. const numActiveSyncers = 3 - const numSyncers = numActiveSyncers + 1 + const numPinnedSyncers = 3 + const numInactiveSyncers = 1 - syncMgr := newTestSyncManager(numActiveSyncers) + pinnedSyncers := make(PinnedSyncers) + pinnedPubkeys := make(map[route.Vertex]*btcec.PublicKey) + for i := 0; i < numPinnedSyncers; i++ { + pubkey := randPubKey(t) + vertex := route.NewVertex(pubkey) + + pinnedSyncers[vertex] = struct{}{} + pinnedPubkeys[vertex] = pubkey + + } + + syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers) syncMgr.Start() defer syncMgr.Stop() + // First we'll start by adding the pinned syncers. These should + // immediately be assigned PinnedSync. + for _, pubkey := range pinnedPubkeys { + peer := peerWithPubkey(pubkey, syncMgr.quit) + err := syncMgr.InitSyncState(peer) + require.NoError(t, err) + + s := assertSyncerExistence(t, syncMgr, peer) + assertSyncerStatus(t, s, chansSynced, PinnedSync) + } + // We'll go ahead and create our syncers. We'll gather the ones which - // should be active and passive to check them later on. + // should be active and passive to check them later on. The pinned peers + // added above should not influence the active syncer count. for i := 0; i < numActiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) err := syncMgr.InitSyncState(peer) @@ -72,7 +112,7 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) { assertSyncerStatus(t, s, chansSynced, ActiveSync) } - for i := 0; i < numSyncers-numActiveSyncers; i++ { + for i := 0; i < numInactiveSyncers; i++ { peer := randPeer(t, syncMgr.quit) err := syncMgr.InitSyncState(peer) require.NoError(t, err) diff --git a/discovery/syncer.go b/discovery/syncer.go index 04a722f2..6d137d71 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -42,6 +42,13 @@ const ( // They are started in a chansSynced state in order to accomplish their // responsibilities above. PassiveSync + + // PinnedSync denotes an ActiveSync that doesn't count towards the + // default active syncer limits and is always active throughout the + // duration of the peer's connection. Each pinned syncer will begin by + // performing a historical sync to ensure we are well synchronized with + // their routing table. + PinnedSync ) // String returns a human readable string describing the target SyncerType. @@ -51,11 +58,24 @@ func (t SyncerType) String() string { return "ActiveSync" case PassiveSync: return "PassiveSync" + case PinnedSync: + return "PinnedSync" default: return fmt.Sprintf("unknown sync type %d", t) } } +// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange +// allowing new gossip messages to be received from the peer. +func (t SyncerType) IsActiveSync() bool { + switch t { + case ActiveSync, PinnedSync: + return true + default: + return false + } +} + // syncerState is an enum that represents the current state of the GossipSyncer. // As the syncer is a state machine, we'll gate our actions based off of the // current state and the next incoming message. @@ -560,7 +580,9 @@ func (g *GossipSyncer) channelGraphSyncer() { // If we haven't yet sent out our update horizon, and // we want to receive real-time channel updates, we'll // do so now. - if g.localUpdateHorizon == nil && syncType == ActiveSync { + if g.localUpdateHorizon == nil && + syncType.IsActiveSync() { + err := g.sendGossipTimestampRange( time.Now(), math.MaxUint32, ) @@ -1418,7 +1440,7 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error { switch req.newSyncType { // If an active sync has been requested, then we should resume receiving // new graph updates from the remote peer. - case ActiveSync: + case ActiveSync, PinnedSync: firstTimestamp = time.Now() timestampRange = math.MaxUint32