peer+htlcswitch: randomize link commitment fee updates

In this commit, we modify the behavior of links updating their
commitment fees. Rather than attempting to update the commitment fee for
each link every time a new block comes in, we'll use a timer with a
random interval between 10 and 60 minutes for each link to determine
when to update their corresponding commitment fee. This prevents us from
oscillating the fee rate for our various commitment transactions.
This commit is contained in:
Wilmer Paulino 2018-05-10 17:40:29 -04:00
parent c1a1b3ba3d
commit 4cc60493d2
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
4 changed files with 111 additions and 93 deletions

@ -3,6 +3,7 @@ package htlcswitch
import ( import (
"bytes" "bytes"
"fmt" "fmt"
prand "math/rand"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -21,6 +22,10 @@ import (
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
) )
func init() {
prand.Seed(time.Now().UnixNano())
}
const ( const (
// expiryGraceDelta is a grace period that the timeout of incoming // expiryGraceDelta is a grace period that the timeout of incoming
// HTLC's that pay directly to us (i.e we're the "exit node") must up // HTLC's that pay directly to us (i.e we're the "exit node") must up
@ -36,6 +41,12 @@ const (
// for a new fee update. We'll use this as a fee floor when proposing // for a new fee update. We'll use this as a fee floor when proposing
// and accepting updates. // and accepting updates.
minCommitFeePerKw = 253 minCommitFeePerKw = 253
// DefaultMinLinkFeeUpdateTimeout and DefaultMaxLinkFeeUpdateTimeout
// represent the default timeout bounds in which a link should propose
// to update its commitment fee rate.
DefaultMinLinkFeeUpdateTimeout = 10 * time.Minute
DefaultMaxLinkFeeUpdateTimeout = 60 * time.Minute
) )
// ForwardingPolicy describes the set of constraints that a given ChannelLink // ForwardingPolicy describes the set of constraints that a given ChannelLink
@ -248,6 +259,12 @@ type ChannelLinkConfig struct {
// in testing, it is here to ensure the sphinx replay detection on the // in testing, it is here to ensure the sphinx replay detection on the
// receiving node is persistent. // receiving node is persistent.
UnsafeReplay bool UnsafeReplay bool
// MinFeeUpdateTimeout and MaxFeeUpdateTimeout represent the timeout
// interval bounds in which a link will propose to update its commitment
// fee rate. A random timeout will be selected between these values.
MinFeeUpdateTimeout time.Duration
MaxFeeUpdateTimeout time.Duration
} }
// channelLink is the service which drives a channel's commitment update // channelLink is the service which drives a channel's commitment update
@ -342,6 +359,10 @@ type channelLink struct {
logCommitTimer *time.Timer logCommitTimer *time.Timer
logCommitTick <-chan time.Time logCommitTick <-chan time.Time
// updateFeeTimer is the timer responsible for updating the link's
// commitment fee every time it fires.
updateFeeTimer *time.Timer
sync.RWMutex sync.RWMutex
wg sync.WaitGroup wg sync.WaitGroup
@ -427,6 +448,8 @@ func (l *channelLink) Start() error {
} }
} }
l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout())
l.wg.Add(1) l.wg.Add(1)
go l.htlcManager() go l.htlcManager()
@ -449,8 +472,8 @@ func (l *channelLink) Stop() {
l.cfg.ChainEvents.Cancel() l.cfg.ChainEvents.Cancel()
} }
l.updateFeeTimer.Stop()
l.channel.Stop() l.channel.Stop()
l.overflowQueue.Stop() l.overflowQueue.Stop()
close(l.quit) close(l.quit)
@ -835,7 +858,6 @@ func (l *channelLink) htlcManager() {
out: out:
for { for {
// We must always check if we failed at some point processing // We must always check if we failed at some point processing
// the last update before processing the next. // the last update before processing the next.
if l.failed { if l.failed {
@ -844,16 +866,10 @@ out:
} }
select { select {
// Our update fee timer has fired, so we'll check the network
// A new block has arrived, we'll check the network fee to see // fee to see if we should adjust our commitment fee.
// if we should adjust our commitment fee, and also update our case <-l.updateFeeTimer.C:
// track of the best current height. l.updateFeeTimer.Reset(l.randomFeeUpdateTimeout())
case blockEpoch, ok := <-l.cfg.BlockEpochs.Epochs:
if !ok {
break out
}
l.bestHeight = uint32(blockEpoch.Height)
// If we're not the initiator of the channel, don't we // If we're not the initiator of the channel, don't we
// don't control the fees, so we can ignore this. // don't control the fees, so we can ignore this.
@ -983,6 +999,20 @@ out:
} }
} }
// randomFeeUpdateTimeout returns a random timeout between the bounds defined
// within the link's configuration that will be used to determine when the link
// should propose an update to its commitment fee rate.
func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
lower := int64(l.cfg.MinFeeUpdateTimeout)
upper := int64(l.cfg.MaxFeeUpdateTimeout)
rand := prand.Int63n(upper)
if rand < lower {
rand = lower
}
return time.Duration(rand)
}
// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC // handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new // Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently // HTLCs, timeout previously cleared HTLCs, and finally to settle currently

