Merge pull request #5006 from wpaulino/new-rate-limit-chan-updates

discovery: use token bucket based rate limiting to throttle gossip
This commit is contained in:
Conner Fromknecht 2021-02-10 17:31:41 -08:00 committed by GitHub
commit 1ee5eb97d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 94 additions and 73 deletions

@ -22,6 +22,19 @@ import (
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/ticker"
"golang.org/x/time/rate"
)
const (
// DefaultMaxChannelUpdateBurst is the default maximum number of updates
// for a specific channel and direction that we'll accept over an
// interval.
DefaultMaxChannelUpdateBurst = 10
// DefaultChannelUpdateInterval is the default interval we'll use to
// determine how often we should allow a new update for a specific
// channel and direction.
DefaultChannelUpdateInterval = time.Minute
) )
var ( var (
@ -233,15 +246,19 @@ type Config struct {
// graph on connect. // graph on connect.
IgnoreHistoricalFilters bool IgnoreHistoricalFilters bool
// GossipUpdateThrottle if true, then the gossiper will throttle
// gossip updates to once per RebroadcastInterval for any keep-alive
// updates, and once per block for other types of updates.
GossipUpdateThrottle bool
// PinnedSyncers is a set of peers that will always transition to // PinnedSyncers is a set of peers that will always transition to
// ActiveSync upon connection. These peers will never transition to // ActiveSync upon connection. These peers will never transition to
// PassiveSync. // PassiveSync.
PinnedSyncers PinnedSyncers PinnedSyncers PinnedSyncers
// MaxChannelUpdateBurst specifies the maximum number of updates for a
// specific channel and direction that we'll accept over an interval.
MaxChannelUpdateBurst int
// ChannelUpdateInterval specifies the interval we'll use to determine
// how often we should allow a new update for a specific channel and
// direction.
ChannelUpdateInterval time.Duration
} }
// AuthenticatedGossiper is a subsystem which is responsible for receiving // AuthenticatedGossiper is a subsystem which is responsible for receiving
@ -318,12 +335,14 @@ type AuthenticatedGossiper struct {
// network. // network.
reliableSender *reliableSender reliableSender *reliableSender
// heightForLastChanUpdate keeps track of the height at which we // chanUpdateRateLimiter contains rate limiters for each direction of
// processed the latest channel update for a specific direction. // a channel update we've processed. We'll use these to determine
// whether we should accept a new update for a specific channel and
// direction.
// //
// NOTE: This map must be synchronized with the main // NOTE: This map must be synchronized with the main
// AuthenticatedGossiper lock. // AuthenticatedGossiper lock.
heightForLastChanUpdate map[uint64][2]uint32 chanUpdateRateLimiter map[uint64][2]*rate.Limiter
sync.Mutex sync.Mutex
} }
@ -340,7 +359,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
prematureChannelUpdates: make(map[uint64][]*networkMsg), prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}), recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32), chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
} }
gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
@ -1937,9 +1956,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// If we have a previous version of the edge being updated, // If we have a previous version of the edge being updated,
// we'll want to rate limit its updates to prevent spam // we'll want to rate limit its updates to prevent spam
// throughout the network if we're currently throttling such // throughout the network.
// updates. if nMsg.isRemote && edgeToUpdate != nil {
if d.cfg.GossipUpdateThrottle && nMsg.isRemote && edgeToUpdate != nil {
// If it's a keep-alive update, we'll only propagate one // If it's a keep-alive update, we'll only propagate one
// if it's been a day since the previous. This follows // if it's been a day since the previous. This follows
// our own heuristic of sending keep-alive updates after // our own heuristic of sending keep-alive updates after
@ -1956,21 +1974,32 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil return nil
} }
} else { } else {
// If it's not, we'll only allow a single update // If it's not, we'll allow an update per minute
// for this channel per block. // with a maximum burst of 10. If we haven't
// seen an update for this channel before, we'll
// need to initialize a rate limiter for each
// direction.
d.Lock() d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID]
if lastUpdateHeight[direction] == d.bestHeight { if !ok {
log.Debugf("Ignoring update for "+ r := rate.Every(d.cfg.ChannelUpdateInterval)
"channel %v due to previous "+ b := d.cfg.MaxChannelUpdateBurst
"update occurring within the "+ rateLimiters = [2]*rate.Limiter{
"same block %v", shortChanID, rate.NewLimiter(r, b),
d.bestHeight) rate.NewLimiter(r, b),
d.Unlock() }
d.chanUpdateRateLimiter[shortChanID] = rateLimiters
}
d.Unlock()
if !rateLimiters[direction].Allow() {
log.Debugf("Rate limiting update for "+
"channel %v from direction %x",
shortChanID,
pubKey.SerializeCompressed())
nMsg.err <- nil nMsg.err <- nil
return nil return nil
} }
d.Unlock()
} }
} }
@ -2017,15 +2046,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil return nil
} }
// With the edge successfully updated on disk, we'll note the
// current height so that we're able to rate limit any future
// updates for the same channel.
d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID]
lastUpdateHeight[direction] = d.bestHeight
d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight
d.Unlock()
// If this is a local ChannelUpdate without an AuthProof, it // If this is a local ChannelUpdate without an AuthProof, it
// means it is an update to a channel that is not (yet) // means it is an update to a channel that is not (yet)
// supposed to be announced to the greater network. However, // supposed to be announced to the greater network. However,

