htlcswitch/link: remove overflow queue

This commit removes the overflowQueue from the link. We do so in order
to promote better UX for senders, so that HTLCs are failed faster when
the commitment is full. This gives the sender the opportunity to try
another, more open path, rather than perceive the HTLC as being stuck.

At the same time, we remove the total number of active goroutines in lnd
by a factor of N where N is the number of active channels.
This commit is contained in:
Conner Fromknecht 2020-04-07 11:56:47 -07:00
parent 68d1753dcb
commit f50649d21b
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
5 changed files with 199 additions and 813 deletions

@ -19,7 +19,6 @@ import (
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntypes"
@ -327,10 +326,6 @@ type channelLink struct {
// which may affect behaviour of the service.
cfg ChannelLinkConfig
// overflowQueue is used to store the htlc add updates which haven't
// been processed because of the commitment transaction overflow.
overflowQueue *packetQueue
// mailBox is the main interface between the outside world and the
// link. All incoming messages will be sent over this mailBox. Messages
// include new updates from our connected peer, and new packets to be
@ -395,12 +390,11 @@ func NewChannelLink(cfg ChannelLinkConfig,
channel: channel,
shortChanID: channel.ShortChanID(),
// TODO(roasbeef): just do reserve here?
overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2),
htlcUpdates: make(chan *contractcourt.ContractUpdate),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
quit: make(chan struct{}),
htlcUpdates: make(chan *contractcourt.ContractUpdate),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
quit: make(chan struct{}),
}
}
@ -436,7 +430,6 @@ func (l *channelLink) Start() error {
}
l.mailBox.ResetMessages()
l.overflowQueue.Start()
l.hodlQueue.Start()
// Before launching the htlcManager messages, revert any circuits that
@ -511,7 +504,6 @@ func (l *channelLink) Stop() {
}
l.updateFeeTimer.Stop()
l.overflowQueue.Stop()
l.hodlQueue.Stop()
close(l.quit)
@ -1100,37 +1092,10 @@ out:
break out
}
// A packet that previously overflowed the commitment
// transaction is now eligible for processing once again. So
// we'll attempt to re-process the packet in order to allow it
// to continue propagating within the network.
case packet := <-l.overflowQueue.outgoingPkts:
msg := packet.htlc.(*lnwire.UpdateAddHTLC)
l.log.Tracef("reprocessing downstream add update "+
"with payment hash(%x)", msg.PaymentHash[:])
l.handleDownstreamPkt(packet)
// A message from the switch was just received. This indicates
// that the link is an intermediate hop in a multi-hop HTLC
// circuit.
case pkt := <-l.downstream:
// If we have non empty processing queue then we'll add
// this to the overflow rather than processing it
// directly. Once an active HTLC is either settled or
// failed, then we'll free up a new slot.
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if ok && l.overflowQueue.Length() != 0 {
l.log.Infof("downstream htlc add update with "+
"payment hash(%x) have been added to "+
"reprocessing queue, pend_updates=%v",
htlc.PaymentHash[:],
l.channel.PendingLocalUpdateCount())
l.overflowQueue.AddPkt(pkt)
continue
}
l.handleDownstreamPkt(pkt)
// A message from the connected peer was just received. This
@ -1301,109 +1266,89 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
openCircuitRef := pkt.inKey()
index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
if err != nil {
switch err {
// The HTLC was unable to be added to the state machine,
// as a result, we'll signal the switch to cancel the
// pending payment.
l.log.Warnf("Unable to handle downstream add HTLC: %v",
err)
// The channels spare bandwidth is fully allocated, so
// we'll put this HTLC into the overflow queue.
case lnwallet.ErrMaxHTLCNumber:
l.log.Infof("downstream htlc add update with "+
"payment hash(%x) have been added to "+
"reprocessing queue, pend_updates: %v",
htlc.PaymentHash[:],
l.channel.PendingLocalUpdateCount())
var (
localFailure = false
reason lnwire.OpaqueReason
)
l.overflowQueue.AddPkt(pkt)
return
// Create a temporary channel failure which we will send
// back to our peer if this is a forward, or report to
// the user if the failed payment was locally initiated.
failure := l.createFailureWithUpdate(
func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
return lnwire.NewTemporaryChannelFailure(
upd,
)
},
)
// The HTLC was unable to be added to the state
// machine, as a result, we'll signal the switch to
// cancel the pending payment.
default:
l.log.Warnf("unable to handle downstream add "+
"HTLC: %v", err)
var (
localFailure = false
reason lnwire.OpaqueReason
)
// Create a temporary channel failure which we
// will send back to our peer if this is a
// forward, or report to the user if the failed
// payment was locally initiated.
failure := l.createFailureWithUpdate(
func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
return lnwire.NewTemporaryChannelFailure(
upd,
)
},
)
// If the payment was locally initiated (which
// is indicated by a nil obfuscator), we do
// not need to encrypt it back to the sender.
if pkt.obfuscator == nil {
var b bytes.Buffer
err := lnwire.EncodeFailure(&b, failure, 0)
if err != nil {
l.log.Errorf("unable to "+
"encode failure: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
reason = lnwire.OpaqueReason(b.Bytes())
localFailure = true
} else {
// If the packet is part of a forward,
// (identified by a non-nil obfuscator)
// we need to encrypt the error back to
// the source.
var err error
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
if err != nil {
l.log.Errorf("unable to "+
"obfuscate error: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
// If the payment was locally initiated (which is
// indicated by a nil obfuscator), we do not need to
// encrypt it back to the sender.
if pkt.obfuscator == nil {
var b bytes.Buffer
err := lnwire.EncodeFailure(&b, failure, 0)
if err != nil {
l.log.Errorf("unable to encode "+
"failure: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
// Create a link error containing the temporary
// channel failure and a detail which indicates
// the we failed to add the htlc.
linkError := NewDetailedLinkError(
failure,
OutgoingFailureDownstreamHtlcAdd,
)
failPkt := &htlcPacket{
incomingChanID: pkt.incomingChanID,
incomingHTLCID: pkt.incomingHTLCID,
circuit: pkt.circuit,
sourceRef: pkt.sourceRef,
hasSource: true,
localFailure: localFailure,
linkFailure: linkError,
htlc: &lnwire.UpdateFailHTLC{
Reason: reason,
},
reason = lnwire.OpaqueReason(b.Bytes())
localFailure = true
} else {
// If the packet is part of a forward,
// (identified by a non-nil obfuscator) we need
// to encrypt the error back to the source.
var err error
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
if err != nil {
l.log.Errorf("unable to "+
"obfuscate error: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
go l.forwardBatch(failPkt)
// Remove this packet from the link's mailbox,
// this prevents it from being reprocessed if
// the link restarts and resets it mailbox. If
// this response doesn't make it back to the
// originating link, it will be rejected upon
// attempting to reforward the Add to the
// switch, since the circuit was never fully
// opened, and the forwarding package shows it
// as unacknowledged.
l.mailBox.AckPacket(pkt.inKey())
return
}
// Create a link error containing the temporary channel
// failure and a detail which indicates the we failed to
// add the htlc.
linkError := NewDetailedLinkError(
failure, OutgoingFailureDownstreamHtlcAdd,
)
failPkt := &htlcPacket{
incomingChanID: pkt.incomingChanID,
incomingHTLCID: pkt.incomingHTLCID,
circuit: pkt.circuit,
sourceRef: pkt.sourceRef,
hasSource: true,
localFailure: localFailure,
linkFailure: linkError,
htlc: &lnwire.UpdateFailHTLC{
Reason: reason,
},
}
go l.forwardBatch(failPkt)
// Remove this packet from the link's mailbox, this
// prevents it from being reprocessed if the link
// restarts and resets it mailbox. If this response
// doesn't make it back to the originating link, it will
// be rejected upon attempting to reforward the Add to
// the switch, since the circuit was never fully opened,
// and the forwarding package shows it as
// unacknowledged.
l.mailBox.AckPacket(pkt.inKey())
return
}
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
@ -2179,18 +2124,7 @@ func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
// Get the balance available on the channel for new HTLCs. This takes
// the channel reserve into account so HTLCs up to this value won't
// violate it.
channelBandwidth := l.channel.AvailableBalance()
// To compute the total bandwidth, we'll take the current available
// bandwidth, then subtract the overflow bandwidth as we'll eventually
// also need to evaluate those HTLC's once space on the commitment
// transaction is free.
overflowBandwidth := l.overflowQueue.TotalHtlcAmount()
if channelBandwidth < overflowBandwidth {
return 0
}
return channelBandwidth - overflowBandwidth
return l.channel.AvailableBalance()
}
// AttachMailBox updates the current mailbox used by this link, and hooks up
@ -2513,7 +2447,6 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
switchPackets = append(switchPackets, settlePacket)
l.overflowQueue.SignalFreeSlot()
// A failureCode message for a previously forwarded HTLC has
// been received. As a result a new slot will be freed up in
@ -2559,7 +2492,6 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
switchPackets = append(switchPackets, failPacket)
l.overflowQueue.SignalFreeSlot()
}
}

@ -7,7 +7,6 @@ import (
"encoding/binary"
"fmt"
"io"
"math"
"net"
"reflect"
"runtime"
@ -263,141 +262,6 @@ func TestChannelLinkSingleHopPayment(t *testing.T) {
}
}
// TestChannelLinkBidirectionalOneHopPayments tests the ability of channel
// link to cope with bigger number of payment updates that commitment
// transaction may consist.
func TestChannelLinkBidirectionalOneHopPayments(t *testing.T) {
t.Parallel()
channels, cleanUp, _, err := createClusterChannels(
btcutil.SatoshiPerBitcoin*3,
btcutil.SatoshiPerBitcoin*5)
if err != nil {
t.Fatalf("unable to create channel: %v", err)
}
defer cleanUp()
n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice,
channels.bobToCarol, channels.carolToBob, testStartingHeight)
if err := n.start(); err != nil {
t.Fatal(err)
}
defer n.stop()
bobBandwidthBefore := n.firstBobChannelLink.Bandwidth()
aliceBandwidthBefore := n.aliceChannelLink.Bandwidth()
debug := false
if debug {
// Log message that alice receives.
n.aliceServer.intersect(createLogFunc("alice",
n.aliceChannelLink.ChanID()))
// Log message that bob receives.
n.bobServer.intersect(createLogFunc("bob",
n.firstBobChannelLink.ChanID()))
}
amt := lnwire.NewMSatFromSatoshis(20000)
htlcAmt, totalTimelock, hopsForwards := generateHops(amt,
testStartingHeight, n.firstBobChannelLink)
_, _, hopsBackwards := generateHops(amt,
testStartingHeight, n.aliceChannelLink)
type result struct {
err error
start time.Time
number int
sender string
}
// Send max available payment number in both sides, thereby testing
// the property of channel link to cope with overflowing.
count := 2 * input.MaxHTLCNumber
resultChan := make(chan *result, count)
for i := 0; i < count/2; i++ {
go func(i int) {
r := &result{
start: time.Now(),
number: i,
sender: "alice",
}
firstHop := n.firstBobChannelLink.ShortChanID()
_, r.err = makePayment(
n.aliceServer, n.bobServer, firstHop,
hopsForwards, amt, htlcAmt, totalTimelock,
).Wait(5 * time.Minute)
resultChan <- r
}(i)
}
for i := 0; i < count/2; i++ {
go func(i int) {
r := &result{
start: time.Now(),
number: i,
sender: "bob",
}
firstHop := n.aliceChannelLink.ShortChanID()
_, r.err = makePayment(
n.bobServer, n.aliceServer, firstHop,
hopsBackwards, amt, htlcAmt, totalTimelock,
).Wait(5 * time.Minute)
resultChan <- r
}(i)
}
maxDelay := time.Duration(0)
minDelay := time.Duration(math.MaxInt64)
averageDelay := time.Duration(0)
// Check that alice invoice was settled and bandwidth of HTLC
// links was changed.
for i := 0; i < count; i++ {
select {
case r := <-resultChan:
if r.err != nil {
t.Fatalf("unable to make payment: %v", r.err)
}
delay := time.Since(r.start)
if delay > maxDelay {
maxDelay = delay
}
if delay < minDelay {
minDelay = delay
}
averageDelay += delay
case <-time.After(5 * time.Minute):
t.Fatalf("timeout: (%v/%v)", i+1, count)
}
}
// TODO(roasbeef): should instead consume async notifications from both
// links
time.Sleep(time.Second * 2)
// At the end Bob and Alice balances should be the same as previous,
// because they sent the equal amount of money to each other.
if aliceBandwidthBefore != n.aliceChannelLink.Bandwidth() {
t.Fatalf("alice bandwidth shouldn't have changed: expected %v, got %x",
aliceBandwidthBefore, n.aliceChannelLink.Bandwidth())
}
if bobBandwidthBefore != n.firstBobChannelLink.Bandwidth() {
t.Fatalf("bob bandwidth shouldn't have changed: expected %v, got %v",
bobBandwidthBefore, n.firstBobChannelLink.Bandwidth())
}
t.Logf("Max waiting: %v", maxDelay)
t.Logf("Min waiting: %v", minDelay)
t.Logf("Average waiting: %v", time.Duration(int(averageDelay)/count))
}
// TestChannelLinkMultiHopPayment checks the ability to send payment over two
// hops. In this test we send the payment from Carol to Alice over Bob peer.
// (Carol -> Bob -> Alice) and checking that HTLC was settled properly and
@ -536,6 +400,105 @@ func testChannelLinkMultiHopPayment(t *testing.T,
}
}
// TestChannelLinkCancelFullCommitment tests the ability for links to cancel
// forwarded HTLCs once all of their commitment slots are full.
func TestChannelLinkCancelFullCommitment(t *testing.T) {
t.Parallel()
channels, cleanUp, _, err := createClusterChannels(
btcutil.SatoshiPerBitcoin*3,
btcutil.SatoshiPerBitcoin*5)
if err != nil {
t.Fatalf("unable to create channel: %v", err)
}
defer cleanUp()
n := newTwoHopNetwork(
t, channels.aliceToBob, channels.bobToAlice, testStartingHeight,
)
if err := n.start(); err != nil {
t.Fatal(err)
}
defer n.stop()
// Fill up the commitment from Alice's side with 20 sat payments.
count := (input.MaxHTLCNumber / 2)
amt := lnwire.NewMSatFromSatoshis(20000)
htlcAmt, totalTimelock, hopsForwards := generateHops(amt,
testStartingHeight, n.bobChannelLink)
firstHop := n.aliceChannelLink.ShortChanID()
// Create channels to buffer the preimage and error channels used in
// making the preliminary payments.
preimages := make([]lntypes.Preimage, count)
aliceErrChan := make(chan chan error, count)
var wg sync.WaitGroup
for i := 0; i < count; i++ {
preimages[i] = lntypes.Preimage{byte(i >> 8), byte(i)}
wg.Add(1)
go func(i int) {
defer wg.Done()
errChan := n.makeHoldPayment(
n.aliceServer, n.bobServer, firstHop,
hopsForwards, amt, htlcAmt, totalTimelock,
preimages[i],
)
aliceErrChan <- errChan
}(i)
}
// Wait for Alice to finish filling her commitment.
wg.Wait()
close(aliceErrChan)
// Now make an additional payment from Alice to Bob, this should be
// canceled because the commitment in this direction is full.
err = <-makePayment(
n.aliceServer, n.bobServer, firstHop, hopsForwards, amt,
htlcAmt, totalTimelock,
).err
if err == nil {
t.Fatalf("overflow payment should have failed")
}
lerr, ok := err.(*LinkError)
if !ok {
t.Fatalf("expected LinkError, got: %T", err)
}
msg := lerr.WireMessage()
if _, ok := msg.(*lnwire.FailTemporaryChannelFailure); !ok {
t.Fatalf("expected TemporaryChannelFailure, got: %T", msg)
}
// Now, settle all htlcs held by bob and clear the commitment of htlcs.
for _, preimage := range preimages {
preimage := preimage
// It's possible that the HTLCs have not been delivered to the
// invoice registry at this point, so we poll until we are able
// to settle.
err = wait.NoError(func() error {
return n.bobServer.registry.SettleHodlInvoice(preimage)
}, time.Minute)
if err != nil {
t.Fatal(err)
}
}
// Ensure that all of the payments sent by alice eventually succeed.
for errChan := range aliceErrChan {
err := <-errChan
if err != nil {
t.Fatalf("alice payment failed: %v", err)
}
}
}
// TestExitNodeTimelockPayloadMismatch tests that when an exit node receives an
// incoming HTLC, if the time lock encoded in the payload of the forwarded HTLC
// doesn't match the expected payment value, then the HTLC will be rejected
@ -2369,227 +2332,6 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth)
}
// TestChannelLinkBandwidthConsistencyOverflow tests that in the case of a
// commitment overflow (no more space for new HTLC's), the bandwidth is updated
// properly as items are being added and removed from the overflow queue.
func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
t.Parallel()
var mockBlob [lnwire.OnionPacketSize]byte
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, bobChannel, batchTick, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
var (
coreLink = aliceLink.(*channelLink)
defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee
aliceStartingBandwidth = aliceLink.Bandwidth()
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
)
estimator := chainfee.NewStaticEstimator(6000, 0)
feePerKw, err := estimator.EstimateFeePerKW(1)
if err != nil {
t.Fatalf("unable to query fee estimator: %v", err)
}
var htlcID uint64
addLinkHTLC := func(id uint64, amt lnwire.MilliSatoshi) [32]byte {
invoice, htlc, _, err := generatePayment(
amt, amt, 5, mockBlob,
)
if err != nil {
t.Fatalf("unable to create payment: %v", err)
}
addPkt := &htlcPacket{
htlc: htlc,
incomingHTLCID: id,
amount: amt,
obfuscator: NewMockObfuscator(),
}
circuit := makePaymentCircuit(&htlc.PaymentHash, addPkt)
_, err = coreLink.cfg.Switch.commitCircuits(&circuit)
if err != nil {
t.Fatalf("unable to commit circuit: %v", err)
}
addPkt.circuit = &circuit
aliceLink.HandleSwitchPacket(addPkt)
return invoice.Terms.PaymentPreimage
}
// We'll first start by adding enough HTLC's to overflow the commitment
// transaction, checking the reported link bandwidth for proper
// consistency along the way
htlcAmt := lnwire.NewMSatFromSatoshis(100000)
totalHtlcAmt := lnwire.MilliSatoshi(0)
const numHTLCs = input.MaxHTLCNumber / 2
var preImages [][32]byte
for i := 0; i < numHTLCs; i++ {
preImage := addLinkHTLC(htlcID, htlcAmt)
preImages = append(preImages, preImage)
totalHtlcAmt += htlcAmt
htlcID++
}
// The HTLCs should all be sent to the remote.
var msg lnwire.Message
for i := 0; i < numHTLCs; i++ {
select {
case msg = <-aliceMsgs:
case <-time.After(15 * time.Second):
t.Fatalf("did not receive message %d", i)
}
addHtlc, ok := msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
_, err := bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
}
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
// TODO(roasbeef): increase sleep
time.Sleep(time.Second * 1)
commitWeight := int64(input.CommitWeight + input.HTLCWeight*numHTLCs)
htlcFee := lnwire.NewMSatFromSatoshis(
feePerKw.FeeForWeight(commitWeight),
)
expectedBandwidth := aliceStartingBandwidth - totalHtlcAmt - htlcFee
expectedBandwidth += lnwire.NewMSatFromSatoshis(defaultCommitFee)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// The overflow queue should be empty at this point, as the commitment
// transaction should be full, but not yet overflown.
if coreLink.overflowQueue.Length() != 0 {
t.Fatalf("wrong overflow queue length: expected %v, got %v", 0,
coreLink.overflowQueue.Length())
}
// At this point, the commitment transaction should now be fully
// saturated. We'll continue adding HTLC's, and asserting that the
// bandwidth accounting is done properly.
const numOverFlowHTLCs = 20
for i := 0; i < numOverFlowHTLCs; i++ {
preImage := addLinkHTLC(htlcID, htlcAmt)
preImages = append(preImages, preImage)
totalHtlcAmt += htlcAmt
htlcID++
}
// No messages should be sent to the remote at this point.
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
time.Sleep(time.Second * 2)
expectedBandwidth -= (numOverFlowHTLCs * htlcAmt)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// With the extra HTLC's added, the overflow queue should now be
// populated with our 20 additional HTLC's.
if coreLink.overflowQueue.Length() != numOverFlowHTLCs {
t.Fatalf("wrong overflow queue length: expected %v, got %v",
numOverFlowHTLCs,
coreLink.overflowQueue.Length())
}
// We trigger a state update to lock in the HTLCs. This should
// not change Alice's bandwidth.
if err := updateState(batchTick, coreLink, bobChannel, true); err != nil {
t.Fatalf("unable to update state: %v", err)
}
time.Sleep(time.Millisecond * 500)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// At this point, we'll now settle enough HTLCs to empty the overflow
// queue. The resulting bandwidth change should be non-existent as this
// will simply transfer over funds to the remote party. However, the
// size of the overflow queue should be decreasing
for i := 0; i < numOverFlowHTLCs; i++ {
err = bobChannel.SettleHTLC(preImages[i], uint64(i), nil, nil, nil)
if err != nil {
t.Fatalf("unable to settle htlc: %v", err)
}
htlcSettle := &lnwire.UpdateFulfillHTLC{
ID: uint64(i),
PaymentPreimage: preImages[i],
}
aliceLink.HandleChannelUpdate(htlcSettle)
time.Sleep(time.Millisecond * 50)
}
time.Sleep(time.Millisecond * 500)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// We trigger a state update to lock in the Settles.
if err := updateState(batchTick, coreLink, bobChannel, false); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// After the state update is done, Alice should start sending
// HTLCs from the overflow queue.
for i := 0; i < numOverFlowHTLCs; i++ {
var msg lnwire.Message
select {
case msg = <-aliceMsgs:
case <-time.After(15 * time.Second):
t.Fatalf("did not receive message")
}
addHtlc, ok := msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
_, err := bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
}
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// Finally, at this point, the queue itself should be fully empty. As
// enough slots have been drained from the commitment transaction to
// allocate the queue items to.
time.Sleep(time.Millisecond * 500)
if coreLink.overflowQueue.Length() != 0 {
t.Fatalf("wrong overflow queue length: expected %v, got %v", 0,
coreLink.overflowQueue.Length())
}
}
// genAddsAndCircuits creates `numHtlcs` sequential ADD packets and there
// corresponding circuits. The provided `htlc` is used in all test packets.
func genAddsAndCircuits(numHtlcs int, htlc *lnwire.UpdateAddHTLC) (
@ -5699,9 +5441,8 @@ func TestCheckHtlcForward(t *testing.T) {
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
HtlcNotifier: &mockHTLCNotifier{},
},
log: log,
channel: testChannel.channel,
overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2),
log: log,
channel: testChannel.channel,
}
var hash [32]byte

