diff --git a/htlcswitch/link.go b/htlcswitch/link.go index e12e325d..b8a50eea 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -19,7 +19,6 @@ import ( "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hop" - "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lntypes" @@ -327,10 +326,6 @@ type channelLink struct { // which may affect behaviour of the service. cfg ChannelLinkConfig - // overflowQueue is used to store the htlc add updates which haven't - // been processed because of the commitment transaction overflow. - overflowQueue *packetQueue - // mailBox is the main interface between the outside world and the // link. All incoming messages will be sent over this mailBox. Messages // include new updates from our connected peer, and new packets to be @@ -395,12 +390,11 @@ func NewChannelLink(cfg ChannelLinkConfig, channel: channel, shortChanID: channel.ShortChanID(), // TODO(roasbeef): just do reserve here? - overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2), - htlcUpdates: make(chan *contractcourt.ContractUpdate), - hodlMap: make(map[channeldb.CircuitKey]hodlHtlc), - hodlQueue: queue.NewConcurrentQueue(10), - log: build.NewPrefixLog(logPrefix, log), - quit: make(chan struct{}), + htlcUpdates: make(chan *contractcourt.ContractUpdate), + hodlMap: make(map[channeldb.CircuitKey]hodlHtlc), + hodlQueue: queue.NewConcurrentQueue(10), + log: build.NewPrefixLog(logPrefix, log), + quit: make(chan struct{}), } } @@ -436,7 +430,6 @@ func (l *channelLink) Start() error { } l.mailBox.ResetMessages() - l.overflowQueue.Start() l.hodlQueue.Start() // Before launching the htlcManager messages, revert any circuits that @@ -511,7 +504,6 @@ func (l *channelLink) Stop() { } l.updateFeeTimer.Stop() - l.overflowQueue.Stop() l.hodlQueue.Stop() close(l.quit) @@ -1100,37 +1092,10 @@ out: break out } - // A packet that previously overflowed the commitment - // transaction is now eligible for processing once again. So - // we'll attempt to re-process the packet in order to allow it - // to continue propagating within the network. - case packet := <-l.overflowQueue.outgoingPkts: - msg := packet.htlc.(*lnwire.UpdateAddHTLC) - l.log.Tracef("reprocessing downstream add update "+ - "with payment hash(%x)", msg.PaymentHash[:]) - - l.handleDownstreamPkt(packet) - // A message from the switch was just received. This indicates // that the link is an intermediate hop in a multi-hop HTLC // circuit. case pkt := <-l.downstream: - // If we have non empty processing queue then we'll add - // this to the overflow rather than processing it - // directly. Once an active HTLC is either settled or - // failed, then we'll free up a new slot. - htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) - if ok && l.overflowQueue.Length() != 0 { - l.log.Infof("downstream htlc add update with "+ - "payment hash(%x) have been added to "+ - "reprocessing queue, pend_updates=%v", - htlc.PaymentHash[:], - l.channel.PendingLocalUpdateCount()) - - l.overflowQueue.AddPkt(pkt) - continue - } - l.handleDownstreamPkt(pkt) // A message from the connected peer was just received. This @@ -1301,109 +1266,89 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { openCircuitRef := pkt.inKey() index, err := l.channel.AddHTLC(htlc, &openCircuitRef) if err != nil { - switch err { + // The HTLC was unable to be added to the state machine, + // as a result, we'll signal the switch to cancel the + // pending payment. + l.log.Warnf("Unable to handle downstream add HTLC: %v", + err) - // The channels spare bandwidth is fully allocated, so - // we'll put this HTLC into the overflow queue. - case lnwallet.ErrMaxHTLCNumber: - l.log.Infof("downstream htlc add update with "+ - "payment hash(%x) have been added to "+ - "reprocessing queue, pend_updates: %v", - htlc.PaymentHash[:], - l.channel.PendingLocalUpdateCount()) + var ( + localFailure = false + reason lnwire.OpaqueReason + ) - l.overflowQueue.AddPkt(pkt) - return + // Create a temporary channel failure which we will send + // back to our peer if this is a forward, or report to + // the user if the failed payment was locally initiated. + failure := l.createFailureWithUpdate( + func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage { + return lnwire.NewTemporaryChannelFailure( + upd, + ) + }, + ) - // The HTLC was unable to be added to the state - // machine, as a result, we'll signal the switch to - // cancel the pending payment. - default: - l.log.Warnf("unable to handle downstream add "+ - "HTLC: %v", err) - - var ( - localFailure = false - reason lnwire.OpaqueReason - ) - - // Create a temporary channel failure which we - // will send back to our peer if this is a - // forward, or report to the user if the failed - // payment was locally initiated. - failure := l.createFailureWithUpdate( - func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage { - return lnwire.NewTemporaryChannelFailure( - upd, - ) - }, - ) - - // If the payment was locally initiated (which - // is indicated by a nil obfuscator), we do - // not need to encrypt it back to the sender. - if pkt.obfuscator == nil { - var b bytes.Buffer - err := lnwire.EncodeFailure(&b, failure, 0) - if err != nil { - l.log.Errorf("unable to "+ - "encode failure: %v", err) - l.mailBox.AckPacket(pkt.inKey()) - return - } - reason = lnwire.OpaqueReason(b.Bytes()) - localFailure = true - } else { - // If the packet is part of a forward, - // (identified by a non-nil obfuscator) - // we need to encrypt the error back to - // the source. - var err error - reason, err = pkt.obfuscator.EncryptFirstHop(failure) - if err != nil { - l.log.Errorf("unable to "+ - "obfuscate error: %v", err) - l.mailBox.AckPacket(pkt.inKey()) - return - } + // If the payment was locally initiated (which is + // indicated by a nil obfuscator), we do not need to + // encrypt it back to the sender. + if pkt.obfuscator == nil { + var b bytes.Buffer + err := lnwire.EncodeFailure(&b, failure, 0) + if err != nil { + l.log.Errorf("unable to encode "+ + "failure: %v", err) + l.mailBox.AckPacket(pkt.inKey()) + return } - - // Create a link error containing the temporary - // channel failure and a detail which indicates - // the we failed to add the htlc. - linkError := NewDetailedLinkError( - failure, - OutgoingFailureDownstreamHtlcAdd, - ) - - failPkt := &htlcPacket{ - incomingChanID: pkt.incomingChanID, - incomingHTLCID: pkt.incomingHTLCID, - circuit: pkt.circuit, - sourceRef: pkt.sourceRef, - hasSource: true, - localFailure: localFailure, - linkFailure: linkError, - htlc: &lnwire.UpdateFailHTLC{ - Reason: reason, - }, + reason = lnwire.OpaqueReason(b.Bytes()) + localFailure = true + } else { + // If the packet is part of a forward, + // (identified by a non-nil obfuscator) we need + // to encrypt the error back to the source. + var err error + reason, err = pkt.obfuscator.EncryptFirstHop(failure) + if err != nil { + l.log.Errorf("unable to "+ + "obfuscate error: %v", err) + l.mailBox.AckPacket(pkt.inKey()) + return } - - go l.forwardBatch(failPkt) - - // Remove this packet from the link's mailbox, - // this prevents it from being reprocessed if - // the link restarts and resets it mailbox. If - // this response doesn't make it back to the - // originating link, it will be rejected upon - // attempting to reforward the Add to the - // switch, since the circuit was never fully - // opened, and the forwarding package shows it - // as unacknowledged. - l.mailBox.AckPacket(pkt.inKey()) - - return } + + // Create a link error containing the temporary channel + // failure and a detail which indicates the we failed to + // add the htlc. + linkError := NewDetailedLinkError( + failure, OutgoingFailureDownstreamHtlcAdd, + ) + + failPkt := &htlcPacket{ + incomingChanID: pkt.incomingChanID, + incomingHTLCID: pkt.incomingHTLCID, + circuit: pkt.circuit, + sourceRef: pkt.sourceRef, + hasSource: true, + localFailure: localFailure, + linkFailure: linkError, + htlc: &lnwire.UpdateFailHTLC{ + Reason: reason, + }, + } + + go l.forwardBatch(failPkt) + + // Remove this packet from the link's mailbox, this + // prevents it from being reprocessed if the link + // restarts and resets it mailbox. If this response + // doesn't make it back to the originating link, it will + // be rejected upon attempting to reforward the Add to + // the switch, since the circuit was never fully opened, + // and the forwarding package shows it as + // unacknowledged. + l.mailBox.AckPacket(pkt.inKey()) + + return } l.log.Tracef("received downstream htlc: payment_hash=%x, "+ @@ -2179,18 +2124,7 @@ func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { // Get the balance available on the channel for new HTLCs. This takes // the channel reserve into account so HTLCs up to this value won't // violate it. - channelBandwidth := l.channel.AvailableBalance() - - // To compute the total bandwidth, we'll take the current available - // bandwidth, then subtract the overflow bandwidth as we'll eventually - // also need to evaluate those HTLC's once space on the commitment - // transaction is free. - overflowBandwidth := l.overflowQueue.TotalHtlcAmount() - if channelBandwidth < overflowBandwidth { - return 0 - } - - return channelBandwidth - overflowBandwidth + return l.channel.AvailableBalance() } // AttachMailBox updates the current mailbox used by this link, and hooks up @@ -2513,7 +2447,6 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, // notify the overflow queue that a spare spot has been // freed up within the commitment state. switchPackets = append(switchPackets, settlePacket) - l.overflowQueue.SignalFreeSlot() // A failureCode message for a previously forwarded HTLC has // been received. As a result a new slot will be freed up in @@ -2559,7 +2492,6 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, // notify the overflow queue that a spare spot has been // freed up within the commitment state. switchPackets = append(switchPackets, failPacket) - l.overflowQueue.SignalFreeSlot() } } diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 072e7f9e..ffd53a47 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -7,7 +7,6 @@ import ( "encoding/binary" "fmt" "io" - "math" "net" "reflect" "runtime" @@ -263,141 +262,6 @@ func TestChannelLinkSingleHopPayment(t *testing.T) { } } -// TestChannelLinkBidirectionalOneHopPayments tests the ability of channel -// link to cope with bigger number of payment updates that commitment -// transaction may consist. -func TestChannelLinkBidirectionalOneHopPayments(t *testing.T) { - t.Parallel() - - channels, cleanUp, _, err := createClusterChannels( - btcutil.SatoshiPerBitcoin*3, - btcutil.SatoshiPerBitcoin*5) - if err != nil { - t.Fatalf("unable to create channel: %v", err) - } - defer cleanUp() - - n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice, - channels.bobToCarol, channels.carolToBob, testStartingHeight) - if err := n.start(); err != nil { - t.Fatal(err) - } - defer n.stop() - bobBandwidthBefore := n.firstBobChannelLink.Bandwidth() - aliceBandwidthBefore := n.aliceChannelLink.Bandwidth() - - debug := false - if debug { - // Log message that alice receives. - n.aliceServer.intersect(createLogFunc("alice", - n.aliceChannelLink.ChanID())) - - // Log message that bob receives. - n.bobServer.intersect(createLogFunc("bob", - n.firstBobChannelLink.ChanID())) - } - - amt := lnwire.NewMSatFromSatoshis(20000) - - htlcAmt, totalTimelock, hopsForwards := generateHops(amt, - testStartingHeight, n.firstBobChannelLink) - _, _, hopsBackwards := generateHops(amt, - testStartingHeight, n.aliceChannelLink) - - type result struct { - err error - start time.Time - number int - sender string - } - - // Send max available payment number in both sides, thereby testing - // the property of channel link to cope with overflowing. - count := 2 * input.MaxHTLCNumber - resultChan := make(chan *result, count) - for i := 0; i < count/2; i++ { - go func(i int) { - r := &result{ - start: time.Now(), - number: i, - sender: "alice", - } - - firstHop := n.firstBobChannelLink.ShortChanID() - _, r.err = makePayment( - n.aliceServer, n.bobServer, firstHop, - hopsForwards, amt, htlcAmt, totalTimelock, - ).Wait(5 * time.Minute) - resultChan <- r - }(i) - } - - for i := 0; i < count/2; i++ { - go func(i int) { - r := &result{ - start: time.Now(), - number: i, - sender: "bob", - } - - firstHop := n.aliceChannelLink.ShortChanID() - _, r.err = makePayment( - n.bobServer, n.aliceServer, firstHop, - hopsBackwards, amt, htlcAmt, totalTimelock, - ).Wait(5 * time.Minute) - resultChan <- r - }(i) - } - - maxDelay := time.Duration(0) - minDelay := time.Duration(math.MaxInt64) - averageDelay := time.Duration(0) - - // Check that alice invoice was settled and bandwidth of HTLC - // links was changed. - for i := 0; i < count; i++ { - select { - case r := <-resultChan: - if r.err != nil { - t.Fatalf("unable to make payment: %v", r.err) - } - - delay := time.Since(r.start) - if delay > maxDelay { - maxDelay = delay - } - - if delay < minDelay { - minDelay = delay - } - averageDelay += delay - - case <-time.After(5 * time.Minute): - t.Fatalf("timeout: (%v/%v)", i+1, count) - } - } - - // TODO(roasbeef): should instead consume async notifications from both - // links - time.Sleep(time.Second * 2) - - // At the end Bob and Alice balances should be the same as previous, - // because they sent the equal amount of money to each other. - if aliceBandwidthBefore != n.aliceChannelLink.Bandwidth() { - t.Fatalf("alice bandwidth shouldn't have changed: expected %v, got %x", - aliceBandwidthBefore, n.aliceChannelLink.Bandwidth()) - } - - if bobBandwidthBefore != n.firstBobChannelLink.Bandwidth() { - t.Fatalf("bob bandwidth shouldn't have changed: expected %v, got %v", - bobBandwidthBefore, n.firstBobChannelLink.Bandwidth()) - } - - t.Logf("Max waiting: %v", maxDelay) - t.Logf("Min waiting: %v", minDelay) - t.Logf("Average waiting: %v", time.Duration(int(averageDelay)/count)) -} - // TestChannelLinkMultiHopPayment checks the ability to send payment over two // hops. In this test we send the payment from Carol to Alice over Bob peer. // (Carol -> Bob -> Alice) and checking that HTLC was settled properly and @@ -536,6 +400,105 @@ func testChannelLinkMultiHopPayment(t *testing.T, } } +// TestChannelLinkCancelFullCommitment tests the ability for links to cancel +// forwarded HTLCs once all of their commitment slots are full. +func TestChannelLinkCancelFullCommitment(t *testing.T) { + t.Parallel() + + channels, cleanUp, _, err := createClusterChannels( + btcutil.SatoshiPerBitcoin*3, + btcutil.SatoshiPerBitcoin*5) + if err != nil { + t.Fatalf("unable to create channel: %v", err) + } + defer cleanUp() + + n := newTwoHopNetwork( + t, channels.aliceToBob, channels.bobToAlice, testStartingHeight, + ) + if err := n.start(); err != nil { + t.Fatal(err) + } + defer n.stop() + + // Fill up the commitment from Alice's side with 20 sat payments. + count := (input.MaxHTLCNumber / 2) + amt := lnwire.NewMSatFromSatoshis(20000) + + htlcAmt, totalTimelock, hopsForwards := generateHops(amt, + testStartingHeight, n.bobChannelLink) + + firstHop := n.aliceChannelLink.ShortChanID() + + // Create channels to buffer the preimage and error channels used in + // making the preliminary payments. + preimages := make([]lntypes.Preimage, count) + aliceErrChan := make(chan chan error, count) + + var wg sync.WaitGroup + for i := 0; i < count; i++ { + preimages[i] = lntypes.Preimage{byte(i >> 8), byte(i)} + + wg.Add(1) + go func(i int) { + defer wg.Done() + + errChan := n.makeHoldPayment( + n.aliceServer, n.bobServer, firstHop, + hopsForwards, amt, htlcAmt, totalTimelock, + preimages[i], + ) + aliceErrChan <- errChan + }(i) + } + + // Wait for Alice to finish filling her commitment. + wg.Wait() + close(aliceErrChan) + + // Now make an additional payment from Alice to Bob, this should be + // canceled because the commitment in this direction is full. + err = <-makePayment( + n.aliceServer, n.bobServer, firstHop, hopsForwards, amt, + htlcAmt, totalTimelock, + ).err + if err == nil { + t.Fatalf("overflow payment should have failed") + } + lerr, ok := err.(*LinkError) + if !ok { + t.Fatalf("expected LinkError, got: %T", err) + } + + msg := lerr.WireMessage() + if _, ok := msg.(*lnwire.FailTemporaryChannelFailure); !ok { + t.Fatalf("expected TemporaryChannelFailure, got: %T", msg) + } + + // Now, settle all htlcs held by bob and clear the commitment of htlcs. + for _, preimage := range preimages { + preimage := preimage + + // It's possible that the HTLCs have not been delivered to the + // invoice registry at this point, so we poll until we are able + // to settle. + err = wait.NoError(func() error { + return n.bobServer.registry.SettleHodlInvoice(preimage) + }, time.Minute) + if err != nil { + t.Fatal(err) + } + } + + // Ensure that all of the payments sent by alice eventually succeed. + for errChan := range aliceErrChan { + err := <-errChan + if err != nil { + t.Fatalf("alice payment failed: %v", err) + } + } +} + // TestExitNodeTimelockPayloadMismatch tests that when an exit node receives an // incoming HTLC, if the time lock encoded in the payload of the forwarded HTLC // doesn't match the expected payment value, then the HTLC will be rejected @@ -2369,227 +2332,6 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) { assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth) } -// TestChannelLinkBandwidthConsistencyOverflow tests that in the case of a -// commitment overflow (no more space for new HTLC's), the bandwidth is updated -// properly as items are being added and removed from the overflow queue. -func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) { - t.Parallel() - - var mockBlob [lnwire.OnionPacketSize]byte - - const chanAmt = btcutil.SatoshiPerBitcoin * 5 - aliceLink, bobChannel, batchTick, start, cleanUp, _, err := - newSingleLinkTestHarness(chanAmt, 0) - if err != nil { - t.Fatalf("unable to create link: %v", err) - } - defer cleanUp() - - if err := start(); err != nil { - t.Fatalf("unable to start test harness: %v", err) - } - - var ( - coreLink = aliceLink.(*channelLink) - defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee - aliceStartingBandwidth = aliceLink.Bandwidth() - aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs - ) - - estimator := chainfee.NewStaticEstimator(6000, 0) - feePerKw, err := estimator.EstimateFeePerKW(1) - if err != nil { - t.Fatalf("unable to query fee estimator: %v", err) - } - - var htlcID uint64 - addLinkHTLC := func(id uint64, amt lnwire.MilliSatoshi) [32]byte { - invoice, htlc, _, err := generatePayment( - amt, amt, 5, mockBlob, - ) - if err != nil { - t.Fatalf("unable to create payment: %v", err) - } - - addPkt := &htlcPacket{ - htlc: htlc, - incomingHTLCID: id, - amount: amt, - obfuscator: NewMockObfuscator(), - } - circuit := makePaymentCircuit(&htlc.PaymentHash, addPkt) - _, err = coreLink.cfg.Switch.commitCircuits(&circuit) - if err != nil { - t.Fatalf("unable to commit circuit: %v", err) - } - - addPkt.circuit = &circuit - aliceLink.HandleSwitchPacket(addPkt) - return invoice.Terms.PaymentPreimage - } - - // We'll first start by adding enough HTLC's to overflow the commitment - // transaction, checking the reported link bandwidth for proper - // consistency along the way - htlcAmt := lnwire.NewMSatFromSatoshis(100000) - totalHtlcAmt := lnwire.MilliSatoshi(0) - const numHTLCs = input.MaxHTLCNumber / 2 - var preImages [][32]byte - for i := 0; i < numHTLCs; i++ { - preImage := addLinkHTLC(htlcID, htlcAmt) - preImages = append(preImages, preImage) - - totalHtlcAmt += htlcAmt - htlcID++ - } - - // The HTLCs should all be sent to the remote. - var msg lnwire.Message - for i := 0; i < numHTLCs; i++ { - select { - case msg = <-aliceMsgs: - case <-time.After(15 * time.Second): - t.Fatalf("did not receive message %d", i) - } - - addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) - if !ok { - t.Fatalf("expected UpdateAddHTLC, got %T", msg) - } - - _, err := bobChannel.ReceiveHTLC(addHtlc) - if err != nil { - t.Fatalf("bob failed receiving htlc: %v", err) - } - } - - select { - case msg = <-aliceMsgs: - t.Fatalf("unexpected message: %T", msg) - case <-time.After(20 * time.Millisecond): - } - - // TODO(roasbeef): increase sleep - time.Sleep(time.Second * 1) - commitWeight := int64(input.CommitWeight + input.HTLCWeight*numHTLCs) - htlcFee := lnwire.NewMSatFromSatoshis( - feePerKw.FeeForWeight(commitWeight), - ) - expectedBandwidth := aliceStartingBandwidth - totalHtlcAmt - htlcFee - expectedBandwidth += lnwire.NewMSatFromSatoshis(defaultCommitFee) - assertLinkBandwidth(t, aliceLink, expectedBandwidth) - - // The overflow queue should be empty at this point, as the commitment - // transaction should be full, but not yet overflown. - if coreLink.overflowQueue.Length() != 0 { - t.Fatalf("wrong overflow queue length: expected %v, got %v", 0, - coreLink.overflowQueue.Length()) - } - - // At this point, the commitment transaction should now be fully - // saturated. We'll continue adding HTLC's, and asserting that the - // bandwidth accounting is done properly. - const numOverFlowHTLCs = 20 - for i := 0; i < numOverFlowHTLCs; i++ { - preImage := addLinkHTLC(htlcID, htlcAmt) - preImages = append(preImages, preImage) - - totalHtlcAmt += htlcAmt - htlcID++ - } - - // No messages should be sent to the remote at this point. - select { - case msg = <-aliceMsgs: - t.Fatalf("unexpected message: %T", msg) - case <-time.After(20 * time.Millisecond): - } - - time.Sleep(time.Second * 2) - expectedBandwidth -= (numOverFlowHTLCs * htlcAmt) - assertLinkBandwidth(t, aliceLink, expectedBandwidth) - - // With the extra HTLC's added, the overflow queue should now be - // populated with our 20 additional HTLC's. - if coreLink.overflowQueue.Length() != numOverFlowHTLCs { - t.Fatalf("wrong overflow queue length: expected %v, got %v", - numOverFlowHTLCs, - coreLink.overflowQueue.Length()) - } - - // We trigger a state update to lock in the HTLCs. This should - // not change Alice's bandwidth. - if err := updateState(batchTick, coreLink, bobChannel, true); err != nil { - t.Fatalf("unable to update state: %v", err) - } - time.Sleep(time.Millisecond * 500) - assertLinkBandwidth(t, aliceLink, expectedBandwidth) - - // At this point, we'll now settle enough HTLCs to empty the overflow - // queue. The resulting bandwidth change should be non-existent as this - // will simply transfer over funds to the remote party. However, the - // size of the overflow queue should be decreasing - for i := 0; i < numOverFlowHTLCs; i++ { - err = bobChannel.SettleHTLC(preImages[i], uint64(i), nil, nil, nil) - if err != nil { - t.Fatalf("unable to settle htlc: %v", err) - } - - htlcSettle := &lnwire.UpdateFulfillHTLC{ - ID: uint64(i), - PaymentPreimage: preImages[i], - } - - aliceLink.HandleChannelUpdate(htlcSettle) - time.Sleep(time.Millisecond * 50) - } - time.Sleep(time.Millisecond * 500) - assertLinkBandwidth(t, aliceLink, expectedBandwidth) - - // We trigger a state update to lock in the Settles. - if err := updateState(batchTick, coreLink, bobChannel, false); err != nil { - t.Fatalf("unable to update state: %v", err) - } - - // After the state update is done, Alice should start sending - // HTLCs from the overflow queue. - for i := 0; i < numOverFlowHTLCs; i++ { - var msg lnwire.Message - select { - case msg = <-aliceMsgs: - case <-time.After(15 * time.Second): - t.Fatalf("did not receive message") - } - - addHtlc, ok := msg.(*lnwire.UpdateAddHTLC) - if !ok { - t.Fatalf("expected UpdateAddHTLC, got %T", msg) - } - - _, err := bobChannel.ReceiveHTLC(addHtlc) - if err != nil { - t.Fatalf("bob failed receiving htlc: %v", err) - } - } - - select { - case msg = <-aliceMsgs: - t.Fatalf("unexpected message: %T", msg) - case <-time.After(20 * time.Millisecond): - } - - assertLinkBandwidth(t, aliceLink, expectedBandwidth) - - // Finally, at this point, the queue itself should be fully empty. As - // enough slots have been drained from the commitment transaction to - // allocate the queue items to. - time.Sleep(time.Millisecond * 500) - if coreLink.overflowQueue.Length() != 0 { - t.Fatalf("wrong overflow queue length: expected %v, got %v", 0, - coreLink.overflowQueue.Length()) - } -} - // genAddsAndCircuits creates `numHtlcs` sequential ADD packets and there // corresponding circuits. The provided `htlc` is used in all test packets. func genAddsAndCircuits(numHtlcs int, htlc *lnwire.UpdateAddHTLC) ( @@ -5699,9 +5441,8 @@ func TestCheckHtlcForward(t *testing.T) { MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry, HtlcNotifier: &mockHTLCNotifier{}, }, - log: log, - channel: testChannel.channel, - overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2), + log: log, + channel: testChannel.channel, } var hash [32]byte diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go deleted file mode 100644 index 420d1f9b..00000000 --- a/htlcswitch/queue.go +++ /dev/null @@ -1,208 +0,0 @@ -package htlcswitch - -import ( - "sync" - "sync/atomic" - "time" - - "github.com/lightningnetwork/lnd/lnwire" -) - -// packetQueue is a goroutine-safe queue of htlc packets which over flow the -// current commitment transaction. An HTLC will overflow the current commitment -// transaction if one attempts to add a new HTLC to the state machine which -// already has the max number of pending HTLC's present on the commitment -// transaction. Packets are removed from the queue by the channelLink itself -// as additional slots become available on the commitment transaction itself. -// In order to synchronize properly we use a semaphore to allow the channelLink -// to signal the number of slots available, and a condition variable to allow -// the packetQueue to know when new items have been added to the queue. -type packetQueue struct { - // totalHtlcAmt is the sum of the value of all pending HTLC's currently - // residing within the overflow queue. This value should only read or - // modified *atomically*. - totalHtlcAmt int64 // To be used atomically. - - // queueLen is an internal counter that reflects the size of the queue - // at any given instance. This value is intended to be use atomically - // as this value is used by internal methods to obtain the length of - // the queue w/o grabbing the main lock. This allows callers to avoid a - // deadlock situation where the main goroutine is attempting a send - // with the lock held. - queueLen int32 // To be used atomically. - - streamShutdown int32 // To be used atomically. - - queue []*htlcPacket - - wg sync.WaitGroup - - // freeSlots serves as a semaphore who's current value signals the - // number of available slots on the commitment transaction. - freeSlots chan struct{} - - queueCond *sync.Cond - queueMtx sync.Mutex - - // outgoingPkts is a channel that the channelLink will receive on in - // order to drain the packetQueue as new slots become available on the - // commitment transaction. - outgoingPkts chan *htlcPacket - - quit chan struct{} -} - -// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots -// value should reflect the max number of HTLC's that we're allowed to have -// outstanding within the commitment transaction. -func newPacketQueue(maxFreeSlots int) *packetQueue { - p := &packetQueue{ - outgoingPkts: make(chan *htlcPacket), - freeSlots: make(chan struct{}, maxFreeSlots), - quit: make(chan struct{}), - } - p.queueCond = sync.NewCond(&p.queueMtx) - - return p -} - -// Start starts all goroutines that packetQueue needs to perform its normal -// duties. -func (p *packetQueue) Start() { - p.wg.Add(1) - go p.packetCoordinator() -} - -// Stop signals the packetQueue for a graceful shutdown, and waits for all -// goroutines to exit. -func (p *packetQueue) Stop() { - close(p.quit) - - // Now that we've closed the channel, we'll repeatedly signal the msg - // consumer until we've detected that it has exited. - for atomic.LoadInt32(&p.streamShutdown) == 0 { - p.queueCond.Signal() - time.Sleep(time.Millisecond * 100) - } -} - -// packetCoordinator is a goroutine that handles the packet overflow queue. -// Using a synchronized queue, outside callers are able to append to the end of -// the queue, waking up the coordinator when the queue transitions from empty -// to non-empty. The packetCoordinator will then aggressively try to empty out -// the queue, passing new htlcPackets to the channelLink as slots within the -// commitment transaction become available. -// -// Future iterations of the packetCoordinator will implement congestion -// avoidance logic in the face of persistent htlcPacket back-pressure. -// -// TODO(roasbeef): later will need to add back pressure handling heuristics -// like reg congestion avoidance: -// * random dropping, RED, etc -func (p *packetQueue) packetCoordinator() { - defer atomic.StoreInt32(&p.streamShutdown, 1) - - for { - // First, we'll check our condition. If the queue of packets is - // empty, then we'll wait until a new item is added. - p.queueCond.L.Lock() - for len(p.queue) == 0 { - p.queueCond.Wait() - - // If we were woke up in order to exit, then we'll do - // so. Otherwise, we'll check the message queue for any - // new items. - select { - case <-p.quit: - p.queueCond.L.Unlock() - return - default: - } - } - - nextPkt := p.queue[0] - - p.queueCond.L.Unlock() - - // If there aren't any further messages to sent (or the link - // didn't immediately read our message), then we'll block and - // wait for a new message to be sent into the overflow queue, - // or for the link's htlcForwarder to wake up. - select { - case <-p.freeSlots: - - select { - case p.outgoingPkts <- nextPkt: - // Pop the item off the front of the queue and - // slide down the reference one to re-position - // the head pointer. This will set us up for - // the next iteration. If the queue is empty - // at this point, then we'll block at the top. - p.queueCond.L.Lock() - p.queue[0] = nil - p.queue = p.queue[1:] - atomic.AddInt32(&p.queueLen, -1) - atomic.AddInt64(&p.totalHtlcAmt, int64(-nextPkt.amount)) - p.queueCond.L.Unlock() - case <-p.quit: - return - } - - case <-p.quit: - return - - default: - } - } -} - -// AddPkt adds the referenced packet to the overflow queue, preserving ordering -// of the existing items. -func (p *packetQueue) AddPkt(pkt *htlcPacket) { - // First, we'll lock the condition, and add the message to the end of - // the message queue, and increment the internal atomic for tracking - // the queue's length. - p.queueCond.L.Lock() - p.queue = append(p.queue, pkt) - atomic.AddInt32(&p.queueLen, 1) - atomic.AddInt64(&p.totalHtlcAmt, int64(pkt.amount)) - p.queueCond.L.Unlock() - - // With the message added, we signal to the msgConsumer that there are - // additional messages to consume. - p.queueCond.Signal() -} - -// SignalFreeSlot signals to the queue that a new slot has opened up within the -// commitment transaction. The max amount of free slots has been defined when -// initially creating the packetQueue itself. This method, combined with AddPkt -// creates the following abstraction: a synchronized queue of infinite length -// which can be added to at will, which flows onto a commitment of fixed -// capacity. -func (p *packetQueue) SignalFreeSlot() { - // We'll only send over a free slot signal if the queue *is not* empty. - // Otherwise, it's possible that we attempt to overfill the free slots - // semaphore and block indefinitely below. - if atomic.LoadInt32(&p.queueLen) == 0 { - return - } - - select { - case p.freeSlots <- struct{}{}: - case <-p.quit: - return - } -} - -// Length returns the number of pending htlc packets present within the over -// flow queue. -func (p *packetQueue) Length() int32 { - return atomic.LoadInt32(&p.queueLen) -} - -// TotalHtlcAmount is the total amount (in mSAT) of all HTLC's currently -// residing within the overflow queue. -func (p *packetQueue) TotalHtlcAmount() lnwire.MilliSatoshi { - // TODO(roasbeef): also factor in fee rate? - return lnwire.MilliSatoshi(atomic.LoadInt64(&p.totalHtlcAmt)) -} diff --git a/htlcswitch/queue_test.go b/htlcswitch/queue_test.go deleted file mode 100644 index 92d31164..00000000 --- a/htlcswitch/queue_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package htlcswitch - -import ( - "reflect" - "testing" - "time" - - "github.com/lightningnetwork/lnd/lnwire" -) - -// TestWaitingQueueThreadSafety test the thread safety properties of the -// waiting queue, by executing methods in separate goroutines which operates -// with the same data. -func TestWaitingQueueThreadSafety(t *testing.T) { - t.Parallel() - - const numPkts = 1000 - - q := newPacketQueue(numPkts) - q.Start() - defer q.Stop() - - a := make([]uint64, numPkts) - for i := 0; i < numPkts; i++ { - a[i] = uint64(i) - q.AddPkt(&htlcPacket{ - incomingHTLCID: a[i], - htlc: &lnwire.UpdateAddHTLC{}, - }) - } - - // The reported length of the queue should be the exact number of - // packets we added above. - queueLength := q.Length() - if queueLength != numPkts { - t.Fatalf("queue has wrong length: expected %v, got %v", numPkts, - queueLength) - } - - var b []uint64 - for i := 0; i < numPkts; i++ { - q.SignalFreeSlot() - - select { - case packet := <-q.outgoingPkts: - b = append(b, packet.incomingHTLCID) - - case <-time.After(2 * time.Second): - t.Fatal("timeout") - } - } - - // The length of the queue should be zero at this point. - time.Sleep(time.Millisecond * 50) - queueLength = q.Length() - if queueLength != 0 { - t.Fatalf("queue has wrong length: expected %v, got %v", 0, - queueLength) - } - - if !reflect.DeepEqual(b, a) { - t.Fatal("wrong order of the objects") - } -} diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 11edadf1..aef7dbdf 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -10344,9 +10344,7 @@ func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) { closeChannelAndAssert(ctxt, t, net, net.Alice, aliceBobCh, false) } -// testAsyncPayments tests the performance of the async payments, and also -// checks that balances of both sides can't be become negative under stress -// payment strikes. +// testAsyncPayments tests the performance of the async payments. func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() @@ -10372,18 +10370,16 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to get alice channel info: %v", err) } - // Calculate the number of invoices. We will deplete the channel - // all the way down to the channel reserve. - chanReserve := channelCapacity / 100 - availableBalance := btcutil.Amount(info.LocalBalance) - chanReserve - numInvoices := int(availableBalance / paymentAmt) + // We'll create a number of invoices equal the max number of HTLCs that + // can be carried in one direction. The number on the commitment will + // likely be lower, but we can't guarantee that any more HTLCs will + // succeed due to the limited path diversity and inability of the router + // to retry via another path. + numInvoices := int(input.MaxHTLCNumber / 2) bobAmt := int64(numInvoices * paymentAmt) aliceAmt := info.LocalBalance - bobAmt - // Send one more payment in order to cause insufficient capacity error. - numInvoices++ - // With the channel open, we'll create invoices for Bob that Alice // will pay to in order to advance the state of the channel. bobPayReqs, _, _, err := createPayReqs( @@ -10424,28 +10420,13 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) { } } - // We should receive one insufficient capacity error, because we sent - // one more payment than we can actually handle with the current - // channel capacity. - errorReceived := false + // Wait until all the payments have settled. for i := 0; i < numInvoices; i++ { - if resp, err := alicePayStream.Recv(); err != nil { + if _, err := alicePayStream.Recv(); err != nil { t.Fatalf("payment stream have been closed: %v", err) - } else if resp.PaymentError != "" { - if errorReceived { - t.Fatalf("redundant payment error: %v", - resp.PaymentError) - } - - errorReceived = true - continue } } - if !errorReceived { - t.Fatalf("insufficient capacity error haven't been received") - } - // All payments have been sent, mark the finish time. timeTaken := time.Since(now) @@ -10535,8 +10516,12 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) t.Fatalf("unable to get alice channel info: %v", err) } - // Calculate the number of invoices. - numInvoices := int(info.LocalBalance / paymentAmt) + // We'll create a number of invoices equal the max number of HTLCs that + // can be carried in one direction. The number on the commitment will + // likely be lower, but we can't guarantee that any more HTLCs will + // succeed due to the limited path diversity and inability of the router + // to retry via another path. + numInvoices := int(input.MaxHTLCNumber / 2) // Nodes should exchange the same amount of money and because of this // at the end balances should remain the same.