From c325bf8c57c45a1a6f11f38a4eea9a29f60db7e8 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 13 Apr 2020 17:29:52 +0200 Subject: [PATCH] htlcswitch: sync link hand-off This commit extends the link with a new synchronous delivery point for local UpdateAddHTLC messages. The switch method SendHTLC is updated to use this delivery point and thereby becomes a synchronous call. For MPP payments, synchronous hand-off is important. Otherwise the next pathfinding round could start without the channel balance updated yet. --- htlcswitch/interfaces.go | 4 ++ htlcswitch/link.go | 74 ++++++++++++++++++++++++---- htlcswitch/mock.go | 5 ++ htlcswitch/switch.go | 2 +- lntest/itest/lnd_test.go | 51 ++++++++++++++++--- lntest/itest/log_error_whitelist.txt | 2 + 6 files changed, 120 insertions(+), 18 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index b28d137f..f51f1b15 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -70,6 +70,10 @@ type ChannelLink interface { // possible). HandleSwitchPacket(*htlcPacket) error + // HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC + // packet. It will be processed synchronously. + HandleLocalAddPacket(*htlcPacket) error + // HandleChannelUpdate handles the htlc requests as settle/add/fail // which sent to us from remote peer we have a channel with. // diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 01a81448..f15b17df 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -286,6 +286,14 @@ type ChannelLinkConfig struct { HtlcNotifier htlcNotifier } +// localUpdateAddMsg contains a locally initiated htlc and a channel that will +// receive the outcome of the link processing. This channel must be buffered to +// prevent the link from blocking. +type localUpdateAddMsg struct { + pkt *htlcPacket + err chan error +} + // channelLink is the service which drives a channel's commitment update // state-machine. In the event that an HTLC needs to be propagated to another // link, the forward handler from config is used which sends HTLC to the @@ -346,6 +354,10 @@ type channelLink struct { // by the HTLC switch. downstream chan *htlcPacket + // localUpdateAdd is a channel to which locally initiated HTLCs are + // sent across. + localUpdateAdd chan *localUpdateAddMsg + // htlcUpdates is a channel that we'll use to update outside // sub-systems with the latest set of active HTLC's on our channel. htlcUpdates chan *contractcourt.ContractUpdate @@ -395,11 +407,12 @@ func NewChannelLink(cfg ChannelLinkConfig, channel: channel, shortChanID: channel.ShortChanID(), // TODO(roasbeef): just do reserve here? - htlcUpdates: make(chan *contractcourt.ContractUpdate), - hodlMap: make(map[channeldb.CircuitKey]hodlHtlc), - hodlQueue: queue.NewConcurrentQueue(10), - log: build.NewPrefixLog(logPrefix, log), - quit: make(chan struct{}), + htlcUpdates: make(chan *contractcourt.ContractUpdate), + hodlMap: make(map[channeldb.CircuitKey]hodlHtlc), + hodlQueue: queue.NewConcurrentQueue(10), + log: build.NewPrefixLog(logPrefix, log), + quit: make(chan struct{}), + localUpdateAdd: make(chan *localUpdateAddMsg), } } @@ -1112,6 +1125,10 @@ func (l *channelLink) htlcManager() { case pkt := <-l.downstream: l.handleDownstreamPkt(pkt) + // A message containing a locally initiated add was received. + case msg := <-l.localUpdateAdd: + msg.err <- l.handleDownstreamUpdateAdd(msg.pkt) + // A message from the connected peer was just received. This // indicates that we have a new incoming HTLC, either directly // for us, or part of a multi-hop HTLC circuit. @@ -1256,8 +1273,11 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration { // handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the // downstream HTLC Switch. -func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { - htlc := pkt.htlc.(*lnwire.UpdateAddHTLC) +func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { + htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) + if !ok { + return errors.New("not an UpdateAddHTLC packet") + } // If hodl.AddOutgoing mode is active, we exit early to simulate // arbitrary delays between the switch adding an ADD to the @@ -1265,7 +1285,7 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { if l.cfg.HodlMask.Active(hodl.AddOutgoing) { l.log.Warnf(hodl.AddOutgoing.Warning()) l.mailBox.AckPacket(pkt.inKey()) - return + return nil } // A new payment has been initiated via the downstream channel, @@ -1291,7 +1311,10 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { // unacknowledged. l.mailBox.FailAdd(pkt) - return + return NewDetailedLinkError( + lnwire.NewTemporaryChannelFailure(nil), + OutgoingFailureDownstreamHtlcAdd, + ) } l.log.Tracef("received downstream htlc: payment_hash=%x, "+ @@ -1324,6 +1347,8 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { ) l.tryBatchUpdateCommitTx() + + return nil } // handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC @@ -1335,7 +1360,9 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: - l.handleDownstreamUpdateAdd(pkt) + // Handle add message. The returned error can be ignored, + // because it is also sent through the mailbox. + _ = l.handleDownstreamUpdateAdd(pkt) case *lnwire.UpdateFulfillHTLC: // If hodl.SettleOutgoing mode is active, we exit early to @@ -2329,6 +2356,33 @@ func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error { return l.mailBox.AddPacket(pkt) } +// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It +// will be processed synchronously. +// +// NOTE: Part of the ChannelLink interface. +func (l *channelLink) HandleLocalAddPacket(pkt *htlcPacket) error { + l.log.Tracef("received switch packet outkey=%v", pkt.outKey()) + + // Create a buffered result channel to prevent the link from blocking. + errChan := make(chan error, 1) + + select { + case l.localUpdateAdd <- &localUpdateAddMsg{ + pkt: pkt, + err: errChan, + }: + case <-l.quit: + return ErrLinkShuttingDown + } + + select { + case err := <-errChan: + return err + case <-l.quit: + return ErrLinkShuttingDown + } +} + // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent // to us from remote peer we have a channel with. // diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e9a2a1ef..5e74756e 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -702,6 +702,11 @@ func (f *mockChannelLink) HandleSwitchPacket(pkt *htlcPacket) error { return nil } +func (f *mockChannelLink) HandleLocalAddPacket(pkt *htlcPacket) error { + _ = f.mailBox.AddPacket(pkt) + return nil +} + func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) { } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 6fabb3ef..1d232374 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -491,7 +491,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, return linkErr } - return link.HandleSwitchPacket(packet) + return link.HandleLocalAddPacket(packet) } // UpdateForwardingPolicies sends a message to the switch to update the diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 475507ba..263b9471 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -4182,6 +4182,34 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { }, ) + // Next, we'll create Fred who is going to initiate the payment and + // establish a channel to from him to Carol. We can't perform this test + // by paying from Carol directly to Dave, because the '--unsafe-replay' + // setup doesn't apply to locally added htlcs. In that case, the + // mailbox, that is responsible for generating the replay, is bypassed. + fred, err := net.NewNode("Fred", nil) + if err != nil { + t.Fatalf("unable to create new nodes: %v", err) + } + defer shutdownAndAssert(net, t, fred) + + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + if err := net.ConnectNodes(ctxt, fred, carol); err != nil { + t.Fatalf("unable to connect fred to carol: %v", err) + } + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + err = net.SendCoins(ctxt, btcutil.SatoshiPerBitcoin, fred) + if err != nil { + t.Fatalf("unable to send coins to fred: %v", err) + } + ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout) + chanPointFC := openChannelAndAssert( + ctxt, t, net, fred, carol, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + // Now that the channel is open, create an invoice for Dave which // expects a payment of 1000 satoshis from Carol paid via a particular // preimage. @@ -4198,8 +4226,7 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to add invoice: %v", err) } - // Wait for Carol to recognize and advertise the new channel generated - // above. + // Wait for all channels to be recognized and advertized. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) err = carol.WaitForNetworkChannelOpen(ctxt, chanPoint) if err != nil { @@ -4211,13 +4238,23 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("bob didn't advertise channel before "+ "timeout: %v", err) } + err = carol.WaitForNetworkChannelOpen(ctxt, chanPointFC) + if err != nil { + t.Fatalf("alice didn't advertise channel before "+ + "timeout: %v", err) + } + err = fred.WaitForNetworkChannelOpen(ctxt, chanPointFC) + if err != nil { + t.Fatalf("bob didn't advertise channel before "+ + "timeout: %v", err) + } - // With the invoice for Dave added, send a payment from Carol paying + // With the invoice for Dave added, send a payment from Fred paying // to the above generated invoice. ctx, cancel := context.WithCancel(ctxb) defer cancel() - payStream, err := carol.SendPayment(ctx) + payStream, err := fred.SendPayment(ctx) if err != nil { t.Fatalf("unable to open payment stream: %v", err) } @@ -4270,12 +4307,12 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to receive payment response: %v", err) } - // Construct the response we expect after sending a duplicate packet - // that fails due to sphinx replay detection. + // Assert that Fred receives the expected failure after Carol sent a + // duplicate packet that fails due to sphinx replay detection. if resp.PaymentError == "" { t.Fatalf("expected payment error") } - assertLastHTLCError(t, carol, lnrpc.Failure_INVALID_ONION_KEY) + assertLastHTLCError(t, fred, lnrpc.Failure_INVALID_ONION_KEY) // Since the payment failed, the balance should still be left // unaltered. diff --git a/lntest/itest/log_error_whitelist.txt b/lntest/itest/log_error_whitelist.txt index fcd65cc5..ae12297b 100644 --- a/lntest/itest/log_error_whitelist.txt +++ b/lntest/itest/log_error_whitelist.txt @@ -24,6 +24,7 @@