@ -1,208 +0,0 @@
package htlcswitch
import (
"sync"
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnwire"
)
// packetQueue is a goroutine-safe queue of htlc packets which over flow the
// current commitment transaction. An HTLC will overflow the current commitment
// transaction if one attempts to add a new HTLC to the state machine which
// already has the max number of pending HTLC's present on the commitment
// transaction. Packets are removed from the queue by the channelLink itself
// as additional slots become available on the commitment transaction itself.
// In order to synchronize properly we use a semaphore to allow the channelLink
// to signal the number of slots available, and a condition variable to allow
// the packetQueue to know when new items have been added to the queue.
type packetQueue struct {
// totalHtlcAmt is the sum of the value of all pending HTLC's currently
// residing within the overflow queue. This value should only read or
// modified *atomically*.
totalHtlcAmt int64 // To be used atomically.
// queueLen is an internal counter that reflects the size of the queue
// at any given instance. This value is intended to be use atomically
// as this value is used by internal methods to obtain the length of
// the queue w/o grabbing the main lock. This allows callers to avoid a
// deadlock situation where the main goroutine is attempting a send
// with the lock held.
queueLen int32 // To be used atomically.
streamShutdown int32 // To be used atomically.
queue []*htlcPacket
wg sync.WaitGroup
// freeSlots serves as a semaphore who's current value signals the
// number of available slots on the commitment transaction.
freeSlots chan struct{}
queueCond *sync.Cond
queueMtx sync.Mutex
// outgoingPkts is a channel that the channelLink will receive on in
// order to drain the packetQueue as new slots become available on the
// commitment transaction.
outgoingPkts chan *htlcPacket
quit chan struct{}
}
// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots
// value should reflect the max number of HTLC's that we're allowed to have
// outstanding within the commitment transaction.
func newPacketQueue(maxFreeSlots int) *packetQueue {
p := &packetQueue{
outgoingPkts: make(chan *htlcPacket),
freeSlots: make(chan struct{}, maxFreeSlots),
quit: make(chan struct{}),
}
p.queueCond = sync.NewCond(&p.queueMtx)
return p
}
// Start starts all goroutines that packetQueue needs to perform its normal
// duties.
func (p *packetQueue) Start() {
p.wg.Add(1)
go p.packetCoordinator()
}
// Stop signals the packetQueue for a graceful shutdown, and waits for all
// goroutines to exit.
func (p *packetQueue) Stop() {
close(p.quit)
// Now that we've closed the channel, we'll repeatedly signal the msg
// consumer until we've detected that it has exited.
for atomic.LoadInt32(&p.streamShutdown) == 0 {
p.queueCond.Signal()
time.Sleep(time.Millisecond * 100)
}
}
// packetCoordinator is a goroutine that handles the packet overflow queue.
// Using a synchronized queue, outside callers are able to append to the end of
// the queue, waking up the coordinator when the queue transitions from empty
// to non-empty. The packetCoordinator will then aggressively try to empty out
// the queue, passing new htlcPackets to the channelLink as slots within the
// commitment transaction become available.
//
// Future iterations of the packetCoordinator will implement congestion
// avoidance logic in the face of persistent htlcPacket back-pressure.
//
// TODO(roasbeef): later will need to add back pressure handling heuristics
// like reg congestion avoidance:
// * random dropping, RED, etc
func (p *packetQueue) packetCoordinator() {
defer atomic.StoreInt32(&p.streamShutdown, 1)
for {
// First, we'll check our condition. If the queue of packets is
// empty, then we'll wait until a new item is added.
p.queueCond.L.Lock()
for len(p.queue) == 0 {
p.queueCond.Wait()
// If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the message queue for any
// new items.
select {
case <-p.quit:
p.queueCond.L.Unlock()
return
default:
}
}
nextPkt := p.queue[0]
p.queueCond.L.Unlock()
// If there aren't any further messages to sent (or the link
// didn't immediately read our message), then we'll block and
// wait for a new message to be sent into the overflow queue,
// or for the link's htlcForwarder to wake up.
select {
case <-p.freeSlots:
select {
case p.outgoingPkts <- nextPkt:
// Pop the item off the front of the queue and
// slide down the reference one to re-position
// the head pointer. This will set us up for
// the next iteration. If the queue is empty
// at this point, then we'll block at the top.
p.queueCond.L.Lock()
p.queue[0] = nil
p.queue = p.queue[1:]
atomic.AddInt32(&p.queueLen, -1)
atomic.AddInt64(&p.totalHtlcAmt, int64(-nextPkt.amount))
p.queueCond.L.Unlock()
case <-p.quit:
return
}
case <-p.quit:
return
default:
}
}
}
// AddPkt adds the referenced packet to the overflow queue, preserving ordering
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
// First, we'll lock the condition, and add the message to the end of
// the message queue, and increment the internal atomic for tracking
// the queue's length.
p.queueCond.L.Lock()
p.queue = append(p.queue, pkt)
atomic.AddInt32(&p.queueLen, 1)
atomic.AddInt64(&p.totalHtlcAmt, int64(pkt.amount))
p.queueCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are
// additional messages to consume.
p.queueCond.Signal()
}
// SignalFreeSlot signals to the queue that a new slot has opened up within the
// commitment transaction. The max amount of free slots has been defined when
// initially creating the packetQueue itself. This method, combined with AddPkt
// creates the following abstraction: a synchronized queue of infinite length
// which can be added to at will, which flows onto a commitment of fixed
// capacity.
func (p *packetQueue) SignalFreeSlot() {
// We'll only send over a free slot signal if the queue *is not* empty.
// Otherwise, it's possible that we attempt to overfill the free slots
// semaphore and block indefinitely below.
if atomic.LoadInt32(&p.queueLen) == 0 {
return
}
select {
case p.freeSlots <- struct{}{}:
case <-p.quit:
return
}
}
// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int32 {
return atomic.LoadInt32(&p.queueLen)
}
// TotalHtlcAmount is the total amount (in mSAT) of all HTLC's currently
// residing within the overflow queue.
func (p *packetQueue) TotalHtlcAmount() lnwire.MilliSatoshi {
// TODO(roasbeef): also factor in fee rate?
return lnwire.MilliSatoshi(atomic.LoadInt64(&p.totalHtlcAmt))
}

