diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 76cdb81d..f71a4b0d 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -1389,13 +1389,16 @@ func TestChannelLinkSingleHopMessageOrdering(t *testing.T) { type mockPeer struct { sync.Mutex - sentMsgs []lnwire.Message + sentMsgs chan lnwire.Message + quit chan struct{} } func (m *mockPeer) SendMessage(msg lnwire.Message) error { - m.Lock() - m.sentMsgs = append(m.sentMsgs, msg) - m.Unlock() + select { + case m.sentMsgs <- msg: + case <-m.quit: + return fmt.Errorf("mockPeer shutting down") + } return nil } func (m *mockPeer) WipeChannel(*wire.OutPoint) error { @@ -1406,19 +1409,11 @@ func (m *mockPeer) PubKey() [33]byte { } func (m *mockPeer) Disconnect(reason error) { } -func (m *mockPeer) popSentMsg() lnwire.Message { - m.Lock() - msg := m.sentMsgs[0] - m.sentMsgs[0] = nil - m.sentMsgs = m.sentMsgs[1:] - m.Unlock() - - return msg -} var _ Peer = (*mockPeer)(nil) -func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), error) { +func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, + *lnwallet.LightningChannel, chan time.Time, func(), error) { globalEpoch := &chainntnfs.BlockEpochEvent{ Epochs: make(chan *chainntnfs.BlockEpoch), Cancel: func() { @@ -1426,18 +1421,21 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro } chanID := lnwire.NewShortChanIDFromInt(4) - aliceChannel, _, fCleanUp, _, err := createTestChannel( + aliceChannel, bobChannel, fCleanUp, _, err := createTestChannel( alicePrivKey, bobPrivKey, chanAmt, chanAmt, chanID, ) if err != nil { - return nil, nil, err + return nil, nil, nil, nil, err } var ( invoiveRegistry = newMockRegistry() decoder = &mockIteratorDecoder{} obfuscator = newMockObfuscator() - alicePeer mockPeer + alicePeer = &mockPeer{ + sentMsgs: make(chan lnwire.Message, 2000), + quit: make(chan struct{}), + } globalPolicy = ForwardingPolicy{ MinHTLC: lnwire.NewMSatFromSatoshis(5), @@ -1451,9 +1449,11 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro preimageMap: make(map[[32]byte][]byte), } + t := make(chan time.Time) + ticker := &mockTicker{t} aliceCfg := ChannelLinkConfig{ FwrdingPolicy: globalPolicy, - Peer: &alicePeer, + Peer: alicePeer, Switch: New(Config{}), DecodeHopIterator: decoder.DecodeHopIterator, DecodeOnionObfuscator: func(io.Reader) (ErrorEncrypter, lnwire.FailCode) { @@ -1467,20 +1467,35 @@ func newSingleLinkTestHarness(chanAmt btcutil.Amount) (ChannelLink, func(), erro Registry: invoiveRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, BlockEpochs: globalEpoch, + BatchTicker: ticker, + // Make the BatchSize large enough to not + // trigger commit update automatically during tests. + BatchSize: 10000, } const startingHeight = 100 aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight) if err := aliceLink.Start(); err != nil { - return nil, nil, err + return nil, nil, nil, nil, err } + go func() { + for { + select { + case <-aliceLink.(*channelLink).htlcUpdates: + case <-aliceLink.(*channelLink).quit: + return + } + } + }() cleanUp := func() { + close(alicePeer.quit) defer fCleanUp() defer aliceLink.Stop() + defer bobChannel.Stop() } - return aliceLink, cleanUp, nil + return aliceLink, bobChannel, t, cleanUp, nil } func assertLinkBandwidth(t *testing.T, link ChannelLink, @@ -1494,6 +1509,148 @@ func assertLinkBandwidth(t *testing.T, link ChannelLink, } } +// handleStateUpdate handles the messages sent from the link after +// the batch ticker has triggered a state update. +func handleStateUpdate(link *channelLink, + remoteChannel *lnwallet.LightningChannel) error { + sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs + var msg lnwire.Message + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive CommitSig from Alice") + } + + // The link should be sending a commit sig at this point. + commitSig, ok := msg.(*lnwire.CommitSig) + if !ok { + return fmt.Errorf("expected CommitSig, got %T", msg) + } + + // Let the remote channel receive the commit sig, and + // respond with a revocation + commitsig. + err := remoteChannel.ReceiveNewCommitment( + commitSig.CommitSig, commitSig.HtlcSigs) + if err != nil { + return err + } + + remoteRev, _, err := remoteChannel.RevokeCurrentCommitment() + if err != nil { + return err + } + link.HandleChannelUpdate(remoteRev) + + remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment() + if err != nil { + return err + } + commitSig = &lnwire.CommitSig{ + CommitSig: remoteSig, + HtlcSigs: remoteHtlcSigs, + } + link.HandleChannelUpdate(commitSig) + + // This should make the link respond with a revocation. + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive RevokeAndAck from Alice") + } + + revoke, ok := msg.(*lnwire.RevokeAndAck) + if !ok { + return fmt.Errorf("expected RevokeAndAck got %T", msg) + } + _, err = remoteChannel.ReceiveRevocation(revoke) + if err != nil { + return fmt.Errorf("unable to recieve "+ + "revocation: %v", err) + } + + return nil +} + +// updateState is used exchange the messages necessary to do a full state +// transition. If initiateUpdate=true, then this call will make the link +// trigger an update by sending on the batchTick channel, if not, it will +// make the remoteChannel initiate the state update. +func updateState(batchTick chan time.Time, link *channelLink, + remoteChannel *lnwallet.LightningChannel, + initiateUpdate bool) error { + sentMsgs := link.cfg.Peer.(*mockPeer).sentMsgs + + if initiateUpdate { + // Trigger update by ticking the batchTicker. + select { + case batchTick <- time.Now(): + case <-link.quit: + return fmt.Errorf("link shuttin down") + } + return handleStateUpdate(link, remoteChannel) + } + + // The remote is triggering the state update, emulate this by + // signing and sending CommitSig to the link. + remoteSig, remoteHtlcSigs, err := remoteChannel.SignNextCommitment() + if err != nil { + return err + } + + commitSig := &lnwire.CommitSig{ + CommitSig: remoteSig, + HtlcSigs: remoteHtlcSigs, + } + link.HandleChannelUpdate(commitSig) + + // The link should respond with a revocation + commit sig. + var msg lnwire.Message + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive RevokeAndAck from Alice") + } + + revoke, ok := msg.(*lnwire.RevokeAndAck) + if !ok { + return fmt.Errorf("expected RevokeAndAck got %T", + msg) + } + _, err = remoteChannel.ReceiveRevocation(revoke) + if err != nil { + return fmt.Errorf("unable to recieve "+ + "revocation: %v", err) + } + select { + case msg = <-sentMsgs: + case <-time.After(20 * time.Second): + return fmt.Errorf("did not receive CommitSig from Alice") + } + + commitSig, ok = msg.(*lnwire.CommitSig) + if !ok { + return fmt.Errorf("expected CommitSig, got %T", msg) + } + + err = remoteChannel.ReceiveNewCommitment( + commitSig.CommitSig, commitSig.HtlcSigs) + if err != nil { + return err + } + + // Lastly, send a revocation back to the link. + remoteRev, _, err := remoteChannel.RevokeCurrentCommitment() + if err != nil { + return err + } + link.HandleChannelUpdate(remoteRev) + + // Sleep to make sure Alice has handled the remote revocation. + time.Sleep(500 * time.Millisecond) + + return nil +} + // TestChannelLinkBandwidthConsistency ensures that the reported bandwidth of a // given ChannelLink is properly updated in response to downstream messages // from the switch, and upstream messages from its channel peer. @@ -1509,7 +1666,7 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // We'll start the test by creating a single instance of const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt) + link, bobChannel, tmr, cleanUp, err := newSingleLinkTestHarness(chanAmt) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -1517,11 +1674,18 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { var ( mockBlob [lnwire.OnionPacketSize]byte - coreChan = aliceLink.(*channelLink).channel - defaultCommitFee = coreChan.StateSnapshot().CommitFee + aliceLink = link.(*channelLink) + aliceChannel = aliceLink.channel + defaultCommitFee = aliceChannel.StateSnapshot().CommitFee aliceStartingBandwidth = aliceLink.Bandwidth() + aliceMsgs = aliceLink.cfg.Peer.(*mockPeer).sentMsgs ) + // We put Alice into HodlHTLC mode, such that she won't settle + // incoming HTLCs automatically. + aliceLink.cfg.HodlHTLC = true + aliceLink.cfg.DebugHTLC = true + estimator := &lnwallet.StaticFeeEstimator{ FeeRate: 24, } @@ -1552,17 +1716,61 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { } aliceLink.HandleSwitchPacket(&addPkt) time.Sleep(time.Millisecond * 500) + + // The resulting bandwidth should reflect that Alice is paying the + // htlc amount in addition to the htlc fee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) + + // Alice should send the HTLC to Bob. + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + bobIndex, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + + // Lock in the HTLC. + if err := updateState(tmr, aliceLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Locking in the HTLC should not change Alice's bandwidth. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) // If we now send in a valid HTLC settle for the prior HTLC we added, // then the bandwidth should remain unchanged as the remote party will // gain additional channel balance. + err = bobChannel.SettleHTLC(invoice.Terms.PaymentPreimage, bobIndex) + if err != nil { + t.Fatalf("unable to settle htlc: %v", err) + } htlcSettle := &lnwire.UpdateFufillHTLC{ - ID: 0, + ID: bobIndex, PaymentPreimage: invoice.Terms.PaymentPreimage, } aliceLink.HandleChannelUpdate(htlcSettle) time.Sleep(time.Millisecond * 500) + + // Since the settle is not locked in yet, Alice's bandwidth should still + // reflect that she has to pay the fee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) + + // Lock in the settle. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Now that it is settled, Alice should have gotten the htlc fee back. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) // Next, we'll add another HTLC initiated by the switch (of the same @@ -1576,31 +1784,96 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { } aliceLink.HandleSwitchPacket(&addPkt) time.Sleep(time.Millisecond * 500) + + // Again, Alice's bandwidth decreases by htlcAmt+htlcFee. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-2*htlcAmt-htlcFee) + + // Alice will send the HTLC to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + addHtlc, ok = msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + bobIndex, err = bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + + // Lock in the HTLC, which should not affect the bandwidth. + if err := updateState(tmr, aliceLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee) // With that processed, we'll now generate an HTLC fail (sent by the // remote peer) to cancel the HTLC we just added. This should return us // back to the bandwidth of the link right before the HTLC was sent. + err = bobChannel.FailHTLC(bobIndex, []byte("nop")) + if err != nil { + t.Fatalf("unable to fail htlc: %v", err) + } failMsg := &lnwire.UpdateFailHTLC{ - ID: 1, // As this is the second HTLC. + ID: bobIndex, Reason: lnwire.OpaqueReason([]byte("nop")), } aliceLink.HandleChannelUpdate(failMsg) time.Sleep(time.Millisecond * 500) + + // Before the Fail gets locked in, the bandwidth should remain unchanged. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt*2-htlcFee) + + // Lock in the Fail. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Now the bancdwidth should reflect the failed HTLC. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) // Moving along, we'll now receive a new HTLC from the remote peer, // with an ID of 0 as this is their first HTLC. The bandwidth should // remain unchanged (but Alice will need to pay the fee for the extra // HTLC). - updateMsg := &lnwire.UpdateAddHTLC{ - ID: 0, - Amount: htlcAmt, - Expiry: 9, - PaymentHash: htlc.PaymentHash, // Re-using the same payment hash. + htlcAmt, totalTimelock, hops := generateHops(htlcAmt, testStartingHeight, + aliceLink) + blob, err := generateRoute(hops...) + if err != nil { + t.Fatalf("unable to gen route: %v", err) } - aliceLink.HandleChannelUpdate(updateMsg) - time.Sleep(time.Millisecond * 500) + invoice, htlc, err = generatePayment(htlcAmt, htlcAmt, + totalTimelock, blob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + + // We must add the invoice to the registry, such that Alice expects + // this payment. + err = aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice) + if err != nil { + t.Fatalf("unable to add invoice to registry: %v", err) + } + + bobIndex, err = bobChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + aliceLink.HandleChannelUpdate(htlc) + + // Alice's balance remains unchanged until this HTLC is locked in. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt) + + // Lock in the HTLC. + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // Since Bob is adding this HTLC, Alice only needs to pay the fee. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcAmt-htlcFee) // Next, we'll settle the HTLC with our knowledge of the pre-image that @@ -1608,32 +1881,112 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { // of the channel should now be re-balanced to the starting point. settlePkt := htlcPacket{ htlc: &lnwire.UpdateFufillHTLC{ - ID: 2, + ID: bobIndex, PaymentPreimage: invoice.Terms.PaymentPreimage, }, } + aliceLink.HandleSwitchPacket(&settlePkt) time.Sleep(time.Millisecond * 500) + + // Settling this HTLC gives Alice all her original bandwidth back. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) - // Finally, we'll test the scenario of failing an HTLC received by the - // remote node. This should result in no perceived bandwidth changes. - htlcAdd := &lnwire.UpdateAddHTLC{ - ID: 1, - Amount: htlcAmt, - Expiry: 9, - PaymentHash: htlc.PaymentHash, + // Alice wil send the Settle to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") } - aliceLink.HandleChannelUpdate(htlcAdd) + + settleHtlc, ok := msg.(*lnwire.UpdateFufillHTLC) + if !ok { + t.Fatalf("expected UpdateFufillHTLC, got %T", msg) + } + pre := settleHtlc.PaymentPreimage + idx := settleHtlc.ID + err = bobChannel.ReceiveHTLCSettle(pre, idx) + if err != nil { + t.Fatalf("unable to receive settle: %v", err) + } + + // After a settle the link should do a state transition automatically, + // so we don't have to trigger it. + if err := handleStateUpdate(aliceLink, bobChannel); err != nil { + t.Fatalf("unable to update state: %v", err) + } + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + + // Finally, we'll test the scenario of failing an HTLC received from the + // remote node. This should result in no perceived bandwidth changes. + htlcAmt, totalTimelock, hops = generateHops(htlcAmt, testStartingHeight, + aliceLink) + blob, err = generateRoute(hops...) + if err != nil { + t.Fatalf("unable to gen route: %v", err) + } + invoice, htlc, err = generatePayment(htlcAmt, htlcAmt, totalTimelock, blob) + if err != nil { + t.Fatalf("unable to create payment: %v", err) + } + if err := aliceLink.cfg.Registry.(*mockInvoiceRegistry).AddInvoice(*invoice); err != nil { + t.Fatalf("unable to add invoice to registry: %v", err) + } + + // Since we are not using the link to handle HTLC IDs for the + // remote channel, we must set this manually. This is the second + // HTLC we add, hence it should have an ID of 1 (Alice's channel + // link will set this automatically for her side). + htlc.ID = 1 + bobIndex, err = bobChannel.AddHTLC(htlc) + if err != nil { + t.Fatalf("unable to add htlc: %v", err) + } + aliceLink.HandleChannelUpdate(htlc) time.Sleep(time.Millisecond * 500) + + // No changes before the HTLC is locked in. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + if err := updateState(tmr, aliceLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // After lock-in, Alice will have to pay the htlc fee. assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth-htlcFee) + + // Now fail this HTLC. failPkt := htlcPacket{ + incomingHTLCID: bobIndex, htlc: &lnwire.UpdateFailHTLC{ - ID: 3, + ID: bobIndex, }, } aliceLink.HandleSwitchPacket(&failPkt) time.Sleep(time.Millisecond * 500) + + // Alice should get all her bandwidth back. + assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) + + // Message should be sent to Bob. + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + failMsg, ok = msg.(*lnwire.UpdateFailHTLC) + if !ok { + t.Fatalf("expected UpdateFailHTLC, got %T", msg) + } + err = bobChannel.ReceiveFailHTLC(failMsg.ID, []byte("fail")) + if err != nil { + t.Fatalf("failed receiving fail htlc: %v", err) + } + + // After failing an HTLC, the link will automatically trigger + // a state update. + if err := handleStateUpdate(aliceLink, bobChannel); err != nil { + t.Fatalf("unable to update state: %v", err) + } assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) } @@ -1646,7 +1999,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { var mockBlob [lnwire.OnionPacketSize]byte const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, cleanUp, err := newSingleLinkTestHarness(chanAmt) + aliceLink, bobChannel, batchTick, cleanUp, err := newSingleLinkTestHarness(chanAmt) if err != nil { t.Fatalf("unable to create link: %v", err) } @@ -1656,6 +2009,7 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { coreLink = aliceLink.(*channelLink) defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee aliceStartingBandwidth = aliceLink.Bandwidth() + aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs ) estimator := &lnwallet.StaticFeeEstimator{ @@ -1667,6 +2021,11 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { } feePerKw := feePerWeight * 1000 + // The starting bandwidth of the channel should be exactly the amount + // that we created the channel between her and Bob. + expectedBandwidth := lnwire.NewMSatFromSatoshis(chanAmt - defaultCommitFee) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) + addLinkHTLC := func(amt lnwire.MilliSatoshi) [32]byte { invoice, htlc, err := generatePayment(amt, amt, 5, mockBlob) if err != nil { @@ -1676,7 +2035,6 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { htlc: htlc, amount: amt, }) - return invoice.Terms.PaymentPreimage } @@ -1694,13 +2052,39 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { totalHtlcAmt += htlcAmt } + // The HTLCs should all be sent to the remote. + var msg lnwire.Message + for i := 0; i < numHTLCs; i++ { + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + _, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + } + + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): + } + // TODO(roasbeef): increase sleep time.Sleep(time.Second * 1) commitWeight := lnwallet.CommitWeight + lnwallet.HtlcWeight*numHTLCs htlcFee := lnwire.NewMSatFromSatoshis( btcutil.Amount((int64(feePerKw) * commitWeight) / 1000), ) - expectedBandwidth := aliceStartingBandwidth - totalHtlcAmt - htlcFee + expectedBandwidth = aliceStartingBandwidth - totalHtlcAmt - htlcFee expectedBandwidth += lnwire.NewMSatFromSatoshis(defaultCommitFee) assertLinkBandwidth(t, aliceLink, expectedBandwidth) @@ -1722,23 +2106,43 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { totalHtlcAmt += htlcAmt } + // No messages should be sent to the remote at this point. + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): + } + time.Sleep(time.Second * 2) expectedBandwidth -= (numOverFlowHTLCs * htlcAmt) assertLinkBandwidth(t, aliceLink, expectedBandwidth) // With the extra HTLC's added, the overflow queue should now be - // populated with our 10 additional HTLC's. + // populated with our 20 additional HTLC's. if coreLink.overflowQueue.Length() != numOverFlowHTLCs { t.Fatalf("wrong overflow queue length: expected %v, got %v", numOverFlowHTLCs, coreLink.overflowQueue.Length()) } - // At this point, we'll now settle one of the HTLC's that were added. - // The resulting bandwidth change should be non-existent as this will - // simply transfer over funds to the remote party. However, the size of - // the overflow queue should be decreasing + // We trigger a state update to lock in the HTLCs. This should + // not change Alice's bandwidth. + if err := updateState(batchTick, coreLink, bobChannel, true); err != nil { + t.Fatalf("unable to update state: %v", err) + } + time.Sleep(time.Millisecond * 500) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) + + // At this point, we'll now settle enough HTLCs to empty the overflow + // queue. The resulting bandwidth change should be non-existent as this + // will simply transfer over funds to the remote party. However, the + // size of the overflow queue should be decreasing for i := 0; i < numOverFlowHTLCs; i++ { + err = bobChannel.SettleHTLC(preImages[i], uint64(i)) + if err != nil { + t.Fatalf("unable to settle htlc: %v", err) + } + htlcSettle := &lnwire.UpdateFufillHTLC{ ID: uint64(i), PaymentPreimage: preImages[i], @@ -1746,19 +2150,48 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { aliceLink.HandleChannelUpdate(htlcSettle) time.Sleep(time.Millisecond * 50) + } + time.Sleep(time.Millisecond * 500) + assertLinkBandwidth(t, aliceLink, expectedBandwidth) - // As we're not actually initiating a full state update, we'll - // trigger a free-slot signal manually here. - coreLink.overflowQueue.SignalFreeSlot() + // We trigger a state update to lock in the Settles. + if err := updateState(batchTick, coreLink, bobChannel, false); err != nil { + t.Fatalf("unable to update state: %v", err) + } + + // After the state update is done, Alice should start sending + // HTLCs from the overflow queue. + for i := 0; i < numOverFlowHTLCs; i++ { + var msg lnwire.Message + select { + case msg = <-aliceMsgs: + case <-time.After(2 * time.Second): + t.Fatalf("did not receive message") + } + + addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) + if !ok { + t.Fatalf("expected UpdateAddHTLC, got %T", msg) + } + + _, err := bobChannel.ReceiveHTLC(addHtlc) + if err != nil { + t.Fatalf("bob failed receiving htlc: %v", err) + } + } + + select { + case msg = <-aliceMsgs: + t.Fatalf("unexpected message: %T", msg) + case <-time.After(20 * time.Millisecond): } - time.Sleep(time.Millisecond * 500) assertLinkBandwidth(t, aliceLink, expectedBandwidth) // Finally, at this point, the queue itself should be fully empty. As // enough slots have been drained from the commitment transaction to // allocate the queue items to. - time.Sleep(time.Millisecond * 100) + time.Sleep(time.Millisecond * 500) if coreLink.overflowQueue.Length() != 0 { t.Fatalf("wrong overflow queue length: expected %v, got %v", 0, coreLink.overflowQueue.Length())