Merge pull request #4786 from wpaulino/rate-limit-channel-updates

discovery: rate limit incoming channel updates
This commit is contained in:
Olaoluwa Osuntokun 2020-11-25 17:07:21 -08:00 committed by GitHub
commit cefbf5f637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 7 deletions

@ -2862,8 +2862,7 @@ func (c *ChannelEdgePolicy) SetSigBytes(sig []byte) {
// IsDisabled determines whether the edge has the disabled bit set. // IsDisabled determines whether the edge has the disabled bit set.
func (c *ChannelEdgePolicy) IsDisabled() bool { func (c *ChannelEdgePolicy) IsDisabled() bool {
return c.ChannelFlags&lnwire.ChanUpdateDisabled == return c.ChannelFlags.IsDisabled()
lnwire.ChanUpdateDisabled
} }
// ComputeFee computes the fee to forward an HTLC of `amt` milli-satoshis over // ComputeFee computes the fee to forward an HTLC of `amt` milli-satoshis over

@ -315,6 +315,13 @@ type AuthenticatedGossiper struct {
// network. // network.
reliableSender *reliableSender reliableSender *reliableSender
// heightForLastChanUpdate keeps track of the height at which we
// processed the latest channel update for a specific direction.
//
// NOTE: This map must be synchronized with the main
// AuthenticatedGossiper lock.
heightForLastChanUpdate map[uint64][2]uint32
sync.Mutex sync.Mutex
} }
@ -331,6 +338,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
prematureChannelUpdates: make(map[uint64][]*networkMsg), prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}), recentRejects: make(map[uint64]struct{}),
heightForLastChanUpdate: make(map[uint64][2]uint32),
syncMgr: newSyncManager(&SyncManagerCfg{ syncMgr: newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash, ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries, ChanSeries: cfg.ChanSeries,
@ -1847,7 +1855,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// point and when we call UpdateEdge() later. // point and when we call UpdateEdge() later.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
switch err { switch err {
// No error, break. // No error, break.
case nil: case nil:
@ -1946,12 +1954,56 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// The least-significant bit in the flag on the channel update // The least-significant bit in the flag on the channel update
// announcement tells us "which" side of the channels directed // announcement tells us "which" side of the channels directed
// edge is being updated. // edge is being updated.
var pubKey *btcec.PublicKey var (
switch { pubKey *btcec.PublicKey
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: edgeToUpdate *channeldb.ChannelEdgePolicy
)
direction := msg.ChannelFlags & lnwire.ChanUpdateDirection
switch direction {
case 0:
pubKey, _ = chanInfo.NodeKey1() pubKey, _ = chanInfo.NodeKey1()
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: edgeToUpdate = edge1
case 1:
pubKey, _ = chanInfo.NodeKey2() pubKey, _ = chanInfo.NodeKey2()
edgeToUpdate = edge2
}
// If we have a previous version of the edge being updated,
// we'll want to rate limit its updates to prevent spam
// throughout the network.
if nMsg.isRemote && edgeToUpdate != nil {
// If it's a keep-alive update, we'll only propagate one
// if it's been a day since the previous. This follows
// our own heuristic of sending keep-alive updates after
// the same duration (see retransmitStaleAnns).
timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
if IsKeepAliveUpdate(msg, edgeToUpdate) {
if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
log.Debugf("Ignoring keep alive update "+
"not within %v period for "+
"channel %v",
d.cfg.RebroadcastInterval,
shortChanID)
nMsg.err <- nil
return nil
}
} else {
// If it's not, we'll only allow a single update
// for this channel per block.
d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID]
if lastUpdateHeight[direction] == d.bestHeight {
log.Debugf("Ignoring update for "+
"channel %v due to previous "+
"update occurring within the "+
"same block %v", shortChanID,
d.bestHeight)
d.Unlock()
nMsg.err <- nil
return nil
}
d.Unlock()
}
} }
// Validate the channel announcement with the expected public key and // Validate the channel announcement with the expected public key and
@ -1997,6 +2049,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil return nil
} }
// With the edge successfully updated on disk, we'll note the
// current height so that we're able to rate limit any future
// updates for the same channel.
d.Lock()
lastUpdateHeight := d.heightForLastChanUpdate[shortChanID]
lastUpdateHeight[direction] = d.bestHeight
d.heightForLastChanUpdate[shortChanID] = lastUpdateHeight
d.Unlock()
// If this is a local ChannelUpdate without an AuthProof, it // If this is a local ChannelUpdate without an AuthProof, it
// means it is an update to a channel that is not (yet) // means it is an update to a channel that is not (yet)
// supposed to be announced to the greater network. However, // supposed to be announced to the greater network. However,
@ -2536,3 +2597,49 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
func (d *AuthenticatedGossiper) SyncManager() *SyncManager { func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
return d.syncMgr return d.syncMgr
} }
// IsKeepAliveUpdate determines whether this channel update is considered a
// keep-alive update based on the previous channel update processed for the same
// direction.
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate,
prev *channeldb.ChannelEdgePolicy) bool {
// Both updates should be from the same direction.
if update.ChannelFlags&lnwire.ChanUpdateDirection !=
prev.ChannelFlags&lnwire.ChanUpdateDirection {
return false
}
// The timestamp should always increase for a keep-alive update.
timestamp := time.Unix(int64(update.Timestamp), 0)
if !timestamp.After(prev.LastUpdate) {
return false
}
// None of the remaining fields should change for a keep-alive update.
if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
return false
}
if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
return false
}
if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
return false
}
if update.TimeLockDelta != prev.TimeLockDelta {
return false
}
if update.HtlcMinimumMsat != prev.MinHTLC {
return false
}
if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
return false
}
if update.HtlcMaximumMsat != prev.MaxHTLC {
return false
}
if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
return false
}
return true
}