@ -1,64 +0,0 @@
package htlcswitch
import (
"reflect"
"testing"
"time"
"github.com/lightningnetwork/lnd/lnwire"
)
// TestWaitingQueueThreadSafety test the thread safety properties of the
// waiting queue, by executing methods in separate goroutines which operates
// with the same data.
func TestWaitingQueueThreadSafety(t *testing.T) {
t.Parallel()
const numPkts = 1000
q := newPacketQueue(numPkts)
q.Start()
defer q.Stop()
a := make([]uint64, numPkts)
for i := 0; i < numPkts; i++ {
a[i] = uint64(i)
q.AddPkt(&htlcPacket{
incomingHTLCID: a[i],
htlc: &lnwire.UpdateAddHTLC{},
})
}
// The reported length of the queue should be the exact number of
// packets we added above.
queueLength := q.Length()
if queueLength != numPkts {
t.Fatalf("queue has wrong length: expected %v, got %v", numPkts,
queueLength)
}
var b []uint64
for i := 0; i < numPkts; i++ {
q.SignalFreeSlot()
select {
case packet := <-q.outgoingPkts:
b = append(b, packet.incomingHTLCID)
case <-time.After(2 * time.Second):
t.Fatal("timeout")
}
}
// The length of the queue should be zero at this point.
time.Sleep(time.Millisecond * 50)
queueLength = q.Length()
if queueLength != 0 {
t.Fatalf("queue has wrong length: expected %v, got %v", 0,
queueLength)
}
if !reflect.DeepEqual(b, a) {
t.Fatal("wrong order of the objects")
}
}

