From 1113684dc30730f74e18c592602531b9defbde1a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 12 Jul 2018 15:59:00 -0700 Subject: [PATCH 01/11] Revert "htlcswitch: in event of duplicate link add, prefer newer link" This reverts commit e60d2b774a35e446081f9be2d4901925cac05379. --- htlcswitch/switch.go | 6 ++-- htlcswitch/switch_test.go | 59 +++++++++++++++++++++++++++++++++++++++ peer.go | 4 +-- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index e2ee14b0..e0566623 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1776,11 +1776,11 @@ func (s *Switch) AddLink(link ChannelLink) error { chanID := link.ChanID() - // If a link already exists, then remove the prior one so we can - // replace it with this fresh instance. + // First, ensure that this link is not already active in the switch. _, err := s.getLink(chanID) if err == nil { - s.removeLink(chanID) + return fmt.Errorf("unable to add ChannelLink(%v), already "+ + "active", chanID) } // Get and attach the mailbox for this link, which buffers packets in diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 904113e4..5b37c448 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -25,6 +25,65 @@ func genPreimage() ([32]byte, error) { return preimage, nil } +// TestSwitchAddDuplicateLink tests that the switch will reject duplicate links +// for both pending and live links. It also tests that we can successfully +// add a link after having removed it. +func TestSwitchAddDuplicateLink(t *testing.T) { + t.Parallel() + + alicePeer, err := newMockServer(t, "alice", nil) + if err != nil { + t.Fatalf("unable to create alice server: %v", err) + } + + s, err := initSwitchWithDB(nil) + if err != nil { + t.Fatalf("unable to init switch: %v", err) + } + if err := s.Start(); err != nil { + t.Fatalf("unable to start switch: %v", err) + } + defer s.Stop() + + chanID1, _, aliceChanID, _ := genIDs() + + pendingChanID := lnwire.ShortChannelID{} + + aliceChannelLink := newMockChannelLink( + s, chanID1, pendingChanID, alicePeer, false, + ) + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } + + // Alice should have a pending link, adding again should fail. + if err := s.AddLink(aliceChannelLink); err == nil { + t.Fatalf("adding duplicate link should have failed") + } + + // Update the short chan id of the channel, so that the link goes live. + aliceChannelLink.setLiveShortChanID(aliceChanID) + err = s.UpdateShortChanID(chanID1) + if err != nil { + t.Fatalf("unable to update alice short_chan_id: %v", err) + } + + // Alice should have a live link, adding again should fail. + if err := s.AddLink(aliceChannelLink); err == nil { + t.Fatalf("adding duplicate link should have failed") + } + + // Remove the live link to ensure the indexes are cleared. + if err := s.RemoveLink(chanID1); err != nil { + t.Fatalf("unable to remove alice link: %v", err) + } + + // Alice has no links, adding should succeed. + if err := s.AddLink(aliceChannelLink); err != nil { + t.Fatalf("unable to add alice link: %v", err) + } +} + // TestSwitchSendPending checks the inability of htlc switch to forward adds // over pending links, and the UpdateShortChanID makes a pending link live. func TestSwitchSendPending(t *testing.T) { diff --git a/peer.go b/peer.go index a9911dda..6d1a7c3e 100644 --- a/peer.go +++ b/peer.go @@ -1526,8 +1526,8 @@ out: ) if err != nil { peerLog.Errorf("can't register new channel "+ - "link(%v) with NodeKey(%x): %v", chanPoint, - p.PubKey(), err) + "link(%v) with NodeKey(%x)", chanPoint, + p.PubKey()) } close(newChanReq.done) From 258019eb242eb027b7ccefb0403b204c3e5410f2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 12 Jul 2018 17:41:12 -0700 Subject: [PATCH 02/11] htlcswitch/switch_test: update reverted test to use... current initialization methods --- htlcswitch/switch_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 5b37c448..ca607e82 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -31,12 +31,12 @@ func genPreimage() ([32]byte, error) { func TestSwitchAddDuplicateLink(t *testing.T) { t.Parallel() - alicePeer, err := newMockServer(t, "alice", nil) + alicePeer, err := newMockServer(t, "alice", testStartingHeight, nil, 6) if err != nil { t.Fatalf("unable to create alice server: %v", err) } - s, err := initSwitchWithDB(nil) + s, err := initSwitchWithDB(testStartingHeight, nil) if err != nil { t.Fatalf("unable to init switch: %v", err) } @@ -74,9 +74,7 @@ func TestSwitchAddDuplicateLink(t *testing.T) { } // Remove the live link to ensure the indexes are cleared. - if err := s.RemoveLink(chanID1); err != nil { - t.Fatalf("unable to remove alice link: %v", err) - } + s.RemoveLink(chanID1) // Alice has no links, adding should succeed. if err := s.AddLink(aliceChannelLink); err != nil { From c78e81d32b46af7b259026f832f11a24dc258da8 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 12 Jul 2018 17:39:27 -0700 Subject: [PATCH 03/11] htlcswitch/switch: synchronous stop of links after removal --- htlcswitch/switch.go | 47 +++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index e0566623..1033f1b6 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -1380,21 +1380,34 @@ func (s *Switch) htlcForwarder() { s.blockEpochStream.Cancel() // Remove all links once we've been signalled for shutdown. + var linksToStop []ChannelLink s.indexMtx.Lock() for _, link := range s.linkIndex { - if err := s.removeLink(link.ChanID()); err != nil { - log.Errorf("unable to remove "+ - "channel link on stop: %v", err) + activeLink := s.removeLink(link.ChanID()) + if activeLink == nil { + log.Errorf("unable to remove ChannelLink(%v) "+ + "on stop", link.ChanID()) + continue } + linksToStop = append(linksToStop, activeLink) } for _, link := range s.pendingLinkIndex { - if err := s.removeLink(link.ChanID()); err != nil { - log.Errorf("unable to remove pending "+ - "channel link on stop: %v", err) + pendingLink := s.removeLink(link.ChanID()) + if pendingLink == nil { + log.Errorf("unable to remove ChannelLink(%v) "+ + "on stop", link.ChanID()) + continue } + linksToStop = append(linksToStop, pendingLink) } s.indexMtx.Unlock() + // Now that all pending and live links have been removed from + // the forwarding indexes, stop each one before shutting down. + for _, link := range linksToStop { + link.Stop() + } + // Before we exit fully, we'll attempt to flush out any // forwarding events that may still be lingering since the last // batch flush. @@ -1868,24 +1881,28 @@ func (s *Switch) getLinkByShortID(chanID lnwire.ShortChannelID) (ChannelLink, er return link, nil } -// RemoveLink is used to initiate the handling of the remove link command. The -// request will be propagated/handled to/in the main goroutine. -func (s *Switch) RemoveLink(chanID lnwire.ChannelID) error { +// RemoveLink purges the switch of any link associated with chanID. If a pending +// or active link is not found, this method does nothing. Otherwise, the method +// returns after the link has been completely shutdown. +func (s *Switch) RemoveLink(chanID lnwire.ChannelID) { s.indexMtx.Lock() - defer s.indexMtx.Unlock() + link := s.removeLink(chanID) + s.indexMtx.Unlock() - return s.removeLink(chanID) + if link != nil { + link.Stop() + } } // removeLink is used to remove and stop the channel link. // // NOTE: This MUST be called with the indexMtx held. -func (s *Switch) removeLink(chanID lnwire.ChannelID) error { +func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink { log.Infof("Removing channel link with ChannelID(%v)", chanID) link, err := s.getLink(chanID) if err != nil { - return err + return nil } // Remove the channel from live link indexes. @@ -1906,9 +1923,7 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error { } } - go link.Stop() - - return nil + return link } // UpdateShortChanID updates the short chan ID for an existing channel. This is From a6e7b358726f4f7e4beab167abf00b0d9cc9ae8c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 12 Jul 2018 17:39:59 -0700 Subject: [PATCH 04/11] server: use blocking RemoveLink to shutdown links --- server.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server.go b/server.go index 565f66d6..2729fa76 100644 --- a/server.go +++ b/server.go @@ -646,7 +646,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ChainIO: cc.chainIO, MarkLinkInactive: func(chanPoint wire.OutPoint) error { chanID := lnwire.NewChanIDFromOutPoint(&chanPoint) - return s.htlcSwitch.RemoveLink(chanID) + s.htlcSwitch.RemoveLink(chanID) + return nil }, IsOurAddress: func(addr btcutil.Address) bool { _, err := cc.wallet.GetPrivKey(addr) @@ -1960,11 +1961,7 @@ func (s *server) peerTerminationWatcher(p *peer) { } for _, link := range links { - err := p.server.htlcSwitch.RemoveLink(link.ChanID()) - if err != nil { - srvrLog.Errorf("unable to remove channel link: %v", - err) - } + p.server.htlcSwitch.RemoveLink(link.ChanID()) } s.mu.Lock() From 5f90973f76d122ccad17e431eafa4d7139c9f775 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 2 Aug 2018 02:34:23 -0700 Subject: [PATCH 05/11] peer: always purge link before adding new one --- peer.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/peer.go b/peer.go index 6d1a7c3e..52182dad 100644 --- a/peer.go +++ b/peer.go @@ -469,11 +469,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, // mailboxes such that we can safely force close // without the link being added again and updates being // applied. - err := p.server.htlcSwitch.RemoveLink(chanID) - if err != nil { - peerLog.Errorf("unable to stop link(%v): %v", - shortChanID, err) - } + p.server.htlcSwitch.RemoveLink(chanID) // If the error encountered was severe enough, we'll // now force close the channel. @@ -557,6 +553,12 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, link := htlcswitch.NewChannelLink(linkCfg, lnChan) + // Before adding our new link, purge the switch of any pending or live + // links going by the same channel id. If one is found, we'll shut it + // down to ensure that the mailboxes are only ever under the control of + // one link. + p.server.htlcSwitch.RemoveLink(link.ChanID()) + // With the channel link created, we'll now notify the htlc switch so // this channel can be used to dispatch local payments and also // passively forward payments. @@ -1922,14 +1924,7 @@ func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error { // Instruct the HtlcSwitch to close this link as the channel is no // longer active. - if err := p.server.htlcSwitch.RemoveLink(chanID); err != nil { - if err == htlcswitch.ErrChannelLinkNotFound { - peerLog.Warnf("unable remove channel link with "+ - "ChannelPoint(%v): %v", chanID, err) - return nil - } - return err - } + p.server.htlcSwitch.RemoveLink(chanID) return nil } From b507d265b4ad2aa9ac42e61a23110b69a8303cb7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 12 Jul 2018 18:16:32 -0700 Subject: [PATCH 06/11] htlcswitch/link: only update contract signals on start if live link --- htlcswitch/link.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 8697624c..2ba04f55 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -359,21 +359,6 @@ func (l *channelLink) Start() error { log.Infof("ChannelLink(%v) is starting", l) - // Before we start the link, we'll update the ChainArbitrator with the - // set of new channel signals for this channel. - // - // TODO(roasbeef): split goroutines within channel arb to avoid - go func() { - err := l.cfg.UpdateContractSignals(&contractcourt.ContractSignals{ - HtlcUpdates: l.htlcUpdates, - ShortChanID: l.channel.ShortChanID(), - }) - if err != nil { - log.Errorf("Unable to update signals for "+ - "ChannelLink(%v)", l) - } - }() - l.mailBox.ResetMessages() l.overflowQueue.Start() @@ -401,6 +386,24 @@ func (l *channelLink) Start() error { return fmt.Errorf("unable to trim circuits above "+ "local htlc index %d: %v", localHtlcIndex, err) } + + // Since the link is live, before we start the link we'll update + // the ChainArbitrator with the set of new channel signals for + // this channel. + // + // TODO(roasbeef): split goroutines within channel arb to avoid + go func() { + signals := &contractcourt.ContractSignals{ + HtlcUpdates: l.htlcUpdates, + ShortChanID: l.channel.ShortChanID(), + } + + err := l.cfg.UpdateContractSignals(signals) + if err != nil { + log.Errorf("Unable to update signals for "+ + "ChannelLink(%v)", l) + } + }() } l.updateFeeTimer = time.NewTimer(l.randomFeeUpdateTimeout()) From 11f02de3b35c2dc998820232d615e8aba909513a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 16 Jul 2018 18:27:10 -0700 Subject: [PATCH 07/11] htlcswitch/link_test: removes unnecessary WaitForShutdown The new RemoveLink method blocks until the link has been fully stopped, so we no longer need to wait for it explicitly. --- htlcswitch/link_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 540dd937..6b87d445 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -3709,7 +3709,6 @@ func (h *persistentLinkHarness) restart(restartSwitch bool, // First, remove the link from the switch. h.coreLink.cfg.Switch.RemoveLink(h.link.ChanID()) - h.coreLink.WaitForShutdown() var htlcSwitch *Switch if restartSwitch { From ad68009ec5af938f6260cdd07d7c1c80077142d7 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 18 Jul 2018 14:26:33 -0700 Subject: [PATCH 08/11] chancloser: remove error returned from unregisterChannel --- chancloser.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chancloser.go b/chancloser.go index bce0e8a5..903d1fa9 100644 --- a/chancloser.go +++ b/chancloser.go @@ -73,7 +73,7 @@ type chanCloseCfg struct { // unregisterChannel is a function closure that allows the // channelCloser to re-register a channel. Once this has been done, no // further HTLC's should be routed through the channel. - unregisterChannel func(lnwire.ChannelID) error + unregisterChannel func(lnwire.ChannelID) // broadcastTx broadcasts the passed transaction to the network. broadcastTx func(*wire.MsgTx) error From d08003b2850dad69e10d354358fabf6978d7a245 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 31 Jul 2018 00:09:25 -0700 Subject: [PATCH 09/11] htlcswitch/link_test: adds TestChannelLinkShutdownDuringForward This commit adds a test that verifies Stop does not block if the link is concurrently forwarding incoming Adds to the switch. This test fails prior to the commits that thread through the link's quit channel. --- htlcswitch/link_test.go | 130 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 6b87d445..d261d8f6 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -3396,6 +3396,136 @@ func TestShouldAdjustCommitFee(t *testing.T) { } } +// TestChannelLinkShutdownDuringForward asserts that a link can be fully +// stopped when it is trying to send synchronously through the switch. The +// specific case this can occur is when a link forwards incoming Adds. We test +// this by forcing the switch into a state where it will not accept new packets, +// and then killing the link, which can only succeed if forwarding can be +// canceled by a call to Stop. +func TestChannelLinkShutdownDuringForward(t *testing.T) { + t.Parallel() + + // First, we'll create our traditional three hop network. We're + // interested in testing the ability to stop the link when it is + // synchronously forwarding to the switch, which happens when an + // incoming link forwards Adds. Thus, the test will be performed + // against Bob's first link. + channels, cleanUp, _, err := createClusterChannels( + btcutil.SatoshiPerBitcoin*3, + btcutil.SatoshiPerBitcoin*5) + if err != nil { + t.Fatalf("unable to create channel: %v", err) + } + defer cleanUp() + + n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice, + channels.bobToCarol, channels.carolToBob, testStartingHeight) + + if err := n.start(); err != nil { + t.Fatal(err) + } + defer n.stop() + defer n.feeEstimator.Stop() + + // Define a helper method that strobes the switch's log ticker, and + // unblocks after nothing has been pulled for two seconds. + waitForBobsSwitchToBlock := func() { + bobSwitch := n.firstBobChannelLink.cfg.Switch + ticker := bobSwitch.cfg.LogEventTicker.(*ticker.Mock) + timeout := time.After(15 * time.Second) + for { + time.Sleep(50 * time.Millisecond) + select { + case ticker.Force <- time.Now(): + + case <-time.After(2 * time.Second): + return + + case <-timeout: + t.Fatalf("switch did not block") + } + } + } + + // Define a helper method that strobes the link's batch ticker, and + // unblocks after nothing has been pulled for two seconds. + waitForBobsIncomingLinkToBlock := func() { + ticker := n.firstBobChannelLink.cfg.BatchTicker.(*ticker.Mock) + timeout := time.After(15 * time.Second) + for { + time.Sleep(50 * time.Millisecond) + select { + case ticker.Force <- time.Now(): + + case <-time.After(2 * time.Second): + // We'll give a little extra time here, to + // ensure that the packet is being pressed + // against the htlcPlex. + time.Sleep(50 * time.Millisecond) + return + + case <-timeout: + t.Fatalf("link did not block") + } + } + } + + // To test that the cancellation is happening properly, we will set the + // switch's htlcPlex to nil, so that calls to routeAsync block, and can + // only exit if the link (or switch) is exiting. We will only be testing + // the link here. + // + // In order to avoid data races, we need to ensure the switch isn't + // selecting on that channel in the meantime. We'll prevent this by + // first acquiring the index mutex and forcing a log event so that the + // htlcForwarder is blocked inside the logTicker case, which also needs + // the indexMtx. + n.firstBobChannelLink.cfg.Switch.indexMtx.Lock() + + // Strobe the log ticker, and wait for switch to stop accepting any more + // log ticks. + waitForBobsSwitchToBlock() + + // While the htlcForwarder is blocked, swap out the htlcPlex with a nil + // channel, and unlock the indexMtx to allow return to the + // htlcForwarder's main select. After this, any attempt to forward + // through the switch will block. + n.firstBobChannelLink.cfg.Switch.htlcPlex = nil + n.firstBobChannelLink.cfg.Switch.indexMtx.Unlock() + + // Now, make a payment from Alice to Carol, which should cause Bob's + // incoming link to block when it tries to submit the packet to the nil + // htlcPlex. + amount := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) + htlcAmt, totalTimelock, hops := generateHops( + amount, testStartingHeight, + n.firstBobChannelLink, n.carolChannelLink, + ) + + n.makePayment( + n.aliceServer, n.carolServer, n.bobServer.PubKey(), + hops, amount, htlcAmt, totalTimelock, + ) + + // Strobe the batch ticker of Bob's incoming link, waiting for it to + // become fully blocked. + waitForBobsIncomingLinkToBlock() + + // Finally, stop the link to test that it can exit while synchronously + // forwarding Adds to the switch. + done := make(chan struct{}) + go func() { + n.firstBobChannelLink.Stop() + close(done) + }() + + select { + case <-time.After(3 * time.Second): + t.Fatalf("unable to shutdown link while fwding incoming Adds") + case <-done: + } +} + // TestChannelLinkUpdateCommitFee tests that when a new block comes in, the // channel link properly checks to see if it should update the commitment fee. func TestChannelLinkUpdateCommitFee(t *testing.T) { From 0fef1c71fe89e9d94558084c16d401e88d60cd4f Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 30 Jul 2018 13:11:11 -0700 Subject: [PATCH 10/11] htlcswitch/link: pass link quit to ForwardPackets --- htlcswitch/link.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 2ba04f55..3112b8ed 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -117,9 +117,10 @@ type ChannelLinkConfig struct { Switch *Switch // ForwardPackets attempts to forward the batch of htlcs through the - // switch. Any failed packets will be returned to the provided - // ChannelLink. - ForwardPackets func(...*htlcPacket) chan error + // switch, any failed packets will be returned to the provided + // ChannelLink. The link's quit signal should be provided to allow + // cancellation of forwarding during link shutdown. + ForwardPackets func(chan struct{}, ...*htlcPacket) chan error // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion // blobs, which are then used to inform how to forward an HTLC. @@ -2542,7 +2543,7 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) { filteredPkts = append(filteredPkts, pkt) } - errChan := l.cfg.ForwardPackets(filteredPkts...) + errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...) go l.handleBatchFwdErrs(errChan) } From f84cd14b12d78c29a4395088768d8cd6d4cf875d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 30 Jul 2018 13:17:39 -0700 Subject: [PATCH 11/11] htlcswitch/switch: permit link shutdown mid-forwarding In this commit, we thread through a link's quit channel into routeAsync, the primary helper method allowing links to send htlcPackets through the switch. This is intended to remove deadlocks from happening, where the link is synchronously blocking on forwarding packets to the switch, but also needs to shutdown. --- htlcswitch/switch.go | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 1033f1b6..4531991c 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -527,12 +527,15 @@ func (s *Switch) forward(packet *htlcPacket) error { // ForwardPackets adds a list of packets to the switch for processing. Fails // and settles are added on a first past, simultaneously constructing circuits // for any adds. After persisting the circuits, another pass of the adds is -// given to forward them through the router. +// given to forward them through the router. The sending link's quit channel is +// used to prevent deadlocks when the switch stops a link in the midst of +// forwarding. // // NOTE: This method guarantees that the returned err chan will eventually be // closed. The receiver should read on the channel until receiving such a // signal. -func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error { +func (s *Switch) ForwardPackets(linkQuit chan struct{}, + packets ...*htlcPacket) chan error { var ( // fwdChan is a buffered channel used to receive err msgs from @@ -568,6 +571,9 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error { // so, we exit early to avoid incrementing the switch's waitgroup while // it is already in the process of shutting down. select { + case <-linkQuit: + close(errChan) + return errChan case <-s.quit: close(errChan) return errChan @@ -593,7 +599,10 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error { circuits = append(circuits, circuit) addBatch = append(addBatch, packet) default: - s.routeAsync(packet, fwdChan) + err := s.routeAsync(packet, fwdChan, linkQuit) + if err != nil { + return errChan + } numSent++ } } @@ -635,7 +644,10 @@ func (s *Switch) ForwardPackets(packets ...*htlcPacket) chan error { // Now, forward any packets for circuits that were successfully added to // the switch's circuit map. for _, packet := range addedPackets { - s.routeAsync(packet, fwdChan) + err := s.routeAsync(packet, fwdChan, linkQuit) + if err != nil { + return errChan + } numSent++ } @@ -722,9 +734,13 @@ func (s *Switch) route(packet *htlcPacket) error { } // routeAsync sends a packet through the htlc switch, using the provided err -// chan to propagate errors back to the caller. This method does not wait for -// a response before returning. -func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error) error { +// chan to propagate errors back to the caller. The link's quit channel is +// provided so that the send can be canceled if either the link or the switch +// receive a shutdown requuest. This method does not wait for a response from +// the htlcForwarder before returning. +func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error, + linkQuit chan struct{}) error { + command := &plexPacket{ pkt: packet, err: errChan, @@ -733,6 +749,8 @@ func (s *Switch) routeAsync(packet *htlcPacket, errChan chan error) error { select { case s.htlcPlex <- command: return nil + case <-linkQuit: + return ErrLinkShuttingDown case <-s.quit: return errors.New("Htlc Switch was stopped") } @@ -1734,7 +1752,10 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) { } } - errChan := s.ForwardPackets(switchPackets...) + // Since this send isn't tied to a specific link, we pass a nil + // link quit channel, meaning the send will fail only if the + // switch receives a shutdown request. + errChan := s.ForwardPackets(nil, switchPackets...) go handleBatchFwdErrs(errChan) } }