diff --git a/htlcswitch/link.go b/htlcswitch/link.go index f15b17df..9276696c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -135,10 +135,10 @@ type ChannelLinkConfig struct { Switch *Switch // ForwardPackets attempts to forward the batch of htlcs through the - // switch, any failed packets will be returned to the provided - // ChannelLink. The link's quit signal should be provided to allow + // switch. The function returns and error in case it fails to send one or + // more packets. The link's quit signal should be provided to allow // cancellation of forwarding during link shutdown. - ForwardPackets func(chan struct{}, ...*htlcPacket) chan error + ForwardPackets func(chan struct{}, ...*htlcPacket) error // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion // blobs, which are then used to inform how to forward an HTLC. @@ -2971,8 +2971,10 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) { filteredPkts = append(filteredPkts, pkt) } - errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...) - go handleBatchFwdErrs(errChan, l.log) + if err := l.cfg.ForwardPackets(l.quit, filteredPkts...); err != nil { + log.Errorf("Unhandled error while reforwarding htlc "+ + "settle/fail over htlcswitch: %v", err) + } } // sendHTLCError functions cancels HTLC and send cancel message back to the diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 95d88763..b082d9ba 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -86,7 +86,7 @@ type mailBoxConfig struct { // forwardPackets send a varidic number of htlcPackets to the switch to // be routed. A quit channel should be provided so that the call can // properly exit during shutdown. - forwardPackets func(chan struct{}, ...*htlcPacket) chan error + forwardPackets func(chan struct{}, ...*htlcPacket) error // clock is a time source for the mailbox. clock clock.Clock @@ -680,8 +680,10 @@ func (m *memoryMailBox) FailAdd(pkt *htlcPacket) { }, } - errChan := m.cfg.forwardPackets(m.quit, failPkt) - go handleBatchFwdErrs(errChan, log) + if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil { + log.Errorf("Unhandled error while reforwarding packets "+ + "settle/fail over htlcswitch: %v", err) + } } // MessageOutBox returns a channel that any new messages ready for delivery @@ -734,7 +736,7 @@ type mailOrchConfig struct { // forwardPackets send a varidic number of htlcPackets to the switch to // be routed. A quit channel should be provided so that the call can // properly exit during shutdown. - forwardPackets func(chan struct{}, ...*htlcPacket) chan error + forwardPackets func(chan struct{}, ...*htlcPacket) error // fetchUpdate retreives the most recent channel update for the channel // this mailbox belongs to. diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go index 7e15eb04..ef199467 100644 --- a/htlcswitch/mailbox_test.go +++ b/htlcswitch/mailbox_test.go @@ -218,16 +218,13 @@ func newMailboxContext(t *testing.T, startTime time.Time, } func (c *mailboxContext) forward(_ chan struct{}, - pkts ...*htlcPacket) chan error { + pkts ...*htlcPacket) error { for _, pkt := range pkts { c.forwards <- pkt } - errChan := make(chan error) - close(errChan) - - return errChan + return nil } func (c *mailboxContext) sendAdds(start, num int) []*htlcPacket { @@ -555,12 +552,8 @@ func TestMailOrchestrator(t *testing.T) { }, nil }, forwardPackets: func(_ chan struct{}, - pkts ...*htlcPacket) chan error { - // Close the channel immediately so the goroutine - // logging errors can exit. - errChan := make(chan error) - close(errChan) - return errChan + pkts ...*htlcPacket) error { + return nil }, clock: clock.NewTestClock(time.Now()), expiry: testExpiry, diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index fd62504d..0eb7ffe8 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -9,7 +9,6 @@ import ( "time" "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btclog" "github.com/btcsuite/btcutil" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" @@ -548,24 +547,14 @@ func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID, // given to forward them through the router. The sending link's quit channel is // used to prevent deadlocks when the switch stops a link in the midst of // forwarding. -// -// NOTE: This method guarantees that the returned err chan will eventually be -// closed. The receiver should read on the channel until receiving such a -// signal. func (s *Switch) ForwardPackets(linkQuit chan struct{}, - packets ...*htlcPacket) chan error { + packets ...*htlcPacket) error { var ( // fwdChan is a buffered channel used to receive err msgs from // the htlcPlex when forwarding this batch. fwdChan = make(chan error, len(packets)) - // errChan is a buffered channel returned to the caller, that is - // proxied by the fwdChan. This method guarantees that errChan - // will be closed eventually to alert the receiver that it can - // stop reading from the channel. - errChan = make(chan error, len(packets)) - // numSent keeps a running count of how many packets are // forwarded to the switch, which determines how many responses // we will wait for on the fwdChan.. @@ -574,8 +563,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, // No packets, nothing to do. if len(packets) == 0 { - close(errChan) - return errChan + return nil } // Setup a barrier to prevent the background tasks from processing @@ -590,18 +578,13 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, // it is already in the process of shutting down. select { case <-linkQuit: - close(errChan) - return errChan + return nil case <-s.quit: - close(errChan) - return errChan + return nil default: - // Spawn a goroutine the proxy the errs back to the returned err - // chan. This is done to ensure the err chan returned to the - // caller closed properly, alerting the receiver of completion - // or shutdown. + // Spawn a goroutine to log the errors returned from failed packets. s.wg.Add(1) - go s.proxyFwdErrs(&numSent, &wg, fwdChan, errChan) + go s.logFwdErrs(&numSent, &wg, fwdChan) } // Make a first pass over the packets, forwarding any settles or fails. @@ -619,7 +602,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, default: err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { - return errChan + return fmt.Errorf("failed to forward packet %v", err) } numSent++ } @@ -628,7 +611,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, // If this batch did not contain any circuits to commit, we can return // early. if len(circuits) == 0 { - return errChan + return nil } // Write any circuits that we found to disk. @@ -664,7 +647,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, for _, packet := range addedPackets { err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { - return errChan + return fmt.Errorf("failed to forward packet %v", err) } numSent++ } @@ -693,21 +676,12 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, } } - return errChan + return nil } -// proxyFwdErrs transmits any errors received on `fwdChan` back to `errChan`, -// and guarantees that the `errChan` will be closed after 1) all errors have -// been sent, or 2) the switch has received a shutdown. The `errChan` should be -// buffered with at least the value of `num` after the barrier has been -// released. -// -// NOTE: The receiver of `errChan` should read until the channel closed, since -// this proxying guarantees that the close will happen. -func (s *Switch) proxyFwdErrs(num *int, wg *sync.WaitGroup, - fwdChan, errChan chan error) { +// logFwdErrs logs any errors received on `fwdChan` +func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) { defer s.wg.Done() - defer close(errChan) // Wait here until the outer function has finished persisting // and routing the packets. This guarantees we don't read from num until @@ -718,7 +692,10 @@ func (s *Switch) proxyFwdErrs(num *int, wg *sync.WaitGroup, for i := 0; i < numSent; i++ { select { case err := <-fwdChan: - errChan <- err + if err != nil { + log.Errorf("Unhandled error while reforwarding htlc "+ + "settle/fail over htlcswitch: %v", err) + } case <-s.quit: log.Errorf("unable to forward htlc packet " + "htlc switch was stopped") @@ -1925,28 +1902,10 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { // Since this send isn't tied to a specific link, we pass a nil // link quit channel, meaning the send will fail only if the // switch receives a shutdown request. - errChan := s.ForwardPackets(nil, switchPackets...) - go handleBatchFwdErrs(errChan, log) - } -} - -// handleBatchFwdErrs waits on the given errChan until it is closed, logging the -// errors returned from any unsuccessful forwarding attempts. -func handleBatchFwdErrs(errChan chan error, l btclog.Logger) { - for { - err, ok := <-errChan - if !ok { - // Err chan has been drained or switch is shutting down. - // Either way, return. - return + if err := s.ForwardPackets(nil, switchPackets...); err != nil { + log.Errorf("Unhandled error while reforwarding packets "+ + "settle/fail over htlcswitch: %v", err) } - - if err == nil { - continue - } - - l.Errorf("Unhandled error while reforwarding htlc "+ - "settle/fail over htlcswitch: %v", err) } } diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index b4580aeb..d2977f7d 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -172,6 +172,13 @@ func TestSwitchSendPending(t *testing.T) { t.Fatalf("unable to create alice server: %v", err) } + bobPeer, err := newMockServer( + t, "bob", testStartingHeight, nil, testDefaultDelta, + ) + if err != nil { + t.Fatalf("unable to create bob server: %v", err) + } + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) @@ -181,7 +188,7 @@ func TestSwitchSendPending(t *testing.T) { } defer s.Stop() - chanID1, _, aliceChanID, bobChanID := genIDs() + chanID1, chanID2, aliceChanID, bobChanID := genIDs() pendingChanID := lnwire.ShortChannelID{} @@ -192,6 +199,13 @@ func TestSwitchSendPending(t *testing.T) { t.Fatalf("unable to add alice link: %v", err) } + bobChannelLink := newMockChannelLink( + s, chanID2, bobChanID, bobPeer, true, + ) + if err := s.AddLink(bobChannelLink); err != nil { + t.Fatalf("unable to add bob link: %v", err) + } + // Create request which should is being forwarded from Bob channel // link to Alice channel link. preimage, err := genPreimage() @@ -212,7 +226,17 @@ func TestSwitchSendPending(t *testing.T) { // Send the ADD packet, this should not be forwarded out to the link // since there are no eligible links. - err = forwardPackets(t, s, packet) + if err = s.ForwardPackets(nil, packet); err != nil { + t.Fatal(err) + } + select { + case p := <-bobChannelLink.packets: + if p.linkFailure != nil { + err = p.linkFailure + } + case <-time.After(time.Second): + t.Fatal("no timely reply from switch") + } linkErr, ok := err.(*LinkError) if !ok { t.Fatalf("expected link error, got: %T", err) @@ -248,7 +272,7 @@ func TestSwitchSendPending(t *testing.T) { packet.incomingHTLCID++ // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatalf("unexpected forward failure: %v", err) } @@ -321,7 +345,7 @@ func TestSwitchForward(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatal(err) } @@ -355,7 +379,7 @@ func TestSwitchForward(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatal(err) } @@ -450,7 +474,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -538,7 +562,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Send the fail packet from the remote peer through the switch. - if err := <-s2.ForwardPackets(nil, fail); err != nil { + if err := s2.ForwardPackets(nil, fail); err != nil { t.Fatalf(err.Error()) } @@ -562,9 +586,13 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) { } // Send the fail packet from the remote peer through the switch. - if err := <-s2.ForwardPackets(nil, fail); err == nil { - t.Fatalf("expected failure when sending duplicate fail " + - "with no pending circuit") + if err := s.ForwardPackets(nil, fail); err != nil { + t.Fatal(err) + } + select { + case <-aliceChannelLink.packets: + t.Fatalf("expected duplicate fail to not arrive at the destination") + case <-time.After(time.Second): } } @@ -645,7 +673,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -735,7 +763,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { } // Send the settle packet from the remote peer through the switch. - if err := <-s2.ForwardPackets(nil, settle); err != nil { + if err := s2.ForwardPackets(nil, settle); err != nil { t.Fatalf(err.Error()) } @@ -759,10 +787,14 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) { t.Fatalf("wrong amount of circuits") } - // Send the settle packet again, which should fail. - if err := <-s2.ForwardPackets(nil, settle); err != nil { - t.Fatalf("expected success when sending duplicate settle " + - "with no pending circuit") + // Send the settle packet again, which not arrive at destination. + if err := s2.ForwardPackets(nil, settle); err != nil { + t.Fatal(err) + } + select { + case <-bobChannelLink.packets: + t.Fatalf("expected duplicate fail to not arrive at the destination") + case <-time.After(time.Second): } } @@ -843,7 +875,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -916,7 +948,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) { // Resend the failed htlc. The packet will be dropped silently since the // switch will detect that it has been half added previously. - if err := <-s2.ForwardPackets(nil, ogPacket); err != nil { + if err := s2.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1008,7 +1040,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1076,7 +1108,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) { // Resend the failed htlc, it should be returned to alice since the // switch will detect that it has been half added previously. - err = <-s2.ForwardPackets(nil, ogPacket) + err = s2.ForwardPackets(nil, ogPacket) if err != nil { t.Fatal(err) } @@ -1174,7 +1206,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, ogPacket); err != nil { + if err := s.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1264,7 +1296,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := <-s2.ForwardPackets(nil, ogPacket); err != nil { + if err := s2.ForwardPackets(nil, ogPacket); err != nil { t.Fatal(err) } @@ -1414,7 +1446,17 @@ func TestCircularForwards(t *testing.T) { // Attempt to forward the packet and check for the expected // error. - err = forwardPackets(t, s, packet) + if err = s.ForwardPackets(nil, packet); err != nil { + t.Fatal(err) + } + select { + case p := <-aliceChannelLink.packets: + if p.linkFailure != nil { + err = p.linkFailure + } + case <-time.After(time.Second): + t.Fatal("no timely reply from switch") + } if !reflect.DeepEqual(err, test.expectedErr) { t.Fatalf("expected: %v, got: %v", test.expectedErr, err) @@ -1634,18 +1676,30 @@ func testSkipIneligibleLinksMultiHopForward(t *testing.T, } // The request to forward should fail as - err = forwardPackets(t, s, packet) - + if err := s.ForwardPackets(nil, packet); err != nil { + t.Fatal(err) + } + var linkError *LinkError + select { + case p := <-aliceChannelLink.packets: + linkError = p.linkFailure + case p := <-bobChannelLink1.packets: + linkError = p.linkFailure + case p := <-bobChannelLink2.packets: + linkError = p.linkFailure + case <-time.After(time.Second): + t.Fatal("no timely reply from switch") + } failure := obfuscator.(*mockObfuscator).failure if testCase.expectedReply == lnwire.CodeNone { - if err != nil { + if linkError != nil { t.Fatalf("forwarding should have succeeded") } if failure != nil { t.Fatalf("unexpected failure %T", failure) } } else { - if err == nil { + if linkError == nil { t.Fatalf("forwarding should have failed due to " + "inactive link") } @@ -1793,7 +1847,7 @@ func TestSwitchCancel(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1825,7 +1879,7 @@ func TestSwitchCancel(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1908,7 +1962,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1938,7 +1992,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that bob channel link received it. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1967,7 +2021,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -1993,7 +2047,7 @@ func TestSwitchAddSamePayment(t *testing.T) { } // Handle the request and checks that payment circuit works properly. - if err := forwardPackets(t, s, request); err != nil { + if err := s.ForwardPackets(nil, request); err != nil { t.Fatal(err) } @@ -2136,7 +2190,7 @@ func TestSwitchSendPayment(t *testing.T) { }, } - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatalf("can't forward htlc packet: %v", err) } @@ -2631,7 +2685,7 @@ func TestInvalidFailure(t *testing.T) { }, } - if err := forwardPackets(t, s, packet); err != nil { + if err := s.ForwardPackets(nil, packet); err != nil { t.Fatalf("can't forward htlc packet: %v", err) } @@ -3057,17 +3111,3 @@ func getThreeHopEvents(channels *clusterChannels, htlcID uint64, return aliceEvents, bobEvents, carolEvents } - -// forwardPackets forwards packets to the switch and enforces a timeout on the -// reply. -func forwardPackets(t *testing.T, s *Switch, packets ...*htlcPacket) error { - - select { - case err := <-s.ForwardPackets(nil, packets...): - return err - - case <-time.After(time.Second): - t.Fatal("no timely reply from switch") - return nil - } -} diff --git a/lntest/itest/log_error_whitelist.txt b/lntest/itest/log_error_whitelist.txt index 6541cab0..bf808e76 100644 --- a/lntest/itest/log_error_whitelist.txt +++ b/lntest/itest/log_error_whitelist.txt @@ -89,13 +89,13 @@