From 4cc60493d233b4dcb4bc1ffab85a56f135d061e5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 10 May 2018 17:40:29 -0400 Subject: [PATCH] peer+htlcswitch: randomize link commitment fee updates In this commit, we modify the behavior of links updating their commitment fees. Rather than attempting to update the commitment fee for each link every time a new block comes in, we'll use a timer with a random interval between 10 and 60 minutes for each link to determine when to update their corresponding commitment fee. This prevents us from oscillating the fee rate for our various commitment transactions. --- htlcswitch/link.go | 54 +++++++++++++++++++++++------- htlcswitch/link_test.go | 72 +++++++++++----------------------------- htlcswitch/test_utils.go | 58 +++++++++++++++++++++----------- peer.go | 20 ++++++----- 4 files changed, 111 insertions(+), 93 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c674a952..66821444 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3,6 +3,7 @@ package htlcswitch import ( "bytes" "fmt" + prand "math/rand" "sync" "sync/atomic" "time" @@ -21,6 +22,10 @@ import ( "github.com/roasbeef/btcd/chaincfg/chainhash" ) +func init() { + prand.Seed(time.Now().UnixNano()) +} + const ( // expiryGraceDelta is a grace period that the timeout of incoming // HTLC's that pay directly to us (i.e we're the "exit node") must up @@ -36,6 +41,12 @@ const ( // for a new fee update. We'll use this as a fee floor when proposing // and accepting updates. minCommitFeePerKw = 253 + + // DefaultMinLinkFeeUpdateTimeout and DefaultMaxLinkFeeUpdateTimeout + // represent the default timeout bounds in which a link should propose + // to update its commitment fee rate. + DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute + DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute ) // ForwardingPolicy describes the set of constraints that a given ChannelLink @@ -248,6 +259,12 @@ type ChannelLinkConfig struct { // in testing, it is here to ensure the sphinx replay detection on the // receiving node is persistent. UnsafeReplay bool + + // MinFeeUpdateTimeout and MaxFeeUpdateTimeout represent the timeout + // interval bounds in which a link will propose to update its commitment + // fee rate. A random timeout will be selected between these values. + MinFeeUpdateTimeout time.Duration + MaxFeeUpdateTimeout time.Duration } // channelLink is the service which drives a channel's commitment update @@ -342,6 +359,10 @@ type channelLink struct { logCommitTimer *time.Timer logCommitTick <-chan time.Time + // updateFeeTimer is the timer responsible for updating the link's + // commitment fee every time it fires. + updateFeeTimer *time.Timer + sync.RWMutex wg sync.WaitGroup @@ -427,6 +448,8 @@ func (l *channelLink) Start() error { } } + l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout()) + l.wg.Add(1) go l.htlcManager() @@ -449,8 +472,8 @@ func (l *channelLink) Stop() { l.cfg.ChainEvents.Cancel() } + l.updateFeeTimer.Stop() l.channel.Stop() - l.overflowQueue.Stop() close(l.quit) @@ -835,7 +858,6 @@ func (l *channelLink) htlcManager() { out: for { - // We must always check if we failed at some point processing // the last update before processing the next. if l.failed { @@ -844,16 +866,10 @@ out: } select { - - // A new block has arrived, we'll check the network fee to see - // if we should adjust our commitment fee, and also update our - // track of the best current height. - case blockEpoch, ok := <-l.cfg.BlockEpochs.Epochs: - if !ok { - break out - } - - l.bestHeight = uint32(blockEpoch.Height) + // Our update fee timer has fired, so we'll check the network + // fee to see if we should adjust our commitment fee. + case <-l.updateFeeTimer.C: + l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout()) // If we're not the initiator of the channel, don't we // don't control the fees, so we can ignore this. @@ -983,6 +999,20 @@ out: } } +// randomFeeUpdateTimeout returns a random timeout between the bounds defined +// within the link's configuration that will be used to determine when the link +// should propose an update to its commitment fee rate. +func (l *channelLink) randomFeeUpdateTimeout() time.Duration { + lower := int64(l.cfg.MinFeeUpdateTimeout) + upper := int64(l.cfg.MaxFeeUpdateTimeout) + rand := prand.Int63n(upper) + if rand < lower { + rand = lower + } + + return time.Duration(rand) +} + // handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC // Switch. Possible messages sent by the switch include requests to forward new // HTLCs, timeout previously cleared HTLCs, and finally to settle currently diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index ea4ba96b..f145e33b 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1498,9 +1498,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( BlockEpochs: globalEpoch, BatchTicker: ticker, FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), - // Make the BatchSize large enough to not - // trigger commit update automatically during tests. - BatchSize: 10000, + // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough + // to not trigger commit updates automatically during tests. + BatchSize: 10000, + MinFeeUpdateTimeout: 30 * time.Minute, + MaxFeeUpdateTimeout: 30 * time.Minute, } const startingHeight = 100 @@ -3451,22 +3453,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { defer n.stop() defer n.feeEstimator.Stop() - // First, we'll start off all channels at "height" 9000 by sending a - // new epoch to all the clients. - select { - case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9000, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } - select { - case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9000, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } + // For the sake of this test, we'll reset the timer to fire in a second + // so that Alice's link queries for a new network fee. + n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond) startingFeeRate := channels.aliceToBob.CommitFeeRate() @@ -3480,20 +3469,15 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { select { case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte: case <-time.After(time.Second * 5): - t.Fatalf("alice didn't query for the new " + - "network fee") + t.Fatalf("alice didn't query for the new network fee") } - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Second) // The fee rate on the alice <-> bob channel should still be the same // on both sides. aliceFeeRate := channels.aliceToBob.CommitFeeRate() bobFeeRate := channels.bobToAlice.CommitFeeRate() - if aliceFeeRate != bobFeeRate { - t.Fatalf("fee rates don't match: expected %v got %v", - aliceFeeRate, bobFeeRate) - } if aliceFeeRate != startingFeeRate { t.Fatalf("alice's fee rate shouldn't have changed: "+ "expected %v, got %v", aliceFeeRate, startingFeeRate) @@ -3503,22 +3487,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { "expected %v, got %v", bobFeeRate, startingFeeRate) } - // Now we'll send a new block update to all end points, with a new - // height THAT'S OVER 9000!!! - select { - case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9001, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } - select { - case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{ - Height: 9001, - }: - case <-time.After(time.Second * 5): - t.Fatalf("link didn't read block epoch") - } + // We'll reset the timer once again to ensure Alice's link queries for a + // new network fee. + n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond) // Next, we'll set up a deliver a fee rate that's triple the current // fee rate. This should cause the Alice (the initiator) to trigger a @@ -3527,11 +3498,10 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { select { case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte * 3: case <-time.After(time.Second * 5): - t.Fatalf("alice didn't query for the new " + - "network fee") + t.Fatalf("alice didn't query for the new network fee") } - time.Sleep(time.Second * 2) + time.Sleep(time.Second) // At this point, Alice should've triggered a new fee update that // increased the fee rate to match the new rate. @@ -3545,10 +3515,6 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) { t.Fatalf("bob's fee rate didn't change: expected %v, got %v", newFeeRate, aliceFeeRate) } - if aliceFeeRate != bobFeeRate { - t.Fatalf("fee rates don't match: expected %v got %v", - aliceFeeRate, bobFeeRate) - } } // TestChannelLinkAcceptDuplicatePayment tests that if a link receives an @@ -3917,9 +3883,11 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, BlockEpochs: globalEpoch, BatchTicker: ticker, FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), - // Make the BatchSize large enough to not - // trigger commit update automatically during tests. - BatchSize: 10000, + // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough + // to not trigger commit updates automatically during tests. + BatchSize: 10000, + MinFeeUpdateTimeout: 30 * time.Minute, + MaxFeeUpdateTimeout: 30 * time.Minute, // Set any hodl flags requested for the new link. HodlMask: hodl.MaskFromFlags(hodlFlags...), DebugHTLC: len(hodlFlags) > 0, diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index da48a412..06760bd1 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -882,6 +882,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, quit: make(chan struct{}), } + const ( + batchTimeout = 50 * time.Millisecond + fwdPkgTimeout = 5 * time.Second + feeUpdateTimeout = 30 * time.Minute + ) + pCache := &mockPreimageCache{ // hash -> preimage preimageMap: make(map[[32]byte][]byte), @@ -921,11 +927,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{aliceTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, aliceChannel, startingHeight, @@ -970,11 +979,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{firstBobTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, firstBobChannel, startingHeight, @@ -1019,11 +1031,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{secondBobTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, secondBobChannel, startingHeight, @@ -1068,11 +1083,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, UpdateContractSignals: func(*contractcourt.ContractSignals) error { return nil }, - ChainEvents: &contractcourt.ChainEventSubscription{}, - SyncStates: true, - BatchTicker: &mockTicker{carolTicker.C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, - BatchSize: 10, + ChainEvents: &contractcourt.ChainEventSubscription{}, + SyncStates: true, + BatchSize: 10, + BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, + FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + MinFeeUpdateTimeout: feeUpdateTimeout, + MaxFeeUpdateTimeout: feeUpdateTimeout, + OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, }, carolChannel, startingHeight, diff --git a/peer.go b/peer.go index f21ece2e..26537cd2 100644 --- a/peer.go +++ b/peer.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "container/list" "fmt" "net" @@ -9,14 +10,11 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lnd/brontide" - "github.com/lightningnetwork/lnd/contractcourt" - - "bytes" - "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" @@ -551,11 +549,15 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, time.NewTicker(50 * time.Millisecond)), FwdPkgGCTicker: htlcswitch.NewBatchTicker( time.NewTicker(time.Minute)), - BatchSize: 10, - UnsafeReplay: cfg.UnsafeReplay, + BatchSize: 10, + UnsafeReplay: cfg.UnsafeReplay, + MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, + MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout, } - link := htlcswitch.NewChannelLink(linkCfg, lnChan, - uint32(currentHeight)) + + link := htlcswitch.NewChannelLink( + linkCfg, lnChan, uint32(currentHeight), + ) // With the channel link created, we'll now notify the htlc switch so // this channel can be used to dispatch local payments and also