diff --git a/htlcswitch/link.go b/htlcswitch/link.go index ec65c6c0..f2f059dd 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -83,6 +83,34 @@ func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSa return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 } +// Ticker is an interface used to wrap a time.Ticker in a struct, +// making mocking it easier. +type Ticker interface { + Start() <-chan time.Time + Stop() +} + +// BatchTicker implements the Ticker interface, and wraps a time.Ticker. +type BatchTicker struct { + ticker *time.Ticker +} + +// NewBatchTicker returns a new BatchTicker that wraps the passed +// time.Ticker. +func NewBatchTicker(t *time.Ticker) *BatchTicker { + return &BatchTicker{t} +} + +// Start returns the tick channel for the underlying time.Ticker. +func (t *BatchTicker) Start() <-chan time.Time { + return t.ticker.C +} + +// Stop stops the underlying time.Ticker. +func (t *BatchTicker) Stop() { + t.ticker.Stop() +} + // ChannelLinkConfig defines the configuration for the channel link. ALL // elements within the configuration MUST be non-nil for channel link to carry // out its duties. @@ -167,6 +195,17 @@ type ChannelLinkConfig struct { // reestablishment message to the remote peer. It should be done if our // clients have been restarted, or remote peer have been reconnected. SyncStates bool + + // BatchTicker is the ticker that determines the interval that we'll + // use to check the batch to see if there're any updates we should + // flush out. By batching updates into a single commit, we attempt + // to increase throughput by maximizing the number of updates + // coalesced into a single commit. + BatchTicker Ticker + + // BatchSize is the max size of a batch of updates done to the link + // before we do a state update. + BatchSize uint32 } // channelLink is the service which drives a channel's commitment update @@ -594,8 +633,8 @@ func (l *channelLink) htlcManager() { } } - batchTimer := time.NewTicker(50 * time.Millisecond) - defer batchTimer.Stop() + batchTick := l.cfg.BatchTicker.Start() + defer l.cfg.BatchTicker.Stop() // TODO(roasbeef): fail chan in case of protocol violation out: @@ -673,7 +712,7 @@ out: break out } - case <-batchTimer.C: + case <-batchTick: // If the current batch is empty, then we have no work // here. if l.batchCounter == 0 { @@ -905,7 +944,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // If this newly added update exceeds the min batch size for adds, or // this is a settle request, then initiate an update. - if l.batchCounter >= 10 || isSettle { + if l.batchCounter >= l.cfg.BatchSize || isSettle { if err := l.updateCommitTx(); err != nil { l.fail("unable to update commitment: %v", err) return diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 76cdb81d..f71a4b0d 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1389,13 +1389,16 @@ func TestChannelLinkSingleHopMessageOrdering(t *testing.T) { type mockPeer struct { sync.Mutex - sentMsgs []lnwire.Message + sentMsgs chan lnwire.Message + quit chan struct{} } func (m *mockPeer) SendMessage(msg lnwire.Message) error { - m.Lock() - m.sentMsgs = append(m.sentMsgs, msg) - m.Unlock() + select { + case m.sentMsgs <- msg: + case <-m.quit: + return fmt.Errorf("mockPeer shutting down") + } return nil } func (m *mockPeer) WipeChannel(*wire.OutPoint) error { @@ -1406,19 +1409,11 @@ func (m *mockPeer) PubKey() [33]byte { } func (m *mockPeer) Disconnect(reason error) { } -func (m *mockPeer) popSentMsg() lnwire.Message { - m.Lock() - msg := m.sentMsgs[0] - m.sentMsgs[0] = nil - m.sentMsgs = m.sentMsgs[1:] - m.Unlock() - - return msg -} var _ Peer = (*mockPeer)(nil) -func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), error) { +func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, + *lnwallet.LightningChannel, chan time.Time, func(), error) { globalEpoch := &chainntnfs.BlockEpochEvent{ Epochs: make(chan *chainntnfs.BlockEpoch), Cancel: func() { @@ -1426,18 +1421,21 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro } chanID := lnwire.NewShortChanIDFromInt(4) - aliceChannel, _, fCleanUp, _, err := createTestChannel( + aliceChannel, bobChannel, fCleanUp, _, err := createTestChannel( alicePrivKey, bobPrivKey, chanAmt, chanAmt, chanID, ) if err != nil { - return nil, nil, err + return nil, nil, nil, nil, err } var ( invoiveRegistry = newMockRegistry() decoder = &mockIteratorDecoder{} obfuscator = newMockObfuscator() - alicePeer mockPeer + alicePeer = &mockPeer{ + sentMsgs: make(chan lnwire.Message, 2000), + quit: make(chan struct{}), + } globalPolicy = ForwardingPolicy{ MinHTLC: lnwire.NewMSatFromSatoshis(5), @@ -1451,9 +1449,11 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro preimageMap: make(map[[32]byte][]byte), } + t := make(chan time.Time) + ticker := &mockTicker{t} aliceCfg := ChannelLinkConfig{ FwrdingPolicy: globalPolicy, - Peer: &alicePeer, + Peer: alicePeer, Switch: New(Config{}), DecodeHopIterator: decoder.DecodeHopIterator, DecodeOnionObfuscator: func(io.Reader) (ErrorEncrypter, lnwire.FailCode) { @@ -1467,20 +1467,35 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro Registry: invoiveRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, BlockEpochs: globalEpoch, + BatchTicker: ticker, + // Make the BatchSize large enough to not + // trigger commit update automatically during tests. + BatchSize: 10000, } const startingHeight = 100 aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) if err := aliceLink.Start(); err != nil { - return nil, nil, err + return nil, nil, nil, nil, err } + go func() { + for { + select { + case <-aliceLink.(*channelLink).htlcUpdates: + case <-aliceLink.(*channelLink).quit: + return + } + } + }() cleanUp := func() { + close(alicePeer.quit) defer fCleanUp() defer aliceLink.Stop() + defer bobChannel.Stop() } - return aliceLink, cleanUp, nil + return aliceLink, bobChannel, t, cleanUp, nil } func assertLinkBandwidth(t *testing.T, link ChannelLink, @@ -1494,6 +1509,148 @@ func assertLinkBandwidth(t *testing.T, link ChannelLink, } } +// handleStateUpdate handles the messages sent from the link after +// the batch ticker has triggered a state update. +func handleStateUpdate(link *channelLink, + remoteChannel *lnwallet.LightningChannel) error { + sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs + var msg lnwire.Message + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive CommitSig from Alice") + } + + // The link should be sending a commit sig at this point. + commitSig, ok := msg.(*lnwire.CommitSig) + if !ok { + return fmt.Errorf("expected CommitSig, got %T", msg) + } + + // Let the remote channel receive the commit sig, and + // respond with a revocation + commitsig. + err := remoteChannel.ReceiveNewCommitment( + commitSig.CommitSig, commitSig.HtlcSigs) + if err != nil { + return err + } + + remoteRev, _, err := remoteChannel.RevokeCurrentCommitment() + if err != nil { + return err + } + link.HandleChannelUpdate(remoteRev) + + remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment() + if err != nil { + return err + } + commitSig = &lnwire.CommitSig{ + CommitSig: remoteSig, + HtlcSigs: remoteHtlcSigs, + } + link.HandleChannelUpdate(commitSig) + + // This should make the link respond with a revocation. + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive RevokeAndAck from Alice") + } + + revoke, ok := msg.(*lnwire.RevokeAndAck) + if !ok { + return fmt.Errorf("expected RevokeAndAck got %T", msg) + } + _, err = remoteChannel.ReceiveRevocation(revoke) + if err != nil { + return fmt.Errorf("unable to recieve "+ + "revocation: %v", err) + } + + return nil +} + +// updateState is used exchange the messages necessary to do a full state +// transition. If initiateUpdate=true, then this call will make the link +// trigger an update by sending on the batchTick channel, if not, it will +// make the remoteChannel initiate the state update. +func updateState(batchTick chan time.Time, link *channelLink, + remoteChannel *lnwallet.LightningChannel, + initiateUpdate bool) error { + sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs + + if initiateUpdate { + // Trigger update by ticking the batchTicker. + select { + case batchTick <- time.Now(): + case <-link.quit: + return fmt.Errorf("link shuttin down") + } + return handleStateUpdate(link, remoteChannel) + } + + // The remote is triggering the state update, emulate this by + // signing and sending CommitSig to the link. + remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment() + if err != nil { + return err + } + + commitSig := &lnwire.CommitSig{ + CommitSig: remoteSig, + HtlcSigs: remoteHtlcSigs, + } + link.HandleChannelUpdate(commitSig) + + // The link should respond with a revocation + commit sig. + var msg lnwire.Message + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive RevokeAndAck from Alice") + } + + revoke, ok := msg.(*lnwire.RevokeAndAck) + if !ok { + return fmt.Errorf("expected RevokeAndAck got %T", + msg) + } + _, err = remoteChannel.ReceiveRevocation(revoke) + if err != nil { + return fmt.Errorf("unable to recieve "+ + "revocation: %v", err) + } + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive CommitSig from Alice") + } + + commitSig, ok = msg.(*lnwire.CommitSig) + if !ok { + return fmt.Errorf("expected CommitSig, got %T", msg) + } + + err = remoteChannel.ReceiveNewCommitment( + commitSig.CommitSig, commitSig.HtlcSigs) + if err != nil { + return err + } + + // Lastly, send a revocation back to the link. + remoteRev, _, err := remoteChannel.RevokeCurrentCommitment() + if err != nil { + return err + } + link.HandleChannelUpdate(remoteRev) + + // Sleep to make sure Alice has handled the remote revocation. + time.Sleep(500 * time.Millisecond) + + return nil +} + // TestChannelLinkBandwidthConsistency ensures that the reported bandwidth of a // given ChannelLink is properly updated in response to downstream messages // from the switch, and upstream messages from its channel peer. @@ -1509,7 +1666,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // We'll start the test by creating a single instance of const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt) + link, bobChannel, tmr, cleanUp, err := newSingleLinkTestHarness(chanAmt) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -1517,11 +1674,18 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { var ( mockBlob [lnwire.OnionPacketSize]byte - coreChan = aliceLink.(*channelLink).channel - defaultCommitFee = coreChan.StateSnapshot().CommitFee + aliceLink = link.(*channelLink) + aliceChannel = aliceLink.channel + defaultCommitFee = aliceChannel.StateSnapshot().CommitFee aliceStartingBandwidth = aliceLink.Bandwidth() + aliceMsgs = aliceLink.cfg.Peer.(*mockPeer).sentMsgs ) + // We put Alice into HodlHTLC mode, such that she won't settle + // incoming HTLCs automatically. + aliceLink.cfg.HodlHTLC = true + aliceLink.cfg.DebugHTLC = true + estimator := &lnwallet.StaticFeeEstimator{ FeeRate: 24, } @@ -1552,17 +1716,61 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { } aliceLink.HandleSwitchPacket(&addPkt) time.Sleep(time.Millisecond * 500) + + // The resulting bandwidth should reflect that Alice is paying the + // htlc amount in addition to the htlc fee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) + + // Alice should send the HTLC to Bob. + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + bobIndex, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + + // Lock in the HTLC. + if err := updateState(tmr, aliceLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Locking in the HTLC should not change Alice's bandwidth. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) // If we now send in a valid HTLC settle for the prior HTLC we added, // then the bandwidth should remain unchanged as the remote party will // gain additional channel balance. + err = bobChannel.SettleHTLC(invoice.Terms.PaymentPreimage, bobIndex) + if err != nil { + t.Fatalf("unable to settle htlc: %v", err) + } htlcSettle := &lnwire.UpdateFufillHTLC{ - ID: 0, + ID: bobIndex, PaymentPreimage: invoice.Terms.PaymentPreimage, } aliceLink.HandleChannelUpdate(htlcSettle) time.Sleep(time.Millisecond * 500) + + // Since the settle is not locked in yet, Alice's bandwidth should still + // reflect that she has to pay the fee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) + + // Lock in the settle. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Now that it is settled, Alice should have gotten the htlc fee back. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) // Next, we'll add another HTLC initiated by the switch (of the same @@ -1576,31 +1784,96 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { } aliceLink.HandleSwitchPacket(&addPkt) time.Sleep(time.Millisecond * 500) + + // Again, Alice's bandwidth decreases by htlcAmt+htlcFee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-2*htlcAmt-htlcFee) + + // Alice will send the HTLC to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + addHtlc, ok = msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + bobIndex, err = bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + + // Lock in the HTLC, which should not affect the bandwidth. + if err := updateState(tmr, aliceLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee) // With that processed, we'll now generate an HTLC fail (sent by the // remote peer) to cancel the HTLC we just added. This should return us // back to the bandwidth of the link right before the HTLC was sent. + err = bobChannel.FailHTLC(bobIndex, []byte("nop")) + if err != nil { + t.Fatalf("unable to fail htlc: %v", err) + } failMsg := &lnwire.UpdateFailHTLC{ - ID: 1, // As this is the second HTLC. + ID: bobIndex, Reason: lnwire.OpaqueReason([]byte("nop")), } aliceLink.HandleChannelUpdate(failMsg) time.Sleep(time.Millisecond * 500) + + // Before the Fail gets locked in, the bandwidth should remain unchanged. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee) + + // Lock in the Fail. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Now the bancdwidth should reflect the failed HTLC. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) // Moving along, we'll now receive a new HTLC from the remote peer, // with an ID of 0 as this is their first HTLC. The bandwidth should // remain unchanged (but Alice will need to pay the fee for the extra // HTLC). - updateMsg := &lnwire.UpdateAddHTLC{ - ID: 0, - Amount: htlcAmt, - Expiry: 9, - PaymentHash: htlc.PaymentHash, // Re-using the same payment hash. + htlcAmt, totalTimelock, hops := generateHops(htlcAmt, testStartingHeight, + aliceLink) + blob, err := generateRoute(hops...) + if err != nil { + t.Fatalf("unable to gen route: %v", err) } - aliceLink.HandleChannelUpdate(updateMsg) - time.Sleep(time.Millisecond * 500) + invoice, htlc, err = generatePayment(htlcAmt, htlcAmt, + totalTimelock, blob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + + // We must add the invoice to the registry, such that Alice expects + // this payment. + err = aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice) + if err != nil { + t.Fatalf("unable to add invoice to registry: %v", err) + } + + bobIndex, err = bobChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + aliceLink.HandleChannelUpdate(htlc) + + // Alice's balance remains unchanged until this HTLC is locked in. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) + + // Lock in the HTLC. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Since Bob is adding this HTLC, Alice only needs to pay the fee. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) // Next, we'll settle the HTLC with our knowledge of the pre-image that @@ -1608,32 +1881,112 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // of the channel should now be re-balanced to the starting point. settlePkt := htlcPacket{ htlc: &lnwire.UpdateFufillHTLC{ - ID: 2, + ID: bobIndex, PaymentPreimage: invoice.Terms.PaymentPreimage, }, } + aliceLink.HandleSwitchPacket(&settlePkt) time.Sleep(time.Millisecond * 500) + + // Settling this HTLC gives Alice all her original bandwidth back. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) - // Finally, we'll test the scenario of failing an HTLC received by the - // remote node. This should result in no perceived bandwidth changes. - htlcAdd := &lnwire.UpdateAddHTLC{ - ID: 1, - Amount: htlcAmt, - Expiry: 9, - PaymentHash: htlc.PaymentHash, + // Alice wil send the Settle to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") } - aliceLink.HandleChannelUpdate(htlcAdd) + + settleHtlc, ok := msg.(*lnwire.UpdateFufillHTLC) + if !ok { + t.Fatalf("expected UpdateFufillHTLC, got %T", msg) + } + pre := settleHtlc.PaymentPreimage + idx := settleHtlc.ID + err = bobChannel.ReceiveHTLCSettle(pre, idx) + if err != nil { + t.Fatalf("unable to receive settle: %v", err) + } + + // After a settle the link should do a state transition automatically, + // so we don't have to trigger it. + if err := handleStateUpdate(aliceLink, bobChannel); err != nil { + t.Fatalf("unable to update state: %v", err) + } + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + + // Finally, we'll test the scenario of failing an HTLC received from the + // remote node. This should result in no perceived bandwidth changes. + htlcAmt, totalTimelock, hops = generateHops(htlcAmt, testStartingHeight, + aliceLink) + blob, err = generateRoute(hops...) + if err != nil { + t.Fatalf("unable to gen route: %v", err) + } + invoice, htlc, err = generatePayment(htlcAmt, htlcAmt, totalTimelock, blob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + if err := aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice); err != nil { + t.Fatalf("unable to add invoice to registry: %v", err) + } + + // Since we are not using the link to handle HTLC IDs for the + // remote channel, we must set this manually. This is the second + // HTLC we add, hence it should have an ID of 1 (Alice's channel + // link will set this automatically for her side). + htlc.ID = 1 + bobIndex, err = bobChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + aliceLink.HandleChannelUpdate(htlc) time.Sleep(time.Millisecond * 500) + + // No changes before the HTLC is locked in. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // After lock-in, Alice will have to pay the htlc fee. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcFee) + + // Now fail this HTLC. failPkt := htlcPacket{ + incomingHTLCID: bobIndex, htlc: &lnwire.UpdateFailHTLC{ - ID: 3, + ID: bobIndex, }, } aliceLink.HandleSwitchPacket(&failPkt) time.Sleep(time.Millisecond * 500) + + // Alice should get all her bandwidth back. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + + // Message should be sent to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + failMsg, ok = msg.(*lnwire.UpdateFailHTLC) + if !ok { + t.Fatalf("expected UpdateFailHTLC, got %T", msg) + } + err = bobChannel.ReceiveFailHTLC(failMsg.ID, []byte("fail")) + if err != nil { + t.Fatalf("failed receiving fail htlc: %v", err) + } + + // After failing an HTLC, the link will automatically trigger + // a state update. + if err := handleStateUpdate(aliceLink, bobChannel); err != nil { + t.Fatalf("unable to update state: %v", err) + } assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) } @@ -1646,7 +1999,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { var mockBlob [lnwire.OnionPacketSize]byte const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt) + aliceLink, bobChannel, batchTick, cleanUp, err := newSingleLinkTestHarness(chanAmt) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -1656,6 +2009,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { coreLink = aliceLink.(*channelLink) defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee aliceStartingBandwidth = aliceLink.Bandwidth() + aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs ) estimator := &lnwallet.StaticFeeEstimator{ @@ -1667,6 +2021,11 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { } feePerKw := feePerWeight * 1000 + // The starting bandwidth of the channel should be exactly the amount + // that we created the channel between her and Bob. + expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) + addLinkHTLC := func(amt lnwire.MilliSatoshi) [32]byte { invoice, htlc, err := generatePayment(amt, amt, 5, mockBlob) if err != nil { @@ -1676,7 +2035,6 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { htlc: htlc, amount: amt, }) - return invoice.Terms.PaymentPreimage } @@ -1694,13 +2052,39 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { totalHtlcAmt += htlcAmt } + // The HTLCs should all be sent to the remote. + var msg lnwire.Message + for i := 0; i < numHTLCs; i++ { + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + _, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + } + + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): + } + // TODO(roasbeef): increase sleep time.Sleep(time.Second * 1) commitWeight := lnwallet.CommitWeight + lnwallet.HtlcWeight*numHTLCs htlcFee := lnwire.NewMSatFromSatoshis( btcutil.Amount((int64(feePerKw) * commitWeight) / 1000), ) - expectedBandwidth := aliceStartingBandwidth - totalHtlcAmt - htlcFee + expectedBandwidth = aliceStartingBandwidth - totalHtlcAmt - htlcFee expectedBandwidth += lnwire.NewMSatFromSatoshis(defaultCommitFee) assertLinkBandwidth(t, aliceLink, expectedBandwidth) @@ -1722,23 +2106,43 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { totalHtlcAmt += htlcAmt } + // No messages should be sent to the remote at this point. + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): + } + time.Sleep(time.Second * 2) expectedBandwidth -= (numOverFlowHTLCs * htlcAmt) assertLinkBandwidth(t, aliceLink, expectedBandwidth) // With the extra HTLC's added, the overflow queue should now be - // populated with our 10 additional HTLC's. + // populated with our 20 additional HTLC's. if coreLink.overflowQueue.Length() != numOverFlowHTLCs { t.Fatalf("wrong overflow queue length: expected %v, got %v", numOverFlowHTLCs, coreLink.overflowQueue.Length()) } - // At this point, we'll now settle one of the HTLC's that were added. - // The resulting bandwidth change should be non-existent as this will - // simply transfer over funds to the remote party. However, the size of - // the overflow queue should be decreasing + // We trigger a state update to lock in the HTLCs. This should + // not change Alice's bandwidth. + if err := updateState(batchTick, coreLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + time.Sleep(time.Millisecond * 500) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) + + // At this point, we'll now settle enough HTLCs to empty the overflow + // queue. The resulting bandwidth change should be non-existent as this + // will simply transfer over funds to the remote party. However, the + // size of the overflow queue should be decreasing for i := 0; i < numOverFlowHTLCs; i++ { + err = bobChannel.SettleHTLC(preImages[i], uint64(i)) + if err != nil { + t.Fatalf("unable to settle htlc: %v", err) + } + htlcSettle := &lnwire.UpdateFufillHTLC{ ID: uint64(i), PaymentPreimage: preImages[i], @@ -1746,19 +2150,48 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { aliceLink.HandleChannelUpdate(htlcSettle) time.Sleep(time.Millisecond * 50) + } + time.Sleep(time.Millisecond * 500) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) - // As we're not actually initiating a full state update, we'll - // trigger a free-slot signal manually here. - coreLink.overflowQueue.SignalFreeSlot() + // We trigger a state update to lock in the Settles. + if err := updateState(batchTick, coreLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // After the state update is done, Alice should start sending + // HTLCs from the overflow queue. + for i := 0; i < numOverFlowHTLCs; i++ { + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + _, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + } + + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): } - time.Sleep(time.Millisecond * 500) assertLinkBandwidth(t, aliceLink, expectedBandwidth) // Finally, at this point, the queue itself should be fully empty. As // enough slots have been drained from the commitment transaction to // allocate the queue items to. - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 500) if coreLink.overflowQueue.Length() != 0 { t.Fatalf("wrong overflow queue length: expected %v, got %v", 0, coreLink.overflowQueue.Length()) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 36aef97b..3addde3d 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "testing" + "time" "io" "sync/atomic" @@ -618,3 +619,14 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S Spend: make(chan *chainntnfs.SpendDetail), }, nil } + +type mockTicker struct { + ticker <-chan time.Time +} + +func (m *mockTicker) Start() <-chan time.Time { + return m.ticker +} + +func (m *mockTicker) Stop() { +} diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 85718582..ca9609c7 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -437,17 +437,21 @@ type threeHopNetwork struct { aliceServer *mockServer aliceChannelLink *channelLink aliceBlockEpoch chan *chainntnfs.BlockEpoch + aliceTicker *time.Ticker firstBobChannelLink *channelLink bobFirstBlockEpoch chan *chainntnfs.BlockEpoch + firstBobTicker *time.Ticker bobServer *mockServer secondBobChannelLink *channelLink bobSecondBlockEpoch chan *chainntnfs.BlockEpoch + secondBobTicker *time.Ticker carolChannelLink *channelLink carolServer *mockServer carolBlockEpoch chan *chainntnfs.BlockEpoch + carolTicker *time.Ticker feeEstimator *mockFeeEstimator @@ -625,6 +629,11 @@ func (n *threeHopNetwork) stop() { done <- struct{}{} }() + n.aliceTicker.Stop() + n.firstBobTicker.Stop() + n.secondBobTicker.Stop() + n.carolTicker.Stop() + for i := 0; i < 3; i++ { <-done } @@ -743,6 +752,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + aliceTicker := time.NewTicker(50 * time.Millisecond) aliceChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -763,6 +773,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{aliceTicker.C}, + BatchSize: 10, }, aliceChannel, startingHeight, @@ -772,7 +784,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-aliceChannelLink.(*channelLink).htlcUpdates + select { + case <-aliceChannelLink.(*channelLink).htlcUpdates: + case <-aliceChannelLink.(*channelLink).quit: + return + } } }() @@ -782,6 +798,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + firstBobTicker := time.NewTicker(50 * time.Millisecond) firstBobChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -802,6 +819,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{firstBobTicker.C}, + BatchSize: 10, }, firstBobChannel, startingHeight, @@ -811,7 +830,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-firstBobChannelLink.(*channelLink).htlcUpdates + select { + case <-firstBobChannelLink.(*channelLink).htlcUpdates: + case <-firstBobChannelLink.(*channelLink).quit: + return + } } }() @@ -821,6 +844,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + secondBobTicker := time.NewTicker(50 * time.Millisecond) secondBobChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -841,6 +865,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{secondBobTicker.C}, + BatchSize: 10, }, secondBobChannel, startingHeight, @@ -850,7 +876,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-secondBobChannelLink.(*channelLink).htlcUpdates + select { + case <-secondBobChannelLink.(*channelLink).htlcUpdates: + case <-secondBobChannelLink.(*channelLink).quit: + return + } } }() @@ -860,6 +890,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, Cancel: func() { }, } + carolTicker := time.NewTicker(50 * time.Millisecond) carolChannelLink := NewChannelLink( ChannelLinkConfig{ FwrdingPolicy: globalPolicy, @@ -880,6 +911,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, }, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, + BatchTicker: &mockTicker{carolTicker.C}, + BatchSize: 10, }, carolChannel, startingHeight, @@ -889,7 +922,11 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } go func() { for { - <-carolChannelLink.(*channelLink).htlcUpdates + select { + case <-carolChannelLink.(*channelLink).htlcUpdates: + case <-carolChannelLink.(*channelLink).quit: + return + } } }() @@ -897,17 +934,21 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, aliceServer: aliceServer, aliceChannelLink: aliceChannelLink.(*channelLink), aliceBlockEpoch: aliceEpochChan, + aliceTicker: aliceTicker, firstBobChannelLink: firstBobChannelLink.(*channelLink), bobFirstBlockEpoch: bobFirstEpochChan, + firstBobTicker: firstBobTicker, bobServer: bobServer, secondBobChannelLink: secondBobChannelLink.(*channelLink), bobSecondBlockEpoch: bobSecondEpochChan, + secondBobTicker: secondBobTicker, carolChannelLink: carolChannelLink.(*channelLink), carolServer: carolServer, carolBlockEpoch: carolBlockEpoch, + carolTicker: carolTicker, feeEstimator: feeEstimator, globalPolicy: globalPolicy, diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 23f383b8..6c2b8712 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -4948,7 +4948,8 @@ func (lc *LightningChannel) availableBalance() (lnwire.MilliSatoshi, int64) { // Next we'll grab the current set of log updates that are still active // and haven't been garbage collected. - htlcView := lc.fetchHTLCView(lc.remoteUpdateLog.logIndex, + remoteACKedIndex := lc.localCommitChain.tip().theirMessageIndex + htlcView := lc.fetchHTLCView(remoteACKedIndex, lc.localUpdateLog.logIndex) feePerKw := lc.channelState.LocalCommitment.FeePerKw dustLimit := lc.channelState.LocalChanCfg.DustLimit diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 03487741..6d903477 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -3914,6 +3914,13 @@ func TestChanAvailableBandwidth(t *testing.T) { t.Fatalf("unable to recv htlc cancel: %v", err) } + // We must do a state transition before the balance is available + // for Alice. + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("unable to complete alice's state "+ + "transition: %v", err) + } + // With the HTLC's settled in the log, we'll now assert that if we // initiate a state transition, then our guess was correct. assertBandwidthEstimateCorrect(false) @@ -4293,4 +4300,67 @@ func TestChannelUnilateralCloseHtlcResolution(t *testing.T) { } } +// TestDesyncHTLCs checks that we cannot add HTLCs that would make the +// balance negative, when the remote and local update logs are desynced. +func TestDesyncHTLCs(t *testing.T) { + t.Parallel() + + // We'll kick off the test by creating our channels which both are + // loaded with 5 BTC each. + aliceChannel, bobChannel, cleanUp, err := createTestChannels(1) + if err != nil { + t.Fatalf("unable to create test channels: %v", err) + } + defer cleanUp() + + // First add one HTLC of value 4.1 BTC. + htlcAmt := lnwire.NewMSatFromSatoshis(4.1 * btcutil.SatoshiPerBitcoin) + htlc, _ := createHTLC(0, htlcAmt) + aliceIndex, err := aliceChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + bobIndex, err := bobChannel.ReceiveHTLC(htlc) + if err != nil { + t.Fatalf("unable to recv htlc: %v", err) + } + + // Lock this HTLC in. + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("unable to complete state update: %v", err) + } + + // Now let let Bob fail this HTLC. + if err := bobChannel.FailHTLC(bobIndex, []byte("failreason")); err != nil { + t.Fatalf("unable to cancel HTLC: %v", err) + } + if err := aliceChannel.ReceiveFailHTLC(aliceIndex, []byte("bad")); err != nil { + t.Fatalf("unable to recv htlc cancel: %v", err) + } + + // Alice now has gotten all here original balance (5 BTC) back, + // however, adding a new HTLC at this point SHOULD fail, since + // if she add the HTLC and sign the next state, Bob cannot assume + // she received the FailHTLC, and must assume she doesn't have + // the necessary balance available. + // + // We try adding an HTLC of value 1 BTC, which should fail + // because the balance is unavailable. + htlcAmt = lnwire.NewMSatFromSatoshis(1 * btcutil.SatoshiPerBitcoin) + htlc, _ = createHTLC(1, htlcAmt) + if _, err = aliceChannel.AddHTLC(htlc); err != ErrInsufficientBalance { + t.Fatalf("expected ErrInsufficientBalance, instead received: %v", + err) + } + + // Now do a state transition, which will ACK the FailHTLC, making + // Alice able to add the new HTLC. + if err := forceStateTransition(aliceChannel, bobChannel); err != nil { + t.Fatalf("unable to complete state update: %v", err) + } + if _, err = aliceChannel.AddHTLC(htlc); err != nil { + t.Fatalf("unable to add htlc: %v", err) + } +} + // TODO(roasbeef): testing.Quick test case for retrans!!! diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go index 2354ca4e..1e8cf3a6 100644 --- a/lnwallet/sigpool.go +++ b/lnwallet/sigpool.go @@ -259,6 +259,8 @@ func (s *sigPool) SubmitSignBatch(signJobs []signJob) { case s.signJobs <- job: case <-job.cancel: // TODO(roasbeef): return error? + case <-s.quit: + return } } } diff --git a/peer.go b/peer.go index e69255d3..62fdf116 100644 --- a/peer.go +++ b/peer.go @@ -397,6 +397,9 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { ) }, SyncStates: true, + BatchTicker: htlcswitch.NewBatchTicker( + time.NewTicker(50 * time.Millisecond)), + BatchSize: 10, } link := htlcswitch.NewChannelLink(linkCfg, lnChan, uint32(currentHeight)) @@ -1289,6 +1292,9 @@ out: ) }, SyncStates: false, + BatchTicker: htlcswitch.NewBatchTicker( + time.NewTicker(50 * time.Millisecond)), + BatchSize: 10, } link := htlcswitch.NewChannelLink(linkConfig, newChan, uint32(currentHeight))