diff --git a/discovery/gossiper.go b/discovery/gossiper.go index e669f61a..348f1742 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -22,6 +22,19 @@ import ( "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" + "golang.org/x/time/rate" +) + +const ( + // DefaultMaxChannelUpdateBurst is the default maximum number of updates + // for a specific channel and direction that we'll accept over an + // interval. + DefaultMaxChannelUpdateBurst = 10 + + // DefaultChannelUpdateInterval is the default interval we'll use to + // determine how often we should allow a new update for a specific + // channel and direction. + DefaultChannelUpdateInterval = time.Minute ) var ( @@ -233,15 +246,19 @@ type Config struct { // graph on connect. IgnoreHistoricalFilters bool - // GossipUpdateThrottle if true, then the gossiper will throttle - // gossip updates to once per RebroadcastInterval for any keep-alive - // updates, and once per block for other types of updates. - GossipUpdateThrottle bool - // PinnedSyncers is a set of peers that will always transition to // ActiveSync upon connection. These peers will never transition to // PassiveSync. PinnedSyncers PinnedSyncers + + // MaxChannelUpdateBurst specifies the maximum number of updates for a + // specific channel and direction that we'll accept over an interval. + MaxChannelUpdateBurst int + + // ChannelUpdateInterval specifies the interval we'll use to determine + // how often we should allow a new update for a specific channel and + // direction. + ChannelUpdateInterval time.Duration } // AuthenticatedGossiper is a subsystem which is responsible for receiving @@ -318,12 +335,14 @@ type AuthenticatedGossiper struct { // network. reliableSender *reliableSender - // heightForLastChanUpdate keeps track of the height at which we - // processed the latest channel update for a specific direction. + // chanUpdateRateLimiter contains rate limiters for each direction of + // a channel update we've processed. We'll use these to determine + // whether we should accept a new update for a specific channel and + // direction. // // NOTE: This map must be synchronized with the main // AuthenticatedGossiper lock. - heightForLastChanUpdate map[uint64][2]uint32 + chanUpdateRateLimiter map[uint64][2]*rate.Limiter sync.Mutex } @@ -340,7 +359,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), - heightForLastChanUpdate: make(map[uint64][2]uint32), + chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), } gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ @@ -1937,9 +1956,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // If we have a previous version of the edge being updated, // we'll want to rate limit its updates to prevent spam - // throughout the network if we're currently throttling such - // updates. - if d.cfg.GossipUpdateThrottle && nMsg.isRemote && edgeToUpdate != nil { + // throughout the network. + if nMsg.isRemote && edgeToUpdate != nil { // If it's a keep-alive update, we'll only propagate one // if it's been a day since the previous. This follows // our own heuristic of sending keep-alive updates after @@ -1956,21 +1974,32 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } } else { - // If it's not, we'll only allow a single update - // for this channel per block. + // If it's not, we'll allow an update per minute + // with a maximum burst of 10. If we haven't + // seen an update for this channel before, we'll + // need to initialize a rate limiter for each + // direction. d.Lock() - lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] - if lastUpdateHeight[direction] == d.bestHeight { - log.Debugf("Ignoring update for "+ - "channel %v due to previous "+ - "update occurring within the "+ - "same block %v", shortChanID, - d.bestHeight) - d.Unlock() + rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID] + if !ok { + r := rate.Every(d.cfg.ChannelUpdateInterval) + b := d.cfg.MaxChannelUpdateBurst + rateLimiters = [2]*rate.Limiter{ + rate.NewLimiter(r, b), + rate.NewLimiter(r, b), + } + d.chanUpdateRateLimiter[shortChanID] = rateLimiters + } + d.Unlock() + + if !rateLimiters[direction].Allow() { + log.Debugf("Rate limiting update for "+ + "channel %v from direction %x", + shortChanID, + pubKey.SerializeCompressed()) nMsg.err <- nil return nil } - d.Unlock() } } @@ -2017,15 +2046,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - // With the edge successfully updated on disk, we'll note the - // current height so that we're able to rate limit any future - // updates for the same channel. - d.Lock() - lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] - lastUpdateHeight[direction] = d.bestHeight - d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight - d.Unlock() - // If this is a local ChannelUpdate without an AuthProof, it // means it is an update to a channel that is not (yet) // supposed to be announced to the greater network. However, diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 1c1e60ca..a551f4cb 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -733,19 +733,21 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { Timestamp: testTimestamp, }, nil }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitTicker: ticker.NewForce(retransmitDelay), - RebroadcastInterval: rebroadcastInterval, - ProofMatureDelta: proofMatureDelta, - WaitingProofStore: waitingProofStore, - MessageStore: newMockMessageStore(), - RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), - HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), - NumActiveSyncers: 3, - AnnSigner: &mock.SingleSigner{Privkey: nodeKeyPriv1}, - SubBatchDelay: time.Second * 5, - MinimumBatchSize: 10, + Router: router, + TrickleDelay: trickleDelay, + RetransmitTicker: ticker.NewForce(retransmitDelay), + RebroadcastInterval: rebroadcastInterval, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), + RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval), + HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval), + NumActiveSyncers: 3, + AnnSigner: &mock.SingleSigner{Privkey: nodeKeyPriv1}, + SubBatchDelay: time.Second * 5, + MinimumBatchSize: 10, + MaxChannelUpdateBurst: DefaultMaxChannelUpdateBurst, + ChannelUpdateInterval: DefaultChannelUpdateInterval, }, nodeKeyPub1) if err := gossiper.Start(); err != nil { @@ -3938,7 +3940,8 @@ func TestRateLimitChannelUpdates(t *testing.T) { } defer cleanup() ctx.gossiper.cfg.RebroadcastInterval = time.Hour - ctx.gossiper.cfg.GossipUpdateThrottle = true + ctx.gossiper.cfg.MaxChannelUpdateBurst = 5 + ctx.gossiper.cfg.ChannelUpdateInterval = 5 * time.Second // The graph should start empty. require.Empty(t, ctx.router.infos) @@ -4037,24 +4040,40 @@ func TestRateLimitChannelUpdates(t *testing.T) { // Then, we'll move on to the non keep alive cases. // - // Non keep alive updates are limited to one per block per direction. - // Since we've already processed updates for both sides, the new updates - // for both directions will not be broadcast until a new block arrives. + // For this test, non keep alive updates are rate limited to one per 5 + // seconds with a max burst of 5 per direction. We'll process the max + // burst of one direction first. None of these should be rate limited. updateSameDirection := keepAliveUpdate + for i := uint32(0); i < uint32(ctx.gossiper.cfg.MaxChannelUpdateBurst); i++ { + updateSameDirection.Timestamp++ + updateSameDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) + assertRateLimit(&updateSameDirection, nodePeer1, false) + } + + // Following with another update should be rate limited as the max burst + // has been reached and we haven't ticked at the next interval yet. updateSameDirection.Timestamp++ updateSameDirection.BaseFee++ require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) assertRateLimit(&updateSameDirection, nodePeer1, true) + // An update for the other direction should not be rate limited. updateDiffDirection := *batch.chanUpdAnn2 updateDiffDirection.Timestamp++ updateDiffDirection.BaseFee++ require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection)) - assertRateLimit(&updateDiffDirection, nodePeer2, true) - - // Notify a new block and reprocess the updates. They should no longer - // be rate limited. - ctx.notifier.notifyBlock(chainhash.Hash{}, blockHeight+1) - assertRateLimit(&updateSameDirection, nodePeer1, false) assertRateLimit(&updateDiffDirection, nodePeer2, false) + + // Wait for the next interval to tick. Since we've only waited for one, + // only one more update is allowed. + <-time.After(ctx.gossiper.cfg.ChannelUpdateInterval) + for i := 0; i < ctx.gossiper.cfg.MaxChannelUpdateBurst; i++ { + updateSameDirection.Timestamp++ + updateSameDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) + + shouldRateLimit := i != 0 + assertRateLimit(&updateSameDirection, nodePeer1, shouldRateLimit) + } } diff --git a/lncfg/protocol_legacy_off.go b/lncfg/protocol_legacy_off.go index c0a8ef8d..060569d8 100644 --- a/lncfg/protocol_legacy_off.go +++ b/lncfg/protocol_legacy_off.go @@ -20,8 +20,3 @@ func (l *LegacyProtocol) LegacyOnion() bool { func (l *LegacyProtocol) NoStaticRemoteKey() bool { return false } - -// NoGossipThrottle returns true if gossip updates shouldn't be throttled. -func (l *LegacyProtocol) NoGossipThrottle() bool { - return false -} diff --git a/lncfg/protocol_legacy_on.go b/lncfg/protocol_legacy_on.go index ee0ee3b4..712d5fed 100644 --- a/lncfg/protocol_legacy_on.go +++ b/lncfg/protocol_legacy_on.go @@ -16,12 +16,6 @@ type LegacyProtocol struct { // remote party's output in the commitment. If set to true, then we // won't signal StaticRemoteKeyOptional. CommitmentTweak bool `long:"committweak" description:"force node to not advertise the new commitment format"` - - // NoGossipUpdateThrottle if true, then gossip updates won't be - // throttled using the current set of heuristics. This should mainly be - // used for integration tests where we want nearly instant propagation - // of gossip updates. - NoGossipUpdateThrottle bool `long:"no-gossip-throttle" description:"if true, then gossip updates will not be throttled to once per rebroadcast interval for non keep-alive updates"` } // LegacyOnion returns true if the old legacy onion format should be used when @@ -36,8 +30,3 @@ func (l *LegacyProtocol) LegacyOnion() bool { func (l *LegacyProtocol) NoStaticRemoteKey() bool { return l.CommitmentTweak } - -// NoGossipThrottle returns true if gossip updates shouldn't be throttled. -func (l *LegacyProtocol) NoGossipThrottle() bool { - return l.NoGossipUpdateThrottle -} diff --git a/lntest/node.go b/lntest/node.go index 1bf009db..1181b785 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -281,7 +281,6 @@ func (cfg NodeConfig) genArgs() []string { args = append(args, fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath)) args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay)) args = append(args, fmt.Sprintf("--profile=%d", cfg.ProfilePort)) - args = append(args, fmt.Sprintf("--protocol.legacy.no-gossip-throttle")) if !cfg.HasSeed { args = append(args, "--noseedbackup") diff --git a/server.go b/server.go index 8bf77cc6..7435e4bf 100644 --- a/server.go +++ b/server.go @@ -819,7 +819,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr, MinimumBatchSize: 10, SubBatchDelay: time.Second * 5, IgnoreHistoricalFilters: cfg.IgnoreHistoricalGossipFilters, - GossipUpdateThrottle: !cfg.ProtocolOptions.NoGossipThrottle(), PinnedSyncers: cfg.Gossip.PinnedSyncers, }, s.identityECDH.PubKey(),