@ -31,6 +31,7 @@ import (
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
) )
var ( var (
@ -3942,3 +3943,136 @@ func TestBroadcastAnnsAfterGraphSynced(t *testing.T) {
} }
assertBroadcast(chanAnn2, true, true) assertBroadcast(chanAnn2, true, true)
} }
// TestRateLimitChannelUpdates ensures that we properly rate limit incoming
// channel updates.
func TestRateLimitChannelUpdates(t *testing.T) {
t.Parallel()
// Create our test harness.
const blockHeight = 100
ctx, cleanup, err := createTestCtx(blockHeight)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
ctx.gossiper.cfg.RebroadcastInterval = time.Hour
// The graph should start empty.
require.Empty(t, ctx.router.infos)
require.Empty(t, ctx.router.edges)
// We'll create a batch of signed announcements, including updates for
// both sides, for a channel and process them. They should all be
// forwarded as this is our first time learning about the channel.
batch, err := createAnnouncements(blockHeight)
require.NoError(t, err)
nodePeer1 := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteChanAnn, nodePeer1,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn1, nodePeer1,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
nodePeer2 := &mockPeer{nodeKeyPriv2.PubKey(), nil, nil}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, nodePeer2,
):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
timeout := time.After(2 * trickleDelay)
for i := 0; i < 3; i++ {
select {
case <-ctx.broadcastedMessage:
case <-timeout:
t.Fatal("expected announcement to be broadcast")
}
}
shortChanID := batch.remoteChanAnn.ShortChannelID.ToUint64()
require.Contains(t, ctx.router.infos, shortChanID)
require.Contains(t, ctx.router.edges, shortChanID)
// We'll define a helper to assert whether updates should be rate
// limited or not depending on their contents.
assertRateLimit := func(update *lnwire.ChannelUpdate, peer lnpeer.Peer,
shouldRateLimit bool) {
t.Helper()
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(update, peer):
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("remote announcement not processed")
}
select {
case <-ctx.broadcastedMessage:
if shouldRateLimit {
t.Fatal("unexpected channel update broadcast")
}
case <-time.After(2 * trickleDelay):
if !shouldRateLimit {
t.Fatal("expected channel update broadcast")
}
}
}
// We'll start with the keep alive case.
//
// We rate limit any keep alive updates that have not at least spanned
// our rebroadcast interval.
rateLimitKeepAliveUpdate := *batch.chanUpdAnn1
rateLimitKeepAliveUpdate.Timestamp++
require.NoError(t, signUpdate(nodeKeyPriv1, &rateLimitKeepAliveUpdate))
assertRateLimit(&rateLimitKeepAliveUpdate, nodePeer1, true)
keepAliveUpdate := *batch.chanUpdAnn1
keepAliveUpdate.Timestamp = uint32(
time.Unix(int64(batch.chanUpdAnn1.Timestamp), 0).
Add(ctx.gossiper.cfg.RebroadcastInterval).Unix(),
)
require.NoError(t, signUpdate(nodeKeyPriv1, &keepAliveUpdate))
assertRateLimit(&keepAliveUpdate, nodePeer1, false)
// Then, we'll move on to the non keep alive cases.
//
// Non keep alive updates are limited to one per block per direction.
// Since we've already processed updates for both sides, the new updates
// for both directions will not be broadcast until a new block arrives.
updateSameDirection := keepAliveUpdate
updateSameDirection.Timestamp++
updateSameDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv1, &updateSameDirection))
assertRateLimit(&updateSameDirection, nodePeer1, true)
updateDiffDirection := *batch.chanUpdAnn2
updateDiffDirection.Timestamp++
updateDiffDirection.BaseFee++
require.NoError(t, signUpdate(nodeKeyPriv2, &updateDiffDirection))
assertRateLimit(&updateDiffDirection, nodePeer2, true)
// Notify a new block and reprocess the updates. They should no longer
// be rate limited.
ctx.notifier.notifyBlock(chainhash.Hash{}, blockHeight+1)
assertRateLimit(&updateSameDirection, nodePeer1, false)
assertRateLimit(&updateDiffDirection, nodePeer2, false)
}

@ -47,6 +47,11 @@ const (
ChanUpdateDisabled ChanUpdateDisabled
) )
// IsDisabled determines whether the channel flags has the disabled bit set.
func (c ChanUpdateChanFlags) IsDisabled() bool {
return c&ChanUpdateDisabled == ChanUpdateDisabled
}
// String returns the bitfield flags as a string. // String returns the bitfield flags as a string.
func (c ChanUpdateChanFlags) String() string { func (c ChanUpdateChanFlags) String() string {
return fmt.Sprintf("%08b", c) return fmt.Sprintf("%08b", c)