@ -10344,9 +10344,7 @@ func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) {
closeChannelAndAssert(ctxt, t, net, net.Alice, aliceBobCh, false)
}
// testAsyncPayments tests the performance of the async payments, and also
// checks that balances of both sides can't be become negative under stress
// payment strikes.
// testAsyncPayments tests the performance of the async payments.
func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
@ -10372,18 +10370,16 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to get alice channel info: %v", err)
}
// Calculate the number of invoices. We will deplete the channel
// all the way down to the channel reserve.
chanReserve := channelCapacity / 100
availableBalance := btcutil.Amount(info.LocalBalance) - chanReserve
numInvoices := int(availableBalance / paymentAmt)
// We'll create a number of invoices equal the max number of HTLCs that
// can be carried in one direction. The number on the commitment will
// likely be lower, but we can't guarantee that any more HTLCs will
// succeed due to the limited path diversity and inability of the router
// to retry via another path.
numInvoices := int(input.MaxHTLCNumber / 2)
bobAmt := int64(numInvoices * paymentAmt)
aliceAmt := info.LocalBalance - bobAmt
// Send one more payment in order to cause insufficient capacity error.
numInvoices++
// With the channel open, we'll create invoices for Bob that Alice
// will pay to in order to advance the state of the channel.
bobPayReqs, _, _, err := createPayReqs(
@ -10424,28 +10420,13 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
}
}
// We should receive one insufficient capacity error, because we sent
// one more payment than we can actually handle with the current
// channel capacity.
errorReceived := false
// Wait until all the payments have settled.
for i := 0; i < numInvoices; i++ {
if resp, err := alicePayStream.Recv(); err != nil {
if _, err := alicePayStream.Recv(); err != nil {
t.Fatalf("payment stream have been closed: %v", err)
} else if resp.PaymentError != "" {
if errorReceived {
t.Fatalf("redundant payment error: %v",
resp.PaymentError)
}
errorReceived = true
continue
}
}
if !errorReceived {
t.Fatalf("insufficient capacity error haven't been received")
}
// All payments have been sent, mark the finish time.
timeTaken := time.Since(now)
@ -10535,8 +10516,12 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest)
t.Fatalf("unable to get alice channel info: %v", err)
}
// Calculate the number of invoices.
numInvoices := int(info.LocalBalance / paymentAmt)
// We'll create a number of invoices equal the max number of HTLCs that
// can be carried in one direction. The number on the commitment will
// likely be lower, but we can't guarantee that any more HTLCs will
// succeed due to the limited path diversity and inability of the router
// to retry via another path.
numInvoices := int(input.MaxHTLCNumber / 2)
// Nodes should exchange the same amount of money and because of this
// at the end balances should remain the same.