@ -1498,9 +1498,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
BlockEpochs: globalEpoch, BlockEpochs: globalEpoch,
BatchTicker: ticker, BatchTicker: ticker,
FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)),
// Make the BatchSize large enough to not // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// trigger commit update automatically during tests. // to not trigger commit updates automatically during tests.
BatchSize: 10000, BatchSize: 10000,
MinFeeUpdateTimeout: 30 * time.Minute,
MaxFeeUpdateTimeout: 30 * time.Minute,
} }
const startingHeight = 100 const startingHeight = 100
@ -3451,22 +3453,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
defer n.stop() defer n.stop()
defer n.feeEstimator.Stop() defer n.feeEstimator.Stop()
// First, we'll start off all channels at "height" 9000 by sending a // For the sake of this test, we'll reset the timer to fire in a second
// new epoch to all the clients. // so that Alice's link queries for a new network fee.
select { n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond)
case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9000,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
select {
case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9000,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
startingFeeRate := channels.aliceToBob.CommitFeeRate() startingFeeRate := channels.aliceToBob.CommitFeeRate()
@ -3480,20 +3469,15 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
select { select {
case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte: case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte:
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
t.Fatalf("alice didn't query for the new " + t.Fatalf("alice didn't query for the new network fee")
"network fee")
} }
time.Sleep(time.Millisecond * 500) time.Sleep(time.Second)
// The fee rate on the alice <-> bob channel should still be the same // The fee rate on the alice <-> bob channel should still be the same
// on both sides. // on both sides.
aliceFeeRate := channels.aliceToBob.CommitFeeRate() aliceFeeRate := channels.aliceToBob.CommitFeeRate()
bobFeeRate := channels.bobToAlice.CommitFeeRate() bobFeeRate := channels.bobToAlice.CommitFeeRate()
if aliceFeeRate != bobFeeRate {
t.Fatalf("fee rates don't match: expected %v got %v",
aliceFeeRate, bobFeeRate)
}
if aliceFeeRate != startingFeeRate { if aliceFeeRate != startingFeeRate {
t.Fatalf("alice's fee rate shouldn't have changed: "+ t.Fatalf("alice's fee rate shouldn't have changed: "+
"expected %v, got %v", aliceFeeRate, startingFeeRate) "expected %v, got %v", aliceFeeRate, startingFeeRate)
@ -3503,22 +3487,9 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
"expected %v, got %v", bobFeeRate, startingFeeRate) "expected %v, got %v", bobFeeRate, startingFeeRate)
} }
// Now we'll send a new block update to all end points, with a new // We'll reset the timer once again to ensure Alice's link queries for a
// height THAT'S OVER 9000!!! // new network fee.
select { n.aliceChannelLink.updateFeeTimer.Reset(time.Millisecond)
case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9001,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
select {
case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9001,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
// Next, we'll set up a deliver a fee rate that's triple the current // Next, we'll set up a deliver a fee rate that's triple the current
// fee rate. This should cause the Alice (the initiator) to trigger a // fee rate. This should cause the Alice (the initiator) to trigger a
@ -3527,11 +3498,10 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
select { select {
case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte * 3: case n.feeEstimator.byteFeeIn <- startingFeeRateSatPerVByte * 3:
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
t.Fatalf("alice didn't query for the new " + t.Fatalf("alice didn't query for the new network fee")
"network fee")
} }
time.Sleep(time.Second * 2) time.Sleep(time.Second)
// At this point, Alice should've triggered a new fee update that // At this point, Alice should've triggered a new fee update that
// increased the fee rate to match the new rate. // increased the fee rate to match the new rate.
@ -3545,10 +3515,6 @@ func TestChannelLinkUpdateCommitFee(t *testing.T) {
t.Fatalf("bob's fee rate didn't change: expected %v, got %v", t.Fatalf("bob's fee rate didn't change: expected %v, got %v",
newFeeRate, aliceFeeRate) newFeeRate, aliceFeeRate)
} }
if aliceFeeRate != bobFeeRate {
t.Fatalf("fee rates don't match: expected %v got %v",
aliceFeeRate, bobFeeRate)
}
} }
// TestChannelLinkAcceptDuplicatePayment tests that if a link receives an // TestChannelLinkAcceptDuplicatePayment tests that if a link receives an
@ -3917,9 +3883,11 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch,
BlockEpochs: globalEpoch, BlockEpochs: globalEpoch,
BatchTicker: ticker, BatchTicker: ticker,
FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)), FwdPkgGCTicker: NewBatchTicker(time.NewTicker(5 * time.Second)),
// Make the BatchSize large enough to not // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// trigger commit update automatically during tests. // to not trigger commit updates automatically during tests.
BatchSize: 10000, BatchSize: 10000,
MinFeeUpdateTimeout: 30 * time.Minute,
MaxFeeUpdateTimeout: 30 * time.Minute,
// Set any hodl flags requested for the new link. // Set any hodl flags requested for the new link.
HodlMask: hodl.MaskFromFlags(hodlFlags...), HodlMask: hodl.MaskFromFlags(hodlFlags...),
DebugHTLC: len(hodlFlags) > 0, DebugHTLC: len(hodlFlags) > 0,

