diff --git a/channeldb/graph.go b/channeldb/graph.go index 67910852..5d599cf6 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -2862,8 +2862,7 @@ func (c *ChannelEdgePolicy) SetSigBytes(sig []byte) { // IsDisabled determines whether the edge has the disabled bit set. func (c *ChannelEdgePolicy) IsDisabled() bool { - return c.ChannelFlags&lnwire.ChanUpdateDisabled == - lnwire.ChanUpdateDisabled + return c.ChannelFlags.IsDisabled() } // ComputeFee computes the fee to forward an HTLC of `amt` milli-satoshis over diff --git a/discovery/gossiper.go b/discovery/gossiper.go index a72ddcd7..a20fb270 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -315,6 +315,13 @@ type AuthenticatedGossiper struct { // network. reliableSender *reliableSender + // heightForLastChanUpdate keeps track of the height at which we + // processed the latest channel update for a specific direction. + // + // NOTE: This map must be synchronized with the main + // AuthenticatedGossiper lock. + heightForLastChanUpdate map[uint64][2]uint32 + sync.Mutex } @@ -331,6 +338,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { prematureChannelUpdates: make(map[uint64][]*networkMsg), channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), + heightForLastChanUpdate: make(map[uint64][2]uint32), syncMgr: newSyncManager(&SyncManagerCfg{ ChainHash: cfg.ChainHash, ChanSeries: cfg.ChanSeries, @@ -1847,7 +1855,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // point and when we call UpdateEdge() later. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) - chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) + chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) switch err { // No error, break. case nil: @@ -1946,12 +1954,56 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // The least-significant bit in the flag on the channel update // announcement tells us "which" side of the channels directed // edge is being updated. - var pubKey *btcec.PublicKey - switch { - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: + var ( + pubKey *btcec.PublicKey + edgeToUpdate *channeldb.ChannelEdgePolicy + ) + direction := msg.ChannelFlags & lnwire.ChanUpdateDirection + switch direction { + case 0: pubKey, _ = chanInfo.NodeKey1() - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: + edgeToUpdate = edge1 + case 1: pubKey, _ = chanInfo.NodeKey2() + edgeToUpdate = edge2 + } + + // If we have a previous version of the edge being updated, + // we'll want to rate limit its updates to prevent spam + // throughout the network. + if nMsg.isRemote && edgeToUpdate != nil { + // If it's a keep-alive update, we'll only propagate one + // if it's been a day since the previous. This follows + // our own heuristic of sending keep-alive updates after + // the same duration (see retransmitStaleAnns). + timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) + if IsKeepAliveUpdate(msg, edgeToUpdate) { + if timeSinceLastUpdate < d.cfg.RebroadcastInterval { + log.Debugf("Ignoring keep alive update "+ + "not within %v period for "+ + "channel %v", + d.cfg.RebroadcastInterval, + shortChanID) + nMsg.err <- nil + return nil + } + } else { + // If it's not, we'll only allow a single update + // for this channel per block. + d.Lock() + lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] + if lastUpdateHeight[direction] == d.bestHeight { + log.Debugf("Ignoring update for "+ + "channel %v due to previous "+ + "update occurring within the "+ + "same block %v", shortChanID, + d.bestHeight) + d.Unlock() + nMsg.err <- nil + return nil + } + d.Unlock() + } } // Validate the channel announcement with the expected public key and @@ -1997,6 +2049,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } + // With the edge successfully updated on disk, we'll note the + // current height so that we're able to rate limit any future + // updates for the same channel. + d.Lock() + lastUpdateHeight := d.heightForLastChanUpdate[shortChanID] + lastUpdateHeight[direction] = d.bestHeight + d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight + d.Unlock() + // If this is a local ChannelUpdate without an AuthProof, it // means it is an update to a channel that is not (yet) // supposed to be announced to the greater network. However, @@ -2536,3 +2597,49 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, func (d *AuthenticatedGossiper) SyncManager() *SyncManager { return d.syncMgr } + +// IsKeepAliveUpdate determines whether this channel update is considered a +// keep-alive update based on the previous channel update processed for the same +// direction. +func IsKeepAliveUpdate(update *lnwire.ChannelUpdate, + prev *channeldb.ChannelEdgePolicy) bool { + + // Both updates should be from the same direction. + if update.ChannelFlags&lnwire.ChanUpdateDirection != + prev.ChannelFlags&lnwire.ChanUpdateDirection { + return false + } + + // The timestamp should always increase for a keep-alive update. + timestamp := time.Unix(int64(update.Timestamp), 0) + if !timestamp.After(prev.LastUpdate) { + return false + } + + // None of the remaining fields should change for a keep-alive update. + if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() { + return false + } + if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat { + return false + } + if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths { + return false + } + if update.TimeLockDelta != prev.TimeLockDelta { + return false + } + if update.HtlcMinimumMsat != prev.MinHTLC { + return false + } + if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() { + return false + } + if update.HtlcMaximumMsat != prev.MaxHTLC { + return false + } + if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) { + return false + } + return true +} diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index f37caa8d..fc227148 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -31,6 +31,7 @@ import ( "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" + "github.com/stretchr/testify/require" ) var ( @@ -3942,3 +3943,136 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) { } assertBroadcast(chanAnn2, true, true) } + +// TestRateLimitChannelUpdates ensures that we properly rate limit incoming +// channel updates. +func TestRateLimitChannelUpdates(t *testing.T) { + t.Parallel() + + // Create our test harness. + const blockHeight = 100 + ctx, cleanup, err := createTestCtx(blockHeight) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + ctx.gossiper.cfg.RebroadcastInterval = time.Hour + + // The graph should start empty. + require.Empty(t, ctx.router.infos) + require.Empty(t, ctx.router.edges) + + // We'll create a batch of signed announcements, including updates for + // both sides, for a channel and process them. They should all be + // forwarded as this is our first time learning about the channel. + batch, err := createAnnouncements(blockHeight) + require.NoError(t, err) + + nodePeer1 := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil} + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.remoteChanAnn, nodePeer1, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn1, nodePeer1, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + nodePeer2 := &mockPeer{nodeKeyPriv2.PubKey(), nil, nil} + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement( + batch.chanUpdAnn2, nodePeer2, + ): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + timeout := time.After(2 * trickleDelay) + for i := 0; i < 3; i++ { + select { + case <-ctx.broadcastedMessage: + case <-timeout: + t.Fatal("expected announcement to be broadcast") + } + } + + shortChanID := batch.remoteChanAnn.ShortChannelID.ToUint64() + require.Contains(t, ctx.router.infos, shortChanID) + require.Contains(t, ctx.router.edges, shortChanID) + + // We'll define a helper to assert whether updates should be rate + // limited or not depending on their contents. + assertRateLimit := func(update *lnwire.ChannelUpdate, peer lnpeer.Peer, + shouldRateLimit bool) { + + t.Helper() + + select { + case err := <-ctx.gossiper.ProcessRemoteAnnouncement(update, peer): + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("remote announcement not processed") + } + + select { + case <-ctx.broadcastedMessage: + if shouldRateLimit { + t.Fatal("unexpected channel update broadcast") + } + case <-time.After(2 * trickleDelay): + if !shouldRateLimit { + t.Fatal("expected channel update broadcast") + } + } + } + + // We'll start with the keep alive case. + // + // We rate limit any keep alive updates that have not at least spanned + // our rebroadcast interval. + rateLimitKeepAliveUpdate := *batch.chanUpdAnn1 + rateLimitKeepAliveUpdate.Timestamp++ + require.NoError(t, signUpdate(nodeKeyPriv1, &rateLimitKeepAliveUpdate)) + assertRateLimit(&rateLimitKeepAliveUpdate, nodePeer1, true) + + keepAliveUpdate := *batch.chanUpdAnn1 + keepAliveUpdate.Timestamp = uint32( + time.Unix(int64(batch.chanUpdAnn1.Timestamp), 0). + Add(ctx.gossiper.cfg.RebroadcastInterval).Unix(), + ) + require.NoError(t, signUpdate(nodeKeyPriv1, &keepAliveUpdate)) + assertRateLimit(&keepAliveUpdate, nodePeer1, false) + + // Then, we'll move on to the non keep alive cases. + // + // Non keep alive updates are limited to one per block per direction. + // Since we've already processed updates for both sides, the new updates + // for both directions will not be broadcast until a new block arrives. + updateSameDirection := keepAliveUpdate + updateSameDirection.Timestamp++ + updateSameDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection)) + assertRateLimit(&updateSameDirection, nodePeer1, true) + + updateDiffDirection := *batch.chanUpdAnn2 + updateDiffDirection.Timestamp++ + updateDiffDirection.BaseFee++ + require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection)) + assertRateLimit(&updateDiffDirection, nodePeer2, true) + + // Notify a new block and reprocess the updates. They should no longer + // be rate limited. + ctx.notifier.notifyBlock(chainhash.Hash{}, blockHeight+1) + assertRateLimit(&updateSameDirection, nodePeer1, false) + assertRateLimit(&updateDiffDirection, nodePeer2, false) +} diff --git a/lnwire/channel_update.go b/lnwire/channel_update.go index fd627646..c0e9e74d 100644 --- a/lnwire/channel_update.go +++ b/lnwire/channel_update.go @@ -47,6 +47,11 @@ const ( ChanUpdateDisabled ) +// IsDisabled determines whether the channel flags has the disabled bit set. +func (c ChanUpdateChanFlags) IsDisabled() bool { + return c&ChanUpdateDisabled == ChanUpdateDisabled +} + // String returns the bitfield flags as a string. func (c ChanUpdateChanFlags) String() string { return fmt.Sprintf("%08b", c)