From 791ba3eb508a852ef1cd363e875937926cc0705c Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 19 Nov 2020 17:31:58 -0800 Subject: [PATCH] discovery: rate limit incoming channel updates This change was largely motivated by an increase in high disk usage as a result of channel update spam. With an in memory graph, this would've gone mostly undetected except for the increased bandwidth usage, which this doesn't aim to solve yet. To minimize the effects to disks, we begin to rate limit channel updates in two ways. Keep alive updates, those which only increase their timestamps to signal liveliness, are now limited to one per lnd's rebroadcast interval (current default of 24H). Non keep alive updates are now limited to one per block per direction. --- channeldb/graph.go | 3 +- discovery/gossiper.go | 117 ++++++++++++++++++++++++++++++-- discovery/gossiper_test.go | 134 +++++++++++++++++++++++++++++++++++++ lnwire/channel_update.go | 5 ++ 4 files changed, 252 insertions(+), 7 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 67910852..5d599cf6 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -2862,8 +2862,7 @@ func (c *ChannelEdgePolicy) SetSigBytes(sig []byte) { // IsDisabled determines whether the edge has the disabled bit set. func (c *ChannelEdgePolicy) IsDisabled() bool { - return c.ChannelFlags&lnwire.ChanUpdateDisabled == - lnwire.ChanUpdateDisabled + return c.ChannelFlags.IsDisabled() } // ComputeFee computes the fee to forward an HTLC of `amt` milli-satoshis over diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a72ddcd7..a20fb270 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -315,6 +315,13 @@ type AuthenticatedGossiper struct { // network. reliableSender *reliableSender + // heightForLastChanUpdate keeps track of the height at which we + // processed the latest channel update for a specific direction. + // + // NOTE: This map must be synchronized with the main + // AuthenticatedGossiper lock. + heightForLastChanUpdate map[uint64][2]uint32 + sync.Mutex } @@ -331,6 +338,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), + heightForLastChanUpdate: make(map[uint64][2]uint32), syncMgr: newSyncManager(&SyncManagerCfg{ ChainHash: cfg.ChainHash, ChanSeries: cfg.ChanSeries, @@ -1847,7 +1855,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // point and when we call UpdateEdge() later. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) switch err { // No error, break. case nil: @@ -1946,12 +1954,56 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // The least-significant bit in the flag on the channel update // announcement tells us "which" side of the channels directed // edge is being updated. - var pubKey *btcec.PublicKey - switch { - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: + var ( + pubKey *btcec.PublicKey + edgeToUpdate *channeldb.ChannelEdgePolicy + ) + direction := msg.ChannelFlags & lnwire.ChanUpdateDirection + switch direction { + case 0: pubKey, _ = chanInfo.NodeKey1() - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: + edgeToUpdate = edge1 + case 1: pubKey, _ = chanInfo.NodeKey2() + edgeToUpdate = edge2 + } + + // If we have a previous version of the edge being updated, + // we'll want to rate limit its updates to prevent spam + // throughout the network. + if nMsg.isRemote && edgeToUpdate != nil { + // If it's a keep-alive update, we'll only propagate one + // if it's been a day since the previous. This follows + // our own heuristic of sending keep-alive updates after + // the same duration (see retransmitStaleAnns). + timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) + if IsKeepAliveUpdate(msg, edgeToUpdate) { + if timeSinceLastUpdate < d.cfg.RebroadcastInterval { + log.Debugf("Ignoring keep alive update "+ + "not within %v period for "+ + "channel %v", + d.cfg.RebroadcastInterval, + shortChanID) + nMsg.err <- nil + return nil + } + } else { + // If it's not, we'll only allow a single update + // for this channel per block. + d.Lock() + lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] + if lastUpdateHeight[direction] == d.bestHeight { + log.Debugf("Ignoring update for "+ + "channel %v due to previous "+ + "update occurring within the "+ + "same block %v", shortChanID, + d.bestHeight) + d.Unlock() + nMsg.err <- nil + return nil + } + d.Unlock() + } } // Validate the channel announcement with the expected public key and @@ -1997,6 +2049,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } + // With the edge successfully updated on disk, we'll note the + // current height so that we're able to rate limit any future + // updates for the same channel. + d.Lock() + lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] + lastUpdateHeight[direction] = d.bestHeight + d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight + d.Unlock() + // If this is a local ChannelUpdate without an AuthProof, it // means it is an update to a channel that is not (yet) // supposed to be announced to the greater network. However, @@ -2536,3 +2597,49 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, func (d *AuthenticatedGossiper) SyncManager() *SyncManager { return d.syncMgr } + +// IsKeepAliveUpdate determines whether this channel update is considered a +// keep-alive update based on the previous channel update processed for the same +// direction. +func IsKeepAliveUpdate(update *lnwire.ChannelUpdate, + prev *channeldb.ChannelEdgePolicy) bool { + + // Both updates should be from the same direction. + if update.ChannelFlags&lnwire.ChanUpdateDirection != + prev.ChannelFlags&lnwire.ChanUpdateDirection { + return false + } + + // The timestamp should always increase for a keep-alive update. + timestamp := time.Unix(int64(update.Timestamp), 0) + if !timestamp.After(prev.LastUpdate) { + return false + } + + // None of the remaining fields should change for a keep-alive update. + if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() { + return false + } + if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat { + return false + } + if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths { + return false + } + if update.TimeLockDelta != prev.TimeLockDelta { + return false + } + if update.HtlcMinimumMsat != prev.MinHTLC { + return false + } + if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() { + return false + } + if update.HtlcMaximumMsat != prev.MaxHTLC { + return false + } + if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) { + return false + } + return true +} diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index f37caa8d..fc227148 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -31,6 +31,7 @@ import ( "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" + "github.com/stretchr/testify/require" ) var ( @@ -3942,3 +3943,136 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { } assertBroadcast(chanAnn2, true, true) } + +// TestRateLimitChannelUpdates ensures that we properly rate limit incoming +// channel updates. +func TestRateLimitChannelUpdates(t *testing.T) { + t.Parallel() + + // Create our test harness. + const blockHeight = 100 + ctx, cleanup, err := createTestCtx(blockHeight) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + ctx.gossiper.cfg.RebroadcastInterval = time.Hour + + // The graph should start empty. + require.Empty(t, ctx.router.infos) + require.Empty(t, ctx.router.edges) + + // We'll create a batch of signed announcements, including updates for + // both sides, for a channel and process them. They should all be + // forwarded as this is our first time learning about the channel. + batch, err := createAnnouncements(blockHeight) + require.NoError(t, err) + + nodePeer1 := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil} + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteChanAnn, nodePeer1, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn1, nodePeer1, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + nodePeer2 := &mockPeer{nodeKeyPriv2.PubKey(), nil, nil} + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, nodePeer2, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + timeout := time.After(2 * trickleDelay) + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-timeout: + t.Fatal("expected announcement to be broadcast") + } + } + + shortChanID := batch.remoteChanAnn.ShortChannelID.ToUint64() + require.Contains(t, ctx.router.infos, shortChanID) + require.Contains(t, ctx.router.edges, shortChanID) + + // We'll define a helper to assert whether updates should be rate + // limited or not depending on their contents. + assertRateLimit := func(update *lnwire.ChannelUpdate, peer lnpeer.Peer, + shouldRateLimit bool) { + + t.Helper() + + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement(update, peer): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + select { + case <-ctx.broadcastedMessage: + if shouldRateLimit { + t.Fatal("unexpected channel update broadcast") + } + case <-time.After(2 * trickleDelay): + if !shouldRateLimit { + t.Fatal("expected channel update broadcast") + } + } + } + + // We'll start with the keep alive case. + // + // We rate limit any keep alive updates that have not at least spanned + // our rebroadcast interval. + rateLimitKeepAliveUpdate := *batch.chanUpdAnn1 + rateLimitKeepAliveUpdate.Timestamp++ + require.NoError(t, signUpdate(nodeKeyPriv1, &rateLimitKeepAliveUpdate)) + assertRateLimit(&rateLimitKeepAliveUpdate, nodePeer1, true) + + keepAliveUpdate := *batch.chanUpdAnn1 + keepAliveUpdate.Timestamp = uint32( + time.Unix(int64(batch.chanUpdAnn1.Timestamp), 0). + Add(ctx.gossiper.cfg.RebroadcastInterval).Unix(), + ) + require.NoError(t, signUpdate(nodeKeyPriv1, &keepAliveUpdate)) + assertRateLimit(&keepAliveUpdate, nodePeer1, false) + + // Then, we'll move on to the non keep alive cases. + // + // Non keep alive updates are limited to one per block per direction. + // Since we've already processed updates for both sides, the new updates + // for both directions will not be broadcast until a new block arrives. + updateSameDirection := keepAliveUpdate + updateSameDirection.Timestamp++ + updateSameDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) + assertRateLimit(&updateSameDirection, nodePeer1, true) + + updateDiffDirection := *batch.chanUpdAnn2 + updateDiffDirection.Timestamp++ + updateDiffDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection)) + assertRateLimit(&updateDiffDirection, nodePeer2, true) + + // Notify a new block and reprocess the updates. They should no longer + // be rate limited. + ctx.notifier.notifyBlock(chainhash.Hash{}, blockHeight+1) + assertRateLimit(&updateSameDirection, nodePeer1, false) + assertRateLimit(&updateDiffDirection, nodePeer2, false) +} diff --git a/lnwire/channel_update.go b/lnwire/channel_update.go index fd627646..c0e9e74d 100644 --- a/lnwire/channel_update.go +++ b/lnwire/channel_update.go @@ -47,6 +47,11 @@ const ( ChanUpdateDisabled ) +// IsDisabled determines whether the channel flags has the disabled bit set. +func (c ChanUpdateChanFlags) IsDisabled() bool { + return c&ChanUpdateDisabled == ChanUpdateDisabled +} + // String returns the bitfield flags as a string. func (c ChanUpdateChanFlags) String() string { return fmt.Sprintf("%08b", c)