diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 6b87d445..d261d8f6 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -3396,6 +3396,136 @@ func TestShouldAdjustCommitFee(t *testing.T) { } } +// TestChannelLinkShutdownDuringForward asserts that a link can be fully +// stopped when it is trying to send synchronously through the switch. The +// specific case this can occur is when a link forwards incoming Adds. We test +// this by forcing the switch into a state where it will not accept new packets, +// and then killing the link, which can only succeed if forwarding can be +// canceled by a call to Stop. +func TestChannelLinkShutdownDuringForward(t *testing.T) { + t.Parallel() + + // First, we'll create our traditional three hop network. We're + // interested in testing the ability to stop the link when it is + // synchronously forwarding to the switch, which happens when an + // incoming link forwards Adds. Thus, the test will be performed + // against Bob's first link. + channels, cleanUp, _, err := createClusterChannels( + btcutil.SatoshiPerBitcoin*3, + btcutil.SatoshiPerBitcoin*5) + if err != nil { + t.Fatalf("unable to create channel: %v", err) + } + defer cleanUp() + + n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice, + channels.bobToCarol, channels.carolToBob, testStartingHeight) + + if err := n.start(); err != nil { + t.Fatal(err) + } + defer n.stop() + defer n.feeEstimator.Stop() + + // Define a helper method that strobes the switch's log ticker, and + // unblocks after nothing has been pulled for two seconds. + waitForBobsSwitchToBlock := func() { + bobSwitch := n.firstBobChannelLink.cfg.Switch + ticker := bobSwitch.cfg.LogEventTicker.(*ticker.Mock) + timeout := time.After(15 * time.Second) + for { + time.Sleep(50 * time.Millisecond) + select { + case ticker.Force <- time.Now(): + + case <-time.After(2 * time.Second): + return + + case <-timeout: + t.Fatalf("switch did not block") + } + } + } + + // Define a helper method that strobes the link's batch ticker, and + // unblocks after nothing has been pulled for two seconds. + waitForBobsIncomingLinkToBlock := func() { + ticker := n.firstBobChannelLink.cfg.BatchTicker.(*ticker.Mock) + timeout := time.After(15 * time.Second) + for { + time.Sleep(50 * time.Millisecond) + select { + case ticker.Force <- time.Now(): + + case <-time.After(2 * time.Second): + // We'll give a little extra time here, to + // ensure that the packet is being pressed + // against the htlcPlex. + time.Sleep(50 * time.Millisecond) + return + + case <-timeout: + t.Fatalf("link did not block") + } + } + } + + // To test that the cancellation is happening properly, we will set the + // switch's htlcPlex to nil, so that calls to routeAsync block, and can + // only exit if the link (or switch) is exiting. We will only be testing + // the link here. + // + // In order to avoid data races, we need to ensure the switch isn't + // selecting on that channel in the meantime. We'll prevent this by + // first acquiring the index mutex and forcing a log event so that the + // htlcForwarder is blocked inside the logTicker case, which also needs + // the indexMtx. + n.firstBobChannelLink.cfg.Switch.indexMtx.Lock() + + // Strobe the log ticker, and wait for switch to stop accepting any more + // log ticks. + waitForBobsSwitchToBlock() + + // While the htlcForwarder is blocked, swap out the htlcPlex with a nil + // channel, and unlock the indexMtx to allow return to the + // htlcForwarder's main select. After this, any attempt to forward + // through the switch will block. + n.firstBobChannelLink.cfg.Switch.htlcPlex = nil + n.firstBobChannelLink.cfg.Switch.indexMtx.Unlock() + + // Now, make a payment from Alice to Carol, which should cause Bob's + // incoming link to block when it tries to submit the packet to the nil + // htlcPlex. + amount := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) + htlcAmt, totalTimelock, hops := generateHops( + amount, testStartingHeight, + n.firstBobChannelLink, n.carolChannelLink, + ) + + n.makePayment( + n.aliceServer, n.carolServer, n.bobServer.PubKey(), + hops, amount, htlcAmt, totalTimelock, + ) + + // Strobe the batch ticker of Bob's incoming link, waiting for it to + // become fully blocked. + waitForBobsIncomingLinkToBlock() + + // Finally, stop the link to test that it can exit while synchronously + // forwarding Adds to the switch. + done := make(chan struct{}) + go func() { + n.firstBobChannelLink.Stop() + close(done) + }() + + select { + case <-time.After(3 * time.Second): + t.Fatalf("unable to shutdown link while fwding incoming Adds") + case <-done: + } +} + // TestChannelLinkUpdateCommitFee tests that when a new block comes in, the // channel link properly checks to see if it should update the commitment fee. func TestChannelLinkUpdateCommitFee(t *testing.T) {