@ -733,19 +733,21 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
Timestamp: testTimestamp, Timestamp: testTimestamp,
}, nil }, nil
}, },
Router: router, Router: router,
TrickleDelay: trickleDelay, TrickleDelay: trickleDelay,
RetransmitTicker: ticker.NewForce(retransmitDelay), RetransmitTicker: ticker.NewForce(retransmitDelay),
RebroadcastInterval: rebroadcastInterval, RebroadcastInterval: rebroadcastInterval,
ProofMatureDelta: proofMatureDelta, ProofMatureDelta: proofMatureDelta,
WaitingProofStore: waitingProofStore, WaitingProofStore: waitingProofStore,
MessageStore: newMockMessageStore(), MessageStore: newMockMessageStore(),
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
NumActiveSyncers: 3, NumActiveSyncers: 3,
AnnSigner: &mock.SingleSigner{Privkey: nodeKeyPriv1}, AnnSigner: &mock.SingleSigner{Privkey: nodeKeyPriv1},
SubBatchDelay: time.Second * 5, SubBatchDelay: time.Second * 5,
MinimumBatchSize: 10, MinimumBatchSize: 10,
MaxChannelUpdateBurst: DefaultMaxChannelUpdateBurst,
ChannelUpdateInterval: DefaultChannelUpdateInterval,
}, nodeKeyPub1) }, nodeKeyPub1)
if err := gossiper.Start(); err != nil { if err := gossiper.Start(); err != nil {
@ -3938,7 +3940,8 @@ func TestRateLimitChannelUpdates(t *testing.T) {
} }
defer cleanup() defer cleanup()
ctx.gossiper.cfg.RebroadcastInterval = time.Hour ctx.gossiper.cfg.RebroadcastInterval = time.Hour
ctx.gossiper.cfg.GossipUpdateThrottle = true ctx.gossiper.cfg.MaxChannelUpdateBurst = 5
ctx.gossiper.cfg.ChannelUpdateInterval = 5 * time.Second
// The graph should start empty. // The graph should start empty.
require.Empty(t, ctx.router.infos) require.Empty(t, ctx.router.infos)
@ -4037,24 +4040,40 @@ func TestRateLimitChannelUpdates(t *testing.T) {
// Then, we'll move on to the non keep alive cases. // Then, we'll move on to the non keep alive cases.
// //
// Non keep alive updates are limited to one per block per direction. // For this test, non keep alive updates are rate limited to one per 5
// Since we've already processed updates for both sides, the new updates // seconds with a max burst of 5 per direction. We'll process the max
// for both directions will not be broadcast until a new block arrives. // burst of one direction first. None of these should be rate limited.
updateSameDirection := keepAliveUpdate updateSameDirection := keepAliveUpdate
for i := uint32(0); i < uint32(ctx.gossiper.cfg.MaxChannelUpdateBurst); i++ {
updateSameDirection.Timestamp++
updateSameDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection))
assertRateLimit(&updateSameDirection, nodePeer1, false)
}
// Following with another update should be rate limited as the max burst
// has been reached and we haven't ticked at the next interval yet.
updateSameDirection.Timestamp++ updateSameDirection.Timestamp++
updateSameDirection.BaseFee++ updateSameDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection))
assertRateLimit(&updateSameDirection, nodePeer1, true) assertRateLimit(&updateSameDirection, nodePeer1, true)
// An update for the other direction should not be rate limited.
updateDiffDirection := *batch.chanUpdAnn2 updateDiffDirection := *batch.chanUpdAnn2
updateDiffDirection.Timestamp++ updateDiffDirection.Timestamp++
updateDiffDirection.BaseFee++ updateDiffDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection)) require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection))
assertRateLimit(&updateDiffDirection, nodePeer2, true)
// Notify a new block and reprocess the updates. They should no longer
// be rate limited.
ctx.notifier.notifyBlock(chainhash.Hash{}, blockHeight+1)
assertRateLimit(&updateSameDirection, nodePeer1, false)
assertRateLimit(&updateDiffDirection, nodePeer2, false) assertRateLimit(&updateDiffDirection, nodePeer2, false)
// Wait for the next interval to tick. Since we've only waited for one,
// only one more update is allowed.
<-time.After(ctx.gossiper.cfg.ChannelUpdateInterval)
for i := 0; i < ctx.gossiper.cfg.MaxChannelUpdateBurst; i++ {
updateSameDirection.Timestamp++
updateSameDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection))
shouldRateLimit := i != 0
assertRateLimit(&updateSameDirection, nodePeer1, shouldRateLimit)
}
} }

