diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 5901dce1..348f1742 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -22,6 +22,19 @@ import ( "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" + "golang.org/x/time/rate" +) + +const ( + // DefaultMaxChannelUpdateBurst is the default maximum number of updates + // for a specific channel and direction that we'll accept over an + // interval. + DefaultMaxChannelUpdateBurst = 10 + + // DefaultChannelUpdateInterval is the default interval we'll use to + // determine how often we should allow a new update for a specific + // channel and direction. + DefaultChannelUpdateInterval = time.Minute ) var ( @@ -237,6 +250,15 @@ type Config struct { // ActiveSync upon connection. These peers will never transition to // PassiveSync. PinnedSyncers PinnedSyncers + + // MaxChannelUpdateBurst specifies the maximum number of updates for a + // specific channel and direction that we'll accept over an interval. + MaxChannelUpdateBurst int + + // ChannelUpdateInterval specifies the interval we'll use to determine + // how often we should allow a new update for a specific channel and + // direction. + ChannelUpdateInterval time.Duration } // AuthenticatedGossiper is a subsystem which is responsible for receiving @@ -313,12 +335,14 @@ type AuthenticatedGossiper struct { // network. reliableSender *reliableSender - // heightForLastChanUpdate keeps track of the height at which we - // processed the latest channel update for a specific direction. + // chanUpdateRateLimiter contains rate limiters for each direction of + // a channel update we've processed. We'll use these to determine + // whether we should accept a new update for a specific channel and + // direction. // // NOTE: This map must be synchronized with the main // AuthenticatedGossiper lock. - heightForLastChanUpdate map[uint64][2]uint32 + chanUpdateRateLimiter map[uint64][2]*rate.Limiter sync.Mutex } @@ -335,7 +359,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), - heightForLastChanUpdate: make(map[uint64][2]uint32), + chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), } gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ @@ -1950,21 +1974,32 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } } else { - // If it's not, we'll only allow a single update - // for this channel per block. + // If it's not, we'll allow an update per minute + // with a maximum burst of 10. If we haven't + // seen an update for this channel before, we'll + // need to initialize a rate limiter for each + // direction. d.Lock() - lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] - if lastUpdateHeight[direction] == d.bestHeight { - log.Debugf("Ignoring update for "+ - "channel %v due to previous "+ - "update occurring within the "+ - "same block %v", shortChanID, - d.bestHeight) - d.Unlock() + rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID] + if !ok { + r := rate.Every(d.cfg.ChannelUpdateInterval) + b := d.cfg.MaxChannelUpdateBurst + rateLimiters = [2]*rate.Limiter{ + rate.NewLimiter(r, b), + rate.NewLimiter(r, b), + } + d.chanUpdateRateLimiter[shortChanID] = rateLimiters + } + d.Unlock() + + if !rateLimiters[direction].Allow() { + log.Debugf("Rate limiting update for "+ + "channel %v from direction %x", + shortChanID, + pubKey.SerializeCompressed()) nMsg.err <- nil return nil } - d.Unlock() } } @@ -2011,15 +2046,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - // With the edge successfully updated on disk, we'll note the - // current height so that we're able to rate limit any future - // updates for the same channel. - d.Lock() - lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] - lastUpdateHeight[direction] = d.bestHeight - d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight - d.Unlock() - // If this is a local ChannelUpdate without an AuthProof, it // means it is an update to a channel that is not (yet) // supposed to be announced to the greater network. However, diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index cab9e5f3..a551f4cb 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -733,19 +733,21 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { Timestamp: testTimestamp, }, nil }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitTicker: ticker.NewForce(retransmitDelay), - RebroadcastInterval: rebroadcastInterval, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: waitingProofStore, - MessageStore: newMockMessageStore(), - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - NumActiveSyncers: 3, - AnnSigner: &mock.SingleSigner{Privkey: nodeKeyPriv1}, - SubBatchDelay: time.Second * 5, - MinimumBatchSize: 10, + Router: router, + TrickleDelay: trickleDelay, + RetransmitTicker: ticker.NewForce(retransmitDelay), + RebroadcastInterval: rebroadcastInterval, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: 3, + AnnSigner: &mock.SingleSigner{Privkey: nodeKeyPriv1}, + SubBatchDelay: time.Second * 5, + MinimumBatchSize: 10, + MaxChannelUpdateBurst: DefaultMaxChannelUpdateBurst, + ChannelUpdateInterval: DefaultChannelUpdateInterval, }, nodeKeyPub1) if err := gossiper.Start(); err != nil { @@ -3938,6 +3940,8 @@ func TestRateLimitChannelUpdates(t *testing.T) { } defer cleanup() ctx.gossiper.cfg.RebroadcastInterval = time.Hour + ctx.gossiper.cfg.MaxChannelUpdateBurst = 5 + ctx.gossiper.cfg.ChannelUpdateInterval = 5 * time.Second // The graph should start empty. require.Empty(t, ctx.router.infos) @@ -4036,24 +4040,40 @@ func TestRateLimitChannelUpdates(t *testing.T) { // Then, we'll move on to the non keep alive cases. // - // Non keep alive updates are limited to one per block per direction. - // Since we've already processed updates for both sides, the new updates - // for both directions will not be broadcast until a new block arrives. + // For this test, non keep alive updates are rate limited to one per 5 + // seconds with a max burst of 5 per direction. We'll process the max + // burst of one direction first. None of these should be rate limited. updateSameDirection := keepAliveUpdate + for i := uint32(0); i < uint32(ctx.gossiper.cfg.MaxChannelUpdateBurst); i++ { + updateSameDirection.Timestamp++ + updateSameDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) + assertRateLimit(&updateSameDirection, nodePeer1, false) + } + + // Following with another update should be rate limited as the max burst + // has been reached and we haven't ticked at the next interval yet. updateSameDirection.Timestamp++ updateSameDirection.BaseFee++ require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) assertRateLimit(&updateSameDirection, nodePeer1, true) + // An update for the other direction should not be rate limited. updateDiffDirection := *batch.chanUpdAnn2 updateDiffDirection.Timestamp++ updateDiffDirection.BaseFee++ require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection)) - assertRateLimit(&updateDiffDirection, nodePeer2, true) - - // Notify a new block and reprocess the updates. They should no longer - // be rate limited. - ctx.notifier.notifyBlock(chainhash.Hash{}, blockHeight+1) - assertRateLimit(&updateSameDirection, nodePeer1, false) assertRateLimit(&updateDiffDirection, nodePeer2, false) + + // Wait for the next interval to tick. Since we've only waited for one, + // only one more update is allowed. + <-time.After(ctx.gossiper.cfg.ChannelUpdateInterval) + for i := 0; i < ctx.gossiper.cfg.MaxChannelUpdateBurst; i++ { + updateSameDirection.Timestamp++ + updateSameDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) + + shouldRateLimit := i != 0 + assertRateLimit(&updateSameDirection, nodePeer1, shouldRateLimit) + } }