Merge pull request #4145 from cfromknecht/remove-overflow-queue

htlcswitch: remove overflow queue
This commit is contained in:
Conner Fromknecht 2020-04-07 14:33:30 -07:00 committed by GitHub
commit b8da7e54fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 251 additions and 856 deletions

View File

@ -203,6 +203,12 @@ type ContractTerm struct {
Features *lnwire.FeatureVector
}
// String returns a human-readable description of the prominent contract terms.
func (c ContractTerm) String() string {
return fmt.Sprintf("amt=%v, expiry=%v, final_cltv_delta=%v", c.Value,
c.Expiry, c.FinalCltvDelta)
}
// Invoice is a payment invoice generated by a payee in order to request
// payment for some good or service. The inclusion of invoices within Lightning
// creates a payment work flow for merchants very similar to that of the

View File

@ -19,7 +19,6 @@ import (
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntypes"
@ -327,10 +326,6 @@ type channelLink struct {
// which may affect behaviour of the service.
cfg ChannelLinkConfig
// overflowQueue is used to store the htlc add updates which haven't
// been processed because of the commitment transaction overflow.
overflowQueue *packetQueue
// mailBox is the main interface between the outside world and the
// link. All incoming messages will be sent over this mailBox. Messages
// include new updates from our connected peer, and new packets to be
@ -395,12 +390,11 @@ func NewChannelLink(cfg ChannelLinkConfig,
channel: channel,
shortChanID: channel.ShortChanID(),
// TODO(roasbeef): just do reserve here?
overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2),
htlcUpdates: make(chan *contractcourt.ContractUpdate),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
quit: make(chan struct{}),
htlcUpdates: make(chan *contractcourt.ContractUpdate),
hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10),
log: build.NewPrefixLog(logPrefix, log),
quit: make(chan struct{}),
}
}
@ -436,7 +430,6 @@ func (l *channelLink) Start() error {
}
l.mailBox.ResetMessages()
l.overflowQueue.Start()
l.hodlQueue.Start()
// Before launching the htlcManager messages, revert any circuits that
@ -511,7 +504,6 @@ func (l *channelLink) Stop() {
}
l.updateFeeTimer.Stop()
l.overflowQueue.Stop()
l.hodlQueue.Stop()
close(l.quit)
@ -1100,38 +1092,11 @@ out:
break out
}
// A packet that previously overflowed the commitment
// transaction is now eligible for processing once again. So
// we'll attempt to re-process the packet in order to allow it
// to continue propagating within the network.
case packet := <-l.overflowQueue.outgoingPkts:
msg := packet.htlc.(*lnwire.UpdateAddHTLC)
l.log.Tracef("reprocessing downstream add update "+
"with payment hash(%x)", msg.PaymentHash[:])
l.handleDownStreamPkt(packet, true)
// A message from the switch was just received. This indicates
// that the link is an intermediate hop in a multi-hop HTLC
// circuit.
case pkt := <-l.downstream:
// If we have non empty processing queue then we'll add
// this to the overflow rather than processing it
// directly. Once an active HTLC is either settled or
// failed, then we'll free up a new slot.
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if ok && l.overflowQueue.Length() != 0 {
l.log.Infof("downstream htlc add update with "+
"payment hash(%x) have been added to "+
"reprocessing queue, pend_updates=%v",
htlc.PaymentHash[:],
l.channel.PendingLocalUpdateCount())
l.overflowQueue.AddPkt(pkt)
continue
}
l.handleDownStreamPkt(pkt, false)
l.handleDownstreamPkt(pkt)
// A message from the connected peer was just received. This
// indicates that we have a new incoming HTLC, either directly
@ -1275,13 +1240,13 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration {
return time.Duration(prand.Int63n(upper-lower) + lower)
}
// handleDownStreamPkt processes an HTLC packet sent from the downstream HTLC
// handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC
// Switch. Possible messages sent by the switch include requests to forward new
// HTLCs, timeout previously cleared HTLCs, and finally to settle currently
// cleared HTLCs with the upstream peer.
//
// TODO(roasbeef): add sync ntfn to ensure switch always has consistent view?
func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) {
var isSettle bool
switch htlc := pkt.htlc.(type) {
case *lnwire.UpdateAddHTLC:
@ -1301,109 +1266,89 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
openCircuitRef := pkt.inKey()
index, err := l.channel.AddHTLC(htlc, &openCircuitRef)
if err != nil {
switch err {
// The HTLC was unable to be added to the state machine,
// as a result, we'll signal the switch to cancel the
// pending payment.
l.log.Warnf("Unable to handle downstream add HTLC: %v",
err)
// The channels spare bandwidth is fully allocated, so
// we'll put this HTLC into the overflow queue.
case lnwallet.ErrMaxHTLCNumber:
l.log.Infof("downstream htlc add update with "+
"payment hash(%x) have been added to "+
"reprocessing queue, pend_updates: %v",
htlc.PaymentHash[:],
l.channel.PendingLocalUpdateCount())
var (
localFailure = false
reason lnwire.OpaqueReason
)
l.overflowQueue.AddPkt(pkt)
return
// Create a temporary channel failure which we will send
// back to our peer if this is a forward, or report to
// the user if the failed payment was locally initiated.
failure := l.createFailureWithUpdate(
func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
return lnwire.NewTemporaryChannelFailure(
upd,
)
},
)
// The HTLC was unable to be added to the state
// machine, as a result, we'll signal the switch to
// cancel the pending payment.
default:
l.log.Warnf("unable to handle downstream add "+
"HTLC: %v", err)
var (
localFailure = false
reason lnwire.OpaqueReason
)
// Create a temporary channel failure which we
// will send back to our peer if this is a
// forward, or report to the user if the failed
// payment was locally initiated.
failure := l.createFailureWithUpdate(
func(upd *lnwire.ChannelUpdate) lnwire.FailureMessage {
return lnwire.NewTemporaryChannelFailure(
upd,
)
},
)
// If the payment was locally initiated (which
// is indicated by a nil obfuscator), we do
// not need to encrypt it back to the sender.
if pkt.obfuscator == nil {
var b bytes.Buffer
err := lnwire.EncodeFailure(&b, failure, 0)
if err != nil {
l.log.Errorf("unable to "+
"encode failure: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
reason = lnwire.OpaqueReason(b.Bytes())
localFailure = true
} else {
// If the packet is part of a forward,
// (identified by a non-nil obfuscator)
// we need to encrypt the error back to
// the source.
var err error
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
if err != nil {
l.log.Errorf("unable to "+
"obfuscate error: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
// If the payment was locally initiated (which is
// indicated by a nil obfuscator), we do not need to
// encrypt it back to the sender.
if pkt.obfuscator == nil {
var b bytes.Buffer
err := lnwire.EncodeFailure(&b, failure, 0)
if err != nil {
l.log.Errorf("unable to encode "+
"failure: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
// Create a link error containing the temporary
// channel failure and a detail which indicates
// the we failed to add the htlc.
linkError := NewDetailedLinkError(
failure,
OutgoingFailureDownstreamHtlcAdd,
)
failPkt := &htlcPacket{
incomingChanID: pkt.incomingChanID,
incomingHTLCID: pkt.incomingHTLCID,
circuit: pkt.circuit,
sourceRef: pkt.sourceRef,
hasSource: true,
localFailure: localFailure,
linkFailure: linkError,
htlc: &lnwire.UpdateFailHTLC{
Reason: reason,
},
reason = lnwire.OpaqueReason(b.Bytes())
localFailure = true
} else {
// If the packet is part of a forward,
// (identified by a non-nil obfuscator) we need
// to encrypt the error back to the source.
var err error
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
if err != nil {
l.log.Errorf("unable to "+
"obfuscate error: %v", err)
l.mailBox.AckPacket(pkt.inKey())
return
}
go l.forwardBatch(failPkt)
// Remove this packet from the link's mailbox,
// this prevents it from being reprocessed if
// the link restarts and resets it mailbox. If
// this response doesn't make it back to the
// originating link, it will be rejected upon
// attempting to reforward the Add to the
// switch, since the circuit was never fully
// opened, and the forwarding package shows it
// as unacknowledged.
l.mailBox.AckPacket(pkt.inKey())
return
}
// Create a link error containing the temporary channel
// failure and a detail which indicates the we failed to
// add the htlc.
linkError := NewDetailedLinkError(
failure, OutgoingFailureDownstreamHtlcAdd,
)
failPkt := &htlcPacket{
incomingChanID: pkt.incomingChanID,
incomingHTLCID: pkt.incomingHTLCID,
circuit: pkt.circuit,
sourceRef: pkt.sourceRef,
hasSource: true,
localFailure: localFailure,
linkFailure: linkError,
htlc: &lnwire.UpdateFailHTLC{
Reason: reason,
},
}
go l.forwardBatch(failPkt)
// Remove this packet from the link's mailbox, this
// prevents it from being reprocessed if the link
// restarts and resets it mailbox. If this response
// doesn't make it back to the originating link, it will
// be rejected upon attempting to reforward the Add to
// the switch, since the circuit was never fully opened,
// and the forwarding package shows it as
// unacknowledged.
l.mailBox.AckPacket(pkt.inKey())
return
}
l.log.Tracef("received downstream htlc: payment_hash=%x, "+
@ -2179,18 +2124,7 @@ func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
// Get the balance available on the channel for new HTLCs. This takes
// the channel reserve into account so HTLCs up to this value won't
// violate it.
channelBandwidth := l.channel.AvailableBalance()
// To compute the total bandwidth, we'll take the current available
// bandwidth, then subtract the overflow bandwidth as we'll eventually
// also need to evaluate those HTLC's once space on the commitment
// transaction is free.
overflowBandwidth := l.overflowQueue.TotalHtlcAmount()
if channelBandwidth < overflowBandwidth {
return 0
}
return channelBandwidth - overflowBandwidth
return l.channel.AvailableBalance()
}
// AttachMailBox updates the current mailbox used by this link, and hooks up
@ -2513,7 +2447,6 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
switchPackets = append(switchPackets, settlePacket)
l.overflowQueue.SignalFreeSlot()
// A failureCode message for a previously forwarded HTLC has
// been received. As a result a new slot will be freed up in
@ -2559,7 +2492,6 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
switchPackets = append(switchPackets, failPacket)
l.overflowQueue.SignalFreeSlot()
}
}

View File

@ -7,7 +7,6 @@ import (
"encoding/binary"
"fmt"
"io"
"math"
"net"
"reflect"
"runtime"
@ -30,6 +29,7 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
@ -262,141 +262,6 @@ func TestChannelLinkSingleHopPayment(t *testing.T) {
}
}
// TestChannelLinkBidirectionalOneHopPayments tests the ability of channel
// link to cope with bigger number of payment updates that commitment
// transaction may consist.
func TestChannelLinkBidirectionalOneHopPayments(t *testing.T) {
t.Parallel()
channels, cleanUp, _, err := createClusterChannels(
btcutil.SatoshiPerBitcoin*3,
btcutil.SatoshiPerBitcoin*5)
if err != nil {
t.Fatalf("unable to create channel: %v", err)
}
defer cleanUp()
n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice,
channels.bobToCarol, channels.carolToBob, testStartingHeight)
if err := n.start(); err != nil {
t.Fatal(err)
}
defer n.stop()
bobBandwidthBefore := n.firstBobChannelLink.Bandwidth()
aliceBandwidthBefore := n.aliceChannelLink.Bandwidth()
debug := false
if debug {
// Log message that alice receives.
n.aliceServer.intersect(createLogFunc("alice",
n.aliceChannelLink.ChanID()))
// Log message that bob receives.
n.bobServer.intersect(createLogFunc("bob",
n.firstBobChannelLink.ChanID()))
}
amt := lnwire.NewMSatFromSatoshis(20000)
htlcAmt, totalTimelock, hopsForwards := generateHops(amt,
testStartingHeight, n.firstBobChannelLink)
_, _, hopsBackwards := generateHops(amt,
testStartingHeight, n.aliceChannelLink)
type result struct {
err error
start time.Time
number int
sender string
}
// Send max available payment number in both sides, thereby testing
// the property of channel link to cope with overflowing.
count := 2 * input.MaxHTLCNumber
resultChan := make(chan *result, count)
for i := 0; i < count/2; i++ {
go func(i int) {
r := &result{
start: time.Now(),
number: i,
sender: "alice",
}
firstHop := n.firstBobChannelLink.ShortChanID()
_, r.err = makePayment(
n.aliceServer, n.bobServer, firstHop,
hopsForwards, amt, htlcAmt, totalTimelock,
).Wait(5 * time.Minute)
resultChan <- r
}(i)
}
for i := 0; i < count/2; i++ {
go func(i int) {
r := &result{
start: time.Now(),
number: i,
sender: "bob",
}
firstHop := n.aliceChannelLink.ShortChanID()
_, r.err = makePayment(
n.bobServer, n.aliceServer, firstHop,
hopsBackwards, amt, htlcAmt, totalTimelock,
).Wait(5 * time.Minute)
resultChan <- r
}(i)
}
maxDelay := time.Duration(0)
minDelay := time.Duration(math.MaxInt64)
averageDelay := time.Duration(0)
// Check that alice invoice was settled and bandwidth of HTLC
// links was changed.
for i := 0; i < count; i++ {
select {
case r := <-resultChan:
if r.err != nil {
t.Fatalf("unable to make payment: %v", r.err)
}
delay := time.Since(r.start)
if delay > maxDelay {
maxDelay = delay
}
if delay < minDelay {
minDelay = delay
}
averageDelay += delay
case <-time.After(5 * time.Minute):
t.Fatalf("timeout: (%v/%v)", i+1, count)
}
}
// TODO(roasbeef): should instead consume async notifications from both
// links
time.Sleep(time.Second * 2)
// At the end Bob and Alice balances should be the same as previous,
// because they sent the equal amount of money to each other.
if aliceBandwidthBefore != n.aliceChannelLink.Bandwidth() {
t.Fatalf("alice bandwidth shouldn't have changed: expected %v, got %x",
aliceBandwidthBefore, n.aliceChannelLink.Bandwidth())
}
if bobBandwidthBefore != n.firstBobChannelLink.Bandwidth() {
t.Fatalf("bob bandwidth shouldn't have changed: expected %v, got %v",
bobBandwidthBefore, n.firstBobChannelLink.Bandwidth())
}
t.Logf("Max waiting: %v", maxDelay)
t.Logf("Min waiting: %v", minDelay)
t.Logf("Average waiting: %v", time.Duration(int(averageDelay)/count))
}
// TestChannelLinkMultiHopPayment checks the ability to send payment over two
// hops. In this test we send the payment from Carol to Alice over Bob peer.
// (Carol -> Bob -> Alice) and checking that HTLC was settled properly and
@ -535,6 +400,105 @@ func testChannelLinkMultiHopPayment(t *testing.T,
}
}
// TestChannelLinkCancelFullCommitment tests the ability for links to cancel
// forwarded HTLCs once all of their commitment slots are full.
func TestChannelLinkCancelFullCommitment(t *testing.T) {
t.Parallel()
channels, cleanUp, _, err := createClusterChannels(
btcutil.SatoshiPerBitcoin*3,
btcutil.SatoshiPerBitcoin*5)
if err != nil {
t.Fatalf("unable to create channel: %v", err)
}
defer cleanUp()
n := newTwoHopNetwork(
t, channels.aliceToBob, channels.bobToAlice, testStartingHeight,
)
if err := n.start(); err != nil {
t.Fatal(err)
}
defer n.stop()
// Fill up the commitment from Alice's side with 20 sat payments.
count := (input.MaxHTLCNumber / 2)
amt := lnwire.NewMSatFromSatoshis(20000)
htlcAmt, totalTimelock, hopsForwards := generateHops(amt,
testStartingHeight, n.bobChannelLink)
firstHop := n.aliceChannelLink.ShortChanID()
// Create channels to buffer the preimage and error channels used in
// making the preliminary payments.
preimages := make([]lntypes.Preimage, count)
aliceErrChan := make(chan chan error, count)
var wg sync.WaitGroup
for i := 0; i < count; i++ {
preimages[i] = lntypes.Preimage{byte(i >> 8), byte(i)}
wg.Add(1)
go func(i int) {
defer wg.Done()
errChan := n.makeHoldPayment(
n.aliceServer, n.bobServer, firstHop,
hopsForwards, amt, htlcAmt, totalTimelock,
preimages[i],
)
aliceErrChan <- errChan
}(i)
}
// Wait for Alice to finish filling her commitment.
wg.Wait()
close(aliceErrChan)
// Now make an additional payment from Alice to Bob, this should be
// canceled because the commitment in this direction is full.
err = <-makePayment(
n.aliceServer, n.bobServer, firstHop, hopsForwards, amt,
htlcAmt, totalTimelock,
).err
if err == nil {
t.Fatalf("overflow payment should have failed")
}
lerr, ok := err.(*LinkError)
if !ok {
t.Fatalf("expected LinkError, got: %T", err)
}
msg := lerr.WireMessage()
if _, ok := msg.(*lnwire.FailTemporaryChannelFailure); !ok {
t.Fatalf("expected TemporaryChannelFailure, got: %T", msg)
}
// Now, settle all htlcs held by bob and clear the commitment of htlcs.
for _, preimage := range preimages {
preimage := preimage
// It's possible that the HTLCs have not been delivered to the
// invoice registry at this point, so we poll until we are able
// to settle.
err = wait.NoError(func() error {
return n.bobServer.registry.SettleHodlInvoice(preimage)
}, time.Minute)
if err != nil {
t.Fatal(err)
}
}
// Ensure that all of the payments sent by alice eventually succeed.
for errChan := range aliceErrChan {
err := <-errChan
if err != nil {
t.Fatalf("alice payment failed: %v", err)
}
}
}
// TestExitNodeTimelockPayloadMismatch tests that when an exit node receives an
// incoming HTLC, if the time lock encoded in the payload of the forwarded HTLC
// doesn't match the expected payment value, then the HTLC will be rejected
@ -2368,227 +2332,6 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
assertLinkBandwidth(t, aliceLink, aliceStartingBandwidth)
}
// TestChannelLinkBandwidthConsistencyOverflow tests that in the case of a
// commitment overflow (no more space for new HTLC's), the bandwidth is updated
// properly as items are being added and removed from the overflow queue.
func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
t.Parallel()
var mockBlob [lnwire.OnionPacketSize]byte
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, bobChannel, batchTick, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
var (
coreLink = aliceLink.(*channelLink)
defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee
aliceStartingBandwidth = aliceLink.Bandwidth()
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
)
estimator := chainfee.NewStaticEstimator(6000, 0)
feePerKw, err := estimator.EstimateFeePerKW(1)
if err != nil {
t.Fatalf("unable to query fee estimator: %v", err)
}
var htlcID uint64
addLinkHTLC := func(id uint64, amt lnwire.MilliSatoshi) [32]byte {
invoice, htlc, _, err := generatePayment(
amt, amt, 5, mockBlob,
)
if err != nil {
t.Fatalf("unable to create payment: %v", err)
}
addPkt := &htlcPacket{
htlc: htlc,
incomingHTLCID: id,
amount: amt,
obfuscator: NewMockObfuscator(),
}
circuit := makePaymentCircuit(&htlc.PaymentHash, addPkt)
_, err = coreLink.cfg.Switch.commitCircuits(&circuit)
if err != nil {
t.Fatalf("unable to commit circuit: %v", err)
}
addPkt.circuit = &circuit
aliceLink.HandleSwitchPacket(addPkt)
return invoice.Terms.PaymentPreimage
}
// We'll first start by adding enough HTLC's to overflow the commitment
// transaction, checking the reported link bandwidth for proper
// consistency along the way
htlcAmt := lnwire.NewMSatFromSatoshis(100000)
totalHtlcAmt := lnwire.MilliSatoshi(0)
const numHTLCs = input.MaxHTLCNumber / 2
var preImages [][32]byte
for i := 0; i < numHTLCs; i++ {
preImage := addLinkHTLC(htlcID, htlcAmt)
preImages = append(preImages, preImage)
totalHtlcAmt += htlcAmt
htlcID++
}
// The HTLCs should all be sent to the remote.
var msg lnwire.Message
for i := 0; i < numHTLCs; i++ {
select {
case msg = <-aliceMsgs:
case <-time.After(15 * time.Second):
t.Fatalf("did not receive message %d", i)
}
addHtlc, ok := msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
_, err := bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
}
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
// TODO(roasbeef): increase sleep
time.Sleep(time.Second * 1)
commitWeight := int64(input.CommitWeight + input.HTLCWeight*numHTLCs)
htlcFee := lnwire.NewMSatFromSatoshis(
feePerKw.FeeForWeight(commitWeight),
)
expectedBandwidth := aliceStartingBandwidth - totalHtlcAmt - htlcFee
expectedBandwidth += lnwire.NewMSatFromSatoshis(defaultCommitFee)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// The overflow queue should be empty at this point, as the commitment
// transaction should be full, but not yet overflown.
if coreLink.overflowQueue.Length() != 0 {
t.Fatalf("wrong overflow queue length: expected %v, got %v", 0,
coreLink.overflowQueue.Length())
}
// At this point, the commitment transaction should now be fully
// saturated. We'll continue adding HTLC's, and asserting that the
// bandwidth accounting is done properly.
const numOverFlowHTLCs = 20
for i := 0; i < numOverFlowHTLCs; i++ {
preImage := addLinkHTLC(htlcID, htlcAmt)
preImages = append(preImages, preImage)
totalHtlcAmt += htlcAmt
htlcID++
}
// No messages should be sent to the remote at this point.
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
time.Sleep(time.Second * 2)
expectedBandwidth -= (numOverFlowHTLCs * htlcAmt)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// With the extra HTLC's added, the overflow queue should now be
// populated with our 20 additional HTLC's.
if coreLink.overflowQueue.Length() != numOverFlowHTLCs {
t.Fatalf("wrong overflow queue length: expected %v, got %v",
numOverFlowHTLCs,
coreLink.overflowQueue.Length())
}
// We trigger a state update to lock in the HTLCs. This should
// not change Alice's bandwidth.
if err := updateState(batchTick, coreLink, bobChannel, true); err != nil {
t.Fatalf("unable to update state: %v", err)
}
time.Sleep(time.Millisecond * 500)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// At this point, we'll now settle enough HTLCs to empty the overflow
// queue. The resulting bandwidth change should be non-existent as this
// will simply transfer over funds to the remote party. However, the
// size of the overflow queue should be decreasing
for i := 0; i < numOverFlowHTLCs; i++ {
err = bobChannel.SettleHTLC(preImages[i], uint64(i), nil, nil, nil)
if err != nil {
t.Fatalf("unable to settle htlc: %v", err)
}
htlcSettle := &lnwire.UpdateFulfillHTLC{
ID: uint64(i),
PaymentPreimage: preImages[i],
}
aliceLink.HandleChannelUpdate(htlcSettle)
time.Sleep(time.Millisecond * 50)
}
time.Sleep(time.Millisecond * 500)
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// We trigger a state update to lock in the Settles.
if err := updateState(batchTick, coreLink, bobChannel, false); err != nil {
t.Fatalf("unable to update state: %v", err)
}
// After the state update is done, Alice should start sending
// HTLCs from the overflow queue.
for i := 0; i < numOverFlowHTLCs; i++ {
var msg lnwire.Message
select {
case msg = <-aliceMsgs:
case <-time.After(15 * time.Second):
t.Fatalf("did not receive message")
}
addHtlc, ok := msg.(*lnwire.UpdateAddHTLC)
if !ok {
t.Fatalf("expected UpdateAddHTLC, got %T", msg)
}
_, err := bobChannel.ReceiveHTLC(addHtlc)
if err != nil {
t.Fatalf("bob failed receiving htlc: %v", err)
}
}
select {
case msg = <-aliceMsgs:
t.Fatalf("unexpected message: %T", msg)
case <-time.After(20 * time.Millisecond):
}
assertLinkBandwidth(t, aliceLink, expectedBandwidth)
// Finally, at this point, the queue itself should be fully empty. As
// enough slots have been drained from the commitment transaction to
// allocate the queue items to.
time.Sleep(time.Millisecond * 500)
if coreLink.overflowQueue.Length() != 0 {
t.Fatalf("wrong overflow queue length: expected %v, got %v", 0,
coreLink.overflowQueue.Length())
}
}
// genAddsAndCircuits creates `numHtlcs` sequential ADD packets and there
// corresponding circuits. The provided `htlc` is used in all test packets.
func genAddsAndCircuits(numHtlcs int, htlc *lnwire.UpdateAddHTLC) (
@ -4503,7 +4246,7 @@ func (h *persistentLinkHarness) restartLink(
// gnerateHtlc generates a simple payment from Bob to Alice.
func generateHtlc(t *testing.T, coreLink *channelLink,
bobChannel *lnwallet.LightningChannel, id uint64) *lnwire.UpdateAddHTLC {
id uint64) *lnwire.UpdateAddHTLC {
t.Helper()
@ -4579,8 +4322,8 @@ func TestChannelLinkNoMoreUpdates(t *testing.T) {
)
// Add two HTLCs to Alice's registry, that Bob can pay.
htlc1 := generateHtlc(t, coreLink, bobChannel, 0)
htlc2 := generateHtlc(t, coreLink, bobChannel, 1)
htlc1 := generateHtlc(t, coreLink, 0)
htlc2 := generateHtlc(t, coreLink, 1)
ctx := linkTestContext{
t: t,
@ -4661,15 +4404,24 @@ func checkHasPreimages(t *testing.T, coreLink *channelLink,
t.Helper()
for i := range htlcs {
_, ok := coreLink.cfg.PreimageCache.LookupPreimage(
htlcs[i].PaymentHash,
)
if ok != expOk {
t.Fatalf("expected to find witness: %v, "+
err := wait.NoError(func() error {
for i := range htlcs {
_, ok := coreLink.cfg.PreimageCache.LookupPreimage(
htlcs[i].PaymentHash,
)
if ok == expOk {
continue
}
return fmt.Errorf("expected to find witness: %v, "+
"got %v for hash=%x", expOk, ok,
htlcs[i].PaymentHash)
}
return nil
}, 5*time.Second)
if err != nil {
t.Fatalf("unable to find preimages: %v", err)
}
}
@ -4701,7 +4453,7 @@ func TestChannelLinkWaitForRevocation(t *testing.T) {
numHtlcs := 10
var htlcs []*lnwire.UpdateAddHTLC
for i := 0; i < numHtlcs; i++ {
htlc := generateHtlc(t, coreLink, bobChannel, uint64(i))
htlc := generateHtlc(t, coreLink, uint64(i))
htlcs = append(htlcs, htlc)
}
@ -5044,8 +4796,8 @@ func TestChannelLinkCleanupSpuriousResponses(t *testing.T) {
coreLink.cfg.HodlMask = hodl.ExitSettle.Mask()
// Add two HTLCs to Alice's registry, that Bob can pay.
htlc1 := generateHtlc(t, coreLink, bobChannel, 0)
htlc2 := generateHtlc(t, coreLink, bobChannel, 1)
htlc1 := generateHtlc(t, coreLink, 0)
htlc2 := generateHtlc(t, coreLink, 1)
ctx := linkTestContext{
t: t,
@ -5430,7 +5182,7 @@ func TestChannelLinkFail(t *testing.T) {
func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) {
// Generate an HTLC and send to the link.
htlc1 := generateHtlc(t, c, remoteChannel, 0)
htlc1 := generateHtlc(t, c, 0)
ctx := linkTestContext{
t: t,
aliceLink: c,
@ -5467,7 +5219,7 @@ func TestChannelLinkFail(t *testing.T) {
func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) {
// Generate an HTLC and send to the link.
htlc1 := generateHtlc(t, c, remoteChannel, 0)
htlc1 := generateHtlc(t, c, 0)
ctx := linkTestContext{
t: t,
aliceLink: c,
@ -5689,9 +5441,8 @@ func TestCheckHtlcForward(t *testing.T) {
MaxOutgoingCltvExpiry: DefaultMaxOutgoingCltvExpiry,
HtlcNotifier: &mockHTLCNotifier{},
},
log: log,
channel: testChannel.channel,
overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2),
log: log,
channel: testChannel.channel,
}
var hash [32]byte

View File

@ -32,7 +32,7 @@ type MailBox interface {
// AckPacket removes a packet from the mailboxes in-memory replay
// buffer. This will prevent a packet from being delivered after a link
// restarts if the switch has remained online.
AckPacket(CircuitKey) error
AckPacket(CircuitKey)
// MessageOutBox returns a channel that any new messages ready for
// delivery will be sent on.
@ -50,10 +50,10 @@ type MailBox interface {
// Start starts the mailbox and any goroutines it needs to operate
// properly.
Start() error
Start()
// Stop signals the mailbox and its goroutines for a graceful shutdown.
Stop() error
Stop()
}
// memoryMailBox is an implementation of the MailBox struct backed by purely
@ -120,13 +120,12 @@ const (
// Start starts the mailbox and any goroutines it needs to operate properly.
//
// NOTE: This method is part of the MailBox interface.
func (m *memoryMailBox) Start() error {
func (m *memoryMailBox) Start() {
m.started.Do(func() {
m.wg.Add(2)
go m.mailCourier(wireCourier)
go m.mailCourier(pktCourier)
})
return nil
}
// ResetMessages blocks until all buffered wire messages are cleared.
@ -180,19 +179,17 @@ func (m *memoryMailBox) signalUntilReset(cType courierType,
// queue of packets to be delivered.
//
// NOTE: It is safe to call this method multiple times for the same circuit key.
func (m *memoryMailBox) AckPacket(inKey CircuitKey) error {
func (m *memoryMailBox) AckPacket(inKey CircuitKey) {
m.pktCond.L.Lock()
entry, ok := m.pktIndex[inKey]
if !ok {
m.pktCond.L.Unlock()
return nil
return
}
m.htlcPkts.Remove(entry)
delete(m.pktIndex, inKey)
m.pktCond.L.Unlock()
return nil
}
// HasPacket queries the packets for a circuit key, this is used to drop packets
@ -208,14 +205,13 @@ func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
// Stop signals the mailbox and its goroutines for a graceful shutdown.
//
// NOTE: This method is part of the MailBox interface.
func (m *memoryMailBox) Stop() error {
func (m *memoryMailBox) Stop() {
m.stopped.Do(func() {
close(m.quit)
m.wireCond.Signal()
m.pktCond.Signal()
})
return nil
}
// mailCourier is a dedicated goroutine whose job is to reliably deliver

View File

@ -1,208 +0,0 @@
package htlcswitch
import (
"sync"
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnwire"
)
// packetQueue is a goroutine-safe queue of htlc packets which over flow the
// current commitment transaction. An HTLC will overflow the current commitment
// transaction if one attempts to add a new HTLC to the state machine which
// already has the max number of pending HTLC's present on the commitment
// transaction. Packets are removed from the queue by the channelLink itself
// as additional slots become available on the commitment transaction itself.
// In order to synchronize properly we use a semaphore to allow the channelLink
// to signal the number of slots available, and a condition variable to allow
// the packetQueue to know when new items have been added to the queue.
type packetQueue struct {
// totalHtlcAmt is the sum of the value of all pending HTLC's currently
// residing within the overflow queue. This value should only read or
// modified *atomically*.
totalHtlcAmt int64 // To be used atomically.
// queueLen is an internal counter that reflects the size of the queue
// at any given instance. This value is intended to be use atomically
// as this value is used by internal methods to obtain the length of
// the queue w/o grabbing the main lock. This allows callers to avoid a
// deadlock situation where the main goroutine is attempting a send
// with the lock held.
queueLen int32 // To be used atomically.
streamShutdown int32 // To be used atomically.
queue []*htlcPacket
wg sync.WaitGroup
// freeSlots serves as a semaphore who's current value signals the
// number of available slots on the commitment transaction.
freeSlots chan struct{}
queueCond *sync.Cond
queueMtx sync.Mutex
// outgoingPkts is a channel that the channelLink will receive on in
// order to drain the packetQueue as new slots become available on the
// commitment transaction.
outgoingPkts chan *htlcPacket
quit chan struct{}
}
// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots
// value should reflect the max number of HTLC's that we're allowed to have
// outstanding within the commitment transaction.
func newPacketQueue(maxFreeSlots int) *packetQueue {
p := &packetQueue{
outgoingPkts: make(chan *htlcPacket),
freeSlots: make(chan struct{}, maxFreeSlots),
quit: make(chan struct{}),
}
p.queueCond = sync.NewCond(&p.queueMtx)
return p
}
// Start starts all goroutines that packetQueue needs to perform its normal
// duties.
func (p *packetQueue) Start() {
p.wg.Add(1)
go p.packetCoordinator()
}
// Stop signals the packetQueue for a graceful shutdown, and waits for all
// goroutines to exit.
func (p *packetQueue) Stop() {
close(p.quit)
// Now that we've closed the channel, we'll repeatedly signal the msg
// consumer until we've detected that it has exited.
for atomic.LoadInt32(&p.streamShutdown) == 0 {
p.queueCond.Signal()
time.Sleep(time.Millisecond * 100)
}
}
// packetCoordinator is a goroutine that handles the packet overflow queue.
// Using a synchronized queue, outside callers are able to append to the end of
// the queue, waking up the coordinator when the queue transitions from empty
// to non-empty. The packetCoordinator will then aggressively try to empty out
// the queue, passing new htlcPackets to the channelLink as slots within the
// commitment transaction become available.
//
// Future iterations of the packetCoordinator will implement congestion
// avoidance logic in the face of persistent htlcPacket back-pressure.
//
// TODO(roasbeef): later will need to add back pressure handling heuristics
// like reg congestion avoidance:
// * random dropping, RED, etc
func (p *packetQueue) packetCoordinator() {
defer atomic.StoreInt32(&p.streamShutdown, 1)
for {
// First, we'll check our condition. If the queue of packets is
// empty, then we'll wait until a new item is added.
p.queueCond.L.Lock()
for len(p.queue) == 0 {
p.queueCond.Wait()
// If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the message queue for any
// new items.
select {
case <-p.quit:
p.queueCond.L.Unlock()
return
default:
}
}
nextPkt := p.queue[0]
p.queueCond.L.Unlock()
// If there aren't any further messages to sent (or the link
// didn't immediately read our message), then we'll block and
// wait for a new message to be sent into the overflow queue,
// or for the link's htlcForwarder to wake up.
select {
case <-p.freeSlots:
select {
case p.outgoingPkts <- nextPkt:
// Pop the item off the front of the queue and
// slide down the reference one to re-position
// the head pointer. This will set us up for
// the next iteration. If the queue is empty
// at this point, then we'll block at the top.
p.queueCond.L.Lock()
p.queue[0] = nil
p.queue = p.queue[1:]
atomic.AddInt32(&p.queueLen, -1)
atomic.AddInt64(&p.totalHtlcAmt, int64(-nextPkt.amount))
p.queueCond.L.Unlock()
case <-p.quit:
return
}
case <-p.quit:
return
default:
}
}
}
// AddPkt adds the referenced packet to the overflow queue, preserving ordering
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
// First, we'll lock the condition, and add the message to the end of
// the message queue, and increment the internal atomic for tracking
// the queue's length.
p.queueCond.L.Lock()
p.queue = append(p.queue, pkt)
atomic.AddInt32(&p.queueLen, 1)
atomic.AddInt64(&p.totalHtlcAmt, int64(pkt.amount))
p.queueCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are
// additional messages to consume.
p.queueCond.Signal()
}
// SignalFreeSlot signals to the queue that a new slot has opened up within the
// commitment transaction. The max amount of free slots has been defined when
// initially creating the packetQueue itself. This method, combined with AddPkt
// creates the following abstraction: a synchronized queue of infinite length
// which can be added to at will, which flows onto a commitment of fixed
// capacity.
func (p *packetQueue) SignalFreeSlot() {
// We'll only send over a free slot signal if the queue *is not* empty.
// Otherwise, it's possible that we attempt to overfill the free slots
// semaphore and block indefinitely below.
if atomic.LoadInt32(&p.queueLen) == 0 {
return
}
select {
case p.freeSlots <- struct{}{}:
case <-p.quit:
return
}
}
// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int32 {
return atomic.LoadInt32(&p.queueLen)
}
// TotalHtlcAmount is the total amount (in mSAT) of all HTLC's currently
// residing within the overflow queue.
func (p *packetQueue) TotalHtlcAmount() lnwire.MilliSatoshi {
// TODO(roasbeef): also factor in fee rate?
return lnwire.MilliSatoshi(atomic.LoadInt64(&p.totalHtlcAmt))
}

View File

@ -1,64 +0,0 @@
package htlcswitch
import (
"reflect"
"testing"
"time"
"github.com/lightningnetwork/lnd/lnwire"
)
// TestWaitingQueueThreadSafety test the thread safety properties of the
// waiting queue, by executing methods in separate goroutines which operates
// with the same data.
func TestWaitingQueueThreadSafety(t *testing.T) {
t.Parallel()
const numPkts = 1000
q := newPacketQueue(numPkts)
q.Start()
defer q.Stop()
a := make([]uint64, numPkts)
for i := 0; i < numPkts; i++ {
a[i] = uint64(i)
q.AddPkt(&htlcPacket{
incomingHTLCID: a[i],
htlc: &lnwire.UpdateAddHTLC{},
})
}
// The reported length of the queue should be the exact number of
// packets we added above.
queueLength := q.Length()
if queueLength != numPkts {
t.Fatalf("queue has wrong length: expected %v, got %v", numPkts,
queueLength)
}
var b []uint64
for i := 0; i < numPkts; i++ {
q.SignalFreeSlot()
select {
case packet := <-q.outgoingPkts:
b = append(b, packet.incomingHTLCID)
case <-time.After(2 * time.Second):
t.Fatal("timeout")
}
}
// The length of the queue should be zero at this point.
time.Sleep(time.Millisecond * 50)
queueLength = q.Length()
if queueLength != 0 {
t.Fatalf("queue has wrong length: expected %v, got %v", 0,
queueLength)
}
if !reflect.DeepEqual(b, a) {
t.Fatal("wrong order of the objects")
}
}

View File

@ -2895,6 +2895,8 @@ func testHtcNotifier(t *testing.T, testOpts []serverOption, iterations int,
func checkHtlcEvents(t *testing.T, events <-chan interface{},
expectedEvents []interface{}) {
t.Helper()
for _, expected := range expectedEvents {
select {
case event := <-events:
@ -2903,7 +2905,7 @@ func checkHtlcEvents(t *testing.T, events <-chan interface{},
event)
}
case <-time.After(time.Second):
case <-time.After(5 * time.Second):
t.Fatalf("expected event: %v", expected)
}
}

View File

@ -1345,15 +1345,13 @@ func (n *twoHopNetwork) makeHoldPayment(sendingPeer, receivingPeer lnpeer.Peer,
}
// Send payment and expose err channel.
go func() {
err := sender.htlcSwitch.SendHTLC(
firstHop, pid, htlc,
)
if err != nil {
paymentErr <- err
return
}
err = sender.htlcSwitch.SendHTLC(firstHop, pid, htlc)
if err != nil {
paymentErr <- err
return paymentErr
}
go func() {
resultChan, err := sender.htlcSwitch.GetPaymentResult(
pid, rhash, newMockDeobfuscator(),
)
@ -1365,6 +1363,7 @@ func (n *twoHopNetwork) makeHoldPayment(sendingPeer, receivingPeer lnpeer.Peer,
result, ok := <-resultChan
if !ok {
paymentErr <- fmt.Errorf("shutting down")
return
}
if result.Error != nil {

View File

@ -7,7 +7,6 @@ import (
"sync/atomic"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes"
@ -503,11 +502,8 @@ func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice,
i.Lock()
log.Debugf("Invoice(%v): added %v", paymentHash,
newLogClosure(func() string {
return spew.Sdump(invoice)
}),
)
log.Debugf("Invoice(%v): added with terms %v", paymentHash,
invoice.Terms)
addIndex, err := i.cdb.AddInvoice(invoice, paymentHash)
if err != nil {

View File

@ -10344,9 +10344,7 @@ func testNodeSignVerify(net *lntest.NetworkHarness, t *harnessTest) {
closeChannelAndAssert(ctxt, t, net, net.Alice, aliceBobCh, false)
}
// testAsyncPayments tests the performance of the async payments, and also
// checks that balances of both sides can't be become negative under stress
// payment strikes.
// testAsyncPayments tests the performance of the async payments.
func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
@ -10372,18 +10370,16 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to get alice channel info: %v", err)
}
// Calculate the number of invoices. We will deplete the channel
// all the way down to the channel reserve.
chanReserve := channelCapacity / 100
availableBalance := btcutil.Amount(info.LocalBalance) - chanReserve
numInvoices := int(availableBalance / paymentAmt)
// We'll create a number of invoices equal the max number of HTLCs that
// can be carried in one direction. The number on the commitment will
// likely be lower, but we can't guarantee that any more HTLCs will
// succeed due to the limited path diversity and inability of the router
// to retry via another path.
numInvoices := int(input.MaxHTLCNumber / 2)
bobAmt := int64(numInvoices * paymentAmt)
aliceAmt := info.LocalBalance - bobAmt
// Send one more payment in order to cause insufficient capacity error.
numInvoices++
// With the channel open, we'll create invoices for Bob that Alice
// will pay to in order to advance the state of the channel.
bobPayReqs, _, _, err := createPayReqs(
@ -10424,28 +10420,13 @@ func testAsyncPayments(net *lntest.NetworkHarness, t *harnessTest) {
}
}
// We should receive one insufficient capacity error, because we sent
// one more payment than we can actually handle with the current
// channel capacity.
errorReceived := false
// Wait until all the payments have settled.
for i := 0; i < numInvoices; i++ {
if resp, err := alicePayStream.Recv(); err != nil {
if _, err := alicePayStream.Recv(); err != nil {
t.Fatalf("payment stream have been closed: %v", err)
} else if resp.PaymentError != "" {
if errorReceived {
t.Fatalf("redundant payment error: %v",
resp.PaymentError)
}
errorReceived = true
continue
}
}
if !errorReceived {
t.Fatalf("insufficient capacity error haven't been received")
}
// All payments have been sent, mark the finish time.
timeTaken := time.Since(now)
@ -10535,8 +10516,12 @@ func testBidirectionalAsyncPayments(net *lntest.NetworkHarness, t *harnessTest)
t.Fatalf("unable to get alice channel info: %v", err)
}
// Calculate the number of invoices.
numInvoices := int(info.LocalBalance / paymentAmt)
// We'll create a number of invoices equal the max number of HTLCs that
// can be carried in one direction. The number on the commitment will
// likely be lower, but we can't guarantee that any more HTLCs will
// succeed due to the limited path diversity and inability of the router
// to retry via another path.
numInvoices := int(input.MaxHTLCNumber / 2)
// Nodes should exchange the same amount of money and because of this
// at the end balances should remain the same.