@ -882,6 +882,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
const (
batchTimeout = 50 * time.Millisecond
fwdPkgTimeout = 5 * time.Second
feeUpdateTimeout = 30 * time.Minute
)
pCache := &mockPreimageCache{ pCache := &mockPreimageCache{
// hash -> preimage // hash -> preimage
preimageMap: make(map[[32]byte][]byte), preimageMap: make(map[[32]byte][]byte),
@ -921,11 +927,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true, SyncStates: true,
BatchTicker: &mockTicker{aliceTicker.C}, BatchSize: 10,
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
BatchSize: 10, FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
}, },
aliceChannel, aliceChannel,
startingHeight, startingHeight,
@ -970,11 +979,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true, SyncStates: true,
BatchTicker: &mockTicker{firstBobTicker.C}, BatchSize: 10,
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
BatchSize: 10, FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
}, },
firstBobChannel, firstBobChannel,
startingHeight, startingHeight,
@ -1019,11 +1031,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true, SyncStates: true,
BatchTicker: &mockTicker{secondBobTicker.C}, BatchSize: 10,
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
BatchSize: 10, FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
}, },
secondBobChannel, secondBobChannel,
startingHeight, startingHeight,
@ -1068,11 +1083,14 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
SyncStates: true, SyncStates: true,
BatchTicker: &mockTicker{carolTicker.C}, BatchSize: 10,
FwdPkgGCTicker: &mockTicker{time.NewTicker(5 * time.Second).C}, BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
BatchSize: 10, FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
MinFeeUpdateTimeout: feeUpdateTimeout,
MaxFeeUpdateTimeout: feeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
}, },
carolChannel, carolChannel,
startingHeight, startingHeight,

20
peer.go

@ -1,6 +1,7 @@
package main package main
import ( import (
"bytes"
"container/list" "container/list"
"fmt" "fmt"
"net" "net"
@ -9,14 +10,11 @@ import (
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/contractcourt"
"bytes"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
@ -551,11 +549,15 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
time.NewTicker(50 * time.Millisecond)), time.NewTicker(50 * time.Millisecond)),
FwdPkgGCTicker: htlcswitch.NewBatchTicker( FwdPkgGCTicker: htlcswitch.NewBatchTicker(
time.NewTicker(time.Minute)), time.NewTicker(time.Minute)),
BatchSize: 10, BatchSize: 10,
UnsafeReplay: cfg.UnsafeReplay, UnsafeReplay: cfg.UnsafeReplay,
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
MaxFeeUpdateTimeout: htlcswitch.DefaultMaxLinkFeeUpdateTimeout,
} }
link := htlcswitch.NewChannelLink(linkCfg, lnChan,
uint32(currentHeight)) link := htlcswitch.NewChannelLink(
linkCfg, lnChan, uint32(currentHeight),
)
// With the channel link created, we'll now notify the htlc switch so // With the channel link created, we'll now notify the htlc switch so
// this channel can be used to dispatch local payments and also // this channel can be used to dispatch local payments and also