From 9b7b3fa3b6308b13a570bb804a4e55c40104aac1 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 16 Jan 2018 21:17:14 +0100 Subject: [PATCH 1/8] channellink: make BatchTicker and BatchSize configurable This commit introduces a new Ticker interface, that can be used to control when the batch timer should tick. This is done to be able to more easily control the ticker during tests. The batch timer is wrapped in the new BatchTicker struct, and made part of the config together with BatchSize. --- htlcswitch/link.go | 47 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 0c0d1b66..bdfaadfc 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -83,6 +83,34 @@ func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSa return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 } +// Ticker is an interface used to wrap a time.Ticker in a struct, +// making mocking it easier. +type Ticker interface { + Start() <-chan time.Time + Stop() +} + +// BatchTicker implements the Ticker interface, and wraps a time.Ticker. +type BatchTicker struct { + ticker *time.Ticker +} + +// NewBatchTicker returns a new BatchTicker that wraps the passed +// time.Ticker. +func NewBatchTicker(t *time.Ticker) *BatchTicker { + return &BatchTicker{t} +} + +// Start returns the tick channel for the underlying time.Ticker. +func (t *BatchTicker) Start() <-chan time.Time { + return t.ticker.C +} + +// Stop stops the underlying time.Ticker. +func (t *BatchTicker) Stop() { + t.ticker.Stop() +} + // ChannelLinkConfig defines the configuration for the channel link. ALL // elements within the configuration MUST be non-nil for channel link to carry // out its duties. @@ -167,6 +195,17 @@ type ChannelLinkConfig struct { // reestablishment message to the remote peer. It should be done if our // clients have been restarted, or remote peer have been reconnected. SyncStates bool + + // BatchTicker is the ticker that determines the interval that we'll + // use to check the batch to see if there're any updates we should + // flush out. By batching updates into a single commit, we attempt + // to increase throughput by maximizing the number of updates + // coalesced into a single commit. + BatchTicker Ticker + + // BatchSize is the max size of a batch of updates done to the link + // before we do a state update. + BatchSize uint32 } // channelLink is the service which drives a channel's commitment update @@ -588,8 +627,8 @@ func (l *channelLink) htlcManager() { } } - batchTimer := time.NewTicker(50 * time.Millisecond) - defer batchTimer.Stop() + batchTick := l.cfg.BatchTicker.Start() + defer l.cfg.BatchTicker.Stop() // TODO(roasbeef): fail chan in case of protocol violation out: @@ -667,7 +706,7 @@ out: break out } - case <-batchTimer.C: + case <-batchTick: // If the current batch is empty, then we have no work // here. if l.batchCounter == 0 { @@ -899,7 +938,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // If this newly added update exceeds the min batch size for adds, or // this is a settle request, then initiate an update. - if l.batchCounter >= 10 || isSettle { + if l.batchCounter >= l.cfg.BatchSize || isSettle { if err := l.updateCommitTx(); err != nil { l.fail("unable to update commitment: %v", err) return From cc050f183fc3c4ba4a287de520ab527f8d68c91d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 30 Jan 2018 18:53:39 -0500 Subject: [PATCH 2/8] htlcswitch tests: add mockTicker struct --- htlcswitch/mock.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 3c336ef2..f17029e0 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "testing" + "time" "io" "sync/atomic" @@ -617,3 +618,14 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S Spend: make(chan *chainntnfs.SpendDetail), }, nil } + +type mockTicker struct { + ticker <-chan time.Time +} + +func (m *mockTicker) Start() <-chan time.Time { + return m.ticker +} + +func (m *mockTicker) Stop() { +} From 26a80f86b8634765735b7bb56738f35eeb92ae0f Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 16 Jan 2018 21:18:14 +0100 Subject: [PATCH 3/8] peer: set BatchTicker and BatchSize in channellink config --- peer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/peer.go b/peer.go index e69255d3..62fdf116 100644 --- a/peer.go +++ b/peer.go @@ -397,6 +397,9 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { ) }, SyncStates: true, + BatchTicker: htlcswitch.NewBatchTicker( + time.NewTicker(50 * time.Millisecond)), + BatchSize: 10, } link := htlcswitch.NewChannelLink(linkCfg, lnChan, uint32(currentHeight)) @@ -1289,6 +1292,9 @@ out: ) }, SyncStates: false, + BatchTicker: htlcswitch.NewBatchTicker( + time.NewTicker(50 * time.Millisecond)), + BatchSize: 10, } link := htlcswitch.NewChannelLink(linkConfig, newChan, uint32(currentHeight)) From 83b368d20eb41c5967a93ba16e1dbd0381df3704 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 16 Jan 2018 21:19:04 +0100 Subject: [PATCH 4/8] lnwallet/channel: use remoteACKedIndex instead of logIndex in availableBalance --- lnwallet/channel.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 23f383b8..6c2b8712 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -4948,7 +4948,8 @@ func (lc *LightningChannel) availableBalance() (lnwire.MilliSatoshi, int64) { // Next we'll grab the current set of log updates that are still active // and haven't been garbage collected. - htlcView := lc.fetchHTLCView(lc.remoteUpdateLog.logIndex, + remoteACKedIndex := lc.localCommitChain.tip().theirMessageIndex + htlcView := lc.fetchHTLCView(remoteACKedIndex, lc.localUpdateLog.logIndex) feePerKw := lc.channelState.LocalCommitment.FeePerKw dustLimit := lc.channelState.LocalChanCfg.DustLimit From f83f47541d58aef7530f52221ee5f6f93f61d75e Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 16 Jan 2018 21:20:00 +0100 Subject: [PATCH 5/8] channel test: add TestDesyncHTLCs This commit adds a test that trigger a case where the balance could end up being negative when we used the logIndex when calculating the channel's available balance. This could happen when the logs got out of sync, and we would use the balance from a settled HTLC even though we wouldn't include it when signing the next state. --- lnwallet/channel_test.go | 70 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 03487741..6d903477 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -3914,6 +3914,13 @@ func TestChanAvailableBandwidth(t *testing.T) { t.Fatalf("unable to recv htlc cancel: %v", err) } + // We must do a state transition before the balance is available + // for Alice. + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("unable to complete alice's state "+ + "transition: %v", err) + } + // With the HTLC's settled in the log, we'll now assert that if we // initiate a state transition, then our guess was correct. assertBandwidthEstimateCorrect(false) @@ -4293,4 +4300,67 @@ func TestChannelUnilateralCloseHtlcResolution(t *testing.T) { } } +// TestDesyncHTLCs checks that we cannot add HTLCs that would make the +// balance negative, when the remote and local update logs are desynced. +func TestDesyncHTLCs(t *testing.T) { + t.Parallel() + + // We'll kick off the test by creating our channels which both are + // loaded with 5 BTC each. + aliceChannel, bobChannel, cleanUp, err := createTestChannels(1) + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer cleanUp() + + // First add one HTLC of value 4.1 BTC. + htlcAmt := lnwire.NewMSatFromSatoshis(4.1 * btcutil.SatoshiPerBitcoin) + htlc, _ := createHTLC(0, htlcAmt) + aliceIndex, err := aliceChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + bobIndex, err := bobChannel.ReceiveHTLC(htlc) + if err != nil { + t.Fatalf("unable to recv htlc: %v", err) + } + + // Lock this HTLC in. + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("unable to complete state update: %v", err) + } + + // Now let let Bob fail this HTLC. + if err := bobChannel.FailHTLC(bobIndex, []byte("failreason")); err != nil { + t.Fatalf("unable to cancel HTLC: %v", err) + } + if err := aliceChannel.ReceiveFailHTLC(aliceIndex, []byte("bad")); err != nil { + t.Fatalf("unable to recv htlc cancel: %v", err) + } + + // Alice now has gotten all here original balance (5 BTC) back, + // however, adding a new HTLC at this point SHOULD fail, since + // if she add the HTLC and sign the next state, Bob cannot assume + // she received the FailHTLC, and must assume she doesn't have + // the necessary balance available. + // + // We try adding an HTLC of value 1 BTC, which should fail + // because the balance is unavailable. + htlcAmt = lnwire.NewMSatFromSatoshis(1 * btcutil.SatoshiPerBitcoin) + htlc, _ = createHTLC(1, htlcAmt) + if _, err = aliceChannel.AddHTLC(htlc); err != ErrInsufficientBalance { + t.Fatalf("expected ErrInsufficientBalance, instead received: %v", + err) + } + + // Now do a state transition, which will ACK the FailHTLC, making + // Alice able to add the new HTLC. + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("unable to complete state update: %v", err) + } + if _, err = aliceChannel.AddHTLC(htlc); err != nil { + t.Fatalf("unable to add htlc: %v", err) + } +} + // TODO(roasbeef): testing.Quick test case for retrans!!! From ab75cd3a4dcd7d2aeff04dbe59bd7d8c25284756 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 16 Jan 2018 21:23:56 +0100 Subject: [PATCH 6/8] htlcswitch/test_utils: set BatchTicker mock and BatchSize This commit adds the mockTicker and BatchSize to the link config. It also exits the goroutines draining the HtlcUpdates gracefully. --- htlcswitch/test_utils.go | 49 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 85718582..ca9609c7 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -437,17 +437,21 @@ type threeHopNetwork struct { aliceServer *mockServer aliceChannelLink *channelLink aliceBlockEpoch chan *chainntnfs.BlockEpoch + aliceTicker *time.Ticker firstBobChannelLink *channelLink bobFirstBlockEpoch chan *chainntnfs.BlockEpoch + firstBobTicker *time.Ticker bobServer *mockServer secondBobChannelLink *channelLink bobSecondBlockEpoch chan *chainntnfs.BlockEpoch + secondBobTicker *time.Ticker carolChannelLink *channelLink carolServer *mockServer carolBlockEpoch chan *chainntnfs.BlockEpoch + carolTicker *time.Ticker feeEstimator *mockFeeEstimator @@ -625,6 +629,11 @@ func (n *threeHopNetwork) stop() { done <- struct{}{} }() + n.aliceTicker.Stop() + n.firstBobTicker.Stop() + n.secondBobTicker.Stop() + n.carolTicker.Stop() + for i := 0; i < 3; i++ { <-done } @@ -743,6 +752,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + aliceTicker := time.NewTicker(50 * time.Millisecond) aliceChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -763,6 +773,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{aliceTicker.C}, + BatchSize: 10, }, aliceChannel, startingHeight, @@ -772,7 +784,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-aliceChannelLink.(*channelLink).htlcUpdates + select { + case <-aliceChannelLink.(*channelLink).htlcUpdates: + case <-aliceChannelLink.(*channelLink).quit: + return + } } }() @@ -782,6 +798,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + firstBobTicker := time.NewTicker(50 * time.Millisecond) firstBobChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -802,6 +819,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{firstBobTicker.C}, + BatchSize: 10, }, firstBobChannel, startingHeight, @@ -811,7 +830,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-firstBobChannelLink.(*channelLink).htlcUpdates + select { + case <-firstBobChannelLink.(*channelLink).htlcUpdates: + case <-firstBobChannelLink.(*channelLink).quit: + return + } } }() @@ -821,6 +844,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + secondBobTicker := time.NewTicker(50 * time.Millisecond) secondBobChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -841,6 +865,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{secondBobTicker.C}, + BatchSize: 10, }, secondBobChannel, startingHeight, @@ -850,7 +876,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-secondBobChannelLink.(*channelLink).htlcUpdates + select { + case <-secondBobChannelLink.(*channelLink).htlcUpdates: + case <-secondBobChannelLink.(*channelLink).quit: + return + } } }() @@ -860,6 +890,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + carolTicker := time.NewTicker(50 * time.Millisecond) carolChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -880,6 +911,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{carolTicker.C}, + BatchSize: 10, }, carolChannel, startingHeight, @@ -889,7 +922,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-carolChannelLink.(*channelLink).htlcUpdates + select { + case <-carolChannelLink.(*channelLink).htlcUpdates: + case <-carolChannelLink.(*channelLink).quit: + return + } } }() @@ -897,17 +934,21 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, aliceServer: aliceServer, aliceChannelLink: aliceChannelLink.(*channelLink), aliceBlockEpoch: aliceEpochChan, + aliceTicker: aliceTicker, firstBobChannelLink: firstBobChannelLink.(*channelLink), bobFirstBlockEpoch: bobFirstEpochChan, + firstBobTicker: firstBobTicker, bobServer: bobServer, secondBobChannelLink: secondBobChannelLink.(*channelLink), bobSecondBlockEpoch: bobSecondEpochChan, + secondBobTicker: secondBobTicker, carolChannelLink: carolChannelLink.(*channelLink), carolServer: carolServer, carolBlockEpoch: carolBlockEpoch, + carolTicker: carolTicker, feeEstimator: feeEstimator, globalPolicy: globalPolicy, From 12d33278335b9f8e1c86a80ef524d37c838e5c4d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 16 Jan 2018 21:24:59 +0100 Subject: [PATCH 7/8] htlcswitch/link_test: update Bandwidth tests This commit updates the tests for checking a links Bandwidth() calculation, after the change that made us use the remoteACKedIndex instead of the logIndex when calculating it. The main result of this change is that we never consider incoming updates before they are acked, when calculating the bandwidth. This is because this was inconsistent with the state we actually end up signing later on. --- htlcswitch/link_test.go | 543 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 488 insertions(+), 55 deletions(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 76cdb81d..f71a4b0d 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1389,13 +1389,16 @@ func TestChannelLinkSingleHopMessageOrdering(t *testing.T) { type mockPeer struct { sync.Mutex - sentMsgs []lnwire.Message + sentMsgs chan lnwire.Message + quit chan struct{} } func (m *mockPeer) SendMessage(msg lnwire.Message) error { - m.Lock() - m.sentMsgs = append(m.sentMsgs, msg) - m.Unlock() + select { + case m.sentMsgs <- msg: + case <-m.quit: + return fmt.Errorf("mockPeer shutting down") + } return nil } func (m *mockPeer) WipeChannel(*wire.OutPoint) error { @@ -1406,19 +1409,11 @@ func (m *mockPeer) PubKey() [33]byte { } func (m *mockPeer) Disconnect(reason error) { } -func (m *mockPeer) popSentMsg() lnwire.Message { - m.Lock() - msg := m.sentMsgs[0] - m.sentMsgs[0] = nil - m.sentMsgs = m.sentMsgs[1:] - m.Unlock() - - return msg -} var _ Peer = (*mockPeer)(nil) -func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), error) { +func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, + *lnwallet.LightningChannel, chan time.Time, func(), error) { globalEpoch := &chainntnfs.BlockEpochEvent{ Epochs: make(chan *chainntnfs.BlockEpoch), Cancel: func() { @@ -1426,18 +1421,21 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro } chanID := lnwire.NewShortChanIDFromInt(4) - aliceChannel, _, fCleanUp, _, err := createTestChannel( + aliceChannel, bobChannel, fCleanUp, _, err := createTestChannel( alicePrivKey, bobPrivKey, chanAmt, chanAmt, chanID, ) if err != nil { - return nil, nil, err + return nil, nil, nil, nil, err } var ( invoiveRegistry = newMockRegistry() decoder = &mockIteratorDecoder{} obfuscator = newMockObfuscator() - alicePeer mockPeer + alicePeer = &mockPeer{ + sentMsgs: make(chan lnwire.Message, 2000), + quit: make(chan struct{}), + } globalPolicy = ForwardingPolicy{ MinHTLC: lnwire.NewMSatFromSatoshis(5), @@ -1451,9 +1449,11 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro preimageMap: make(map[[32]byte][]byte), } + t := make(chan time.Time) + ticker := &mockTicker{t} aliceCfg := ChannelLinkConfig{ FwrdingPolicy: globalPolicy, - Peer: &alicePeer, + Peer: alicePeer, Switch: New(Config{}), DecodeHopIterator: decoder.DecodeHopIterator, DecodeOnionObfuscator: func(io.Reader) (ErrorEncrypter, lnwire.FailCode) { @@ -1467,20 +1467,35 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro Registry: invoiveRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, BlockEpochs: globalEpoch, + BatchTicker: ticker, + // Make the BatchSize large enough to not + // trigger commit update automatically during tests. + BatchSize: 10000, } const startingHeight = 100 aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) if err := aliceLink.Start(); err != nil { - return nil, nil, err + return nil, nil, nil, nil, err } + go func() { + for { + select { + case <-aliceLink.(*channelLink).htlcUpdates: + case <-aliceLink.(*channelLink).quit: + return + } + } + }() cleanUp := func() { + close(alicePeer.quit) defer fCleanUp() defer aliceLink.Stop() + defer bobChannel.Stop() } - return aliceLink, cleanUp, nil + return aliceLink, bobChannel, t, cleanUp, nil } func assertLinkBandwidth(t *testing.T, link ChannelLink, @@ -1494,6 +1509,148 @@ func assertLinkBandwidth(t *testing.T, link ChannelLink, } } +// handleStateUpdate handles the messages sent from the link after +// the batch ticker has triggered a state update. +func handleStateUpdate(link *channelLink, + remoteChannel *lnwallet.LightningChannel) error { + sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs + var msg lnwire.Message + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive CommitSig from Alice") + } + + // The link should be sending a commit sig at this point. + commitSig, ok := msg.(*lnwire.CommitSig) + if !ok { + return fmt.Errorf("expected CommitSig, got %T", msg) + } + + // Let the remote channel receive the commit sig, and + // respond with a revocation + commitsig. + err := remoteChannel.ReceiveNewCommitment( + commitSig.CommitSig, commitSig.HtlcSigs) + if err != nil { + return err + } + + remoteRev, _, err := remoteChannel.RevokeCurrentCommitment() + if err != nil { + return err + } + link.HandleChannelUpdate(remoteRev) + + remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment() + if err != nil { + return err + } + commitSig = &lnwire.CommitSig{ + CommitSig: remoteSig, + HtlcSigs: remoteHtlcSigs, + } + link.HandleChannelUpdate(commitSig) + + // This should make the link respond with a revocation. + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive RevokeAndAck from Alice") + } + + revoke, ok := msg.(*lnwire.RevokeAndAck) + if !ok { + return fmt.Errorf("expected RevokeAndAck got %T", msg) + } + _, err = remoteChannel.ReceiveRevocation(revoke) + if err != nil { + return fmt.Errorf("unable to recieve "+ + "revocation: %v", err) + } + + return nil +} + +// updateState is used exchange the messages necessary to do a full state +// transition. If initiateUpdate=true, then this call will make the link +// trigger an update by sending on the batchTick channel, if not, it will +// make the remoteChannel initiate the state update. +func updateState(batchTick chan time.Time, link *channelLink, + remoteChannel *lnwallet.LightningChannel, + initiateUpdate bool) error { + sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs + + if initiateUpdate { + // Trigger update by ticking the batchTicker. + select { + case batchTick <- time.Now(): + case <-link.quit: + return fmt.Errorf("link shuttin down") + } + return handleStateUpdate(link, remoteChannel) + } + + // The remote is triggering the state update, emulate this by + // signing and sending CommitSig to the link. + remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment() + if err != nil { + return err + } + + commitSig := &lnwire.CommitSig{ + CommitSig: remoteSig, + HtlcSigs: remoteHtlcSigs, + } + link.HandleChannelUpdate(commitSig) + + // The link should respond with a revocation + commit sig. + var msg lnwire.Message + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive RevokeAndAck from Alice") + } + + revoke, ok := msg.(*lnwire.RevokeAndAck) + if !ok { + return fmt.Errorf("expected RevokeAndAck got %T", + msg) + } + _, err = remoteChannel.ReceiveRevocation(revoke) + if err != nil { + return fmt.Errorf("unable to recieve "+ + "revocation: %v", err) + } + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive CommitSig from Alice") + } + + commitSig, ok = msg.(*lnwire.CommitSig) + if !ok { + return fmt.Errorf("expected CommitSig, got %T", msg) + } + + err = remoteChannel.ReceiveNewCommitment( + commitSig.CommitSig, commitSig.HtlcSigs) + if err != nil { + return err + } + + // Lastly, send a revocation back to the link. + remoteRev, _, err := remoteChannel.RevokeCurrentCommitment() + if err != nil { + return err + } + link.HandleChannelUpdate(remoteRev) + + // Sleep to make sure Alice has handled the remote revocation. + time.Sleep(500 * time.Millisecond) + + return nil +} + // TestChannelLinkBandwidthConsistency ensures that the reported bandwidth of a // given ChannelLink is properly updated in response to downstream messages // from the switch, and upstream messages from its channel peer. @@ -1509,7 +1666,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // We'll start the test by creating a single instance of const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt) + link, bobChannel, tmr, cleanUp, err := newSingleLinkTestHarness(chanAmt) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -1517,11 +1674,18 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { var ( mockBlob [lnwire.OnionPacketSize]byte - coreChan = aliceLink.(*channelLink).channel - defaultCommitFee = coreChan.StateSnapshot().CommitFee + aliceLink = link.(*channelLink) + aliceChannel = aliceLink.channel + defaultCommitFee = aliceChannel.StateSnapshot().CommitFee aliceStartingBandwidth = aliceLink.Bandwidth() + aliceMsgs = aliceLink.cfg.Peer.(*mockPeer).sentMsgs ) + // We put Alice into HodlHTLC mode, such that she won't settle + // incoming HTLCs automatically. + aliceLink.cfg.HodlHTLC = true + aliceLink.cfg.DebugHTLC = true + estimator := &lnwallet.StaticFeeEstimator{ FeeRate: 24, } @@ -1552,17 +1716,61 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { } aliceLink.HandleSwitchPacket(&addPkt) time.Sleep(time.Millisecond * 500) + + // The resulting bandwidth should reflect that Alice is paying the + // htlc amount in addition to the htlc fee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) + + // Alice should send the HTLC to Bob. + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + bobIndex, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + + // Lock in the HTLC. + if err := updateState(tmr, aliceLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Locking in the HTLC should not change Alice's bandwidth. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) // If we now send in a valid HTLC settle for the prior HTLC we added, // then the bandwidth should remain unchanged as the remote party will // gain additional channel balance. + err = bobChannel.SettleHTLC(invoice.Terms.PaymentPreimage, bobIndex) + if err != nil { + t.Fatalf("unable to settle htlc: %v", err) + } htlcSettle := &lnwire.UpdateFufillHTLC{ - ID: 0, + ID: bobIndex, PaymentPreimage: invoice.Terms.PaymentPreimage, } aliceLink.HandleChannelUpdate(htlcSettle) time.Sleep(time.Millisecond * 500) + + // Since the settle is not locked in yet, Alice's bandwidth should still + // reflect that she has to pay the fee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) + + // Lock in the settle. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Now that it is settled, Alice should have gotten the htlc fee back. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) // Next, we'll add another HTLC initiated by the switch (of the same @@ -1576,31 +1784,96 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { } aliceLink.HandleSwitchPacket(&addPkt) time.Sleep(time.Millisecond * 500) + + // Again, Alice's bandwidth decreases by htlcAmt+htlcFee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-2*htlcAmt-htlcFee) + + // Alice will send the HTLC to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + addHtlc, ok = msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + bobIndex, err = bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + + // Lock in the HTLC, which should not affect the bandwidth. + if err := updateState(tmr, aliceLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee) // With that processed, we'll now generate an HTLC fail (sent by the // remote peer) to cancel the HTLC we just added. This should return us // back to the bandwidth of the link right before the HTLC was sent. + err = bobChannel.FailHTLC(bobIndex, []byte("nop")) + if err != nil { + t.Fatalf("unable to fail htlc: %v", err) + } failMsg := &lnwire.UpdateFailHTLC{ - ID: 1, // As this is the second HTLC. + ID: bobIndex, Reason: lnwire.OpaqueReason([]byte("nop")), } aliceLink.HandleChannelUpdate(failMsg) time.Sleep(time.Millisecond * 500) + + // Before the Fail gets locked in, the bandwidth should remain unchanged. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee) + + // Lock in the Fail. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Now the bancdwidth should reflect the failed HTLC. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) // Moving along, we'll now receive a new HTLC from the remote peer, // with an ID of 0 as this is their first HTLC. The bandwidth should // remain unchanged (but Alice will need to pay the fee for the extra // HTLC). - updateMsg := &lnwire.UpdateAddHTLC{ - ID: 0, - Amount: htlcAmt, - Expiry: 9, - PaymentHash: htlc.PaymentHash, // Re-using the same payment hash. + htlcAmt, totalTimelock, hops := generateHops(htlcAmt, testStartingHeight, + aliceLink) + blob, err := generateRoute(hops...) + if err != nil { + t.Fatalf("unable to gen route: %v", err) } - aliceLink.HandleChannelUpdate(updateMsg) - time.Sleep(time.Millisecond * 500) + invoice, htlc, err = generatePayment(htlcAmt, htlcAmt, + totalTimelock, blob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + + // We must add the invoice to the registry, such that Alice expects + // this payment. + err = aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice) + if err != nil { + t.Fatalf("unable to add invoice to registry: %v", err) + } + + bobIndex, err = bobChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + aliceLink.HandleChannelUpdate(htlc) + + // Alice's balance remains unchanged until this HTLC is locked in. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) + + // Lock in the HTLC. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Since Bob is adding this HTLC, Alice only needs to pay the fee. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) // Next, we'll settle the HTLC with our knowledge of the pre-image that @@ -1608,32 +1881,112 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // of the channel should now be re-balanced to the starting point. settlePkt := htlcPacket{ htlc: &lnwire.UpdateFufillHTLC{ - ID: 2, + ID: bobIndex, PaymentPreimage: invoice.Terms.PaymentPreimage, }, } + aliceLink.HandleSwitchPacket(&settlePkt) time.Sleep(time.Millisecond * 500) + + // Settling this HTLC gives Alice all her original bandwidth back. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) - // Finally, we'll test the scenario of failing an HTLC received by the - // remote node. This should result in no perceived bandwidth changes. - htlcAdd := &lnwire.UpdateAddHTLC{ - ID: 1, - Amount: htlcAmt, - Expiry: 9, - PaymentHash: htlc.PaymentHash, + // Alice wil send the Settle to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") } - aliceLink.HandleChannelUpdate(htlcAdd) + + settleHtlc, ok := msg.(*lnwire.UpdateFufillHTLC) + if !ok { + t.Fatalf("expected UpdateFufillHTLC, got %T", msg) + } + pre := settleHtlc.PaymentPreimage + idx := settleHtlc.ID + err = bobChannel.ReceiveHTLCSettle(pre, idx) + if err != nil { + t.Fatalf("unable to receive settle: %v", err) + } + + // After a settle the link should do a state transition automatically, + // so we don't have to trigger it. + if err := handleStateUpdate(aliceLink, bobChannel); err != nil { + t.Fatalf("unable to update state: %v", err) + } + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + + // Finally, we'll test the scenario of failing an HTLC received from the + // remote node. This should result in no perceived bandwidth changes. + htlcAmt, totalTimelock, hops = generateHops(htlcAmt, testStartingHeight, + aliceLink) + blob, err = generateRoute(hops...) + if err != nil { + t.Fatalf("unable to gen route: %v", err) + } + invoice, htlc, err = generatePayment(htlcAmt, htlcAmt, totalTimelock, blob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + if err := aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice); err != nil { + t.Fatalf("unable to add invoice to registry: %v", err) + } + + // Since we are not using the link to handle HTLC IDs for the + // remote channel, we must set this manually. This is the second + // HTLC we add, hence it should have an ID of 1 (Alice's channel + // link will set this automatically for her side). + htlc.ID = 1 + bobIndex, err = bobChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + aliceLink.HandleChannelUpdate(htlc) time.Sleep(time.Millisecond * 500) + + // No changes before the HTLC is locked in. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // After lock-in, Alice will have to pay the htlc fee. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcFee) + + // Now fail this HTLC. failPkt := htlcPacket{ + incomingHTLCID: bobIndex, htlc: &lnwire.UpdateFailHTLC{ - ID: 3, + ID: bobIndex, }, } aliceLink.HandleSwitchPacket(&failPkt) time.Sleep(time.Millisecond * 500) + + // Alice should get all her bandwidth back. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + + // Message should be sent to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + failMsg, ok = msg.(*lnwire.UpdateFailHTLC) + if !ok { + t.Fatalf("expected UpdateFailHTLC, got %T", msg) + } + err = bobChannel.ReceiveFailHTLC(failMsg.ID, []byte("fail")) + if err != nil { + t.Fatalf("failed receiving fail htlc: %v", err) + } + + // After failing an HTLC, the link will automatically trigger + // a state update. + if err := handleStateUpdate(aliceLink, bobChannel); err != nil { + t.Fatalf("unable to update state: %v", err) + } assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) } @@ -1646,7 +1999,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { var mockBlob [lnwire.OnionPacketSize]byte const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt) + aliceLink, bobChannel, batchTick, cleanUp, err := newSingleLinkTestHarness(chanAmt) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -1656,6 +2009,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { coreLink = aliceLink.(*channelLink) defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee aliceStartingBandwidth = aliceLink.Bandwidth() + aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs ) estimator := &lnwallet.StaticFeeEstimator{ @@ -1667,6 +2021,11 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { } feePerKw := feePerWeight * 1000 + // The starting bandwidth of the channel should be exactly the amount + // that we created the channel between her and Bob. + expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) + addLinkHTLC := func(amt lnwire.MilliSatoshi) [32]byte { invoice, htlc, err := generatePayment(amt, amt, 5, mockBlob) if err != nil { @@ -1676,7 +2035,6 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { htlc: htlc, amount: amt, }) - return invoice.Terms.PaymentPreimage } @@ -1694,13 +2052,39 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { totalHtlcAmt += htlcAmt } + // The HTLCs should all be sent to the remote. + var msg lnwire.Message + for i := 0; i < numHTLCs; i++ { + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + _, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + } + + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): + } + // TODO(roasbeef): increase sleep time.Sleep(time.Second * 1) commitWeight := lnwallet.CommitWeight + lnwallet.HtlcWeight*numHTLCs htlcFee := lnwire.NewMSatFromSatoshis( btcutil.Amount((int64(feePerKw) * commitWeight) / 1000), ) - expectedBandwidth := aliceStartingBandwidth - totalHtlcAmt - htlcFee + expectedBandwidth = aliceStartingBandwidth - totalHtlcAmt - htlcFee expectedBandwidth += lnwire.NewMSatFromSatoshis(defaultCommitFee) assertLinkBandwidth(t, aliceLink, expectedBandwidth) @@ -1722,23 +2106,43 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { totalHtlcAmt += htlcAmt } + // No messages should be sent to the remote at this point. + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): + } + time.Sleep(time.Second * 2) expectedBandwidth -= (numOverFlowHTLCs * htlcAmt) assertLinkBandwidth(t, aliceLink, expectedBandwidth) // With the extra HTLC's added, the overflow queue should now be - // populated with our 10 additional HTLC's. + // populated with our 20 additional HTLC's. if coreLink.overflowQueue.Length() != numOverFlowHTLCs { t.Fatalf("wrong overflow queue length: expected %v, got %v", numOverFlowHTLCs, coreLink.overflowQueue.Length()) } - // At this point, we'll now settle one of the HTLC's that were added. - // The resulting bandwidth change should be non-existent as this will - // simply transfer over funds to the remote party. However, the size of - // the overflow queue should be decreasing + // We trigger a state update to lock in the HTLCs. This should + // not change Alice's bandwidth. + if err := updateState(batchTick, coreLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + time.Sleep(time.Millisecond * 500) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) + + // At this point, we'll now settle enough HTLCs to empty the overflow + // queue. The resulting bandwidth change should be non-existent as this + // will simply transfer over funds to the remote party. However, the + // size of the overflow queue should be decreasing for i := 0; i < numOverFlowHTLCs; i++ { + err = bobChannel.SettleHTLC(preImages[i], uint64(i)) + if err != nil { + t.Fatalf("unable to settle htlc: %v", err) + } + htlcSettle := &lnwire.UpdateFufillHTLC{ ID: uint64(i), PaymentPreimage: preImages[i], @@ -1746,19 +2150,48 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { aliceLink.HandleChannelUpdate(htlcSettle) time.Sleep(time.Millisecond * 50) + } + time.Sleep(time.Millisecond * 500) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) - // As we're not actually initiating a full state update, we'll - // trigger a free-slot signal manually here. - coreLink.overflowQueue.SignalFreeSlot() + // We trigger a state update to lock in the Settles. + if err := updateState(batchTick, coreLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // After the state update is done, Alice should start sending + // HTLCs from the overflow queue. + for i := 0; i < numOverFlowHTLCs; i++ { + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + _, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + } + + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): } - time.Sleep(time.Millisecond * 500) assertLinkBandwidth(t, aliceLink, expectedBandwidth) // Finally, at this point, the queue itself should be fully empty. As // enough slots have been drained from the commitment transaction to // allocate the queue items to. - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 500) if coreLink.overflowQueue.Length() != 0 { t.Fatalf("wrong overflow queue length: expected %v, got %v", 0, coreLink.overflowQueue.Length()) From 9f2ec87ed6211cf725e3252a6438d909966159e4 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 1 Feb 2018 09:32:57 -0500 Subject: [PATCH 8/8] lnwallet/sigpool: exit SubmitSignBatch on quit --- lnwallet/sigpool.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go index 2354ca4e..1e8cf3a6 100644 --- a/lnwallet/sigpool.go +++ b/lnwallet/sigpool.go @@ -259,6 +259,8 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) { case s.signJobs <- job: case <-job.cancel: // TODO(roasbeef): return error? + case <-s.quit: + return } } }