From 341308327e726961d1e960c820036e1c90960992 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 13 Apr 2020 13:28:52 +0200 Subject: [PATCH 01/10] htlcswitch: refactor handleLocalDispatch Move creation of the goroutine as a preparation for sync local routing --- htlcswitch/switch.go | 74 +++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 42 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index a299a812..f88f16f6 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -774,49 +774,37 @@ func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, } } -// handleLocalDispatch is used at the start/end of the htlc update life cycle. -// At the start (1) it is used to send the htlc to the channel link without -// creation of circuit. At the end (2) it is used to notify the user about the -// result of his payment is it was successful or not. -// -// Alice Bob Carol -// o --add----> o ---add----> o -// (1) -// -// (2) -// o <-settle-- o <--settle-- o -// Alice Bob Carol -// -func (s *Switch) handleLocalDispatch(pkt *htlcPacket) error { - // User have created the htlc update therefore we should find the - // appropriate channel link and send the payment over this link. - if htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC); ok { - link, err := s.handleLocalAddHTLC(pkt, htlc) - if err != nil { - // Notify the htlc notifier of a link failure on our - // outgoing link. Incoming timelock/amount values are - // not set because they are not present for local sends. - s.cfg.HtlcNotifier.NotifyLinkFailEvent( - newHtlcKey(pkt), - HtlcInfo{ - OutgoingTimeLock: htlc.Expiry, - OutgoingAmt: htlc.Amount, - }, - HtlcEventTypeSend, - err, - false, - ) - - return err - } - - return link.HandleSwitchPacket(pkt) +// handleLocalUpdateAddDispatch is used at the start of the htlc update life +// cycle. It is used to send the htlc to the channel link without creation of +// circuit. +func (s *Switch) handleLocalUpdateAddDispatch(pkt *htlcPacket) error { + htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) + if !ok { + return errors.New("not an UpdateAdd packet") } - s.wg.Add(1) - go s.handleLocalResponse(pkt) + // User have created the htlc update therefore we should find the + // appropriate channel link and send the payment over this link. + link, err := s.handleLocalAddHTLC(pkt, htlc) + if err != nil { + // Notify the htlc notifier of a link failure on our + // outgoing link. Incoming timelock/amount values are + // not set because they are not present for local sends. + s.cfg.HtlcNotifier.NotifyLinkFailEvent( + newHtlcKey(pkt), + HtlcInfo{ + OutgoingTimeLock: htlc.Expiry, + OutgoingAmt: htlc.Amount, + }, + HtlcEventTypeSend, + err, + false, + ) - return nil + return err + } + + return link.HandleSwitchPacket(pkt) } // handleLocalAddHTLC handles the addition of a htlc for a send that @@ -1065,7 +1053,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { if packet.incomingChanID == hop.Source { // A blank incomingChanID indicates that this is // a pending user-initiated payment. - return s.handleLocalDispatch(packet) + return s.handleLocalUpdateAddDispatch(packet) } // Before we attempt to find a non-strict forwarding path for @@ -1268,7 +1256,9 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { // A blank IncomingChanID in a circuit indicates that it is a pending // user-initiated payment. if packet.incomingChanID == hop.Source { - return s.handleLocalDispatch(packet) + s.wg.Add(1) + go s.handleLocalResponse(packet) + return nil } // Check to see that the source link is online before removing From babb0a36b4f479009fab6a89d003154e163fda7a Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 13 Apr 2020 14:23:36 +0200 Subject: [PATCH 02/10] switch/test: use external interface for testing Previously the forward(...) method was used in forwarding tests, while that code path isn't used for forwards in reality. --- htlcswitch/switch_test.go | 88 ++++++++++++++++++++++----------------- 1 file changed, 50 insertions(+), 38 deletions(-) diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index de905961..60604c42 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -213,7 +213,7 @@ func TestSwitchSendPending(t *testing.T) { // Send the ADD packet, this should not be forwarded out to the link // since there are no eligible links. - err = s.forward(packet) + err = forwardPackets(t, s, packet) linkErr, ok := err.(*LinkError) if !ok { t.Fatalf("expected link error, got: %T", err) @@ -249,7 +249,7 @@ func TestSwitchSendPending(t *testing.T) { packet.incomingHTLCID++ // Handle the request and checks that bob channel link received it. - if err := s.forward(packet); err != nil { + if err := forwardPackets(t, s, packet); err != nil { t.Fatalf("unexpected forward failure: %v", err) } @@ -322,7 +322,7 @@ func TestSwitchForward(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(packet); err != nil { + if err := forwardPackets(t, s, packet); err != nil { t.Fatal(err) } @@ -356,7 +356,7 @@ func TestSwitchForward(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := s.forward(packet); err != nil { + if err := forwardPackets(t, s, packet); err != nil { t.Fatal(err) } @@ -451,7 +451,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(ogPacket); err != nil { + if err := forwardPackets(t, s, ogPacket); err != nil { t.Fatal(err) } @@ -539,7 +539,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Send the fail packet from the remote peer through the switch. - if err := s2.forward(fail); err != nil { + if err := <-s2.ForwardPackets(nil, fail); err != nil { t.Fatalf(err.Error()) } @@ -563,7 +563,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Send the fail packet from the remote peer through the switch. - if err := s2.forward(fail); err == nil { + if err := <-s2.ForwardPackets(nil, fail); err == nil { t.Fatalf("expected failure when sending duplicate fail " + "with no pending circuit") } @@ -646,7 +646,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(ogPacket); err != nil { + if err := forwardPackets(t, s, ogPacket); err != nil { t.Fatal(err) } @@ -736,7 +736,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { } // Send the settle packet from the remote peer through the switch. - if err := s2.forward(settle); err != nil { + if err := <-s2.ForwardPackets(nil, settle); err != nil { t.Fatalf(err.Error()) } @@ -761,7 +761,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { } // Send the settle packet again, which should fail. - if err := s2.forward(settle); err != nil { + if err := <-s2.ForwardPackets(nil, settle); err != nil { t.Fatalf("expected success when sending duplicate settle " + "with no pending circuit") } @@ -844,7 +844,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(ogPacket); err != nil { + if err := forwardPackets(t, s, ogPacket); err != nil { t.Fatal(err) } @@ -915,12 +915,10 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { t.Fatalf("wrong amount of half circuits") } - // Resend the failed htlc, it should be returned to alice since the + // Resend the failed htlc. The packet will be dropped silently since the // switch will detect that it has been half added previously. - err = s2.forward(ogPacket) - if err != ErrDuplicateAdd { - t.Fatal("unexpected error when reforwarding a "+ - "failed packet", err) + if err := <-s2.ForwardPackets(nil, ogPacket); err != nil { + t.Fatal(err) } // After detecting an incomplete forward, the fail packet should have @@ -1011,7 +1009,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(ogPacket); err != nil { + if err := forwardPackets(t, s, ogPacket); err != nil { t.Fatal(err) } @@ -1079,20 +1077,20 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { // Resend the failed htlc, it should be returned to alice since the // switch will detect that it has been half added previously. - err = s2.forward(ogPacket) - linkErr, ok := err.(*LinkError) - if !ok { - t.Fatalf("expected link error, got: %T", err) - } - if linkErr.FailureDetail != OutgoingFailureIncompleteForward { - t.Fatalf("expected incomplete forward, got: %v", - linkErr.FailureDetail) + err = <-s2.ForwardPackets(nil, ogPacket) + if err != nil { + t.Fatal(err) } // After detecting an incomplete forward, the fail packet should have // been returned to the sender. select { - case <-aliceChannelLink.packets: + case pkt := <-aliceChannelLink.packets: + linkErr := pkt.linkFailure + if linkErr.FailureDetail != OutgoingFailureIncompleteForward { + t.Fatalf("expected incomplete forward, got: %v", + linkErr.FailureDetail) + } case <-time.After(time.Second): t.Fatal("request was not propagated to destination") } @@ -1177,7 +1175,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(ogPacket); err != nil { + if err := forwardPackets(t, s, ogPacket); err != nil { t.Fatal(err) } @@ -1267,7 +1265,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := s2.forward(ogPacket); err != nil { + if err := <-s2.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1417,7 +1415,7 @@ func TestCircularForwards(t *testing.T) { // Attempt to forward the packet and check for the expected // error. - err = s.forward(packet) + err = forwardPackets(t, s, packet) if !reflect.DeepEqual(err, test.expectedErr) { t.Fatalf("expected: %v, got: %v", test.expectedErr, err) @@ -1637,7 +1635,7 @@ func testSkipIneligibleLinksMultiHopForward(t *testing.T, } // The request to forward should fail as - err = s.forward(packet) + err = forwardPackets(t, s, packet) failure := obfuscator.(*mockObfuscator).failure if testCase.expectedReply == lnwire.CodeNone { @@ -1796,7 +1794,7 @@ func TestSwitchCancel(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(request); err != nil { + if err := forwardPackets(t, s, request); err != nil { t.Fatal(err) } @@ -1828,7 +1826,7 @@ func TestSwitchCancel(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := s.forward(request); err != nil { + if err := forwardPackets(t, s, request); err != nil { t.Fatal(err) } @@ -1911,7 +1909,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(request); err != nil { + if err := forwardPackets(t, s, request); err != nil { t.Fatal(err) } @@ -1941,7 +1939,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := s.forward(request); err != nil { + if err := forwardPackets(t, s, request); err != nil { t.Fatal(err) } @@ -1970,7 +1968,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := s.forward(request); err != nil { + if err := forwardPackets(t, s, request); err != nil { t.Fatal(err) } @@ -1996,7 +1994,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := s.forward(request); err != nil { + if err := forwardPackets(t, s, request); err != nil { t.Fatal(err) } @@ -2139,7 +2137,7 @@ func TestSwitchSendPayment(t *testing.T) { }, } - if err := s.forward(packet); err != nil { + if err := forwardPackets(t, s, packet); err != nil { t.Fatalf("can't forward htlc packet: %v", err) } @@ -2634,7 +2632,7 @@ func TestInvalidFailure(t *testing.T) { }, } - if err := s.forward(packet); err != nil { + if err := forwardPackets(t, s, packet); err != nil { t.Fatalf("can't forward htlc packet: %v", err) } @@ -3060,3 +3058,17 @@ func getThreeHopEvents(channels *clusterChannels, htlcID uint64, return aliceEvents, bobEvents, carolEvents } + +// forwardPackets forwards packets to the switch and enforces a timeout on the +// reply. +func forwardPackets(t *testing.T, s *Switch, packets ...*htlcPacket) error { + + select { + case err := <-s.ForwardPackets(nil, packets...): + return err + + case <-time.After(time.Second): + t.Fatal("no timely reply from switch") + return nil + } +} From fe35d35458caea48cb8b795d66f65989836e146e Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 13 Apr 2020 14:27:20 +0200 Subject: [PATCH 03/10] htlcswitch: simplify local sends Embed forward method into SendHTLC and remove redundant type check. --- htlcswitch/switch.go | 67 +++++++++++++------------------------------- 1 file changed, 20 insertions(+), 47 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index f88f16f6..659db5b6 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -451,7 +451,26 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, htlc: htlc, } - return s.forward(packet) + circuit := newPaymentCircuit(&htlc.PaymentHash, packet) + actions, err := s.circuits.CommitCircuits(circuit) + if err != nil { + log.Errorf("unable to commit circuit in switch: %v", err) + return err + } + + // Drop duplicate packet if it has already been seen. + switch { + case len(actions.Drops) == 1: + return ErrDuplicateAdd + + case len(actions.Fails) == 1: + return err + } + + // Send packet to link. + packet.circuit = circuit + + return s.route(packet) } // UpdateForwardingPolicies sends a message to the switch to update the @@ -498,52 +517,6 @@ func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID, return circuit != nil && circuit.Incoming.ChanID != hop.Source } -// forward is used in order to find next channel link and apply htlc update. -// Also this function is used by channel links itself in order to forward the -// update after it has been included in the channel. -func (s *Switch) forward(packet *htlcPacket) error { - switch htlc := packet.htlc.(type) { - case *lnwire.UpdateAddHTLC: - circuit := newPaymentCircuit(&htlc.PaymentHash, packet) - actions, err := s.circuits.CommitCircuits(circuit) - if err != nil { - log.Errorf("unable to commit circuit in switch: %v", err) - return err - } - - // Drop duplicate packet if it has already been seen. - switch { - case len(actions.Drops) == 1: - return ErrDuplicateAdd - - case len(actions.Fails) == 1: - if packet.incomingChanID == hop.Source { - return err - } - - var failure lnwire.FailureMessage - update, err := s.cfg.FetchLastChannelUpdate( - packet.incomingChanID, - ) - if err != nil { - failure = &lnwire.FailTemporaryNodeFailure{} - } else { - failure = lnwire.NewTemporaryChannelFailure(update) - } - - linkError := NewDetailedLinkError( - failure, OutgoingFailureIncompleteForward, - ) - - return s.failAddPacket(packet, linkError) - } - - packet.circuit = circuit - } - - return s.route(packet) -} - // ForwardPackets adds a list of packets to the switch for processing. Fails // and settles are added on a first past, simultaneously constructing circuits // for any adds. After persisting the circuits, another pass of the adds is From 7afb43a012c933abe6aa93058254a785ac8b042f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 13 Apr 2020 14:33:44 +0200 Subject: [PATCH 04/10] htlcswitch: bypass main loop for local sends There is no concurrent access in this code path, so there is no need to pass the call through the main event loop. --- htlcswitch/switch.go | 86 ++++++++++++-------------------------------- 1 file changed, 23 insertions(+), 63 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 659db5b6..6fabb3ef 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -470,7 +470,28 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, // Send packet to link. packet.circuit = circuit - return s.route(packet) + // User has created the htlc update therefore we should find the + // appropriate channel link and send the payment over this link. + link, linkErr := s.handleLocalAddHTLC(packet, htlc) + if linkErr != nil { + // Notify the htlc notifier of a link failure on our + // outgoing link. Incoming timelock/amount values are + // not set because they are not present for local sends. + s.cfg.HtlcNotifier.NotifyLinkFailEvent( + newHtlcKey(packet), + HtlcInfo{ + OutgoingTimeLock: htlc.Expiry, + OutgoingAmt: htlc.Amount, + }, + HtlcEventTypeSend, + linkErr, + false, + ) + + return linkErr + } + + return link.HandleSwitchPacket(packet) } // UpdateForwardingPolicies sends a message to the switch to update the @@ -702,28 +723,6 @@ func (s *Switch) proxyFwdErrs(num *int, wg *sync.WaitGroup, } } -// route sends a single htlcPacket through the switch and synchronously awaits a -// response. -func (s *Switch) route(packet *htlcPacket) error { - command := &plexPacket{ - pkt: packet, - err: make(chan error, 1), - } - - select { - case s.htlcPlex <- command: - case <-s.quit: - return ErrSwitchExiting - } - - select { - case err := <-command.err: - return err - case <-s.quit: - return ErrSwitchExiting - } -} - // routeAsync sends a packet through the htlc switch, using the provided err // chan to propagate errors back to the caller. The link's quit channel is // provided so that the send can be canceled if either the link or the switch @@ -747,39 +746,6 @@ func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, } } -// handleLocalUpdateAddDispatch is used at the start of the htlc update life -// cycle. It is used to send the htlc to the channel link without creation of -// circuit. -func (s *Switch) handleLocalUpdateAddDispatch(pkt *htlcPacket) error { - htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) - if !ok { - return errors.New("not an UpdateAdd packet") - } - - // User have created the htlc update therefore we should find the - // appropriate channel link and send the payment over this link. - link, err := s.handleLocalAddHTLC(pkt, htlc) - if err != nil { - // Notify the htlc notifier of a link failure on our - // outgoing link. Incoming timelock/amount values are - // not set because they are not present for local sends. - s.cfg.HtlcNotifier.NotifyLinkFailEvent( - newHtlcKey(pkt), - HtlcInfo{ - OutgoingTimeLock: htlc.Expiry, - OutgoingAmt: htlc.Amount, - }, - HtlcEventTypeSend, - err, - false, - ) - - return err - } - - return link.HandleSwitchPacket(pkt) -} - // handleLocalAddHTLC handles the addition of a htlc for a send that // originates from our node. It returns the link that the htlc should // be forwarded outwards on, and a link error if the htlc cannot be @@ -1014,7 +980,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { case *lnwire.UpdateAddHTLC: // Check if the node is set to reject all onward HTLCs and also make // sure that HTLC is not from the source node. - if s.cfg.RejectHTLC && packet.incomingChanID != hop.Source { + if s.cfg.RejectHTLC { failure := NewDetailedLinkError( &lnwire.FailChannelDisabled{}, OutgoingFailureForwardsDisabled, @@ -1023,12 +989,6 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return s.failAddPacket(packet, failure) } - if packet.incomingChanID == hop.Source { - // A blank incomingChanID indicates that this is - // a pending user-initiated payment. - return s.handleLocalUpdateAddDispatch(packet) - } - // Before we attempt to find a non-strict forwarding path for // this htlc, check whether the htlc is being routed over the // same incoming and outgoing channel. If our node does not From b559811bf5e93f3200b68475bb748792acb5b5f9 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2020 15:22:56 +0200 Subject: [PATCH 05/10] htlcswitch: extract updateCommitTxOrFail in link Deduplicate code and prepare for further split of handleDownstreamPkt. --- htlcswitch/link.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 019ba007..db7f15ff 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1097,9 +1097,7 @@ func (l *channelLink) htlcManager() { // including all the currently pending entries. If the // send was unsuccessful, then abandon the update, // waiting for the revocation window to open up. - if err := l.updateCommitTx(); err != nil { - l.fail(LinkFailureError{code: ErrInternalError}, - "unable to update commitment: %v", err) + if !l.updateCommitTxOrFail() { return } @@ -1476,9 +1474,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { if l.channel.PendingLocalUpdateCount() >= uint64(l.cfg.BatchSize) || isSettle { - if err := l.updateCommitTx(); err != nil { - l.fail(LinkFailureError{code: ErrInternalError}, - "unable to update commitment: %v", err) + if !l.updateCommitTxOrFail() { return } } @@ -1753,9 +1749,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // Otherwise, the remote party initiated the state transition, // so we'll reply with a signature to provide them with their // version of the latest commitment. - if err := l.updateCommitTx(); err != nil { - l.fail(LinkFailureError{code: ErrInternalError}, - "unable to update commitment: %v", err) + if !l.updateCommitTxOrFail() { return } @@ -1832,9 +1826,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // but there are still remote updates that are not in the remote // commit tx yet, send out an update. if l.channel.OweCommitment(true) { - if err := l.updateCommitTx(); err != nil { - l.fail(LinkFailureError{code: ErrInternalError}, - "unable to update commitment: %v", err) + if !l.updateCommitTxOrFail() { return } } @@ -1918,6 +1910,18 @@ func (l *channelLink) ackDownStreamPackets() error { return nil } +// updateCommitTxOrFail updates the commitment tx and if that fails, it fails +// the link. +func (l *channelLink) updateCommitTxOrFail() bool { + if err := l.updateCommitTx(); err != nil { + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) + return false + } + + return true +} + // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point. From 55930df70d57e1a89fa837d5bfa8d44968b348de Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2020 15:26:41 +0200 Subject: [PATCH 06/10] htlcswitch: update commit tx per downstream msg type Unroll common code to allow splitting in separate handlers per message type. --- htlcswitch/link.go | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index db7f15ff..c5c6335e 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1261,7 +1261,6 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration { // // TODO(roasbeef): add sync ntfn to ensure switch always has consistent view? func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { - var isSettle bool switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: // If hodl.AddOutgoing mode is active, we exit early to simulate @@ -1328,6 +1327,8 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { getEventType(pkt), ) + l.tryBatchUpdateCommitTx() + case *lnwire.UpdateFulfillHTLC: // If hodl.SettleOutgoing mode is active, we exit early to // simulate arbitrary delays between the switch adding the @@ -1384,7 +1385,6 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { // Then we send the HTLC settle message to the connected peer // so we can continue the propagation of the settle message. l.cfg.Peer.SendMessage(false, htlc) - isSettle = true // Send a settle event notification to htlcNotifier. l.cfg.HtlcNotifier.NotifySettleEvent( @@ -1392,6 +1392,9 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { getEventType(pkt), ) + // Immediately update the commitment tx to minimize latency. + l.updateCommitTxOrFail() + case *lnwire.UpdateFailHTLC: // If hodl.FailOutgoing mode is active, we exit early to // simulate arbitrary delays between the switch adding a FAIL to @@ -1448,7 +1451,6 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { // We send the HTLC message to the peer which initially created // the HTLC. l.cfg.Peer.SendMessage(false, htlc) - isSettle = true // If the packet does not have a link failure set, it failed // further down the route so we notify a forwarding failure. @@ -1467,17 +1469,20 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { newHtlcKey(pkt), getEventType(pkt), ) } + + // Immediately update the commitment tx to minimize latency. + l.updateCommitTxOrFail() + } +} + +// tryBatchUpdateCommitTx updates the commitment transaction if the batch is +// full. +func (l *channelLink) tryBatchUpdateCommitTx() { + if l.channel.PendingLocalUpdateCount() < uint64(l.cfg.BatchSize) { + return } - // If this newly added update exceeds the min batch size for adds, or - // this is a settle request, then initiate an update. - if l.channel.PendingLocalUpdateCount() >= uint64(l.cfg.BatchSize) || - isSettle { - - if !l.updateCommitTxOrFail() { - return - } - } + l.updateCommitTxOrFail() } // cleanupSpuriousResponse attempts to ack any AddRef or SettleFailRef From de2df5606a5b3a7acae8ccc31d28e7dd77e907eb Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2020 15:29:31 +0200 Subject: [PATCH 07/10] htlcswitch: extract handleDownstreamUpdateAdd in link To be able to call just the UpdateAdd logic for synchronously handled local adds in a later commit. --- htlcswitch/link.go | 138 ++++++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 65 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index c5c6335e..01a81448 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -1254,6 +1254,78 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration { return time.Duration(prand.Int63n(upper-lower) + lower) } +// handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the +// downstream HTLC Switch. +func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { + htlc := pkt.htlc.(*lnwire.UpdateAddHTLC) + + // If hodl.AddOutgoing mode is active, we exit early to simulate + // arbitrary delays between the switch adding an ADD to the + // mailbox, and the HTLC being added to the commitment state. + if l.cfg.HodlMask.Active(hodl.AddOutgoing) { + l.log.Warnf(hodl.AddOutgoing.Warning()) + l.mailBox.AckPacket(pkt.inKey()) + return + } + + // A new payment has been initiated via the downstream channel, + // so we add the new HTLC to our local log, then update the + // commitment chains. + htlc.ChanID = l.ChanID() + openCircuitRef := pkt.inKey() + index, err := l.channel.AddHTLC(htlc, &openCircuitRef) + if err != nil { + // The HTLC was unable to be added to the state machine, + // as a result, we'll signal the switch to cancel the + // pending payment. + l.log.Warnf("Unable to handle downstream add HTLC: %v", + err) + + // Remove this packet from the link's mailbox, this + // prevents it from being reprocessed if the link + // restarts and resets it mailbox. If this response + // doesn't make it back to the originating link, it will + // be rejected upon attempting to reforward the Add to + // the switch, since the circuit was never fully opened, + // and the forwarding package shows it as + // unacknowledged. + l.mailBox.FailAdd(pkt) + + return + } + + l.log.Tracef("received downstream htlc: payment_hash=%x, "+ + "local_log_index=%v, pend_updates=%v", + htlc.PaymentHash[:], index, + l.channel.PendingLocalUpdateCount()) + + pkt.outgoingChanID = l.ShortChanID() + pkt.outgoingHTLCID = index + htlc.ID = index + + l.log.Debugf("queueing keystone of ADD open circuit: %s->%s", + pkt.inKey(), pkt.outKey()) + + l.openedCircuits = append(l.openedCircuits, pkt.inKey()) + l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) + + _ = l.cfg.Peer.SendMessage(false, htlc) + + // Send a forward event notification to htlcNotifier. + l.cfg.HtlcNotifier.NotifyForwardingEvent( + newHtlcKey(pkt), + HtlcInfo{ + IncomingTimeLock: pkt.incomingTimeout, + IncomingAmt: pkt.incomingAmount, + OutgoingTimeLock: htlc.Expiry, + OutgoingAmt: htlc.Amount, + }, + getEventType(pkt), + ) + + l.tryBatchUpdateCommitTx() +} + // handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC // Switch. Possible messages sent by the switch include requests to forward new // HTLCs, timeout previously cleared HTLCs, and finally to settle currently @@ -1263,71 +1335,7 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration { func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: - // If hodl.AddOutgoing mode is active, we exit early to simulate - // arbitrary delays between the switch adding an ADD to the - // mailbox, and the HTLC being added to the commitment state. - if l.cfg.HodlMask.Active(hodl.AddOutgoing) { - l.log.Warnf(hodl.AddOutgoing.Warning()) - l.mailBox.AckPacket(pkt.inKey()) - return - } - - // A new payment has been initiated via the downstream channel, - // so we add the new HTLC to our local log, then update the - // commitment chains. - htlc.ChanID = l.ChanID() - openCircuitRef := pkt.inKey() - index, err := l.channel.AddHTLC(htlc, &openCircuitRef) - if err != nil { - // The HTLC was unable to be added to the state machine, - // as a result, we'll signal the switch to cancel the - // pending payment. - l.log.Warnf("Unable to handle downstream add HTLC: %v", - err) - - // Remove this packet from the link's mailbox, this - // prevents it from being reprocessed if the link - // restarts and resets it mailbox. If this response - // doesn't make it back to the originating link, it will - // be rejected upon attempting to reforward the Add to - // the switch, since the circuit was never fully opened, - // and the forwarding package shows it as - // unacknowledged. - l.mailBox.FailAdd(pkt) - - return - } - - l.log.Tracef("received downstream htlc: payment_hash=%x, "+ - "local_log_index=%v, pend_updates=%v", - htlc.PaymentHash[:], index, - l.channel.PendingLocalUpdateCount()) - - pkt.outgoingChanID = l.ShortChanID() - pkt.outgoingHTLCID = index - htlc.ID = index - - l.log.Debugf("queueing keystone of ADD open circuit: %s->%s", - pkt.inKey(), pkt.outKey()) - - l.openedCircuits = append(l.openedCircuits, pkt.inKey()) - l.keystoneBatch = append(l.keystoneBatch, pkt.keystone()) - - l.cfg.Peer.SendMessage(false, htlc) - - // Send a forward event notification to htlcNotifier. - l.cfg.HtlcNotifier.NotifyForwardingEvent( - newHtlcKey(pkt), - HtlcInfo{ - IncomingTimeLock: pkt.incomingTimeout, - IncomingAmt: pkt.incomingAmount, - OutgoingTimeLock: htlc.Expiry, - OutgoingAmt: htlc.Amount, - }, - getEventType(pkt), - ) - - l.tryBatchUpdateCommitTx() + l.handleDownstreamUpdateAdd(pkt) case *lnwire.UpdateFulfillHTLC: // If hodl.SettleOutgoing mode is active, we exit early to From c325bf8c57c45a1a6f11f38a4eea9a29f60db7e8 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 13 Apr 2020 17:29:52 +0200 Subject: [PATCH 08/10] htlcswitch: sync link hand-off This commit extends the link with a new synchronous delivery point for local UpdateAddHTLC messages. The switch method SendHTLC is updated to use this delivery point and thereby becomes a synchronous call. For MPP payments, synchronous hand-off is important. Otherwise the next pathfinding round could start without the channel balance updated yet. --- htlcswitch/interfaces.go | 4 ++ htlcswitch/link.go | 74 ++++++++++++++++++++++++---- htlcswitch/mock.go | 5 ++ htlcswitch/switch.go | 2 +- lntest/itest/lnd_test.go | 51 ++++++++++++++++--- lntest/itest/log_error_whitelist.txt | 2 + 6 files changed, 120 insertions(+), 18 deletions(-) diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index b28d137f..f51f1b15 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -70,6 +70,10 @@ type ChannelLink interface { // possible). HandleSwitchPacket(*htlcPacket) error + // HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC + // packet. It will be processed synchronously. + HandleLocalAddPacket(*htlcPacket) error + // HandleChannelUpdate handles the htlc requests as settle/add/fail // which sent to us from remote peer we have a channel with. // diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 01a81448..f15b17df 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -286,6 +286,14 @@ type ChannelLinkConfig struct { HtlcNotifier htlcNotifier } +// localUpdateAddMsg contains a locally initiated htlc and a channel that will +// receive the outcome of the link processing. This channel must be buffered to +// prevent the link from blocking. +type localUpdateAddMsg struct { + pkt *htlcPacket + err chan error +} + // channelLink is the service which drives a channel's commitment update // state-machine. In the event that an HTLC needs to be propagated to another // link, the forward handler from config is used which sends HTLC to the @@ -346,6 +354,10 @@ type channelLink struct { // by the HTLC switch. downstream chan *htlcPacket + // localUpdateAdd is a channel to which locally initiated HTLCs are + // sent across. + localUpdateAdd chan *localUpdateAddMsg + // htlcUpdates is a channel that we'll use to update outside // sub-systems with the latest set of active HTLC's on our channel. htlcUpdates chan *contractcourt.ContractUpdate @@ -395,11 +407,12 @@ func NewChannelLink(cfg ChannelLinkConfig, channel: channel, shortChanID: channel.ShortChanID(), // TODO(roasbeef): just do reserve here? - htlcUpdates: make(chan *contractcourt.ContractUpdate), - hodlMap: make(map[channeldb.CircuitKey]hodlHtlc), - hodlQueue: queue.NewConcurrentQueue(10), - log: build.NewPrefixLog(logPrefix, log), - quit: make(chan struct{}), + htlcUpdates: make(chan *contractcourt.ContractUpdate), + hodlMap: make(map[channeldb.CircuitKey]hodlHtlc), + hodlQueue: queue.NewConcurrentQueue(10), + log: build.NewPrefixLog(logPrefix, log), + quit: make(chan struct{}), + localUpdateAdd: make(chan *localUpdateAddMsg), } } @@ -1112,6 +1125,10 @@ func (l *channelLink) htlcManager() { case pkt := <-l.downstream: l.handleDownstreamPkt(pkt) + // A message containing a locally initiated add was received. + case msg := <-l.localUpdateAdd: + msg.err <- l.handleDownstreamUpdateAdd(msg.pkt) + // A message from the connected peer was just received. This // indicates that we have a new incoming HTLC, either directly // for us, or part of a multi-hop HTLC circuit. @@ -1256,8 +1273,11 @@ func (l *channelLink) randomFeeUpdateTimeout() time.Duration { // handleDownstreamUpdateAdd processes an UpdateAddHTLC packet sent from the // downstream HTLC Switch. -func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { - htlc := pkt.htlc.(*lnwire.UpdateAddHTLC) +func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { + htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) + if !ok { + return errors.New("not an UpdateAddHTLC packet") + } // If hodl.AddOutgoing mode is active, we exit early to simulate // arbitrary delays between the switch adding an ADD to the @@ -1265,7 +1285,7 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { if l.cfg.HodlMask.Active(hodl.AddOutgoing) { l.log.Warnf(hodl.AddOutgoing.Warning()) l.mailBox.AckPacket(pkt.inKey()) - return + return nil } // A new payment has been initiated via the downstream channel, @@ -1291,7 +1311,10 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { // unacknowledged. l.mailBox.FailAdd(pkt) - return + return NewDetailedLinkError( + lnwire.NewTemporaryChannelFailure(nil), + OutgoingFailureDownstreamHtlcAdd, + ) } l.log.Tracef("received downstream htlc: payment_hash=%x, "+ @@ -1324,6 +1347,8 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { ) l.tryBatchUpdateCommitTx() + + return nil } // handleDownstreamPkt processes an HTLC packet sent from the downstream HTLC @@ -1335,7 +1360,9 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) { func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: - l.handleDownstreamUpdateAdd(pkt) + // Handle add message. The returned error can be ignored, + // because it is also sent through the mailbox. + _ = l.handleDownstreamUpdateAdd(pkt) case *lnwire.UpdateFulfillHTLC: // If hodl.SettleOutgoing mode is active, we exit early to @@ -2329,6 +2356,33 @@ func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error { return l.mailBox.AddPacket(pkt) } +// HandleLocalAddPacket handles a locally-initiated UpdateAddHTLC packet. It +// will be processed synchronously. +// +// NOTE: Part of the ChannelLink interface. +func (l *channelLink) HandleLocalAddPacket(pkt *htlcPacket) error { + l.log.Tracef("received switch packet outkey=%v", pkt.outKey()) + + // Create a buffered result channel to prevent the link from blocking. + errChan := make(chan error, 1) + + select { + case l.localUpdateAdd <- &localUpdateAddMsg{ + pkt: pkt, + err: errChan, + }: + case <-l.quit: + return ErrLinkShuttingDown + } + + select { + case err := <-errChan: + return err + case <-l.quit: + return ErrLinkShuttingDown + } +} + // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent // to us from remote peer we have a channel with. // diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e9a2a1ef..5e74756e 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -702,6 +702,11 @@ func (f *mockChannelLink) HandleSwitchPacket(pkt *htlcPacket) error { return nil } +func (f *mockChannelLink) HandleLocalAddPacket(pkt *htlcPacket) error { + _ = f.mailBox.AddPacket(pkt) + return nil +} + func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) { } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 6fabb3ef..1d232374 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -491,7 +491,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64, return linkErr } - return link.HandleSwitchPacket(packet) + return link.HandleLocalAddPacket(packet) } // UpdateForwardingPolicies sends a message to the switch to update the diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 475507ba..263b9471 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -4182,6 +4182,34 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { }, ) + // Next, we'll create Fred who is going to initiate the payment and + // establish a channel to from him to Carol. We can't perform this test + // by paying from Carol directly to Dave, because the '--unsafe-replay' + // setup doesn't apply to locally added htlcs. In that case, the + // mailbox, that is responsible for generating the replay, is bypassed. + fred, err := net.NewNode("Fred", nil) + if err != nil { + t.Fatalf("unable to create new nodes: %v", err) + } + defer shutdownAndAssert(net, t, fred) + + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + if err := net.ConnectNodes(ctxt, fred, carol); err != nil { + t.Fatalf("unable to connect fred to carol: %v", err) + } + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + err = net.SendCoins(ctxt, btcutil.SatoshiPerBitcoin, fred) + if err != nil { + t.Fatalf("unable to send coins to fred: %v", err) + } + ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout) + chanPointFC := openChannelAndAssert( + ctxt, t, net, fred, carol, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + // Now that the channel is open, create an invoice for Dave which // expects a payment of 1000 satoshis from Carol paid via a particular // preimage. @@ -4198,8 +4226,7 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to add invoice: %v", err) } - // Wait for Carol to recognize and advertise the new channel generated - // above. + // Wait for all channels to be recognized and advertized. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) err = carol.WaitForNetworkChannelOpen(ctxt, chanPoint) if err != nil { @@ -4211,13 +4238,23 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("bob didn't advertise channel before "+ "timeout: %v", err) } + err = carol.WaitForNetworkChannelOpen(ctxt, chanPointFC) + if err != nil { + t.Fatalf("alice didn't advertise channel before "+ + "timeout: %v", err) + } + err = fred.WaitForNetworkChannelOpen(ctxt, chanPointFC) + if err != nil { + t.Fatalf("bob didn't advertise channel before "+ + "timeout: %v", err) + } - // With the invoice for Dave added, send a payment from Carol paying + // With the invoice for Dave added, send a payment from Fred paying // to the above generated invoice. ctx, cancel := context.WithCancel(ctxb) defer cancel() - payStream, err := carol.SendPayment(ctx) + payStream, err := fred.SendPayment(ctx) if err != nil { t.Fatalf("unable to open payment stream: %v", err) } @@ -4270,12 +4307,12 @@ func testSphinxReplayPersistence(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to receive payment response: %v", err) } - // Construct the response we expect after sending a duplicate packet - // that fails due to sphinx replay detection. + // Assert that Fred receives the expected failure after Carol sent a + // duplicate packet that fails due to sphinx replay detection. if resp.PaymentError == "" { t.Fatalf("expected payment error") } - assertLastHTLCError(t, carol, lnrpc.Failure_INVALID_ONION_KEY) + assertLastHTLCError(t, fred, lnrpc.Failure_INVALID_ONION_KEY) // Since the payment failed, the balance should still be left // unaltered. diff --git a/lntest/itest/log_error_whitelist.txt b/lntest/itest/log_error_whitelist.txt index fcd65cc5..ae12297b 100644 --- a/lntest/itest/log_error_whitelist.txt +++ b/lntest/itest/log_error_whitelist.txt @@ -24,6 +24,7 @@