diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 5a07926b..8baa2f6d 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -118,6 +118,11 @@ type ChannelLinkConfig struct { // in thread-safe manner. Registry InvoiceDatabase + // FeeEstimator is an instance of a live fee estimator which will be + // used to dynamically regulate the current fee of the commitment + // transaction to ensure timely confirmation. + FeeEstimator lnwallet.FeeEstimator + // BlockEpochs is an active block epoch event stream backed by an // active ChainNotifier instance. The ChannelLink will use new block // notifications sent over this channel to decide when a _new_ HTLC is @@ -138,6 +143,7 @@ type ChannelLinkConfig struct { // HodlHTLC should be active if you want this node to refrain from // settling all incoming HTLCs with the sender if it finds itself to be // the exit node. + // // NOTE: HodlHTLC should be active in conjunction with DebugHTLC. HodlHTLC bool @@ -286,6 +292,27 @@ func (l *channelLink) Stop() { l.cfg.BlockEpochs.Cancel() } +// sampleNetworkFee samples the current fee rate on the network to get into the +// chain in a timely manner. The returned value is expressed in fee-per-kw, as +// this is the native rate used when computing the fee for commitment +// transactions, and the second-level HTLC transactions. +func (l *channelLink) sampleNetworkFee() (btcutil.Amount, error) { + // We'll first query for the sat/weight recommended to be confirmed + // within 3blocks. + feePerWeight, err := l.cfg.FeeEstimator.EstimateFeePerWeight(3) + if err != nil { + return 0, err + } + + // Once we have this fee rate, we'll convert to sat-per-kw. + feePerKw := feePerWeight * 1000 + + log.Debugf("ChannelLink(%v): sampled fee rate for 3 block conf: %v "+ + "sat/kw", l, int64(feePerKw)) + + return feePerKw, nil +} + // shouldAdjustCommitFee returns true if we should update our commitment fee to // match that of the network fee. We'll only update our commitment fee if the // network fee is +/- 10% to our network fee. @@ -417,22 +444,45 @@ func (l *channelLink) htlcManager() { out: for { select { - // A new block has arrived, we'll examine all the active HTLC's - // to see if any of them have expired, and also update our + // A new block has arrived, we'll check the network fee to see + // if we should adjust our commitment fee , and also update our // track of the best current height. case blockEpoch, ok := <-l.cfg.BlockEpochs.Epochs: if !ok { break out } - log.Tracef("ChannelPoint(%v): new block(height=%v, "+ - "hash=%v) examining active HTLC's", - l.channel.ChannelPoint(), blockEpoch.Height, - blockEpoch.Hash) - - // TODO(roasbeef): check HTLC's for expiry l.bestHeight = uint32(blockEpoch.Height) + // If we're not the initiator of the channel, don't we + // don't control the fees, so we can ignore this. + if !l.channel.IsInitiator() { + continue + } + + // If we are the initiator, then we'll sample the + // current fee rate to get into the chain within 3 + // blocks. + feePerKw, err := l.sampleNetworkFee() + if err != nil { + log.Errorf("unable to sample network fee: %v", err) + continue + } + + // We'll check to see if we should update the fee rate + // based on our current set fee rate. + commitFee := l.channel.CommitFeeRate() + if !shouldAdjustCommitFee(feePerKw, commitFee) { + continue + } + + // If we do, then we'll send a new UpdateFee message to + // the remote party, to be locked in with a new update. + if err := l.updateChannelFee(feePerKw); err != nil { + log.Errorf("unable to update fee rate: %v", err) + continue + } + // The underlying channel has notified us of a unilateral close // carried out by the remote peer. In the case of such an // event, we'll wipe the channel state from the peer, and mark @@ -733,7 +783,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // need to re-transmit any messages to the remote party. msgsToReSend, err := l.channel.ProcessChanSyncMsg(msg) if err != nil { - // TODO(roasbeef): check conrete type of error, act + // TODO(roasbeef): check concrete type of error, act // accordingly l.fail("unable to handle upstream reestablish "+ "message: %v", err) @@ -1071,14 +1121,22 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { // updateChannelFee updates the commitment fee-per-kw on this channel by // committing to an update_fee message. func (l *channelLink) updateChannelFee(feePerKw btcutil.Amount) error { - // Update local fee. + + log.Infof("ChannelPoint(%v): updating commit fee to %v sat/kw", l, + feePerKw) + + // First, we'll update the local fee on our commitment. if err := l.channel.UpdateFee(feePerKw); err != nil { return err } - // Send fee update to remote. + // We'll then attempt to send a new UpdateFee message, and also lock it + // in immediately by triggering a commitment update. msg := lnwire.NewUpdateFee(l.ChanID(), feePerKw) - return l.cfg.Peer.SendMessage(msg) + if err := l.cfg.Peer.SendMessage(msg); err != nil { + return err + } + return l.updateCommitTx() } // processLockedInHtlcs serially processes each of the log updates which have diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index be161fa3..c9a53e42 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -2119,3 +2119,148 @@ func TestShouldAdjustCommitFee(t *testing.T) { } } } + +// TestChannelLinkUpdateCommitFee tests that when a new block comes in, the +// channel link properly checks to see if it should update the commitment fee. +func TestChannelLinkUpdateCommitFee(t *testing.T) { + t.Parallel() + + // First, we'll create our traditional three hop network. We'll only be + // interacting with and asserting the state of two of the end points + // for this test. + channels, cleanUp, _, err := createClusterChannels( + btcutil.SatoshiPerBitcoin*3, + btcutil.SatoshiPerBitcoin*5) + if err != nil { + t.Fatalf("unable to create channel: %v", err) + } + defer cleanUp() + + n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice, + channels.bobToCarol, channels.carolToBob, testStartingHeight) + + // First, we'll set up some message interceptors to ensure that the + // proper messages are sent when updating fees. + chanID := n.aliceChannelLink.ChanID() + messages := []expectedMessage{ + {"alice", "bob", &lnwire.ChannelReestablish{}, false}, + {"bob", "alice", &lnwire.ChannelReestablish{}, false}, + + {"alice", "bob", &lnwire.FundingLocked{}, false}, + {"bob", "alice", &lnwire.FundingLocked{}, false}, + + {"alice", "bob", &lnwire.UpdateFee{}, false}, + + {"alice", "bob", &lnwire.CommitSig{}, false}, + {"bob", "alice", &lnwire.RevokeAndAck{}, false}, + {"bob", "alice", &lnwire.CommitSig{}, false}, + {"alice", "bob", &lnwire.RevokeAndAck{}, false}, + } + n.aliceServer.intersect(createInterceptorFunc("[alice] <-- [bob]", + "alice", messages, chanID, false)) + n.bobServer.intersect(createInterceptorFunc("[alice] --> [bob]", + "bob", messages, chanID, false)) + + if err := n.start(); err != nil { + t.Fatal(err) + } + defer n.stop() + defer n.feeEstimator.Stop() + + // First, we'll start off all channels at "height" 9000 by sending a + // new epoch to all the clients. + select { + case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{ + Height: 9000, + }: + case <-time.After(time.Second * 5): + t.Fatalf("link didn't read block epoch") + } + select { + case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{ + Height: 9000, + }: + case <-time.After(time.Second * 5): + t.Fatalf("link didn't read block epoch") + } + + startingFeeRate := channels.aliceToBob.CommitFeeRate() + + // Next, we'll send the first fee rate response to Alice. + select { + case n.feeEstimator.weightFeeIn <- startingFeeRate / 1000: + case <-time.After(time.Second * 5): + t.Fatalf("alice didn't query for the new " + + "network fee") + } + + time.Sleep(time.Millisecond * 500) + + // The fee rate on the alice <-> bob channel should still be the same + // on both sides. + aliceFeeRate := channels.aliceToBob.CommitFeeRate() + bobFeeRate := channels.bobToAlice.CommitFeeRate() + if aliceFeeRate != bobFeeRate { + t.Fatalf("fee rates don't match: expected %v got %v", + aliceFeeRate, bobFeeRate) + } + if aliceFeeRate != startingFeeRate { + t.Fatalf("alice's fee rate shouldn't have changed: "+ + "expected %v, got %v", aliceFeeRate, startingFeeRate) + } + if bobFeeRate != startingFeeRate { + t.Fatalf("bob's fee rate shouldn't have changed: "+ + "expected %v, got %v", bobFeeRate, startingFeeRate) + } + + // Now we'll send a new block update to all end points, with a new + // height THAT'S OVER 9000!!! + select { + case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{ + Height: 9001, + }: + case <-time.After(time.Second * 5): + t.Fatalf("link didn't read block epoch") + } + select { + case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{ + Height: 9001, + }: + case <-time.After(time.Second * 5): + t.Fatalf("link didn't read block epoch") + } + + // Next, we'll set up a deliver a fee rate that's triple the current + // fee rate. This should cause the Alice (the initiator) to trigger a + // fee update. + newFeeRate := startingFeeRate * 3 + select { + case n.feeEstimator.weightFeeIn <- newFeeRate: + case <-time.After(time.Second * 5): + t.Fatalf("alice didn't query for the new " + + "network fee") + } + + time.Sleep(time.Second * 1) + + // At this point, Alice should've triggered a new fee update that + // increased the fee rate to match the new rate. + // + // We'll scale the new fee rate by 100 as we deal with units of fee + // per-kw. + expectedFeeRate := newFeeRate * 1000 + aliceFeeRate = channels.aliceToBob.CommitFeeRate() + bobFeeRate = channels.bobToAlice.CommitFeeRate() + if aliceFeeRate != expectedFeeRate { + t.Fatalf("alice's fee rate didn't change: expected %v, got %v", + expectedFeeRate, aliceFeeRate) + } + if bobFeeRate != expectedFeeRate { + t.Fatalf("bob's fee rate didn't change: expected %v, got %v", + expectedFeeRate, aliceFeeRate) + } + if aliceFeeRate != bobFeeRate { + t.Fatalf("fee rates don't match: expected %v got %v", + aliceFeeRate, bobFeeRate) + } +} diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 20b9fbef..8721ccb3 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -22,8 +22,44 @@ import ( "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" ) +type mockFeeEstimator struct { + byteFeeIn chan btcutil.Amount + weightFeeIn chan btcutil.Amount + + quit chan struct{} +} + +func (m *mockFeeEstimator) EstimateFeePerByte(numBlocks uint32) (btcutil.Amount, error) { + select { + case feeRate := <-m.byteFeeIn: + return feeRate, nil + case <-m.quit: + return 0, fmt.Errorf("exiting") + } +} + +func (m *mockFeeEstimator) EstimateFeePerWeight(numBlocks uint32) (btcutil.Amount, error) { + select { + case feeRate := <-m.weightFeeIn: + return feeRate, nil + case <-m.quit: + return 0, fmt.Errorf("exiting") + } +} + +func (m *mockFeeEstimator) Start() error { + return nil +} +func (m *mockFeeEstimator) Stop() error { + close(m.quit) + return nil +} + +var _ lnwallet.FeeEstimator = (*mockFeeEstimator)(nil) + type mockServer struct { started int32 shutdown int32 @@ -304,6 +340,8 @@ func (s *mockServer) readHandler(message lnwire.Message) error { return nil case *lnwire.ChannelReestablish: targetChan = msg.ChanID + case *lnwire.UpdateFee: + targetChan = msg.ChanID default: return fmt.Errorf("unknown message type: %T", msg) } diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 11f3fd75..e1e1fe16 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -365,6 +365,8 @@ func getChanID(msg lnwire.Message) (lnwire.ChannelID, error) { chanID = msg.ChanID case *lnwire.FundingLocked: chanID = msg.ChanID + case *lnwire.UpdateFee: + chanID = msg.ChanID default: return chanID, fmt.Errorf("unknown type: %T", msg) } @@ -426,13 +428,20 @@ func generateRoute(hops ...ForwardingInfo) ([lnwire.OnionPacketSize]byte, error) type threeHopNetwork struct { aliceServer *mockServer aliceChannelLink *channelLink + aliceBlockEpoch chan *chainntnfs.BlockEpoch + + firstBobChannelLink *channelLink + bobFirstBlockEpoch chan *chainntnfs.BlockEpoch - firstBobChannelLink *channelLink bobServer *mockServer secondBobChannelLink *channelLink + bobSecondBlockEpoch chan *chainntnfs.BlockEpoch carolChannelLink *channelLink carolServer *mockServer + carolBlockEpoch chan *chainntnfs.BlockEpoch + + feeEstimator *mockFeeEstimator globalPolicy ForwardingPolicy } @@ -698,21 +707,29 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, bobServer := newMockServer(t, "bob") carolServer := newMockServer(t, "carol") - // Create mock decoder instead of sphinx one in order to mock the - // route which htlc should follow. + // Create mock decoder instead of sphinx one in order to mock the route + // which htlc should follow. decoder := &mockIteratorDecoder{} - globalEpoch := &chainntnfs.BlockEpochEvent{ - Epochs: make(chan *chainntnfs.BlockEpoch), - Cancel: func() { - }, + feeEstimator := &mockFeeEstimator{ + byteFeeIn: make(chan btcutil.Amount), + weightFeeIn: make(chan btcutil.Amount), + quit: make(chan struct{}), } + globalPolicy := ForwardingPolicy{ MinHTLC: lnwire.NewMSatFromSatoshis(5), BaseFee: lnwire.NewMSatFromSatoshis(1), TimeLockDelta: 6, } obfuscator := newMockObfuscator() + + aliceEpochChan := make(chan *chainntnfs.BlockEpoch) + aliceEpoch := &chainntnfs.BlockEpochEvent{ + Epochs: aliceEpochChan, + Cancel: func() { + }, + } aliceChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -725,7 +742,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, GetLastChannelUpdate: mockGetChanUpdateMessage, Registry: aliceServer.registry, - BlockEpochs: globalEpoch, + BlockEpochs: aliceEpoch, + FeeEstimator: feeEstimator, SyncStates: true, }, aliceChannel, @@ -735,6 +753,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, t.Fatalf("unable to add alice channel link: %v", err) } + bobFirstEpochChan := make(chan *chainntnfs.BlockEpoch) + bobFirstEpoch := &chainntnfs.BlockEpochEvent{ + Epochs: bobFirstEpochChan, + Cancel: func() { + }, + } firstBobChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -747,7 +771,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, GetLastChannelUpdate: mockGetChanUpdateMessage, Registry: bobServer.registry, - BlockEpochs: globalEpoch, + BlockEpochs: bobFirstEpoch, + FeeEstimator: feeEstimator, SyncStates: true, }, firstBobChannel, @@ -757,6 +782,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, t.Fatalf("unable to add first bob channel link: %v", err) } + bobSecondEpochChan := make(chan *chainntnfs.BlockEpoch) + bobSecondEpoch := &chainntnfs.BlockEpochEvent{ + Epochs: bobSecondEpochChan, + Cancel: func() { + }, + } secondBobChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -769,7 +800,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, GetLastChannelUpdate: mockGetChanUpdateMessage, Registry: bobServer.registry, - BlockEpochs: globalEpoch, + BlockEpochs: bobSecondEpoch, + FeeEstimator: feeEstimator, SyncStates: true, }, secondBobChannel, @@ -779,6 +811,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, t.Fatalf("unable to add second bob channel link: %v", err) } + carolBlockEpoch := make(chan *chainntnfs.BlockEpoch) + carolEpoch := &chainntnfs.BlockEpochEvent{ + Epochs: bobSecondEpochChan, + Cancel: func() { + }, + } carolChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -791,7 +829,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, GetLastChannelUpdate: mockGetChanUpdateMessage, Registry: carolServer.registry, - BlockEpochs: globalEpoch, + BlockEpochs: carolEpoch, + FeeEstimator: feeEstimator, SyncStates: true, }, carolChannel, @@ -802,14 +841,22 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } return &threeHopNetwork{ - aliceServer: aliceServer, - aliceChannelLink: aliceChannelLink.(*channelLink), - firstBobChannelLink: firstBobChannelLink.(*channelLink), + aliceServer: aliceServer, + aliceChannelLink: aliceChannelLink.(*channelLink), + aliceBlockEpoch: aliceEpochChan, + + firstBobChannelLink: firstBobChannelLink.(*channelLink), + bobFirstBlockEpoch: bobFirstEpochChan, + bobServer: bobServer, secondBobChannelLink: secondBobChannelLink.(*channelLink), - carolChannelLink: carolChannelLink.(*channelLink), - carolServer: carolServer, + bobSecondBlockEpoch: bobSecondEpochChan, + carolChannelLink: carolChannelLink.(*channelLink), + carolServer: carolServer, + carolBlockEpoch: carolBlockEpoch, + + feeEstimator: feeEstimator, globalPolicy: globalPolicy, } }