From 1a6701122cd22824e79a87251f2366334f036a21 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 19 May 2020 12:13:02 +0300 Subject: [PATCH] htlcswitch: change ForwardPackets to return error As part of the preparation to the switch interceptor feature, this function is changed to return error instead of error channel that is closed automatically. Returning an error channel has become complex to maintain and implement when adding more asynchronous flows to the switch. The change doesn't affect the current behavior which logs the errors as before. --- htlcswitch/link.go | 12 ++- htlcswitch/mailbox.go | 10 +- htlcswitch/mailbox_test.go | 15 +-- htlcswitch/switch.go | 79 ++++----------- htlcswitch/switch_test.go | 138 +++++++++++++++++---------- lntest/itest/log_error_whitelist.txt | 12 +-- 6 files changed, 131 insertions(+), 135 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index f15b17df..9276696c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -135,10 +135,10 @@ type ChannelLinkConfig struct { Switch *Switch // ForwardPackets attempts to forward the batch of htlcs through the - // switch, any failed packets will be returned to the provided - // ChannelLink. The link's quit signal should be provided to allow + // switch. The function returns and error in case it fails to send one or + // more packets. The link's quit signal should be provided to allow // cancellation of forwarding during link shutdown. - ForwardPackets func(chan struct{}, ...*htlcPacket) chan error + ForwardPackets func(chan struct{}, ...*htlcPacket) error // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion // blobs, which are then used to inform how to forward an HTLC. @@ -2971,8 +2971,10 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) { filteredPkts = append(filteredPkts, pkt) } - errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...) - go handleBatchFwdErrs(errChan, l.log) + if err := l.cfg.ForwardPackets(l.quit, filteredPkts...); err != nil { + log.Errorf("Unhandled error while reforwarding htlc "+ + "settle/fail over htlcswitch: %v", err) + } } // sendHTLCError functions cancels HTLC and send cancel message back to the diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 95d88763..b082d9ba 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -86,7 +86,7 @@ type mailBoxConfig struct { // forwardPackets send a varidic number of htlcPackets to the switch to // be routed. A quit channel should be provided so that the call can // properly exit during shutdown. - forwardPackets func(chan struct{}, ...*htlcPacket) chan error + forwardPackets func(chan struct{}, ...*htlcPacket) error // clock is a time source for the mailbox. clock clock.Clock @@ -680,8 +680,10 @@ func (m *memoryMailBox) FailAdd(pkt *htlcPacket) { }, } - errChan := m.cfg.forwardPackets(m.quit, failPkt) - go handleBatchFwdErrs(errChan, log) + if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil { + log.Errorf("Unhandled error while reforwarding packets "+ + "settle/fail over htlcswitch: %v", err) + } } // MessageOutBox returns a channel that any new messages ready for delivery @@ -734,7 +736,7 @@ type mailOrchConfig struct { // forwardPackets send a varidic number of htlcPackets to the switch to // be routed. A quit channel should be provided so that the call can // properly exit during shutdown. - forwardPackets func(chan struct{}, ...*htlcPacket) chan error + forwardPackets func(chan struct{}, ...*htlcPacket) error // fetchUpdate retreives the most recent channel update for the channel // this mailbox belongs to. diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index 7e15eb04..ef199467 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -218,16 +218,13 @@ func newMailboxContext(t *testing.T, startTime time.Time, } func (c *mailboxContext) forward(_ chan struct{}, - pkts ...*htlcPacket) chan error { + pkts ...*htlcPacket) error { for _, pkt := range pkts { c.forwards <- pkt } - errChan := make(chan error) - close(errChan) - - return errChan + return nil } func (c *mailboxContext) sendAdds(start, num int) []*htlcPacket { @@ -555,12 +552,8 @@ func TestMailOrchestrator(t *testing.T) { }, nil }, forwardPackets: func(_ chan struct{}, - pkts ...*htlcPacket) chan error { - // Close the channel immediately so the goroutine - // logging errors can exit. - errChan := make(chan error) - close(errChan) - return errChan + pkts ...*htlcPacket) error { + return nil }, clock: clock.NewTestClock(time.Now()), expiry: testExpiry, diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index fd62504d..0eb7ffe8 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -9,7 +9,6 @@ import ( "time" "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btclog" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" @@ -548,24 +547,14 @@ func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID, // given to forward them through the router. The sending link's quit channel is // used to prevent deadlocks when the switch stops a link in the midst of // forwarding. -// -// NOTE: This method guarantees that the returned err chan will eventually be -// closed. The receiver should read on the channel until receiving such a -// signal. func (s *Switch) ForwardPackets(linkQuit chan struct{}, - packets ...*htlcPacket) chan error { + packets ...*htlcPacket) error { var ( // fwdChan is a buffered channel used to receive err msgs from // the htlcPlex when forwarding this batch. fwdChan = make(chan error, len(packets)) - // errChan is a buffered channel returned to the caller, that is - // proxied by the fwdChan. This method guarantees that errChan - // will be closed eventually to alert the receiver that it can - // stop reading from the channel. - errChan = make(chan error, len(packets)) - // numSent keeps a running count of how many packets are // forwarded to the switch, which determines how many responses // we will wait for on the fwdChan.. @@ -574,8 +563,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, // No packets, nothing to do. if len(packets) == 0 { - close(errChan) - return errChan + return nil } // Setup a barrier to prevent the background tasks from processing @@ -590,18 +578,13 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, // it is already in the process of shutting down. select { case <-linkQuit: - close(errChan) - return errChan + return nil case <-s.quit: - close(errChan) - return errChan + return nil default: - // Spawn a goroutine the proxy the errs back to the returned err - // chan. This is done to ensure the err chan returned to the - // caller closed properly, alerting the receiver of completion - // or shutdown. + // Spawn a goroutine to log the errors returned from failed packets. s.wg.Add(1) - go s.proxyFwdErrs(&numSent, &wg, fwdChan, errChan) + go s.logFwdErrs(&numSent, &wg, fwdChan) } // Make a first pass over the packets, forwarding any settles or fails. @@ -619,7 +602,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, default: err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { - return errChan + return fmt.Errorf("failed to forward packet %v", err) } numSent++ } @@ -628,7 +611,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, // If this batch did not contain any circuits to commit, we can return // early. if len(circuits) == 0 { - return errChan + return nil } // Write any circuits that we found to disk. @@ -664,7 +647,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, for _, packet := range addedPackets { err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { - return errChan + return fmt.Errorf("failed to forward packet %v", err) } numSent++ } @@ -693,21 +676,12 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, } } - return errChan + return nil } -// proxyFwdErrs transmits any errors received on `fwdChan` back to `errChan`, -// and guarantees that the `errChan` will be closed after 1) all errors have -// been sent, or 2) the switch has received a shutdown. The `errChan` should be -// buffered with at least the value of `num` after the barrier has been -// released. -// -// NOTE: The receiver of `errChan` should read until the channel closed, since -// this proxying guarantees that the close will happen. -func (s *Switch) proxyFwdErrs(num *int, wg *sync.WaitGroup, - fwdChan, errChan chan error) { +// logFwdErrs logs any errors received on `fwdChan` +func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) { defer s.wg.Done() - defer close(errChan) // Wait here until the outer function has finished persisting // and routing the packets. This guarantees we don't read from num until @@ -718,7 +692,10 @@ func (s *Switch) proxyFwdErrs(num *int, wg *sync.WaitGroup, for i := 0; i < numSent; i++ { select { case err := <-fwdChan: - errChan <- err + if err != nil { + log.Errorf("Unhandled error while reforwarding htlc "+ + "settle/fail over htlcswitch: %v", err) + } case <-s.quit: log.Errorf("unable to forward htlc packet " + "htlc switch was stopped") @@ -1925,28 +1902,10 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { // Since this send isn't tied to a specific link, we pass a nil // link quit channel, meaning the send will fail only if the // switch receives a shutdown request. - errChan := s.ForwardPackets(nil, switchPackets...) - go handleBatchFwdErrs(errChan, log) - } -} - -// handleBatchFwdErrs waits on the given errChan until it is closed, logging the -// errors returned from any unsuccessful forwarding attempts. -func handleBatchFwdErrs(errChan chan error, l btclog.Logger) { - for { - err, ok := <-errChan - if !ok { - // Err chan has been drained or switch is shutting down. - // Either way, return. - return + if err := s.ForwardPackets(nil, switchPackets...); err != nil { + log.Errorf("Unhandled error while reforwarding packets "+ + "settle/fail over htlcswitch: %v", err) } - - if err == nil { - continue - } - - l.Errorf("Unhandled error while reforwarding htlc "+ - "settle/fail over htlcswitch: %v", err) } } diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index b4580aeb..d2977f7d 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -172,6 +172,13 @@ func TestSwitchSendPending(t *testing.T) { t.Fatalf("unable to create alice server: %v", err) } + bobPeer, err := newMockServer( + t, "bob", testStartingHeight, nil, testDefaultDelta, + ) + if err != nil { + t.Fatalf("unable to create bob server: %v", err) + } + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) @@ -181,7 +188,7 @@ func TestSwitchSendPending(t *testing.T) { } defer s.Stop() - chanID1, _, aliceChanID, bobChanID := genIDs() + chanID1, chanID2, aliceChanID, bobChanID := genIDs() pendingChanID := lnwire.ShortChannelID{} @@ -192,6 +199,13 @@ func TestSwitchSendPending(t *testing.T) { t.Fatalf("unable to add alice link: %v", err) } + bobChannelLink := newMockChannelLink( + s, chanID2, bobChanID, bobPeer, true, + ) + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + // Create request which should is being forwarded from Bob channel // link to Alice channel link. preimage, err := genPreimage() @@ -212,7 +226,17 @@ func TestSwitchSendPending(t *testing.T) { // Send the ADD packet, this should not be forwarded out to the link // since there are no eligible links. - err = forwardPackets(t, s, packet) + if err = s.ForwardPackets(nil, packet); err != nil { + t.Fatal(err) + } + select { + case p := <-bobChannelLink.packets: + if p.linkFailure != nil { + err = p.linkFailure + } + case <-time.After(time.Second): + t.Fatal("no timely reply from switch") + } linkErr, ok := err.(*LinkError) if !ok { t.Fatalf("expected link error, got: %T", err) @@ -248,7 +272,7 @@ func TestSwitchSendPending(t *testing.T) { packet.incomingHTLCID++ // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatalf("unexpected forward failure: %v", err) } @@ -321,7 +345,7 @@ func TestSwitchForward(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatal(err) } @@ -355,7 +379,7 @@ func TestSwitchForward(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatal(err) } @@ -450,7 +474,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -538,7 +562,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Send the fail packet from the remote peer through the switch. - if err := <-s2.ForwardPackets(nil, fail); err != nil { + if err := s2.ForwardPackets(nil, fail); err != nil { t.Fatalf(err.Error()) } @@ -562,9 +586,13 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Send the fail packet from the remote peer through the switch. - if err := <-s2.ForwardPackets(nil, fail); err == nil { - t.Fatalf("expected failure when sending duplicate fail " + - "with no pending circuit") + if err := s.ForwardPackets(nil, fail); err != nil { + t.Fatal(err) + } + select { + case <-aliceChannelLink.packets: + t.Fatalf("expected duplicate fail to not arrive at the destination") + case <-time.After(time.Second): } } @@ -645,7 +673,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -735,7 +763,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { } // Send the settle packet from the remote peer through the switch. - if err := <-s2.ForwardPackets(nil, settle); err != nil { + if err := s2.ForwardPackets(nil, settle); err != nil { t.Fatalf(err.Error()) } @@ -759,10 +787,14 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { t.Fatalf("wrong amount of circuits") } - // Send the settle packet again, which should fail. - if err := <-s2.ForwardPackets(nil, settle); err != nil { - t.Fatalf("expected success when sending duplicate settle " + - "with no pending circuit") + // Send the settle packet again, which not arrive at destination. + if err := s2.ForwardPackets(nil, settle); err != nil { + t.Fatal(err) + } + select { + case <-bobChannelLink.packets: + t.Fatalf("expected duplicate fail to not arrive at the destination") + case <-time.After(time.Second): } } @@ -843,7 +875,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -916,7 +948,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { // Resend the failed htlc. The packet will be dropped silently since the // switch will detect that it has been half added previously. - if err := <-s2.ForwardPackets(nil, ogPacket); err != nil { + if err := s2.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1008,7 +1040,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1076,7 +1108,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { // Resend the failed htlc, it should be returned to alice since the // switch will detect that it has been half added previously. - err = <-s2.ForwardPackets(nil, ogPacket) + err = s2.ForwardPackets(nil, ogPacket) if err != nil { t.Fatal(err) } @@ -1174,7 +1206,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1264,7 +1296,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := <-s2.ForwardPackets(nil, ogPacket); err != nil { + if err := s2.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1414,7 +1446,17 @@ func TestCircularForwards(t *testing.T) { // Attempt to forward the packet and check for the expected // error. - err = forwardPackets(t, s, packet) + if err = s.ForwardPackets(nil, packet); err != nil { + t.Fatal(err) + } + select { + case p := <-aliceChannelLink.packets: + if p.linkFailure != nil { + err = p.linkFailure + } + case <-time.After(time.Second): + t.Fatal("no timely reply from switch") + } if !reflect.DeepEqual(err, test.expectedErr) { t.Fatalf("expected: %v, got: %v", test.expectedErr, err) @@ -1634,18 +1676,30 @@ func testSkipIneligibleLinksMultiHopForward(t *testing.T, } // The request to forward should fail as - err = forwardPackets(t, s, packet) - + if err := s.ForwardPackets(nil, packet); err != nil { + t.Fatal(err) + } + var linkError *LinkError + select { + case p := <-aliceChannelLink.packets: + linkError = p.linkFailure + case p := <-bobChannelLink1.packets: + linkError = p.linkFailure + case p := <-bobChannelLink2.packets: + linkError = p.linkFailure + case <-time.After(time.Second): + t.Fatal("no timely reply from switch") + } failure := obfuscator.(*mockObfuscator).failure if testCase.expectedReply == lnwire.CodeNone { - if err != nil { + if linkError != nil { t.Fatalf("forwarding should have succeeded") } if failure != nil { t.Fatalf("unexpected failure %T", failure) } } else { - if err == nil { + if linkError == nil { t.Fatalf("forwarding should have failed due to " + "inactive link") } @@ -1793,7 +1847,7 @@ func TestSwitchCancel(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1825,7 +1879,7 @@ func TestSwitchCancel(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1908,7 +1962,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1938,7 +1992,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1967,7 +2021,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1993,7 +2047,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -2136,7 +2190,7 @@ func TestSwitchSendPayment(t *testing.T) { }, } - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatalf("can't forward htlc packet: %v", err) } @@ -2631,7 +2685,7 @@ func TestInvalidFailure(t *testing.T) { }, } - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatalf("can't forward htlc packet: %v", err) } @@ -3057,17 +3111,3 @@ func getThreeHopEvents(channels *clusterChannels, htlcID uint64, return aliceEvents, bobEvents, carolEvents } - -// forwardPackets forwards packets to the switch and enforces a timeout on the -// reply. -func forwardPackets(t *testing.T, s *Switch, packets ...*htlcPacket) error { - - select { - case err := <-s.ForwardPackets(nil, packets...): - return err - - case <-time.After(time.Second): - t.Fatal("no timely reply from switch") - return nil - } -} diff --git a/lntest/itest/log_error_whitelist.txt b/lntest/itest/log_error_whitelist.txt index 6541cab0..bf808e76 100644 --- a/lntest/itest/log_error_whitelist.txt +++ b/lntest/itest/log_error_whitelist.txt @@ -89,13 +89,13 @@