From 7595bee27c21aece9779d1504a064f5d32551ff6 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Wed, 24 May 2017 18:27:39 +0300 Subject: [PATCH] htlcswitch: add usage of queue in channel link In this commit usage of the pending packet queue have been added. This queue will consume the downstream packets if state machine return the error that we do not have enough capacity for htlc in commitment transaction. Upon receiving settle/fail payment descriptors - add htlc have been removed, we release the slot, and process pending add htlc requests. --- htlcswitch/link.go | 50 +++++++++++++++++++++++++- htlcswitch/link_test.go | 76 ++++++++++++++++++++++++++++++++++++++++ htlcswitch/test_utils.go | 2 +- 3 files changed, 126 insertions(+), 2 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d95ac1ad..4385eb5b 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -9,6 +9,8 @@ import ( "io" + "encoding/hex" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" @@ -82,6 +84,10 @@ type channelLink struct { // which may affect behaviour of the service. cfg *ChannelLinkConfig + // queue is used to store the htlc add updates which haven't been + // processed because of the commitment trancation overflow. + queue *packetQueue + // upstream is a channel which responsible for propagating the // received from remote peer messages, with which we have an opened // channel, to handler function. @@ -115,6 +121,7 @@ func NewChannelLink(cfg *ChannelLinkConfig, downstream: make(chan *htlcPacket), control: make(chan interface{}), cancelReasons: make(map[uint64]lnwire.OpaqueReason), + queue: newWaitingQueue(), quit: make(chan struct{}), } } @@ -248,7 +255,31 @@ out: break out } + // Previously add update have been added to the reprocessing + // queue because of the overflooding threat, and now we are + // trying to process it again. + case packet := <-l.queue.pending: + msg := packet.htlc.(*lnwire.UpdateAddHTLC) + log.Infof("Reprocess downstream add update "+ + "with payment hash(%v)", + hex.EncodeToString(msg.PaymentHash[:])) + l.handleDownStreamPkt(packet) + case pkt := <-l.downstream: + // If we have non empty processing queue than in + // order to preserve the order of add updates + // consume it, and process it later. + htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) + if ok && l.queue.length() != 0 { + log.Infof("Downstream htlc add update with "+ + "payment hash(%v) have been added to "+ + "reprocessing queue, batch: %v", + hex.EncodeToString(htlc.PaymentHash[:]), + l.batchCounter) + + l.queue.consume(pkt) + continue + } l.handleDownStreamPkt(pkt) case msg := <-l.upstream: @@ -282,7 +313,15 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // chains. htlc.ChanID = l.ChanID() index, err := l.channel.AddHTLC(htlc) - if err != nil { + if err == lnwallet.ErrMaxHTLCNumber { + log.Infof("Downstream htlc add update with "+ + "payment hash(%v) have been added to "+ + "reprocessing queue, batch: %v", + hex.EncodeToString(htlc.PaymentHash[:]), + l.batchCounter) + l.queue.consume(pkt) + return + } else if err != nil { // TODO: possibly perform fallback/retry logic // depending on type of error @@ -298,6 +337,10 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { err) return } + log.Tracef("Receive downstream htlc with payment hash"+ + "(%v), assign the index: %v, batch: %v", + hex.EncodeToString(htlc.PaymentHash[:]), + index, l.batchCounter+1) htlc.ID = index l.cfg.Peer.SendMessage(htlc) @@ -381,6 +424,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { l.cfg.Peer.Disconnect() return } + log.Tracef("Receive upstream htlc with payment hash(%v), "+ + "assign the index: %v", + hex.EncodeToString(msg.PaymentHash[:]), index) // TODO(roasbeef): perform sanity checks on per-hop payload // * time-lock is sane, fee, chain, etc @@ -611,6 +657,7 @@ func (l *channelLink) processLockedInHtlcs( &lnwire.UpdateFufillHTLC{ PaymentPreimage: pd.RPreimage, }, pd.RHash, pd.Amount)) + l.queue.release() case lnwallet.Fail: opaqueReason := l.cancelReasons[pd.ParentIndex] @@ -625,6 +672,7 @@ func (l *channelLink) processLockedInHtlcs( Reason: opaqueReason, ChanID: l.ChanID(), }, pd.RHash, pd.Amount)) + l.queue.release() case lnwallet.Add: blob := l.blobs[pd.Index] diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 45c4c0c0..819ae747 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -126,6 +126,82 @@ func TestChannelLinkSingleHopPayment(t *testing.T) { } } +// TestChannelLinkBidirectionalOneHopPayments tests the ability of channel +// link to cope with bigger number of payment updates that commitment +// transaction may consist. +func TestChannelLinkBidirectionalOneHopPayments(t *testing.T) { + n := newThreeHopNetwork(t, + btcutil.SatoshiPerBitcoin*3, + btcutil.SatoshiPerBitcoin*5, + ) + if err := n.start(); err != nil { + t.Fatal(err) + } + defer n.stop() + + bobBandwidthBefore := n.firstBobChannelLink.Bandwidth() + aliceBandwidthBefore := n.aliceChannelLink.Bandwidth() + + debug := false + if debug { + // Log message that alice receives. + n.aliceServer.record(createLogFunc("alice", + n.aliceChannelLink.ChanID())) + + // Log message that bob receives. + n.bobServer.record(createLogFunc("bob", + n.firstBobChannelLink.ChanID())) + } + + // Send max available payment number in both sides, thereby testing + // the property of channel link to cope with overflowing. + errChan := make(chan error) + count := 2 * lnwallet.MaxHTLCNumber + for i := 0; i < count/2; i++ { + go func() { + _, err := n.makePayment([]Peer{ + n.aliceServer, + n.bobServer, + }, 10) + errChan <- err + }() + } + + for i := 0; i < count/2; i++ { + go func() { + _, err := n.makePayment([]Peer{ + n.bobServer, + n.aliceServer, + }, 10) + errChan <- err + }() + } + + // Check that alice invoice was settled and bandwidth of HTLC + // links was changed. + for i := 0; i < count; i++ { + select { + case err := <-errChan: + if err != nil { + t.Fatalf("unable to make the payment: %v", err) + } + case <-time.After(4 * time.Second): + t.Fatalf("timeout: (%v/%v)", i+1, count) + } + + } + + // At the end Bob and Alice balances should be the same as previous, + // because they sent the equal amount of money to each other. + if aliceBandwidthBefore != n.aliceChannelLink.Bandwidth() { + t.Fatal("alice bandwidth shouldn't have changed") + } + + if bobBandwidthBefore != n.firstBobChannelLink.Bandwidth() { + t.Fatal("bob bandwidth shouldn't have changed") + } +} + // TestChannelLinkMultiHopPayment checks the ability to send payment over two // hopes. In this test we send the payment from Carol to Alice over Bob peer. // (Carol -> Bob -> Alice) and checking that HTLC was settled properly and diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index b3b5e4cb..c0aebd53 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -359,7 +359,7 @@ func (n *threeHopNetwork) makePayment(peers []Peer, select { case err := <-errChan: return invoice, err - case <-time.After(6 * time.Second): + case <-time.After(12 * time.Second): return invoice, errors.New("htlc was no settled in time") } }