discovery: add support for PinnedSyncers

A pinned syncer is an ActiveSyncer that is configured to always remain
active for the lifetime of the connection. Pinned syncers do not count
towards the total NumActiveSyncer count, which are rotated periodically.

This features allows nodes to more tightly synchronize their routing
tables by ensuring they are always receiving gossip from distinguished
subset of peers.
This commit is contained in:
Conner Fromknecht 2021-01-29 00:13:38 -08:00
parent 9e932f2a64
commit 2f0d56d539
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
4 changed files with 114 additions and 8 deletions

@ -93,6 +93,10 @@ type chanPolicyUpdateRequest struct {
errChan chan error
}
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
// syncer at all times.
type PinnedSyncers map[route.Vertex]struct{}
// Config defines the configuration for the service. ALL elements within the
// configuration MUST be non-nil for the service to carry out its duties.
type Config struct {

@ -92,6 +92,11 @@ type SyncManagerCfg struct {
// BestHeight returns the latest height known of the chain.
BestHeight func() uint32
// PinnedSyncers is a set of peers that will always transition to
// ActiveSync upon connection. These peers will never transition to
// PassiveSync.
PinnedSyncers PinnedSyncers
}
// SyncManager is a subsystem of the gossiper that manages the gossip syncers
@ -140,6 +145,12 @@ type SyncManager struct {
// currently receiving new graph updates from.
inactiveSyncers map[route.Vertex]*GossipSyncer
// pinnedActiveSyncers is the set of all syncers which are pinned into
// an active sync. Pinned peers performan an initial historical sync on
// each connection and will continue to receive graph updates for the
// duration of the connection.
pinnedActiveSyncers map[route.Vertex]*GossipSyncer
wg sync.WaitGroup
quit chan struct{}
}
@ -154,7 +165,10 @@ func newSyncManager(cfg *SyncManagerCfg) *SyncManager {
map[route.Vertex]*GossipSyncer, cfg.NumActiveSyncers,
),
inactiveSyncers: make(map[route.Vertex]*GossipSyncer),
quit: make(chan struct{}),
pinnedActiveSyncers: make(
map[route.Vertex]*GossipSyncer, len(cfg.PinnedSyncers),
),
quit: make(chan struct{}),
}
}
@ -240,6 +254,8 @@ func (m *SyncManager) syncerHandler() {
s := m.createGossipSyncer(newSyncer.peer)
isPinnedSyncer := m.isPinnedSyncer(s)
// attemptHistoricalSync determines whether we should
// attempt an initial historical sync when a new peer
// connects.
@ -247,6 +263,12 @@ func (m *SyncManager) syncerHandler() {
m.syncersMu.Lock()
switch {
// For pinned syncers, we will immediately transition
// the peer into an active (pinned) sync state.
case isPinnedSyncer:
s.setSyncType(PinnedSync)
m.pinnedActiveSyncers[s.cfg.peerPub] = s
// Regardless of whether the initial historical sync
// has completed, we'll re-trigger a historical sync if
// we no longer have any syncers. This might be
@ -416,6 +438,13 @@ func (m *SyncManager) syncerHandler() {
}
}
// isPinnedSyncer returns true if the passed GossipSyncer is one of our pinned
// sync peers.
func (m *SyncManager) isPinnedSyncer(s *GossipSyncer) bool {
_, isPinnedSyncer := m.cfg.PinnedSyncers[s.cfg.peerPub]
return isPinnedSyncer
}
// createGossipSyncer creates the GossipSyncer for a newly connected peer.
func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
nodeID := route.Vertex(peer.PubKey())
@ -474,6 +503,13 @@ func (m *SyncManager) removeGossipSyncer(peer route.Vertex) {
return
}
// If it's a pinned syncer, then we can just exit as this doesn't
// affect our active syncer count.
if _, ok := m.pinnedActiveSyncers[peer]; ok {
delete(m.pinnedActiveSyncers, peer)
return
}
// Otherwise, we'll need find a new one to replace it, if any.
delete(m.activeSyncers, peer)
newActiveSyncer := chooseRandomSyncer(
@ -676,6 +712,10 @@ func (m *SyncManager) gossipSyncer(peer route.Vertex) (*GossipSyncer, bool) {
if ok {
return syncer, true
}
syncer, ok = m.pinnedActiveSyncers[peer]
if ok {
return syncer, true
}
return nil, false
}

@ -7,9 +7,11 @@ import (
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
)
@ -18,8 +20,13 @@ import (
func randPeer(t *testing.T, quit chan struct{}) *mockPeer {
t.Helper()
pk := randPubKey(t)
return peerWithPubkey(pk, quit)
}
func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer {
return &mockPeer{
pk: randPubKey(t),
pk: pk,
sentMsgs: make(chan lnwire.Message),
quit: quit,
}
@ -28,6 +35,14 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer {
// newTestSyncManager creates a new test SyncManager using mock implementations
// of its dependencies.
func newTestSyncManager(numActiveSyncers int) *SyncManager {
return newPinnedTestSyncManager(numActiveSyncers, nil)
}
// newTestSyncManager creates a new test SyncManager with a set of pinned
// syncers using mock implementations of its dependencies.
func newPinnedTestSyncManager(numActiveSyncers int,
pinnedSyncers PinnedSyncers) *SyncManager {
hID := lnwire.ShortChannelID{BlockHeight: latestKnownHeight}
return newSyncManager(&SyncManagerCfg{
ChanSeries: newMockChannelGraphTimeSeries(hID),
@ -37,6 +52,7 @@ func newTestSyncManager(numActiveSyncers int) *SyncManager {
BestHeight: func() uint32 {
return latestKnownHeight
},
PinnedSyncers: pinnedSyncers,
})
}
@ -48,14 +64,38 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) {
// We'll start by creating our test sync manager which will hold up to
// 3 active syncers.
const numActiveSyncers = 3
const numSyncers = numActiveSyncers + 1
const numPinnedSyncers = 3
const numInactiveSyncers = 1
syncMgr := newTestSyncManager(numActiveSyncers)
pinnedSyncers := make(PinnedSyncers)
pinnedPubkeys := make(map[route.Vertex]*btcec.PublicKey)
for i := 0; i < numPinnedSyncers; i++ {
pubkey := randPubKey(t)
vertex := route.NewVertex(pubkey)
pinnedSyncers[vertex] = struct{}{}
pinnedPubkeys[vertex] = pubkey
}
syncMgr := newPinnedTestSyncManager(numActiveSyncers, pinnedSyncers)
syncMgr.Start()
defer syncMgr.Stop()
// First we'll start by adding the pinned syncers. These should
// immediately be assigned PinnedSync.
for _, pubkey := range pinnedPubkeys {
peer := peerWithPubkey(pubkey, syncMgr.quit)
err := syncMgr.InitSyncState(peer)
require.NoError(t, err)
s := assertSyncerExistence(t, syncMgr, peer)
assertSyncerStatus(t, s, chansSynced, PinnedSync)
}
// We'll go ahead and create our syncers. We'll gather the ones which
// should be active and passive to check them later on.
// should be active and passive to check them later on. The pinned peers
// added above should not influence the active syncer count.
for i := 0; i < numActiveSyncers; i++ {
peer := randPeer(t, syncMgr.quit)
err := syncMgr.InitSyncState(peer)
@ -72,7 +112,7 @@ func TestSyncManagerNumActiveSyncers(t *testing.T) {
assertSyncerStatus(t, s, chansSynced, ActiveSync)
}
for i := 0; i < numSyncers-numActiveSyncers; i++ {
for i := 0; i < numInactiveSyncers; i++ {
peer := randPeer(t, syncMgr.quit)
err := syncMgr.InitSyncState(peer)
require.NoError(t, err)

@ -42,6 +42,13 @@ const (
// They are started in a chansSynced state in order to accomplish their
// responsibilities above.
PassiveSync
// PinnedSync denotes an ActiveSync that doesn't count towards the
// default active syncer limits and is always active throughout the
// duration of the peer's connection. Each pinned syncer will begin by
// performing a historical sync to ensure we are well synchronized with
// their routing table.
PinnedSync
)
// String returns a human readable string describing the target SyncerType.
@ -51,11 +58,24 @@ func (t SyncerType) String() string {
return "ActiveSync"
case PassiveSync:
return "PassiveSync"
case PinnedSync:
return "PinnedSync"
default:
return fmt.Sprintf("unknown sync type %d", t)
}
}
// IsActiveSync returns true if the SyncerType should set a GossipTimestampRange
// allowing new gossip messages to be received from the peer.
func (t SyncerType) IsActiveSync() bool {
switch t {
case ActiveSync, PinnedSync:
return true
default:
return false
}
}
// syncerState is an enum that represents the current state of the GossipSyncer.
// As the syncer is a state machine, we'll gate our actions based off of the
// current state and the next incoming message.
@ -560,7 +580,9 @@ func (g *GossipSyncer) channelGraphSyncer() {
// If we haven't yet sent out our update horizon, and
// we want to receive real-time channel updates, we'll
// do so now.
if g.localUpdateHorizon == nil && syncType == ActiveSync {
if g.localUpdateHorizon == nil &&
syncType.IsActiveSync() {
err := g.sendGossipTimestampRange(
time.Now(), math.MaxUint32,
)
@ -1418,7 +1440,7 @@ func (g *GossipSyncer) handleSyncTransition(req *syncTransitionReq) error {
switch req.newSyncType {
// If an active sync has been requested, then we should resume receiving
// new graph updates from the remote peer.
case ActiveSync:
case ActiveSync, PinnedSync:
firstTimestamp = time.Now()
timestampRange = math.MaxUint32