@ -20,8 +20,3 @@ func (l *LegacyProtocol) LegacyOnion() bool {
func (l *LegacyProtocol) NoStaticRemoteKey() bool { func (l *LegacyProtocol) NoStaticRemoteKey() bool {
return false return false
} }
// NoGossipThrottle returns true if gossip updates shouldn't be throttled.
func (l *LegacyProtocol) NoGossipThrottle() bool {
return false
}

@ -16,12 +16,6 @@ type LegacyProtocol struct {
// remote party's output in the commitment. If set to true, then we // remote party's output in the commitment. If set to true, then we
// won't signal StaticRemoteKeyOptional. // won't signal StaticRemoteKeyOptional.
CommitmentTweak bool `long:"committweak" description:"force node to not advertise the new commitment format"` CommitmentTweak bool `long:"committweak" description:"force node to not advertise the new commitment format"`
// NoGossipUpdateThrottle if true, then gossip updates won't be
// throttled using the current set of heuristics. This should mainly be
// used for integration tests where we want nearly instant propagation
// of gossip updates.
NoGossipUpdateThrottle bool `long:"no-gossip-throttle" description:"if true, then gossip updates will not be throttled to once per rebroadcast interval for non keep-alive updates"`
} }
// LegacyOnion returns true if the old legacy onion format should be used when // LegacyOnion returns true if the old legacy onion format should be used when
@ -36,8 +30,3 @@ func (l *LegacyProtocol) LegacyOnion() bool {
func (l *LegacyProtocol) NoStaticRemoteKey() bool { func (l *LegacyProtocol) NoStaticRemoteKey() bool {
return l.CommitmentTweak return l.CommitmentTweak
} }
// NoGossipThrottle returns true if gossip updates shouldn't be throttled.
func (l *LegacyProtocol) NoGossipThrottle() bool {
return l.NoGossipUpdateThrottle
}

@ -281,7 +281,6 @@ func (cfg NodeConfig) genArgs() []string {
args = append(args, fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath)) args = append(args, fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath))
args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay))
args = append(args, fmt.Sprintf("--profile=%d", cfg.ProfilePort)) args = append(args, fmt.Sprintf("--profile=%d", cfg.ProfilePort))
args = append(args, fmt.Sprintf("--protocol.legacy.no-gossip-throttle"))
if !cfg.HasSeed { if !cfg.HasSeed {
args = append(args, "--noseedbackup") args = append(args, "--noseedbackup")

@ -819,7 +819,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
MinimumBatchSize: 10, MinimumBatchSize: 10,
SubBatchDelay: time.Second * 5, SubBatchDelay: time.Second * 5,
IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters, IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters,
GossipUpdateThrottle: !cfg.ProtocolOptions.NoGossipThrottle(),
PinnedSyncers: cfg.Gossip.PinnedSyncers, PinnedSyncers: cfg.Gossip.PinnedSyncers,
}, },
s.identityECDH.PubKey(), s.identityECDH.PubKey(),