From 5aa59906b56ace20d180f70ecf8531ee94c46b0e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 30 Nov 2020 13:08:04 -0800 Subject: [PATCH] wtclient: schedule ForceQuit first during Stop Currently the ForceQuit call is scheduled after trying to stop the backup queue. In certain cases, the call to stop the queue never finishes, which means the force quit is never scheduled. We rememdy by scheduling this call before any other operations to ensure we can always exit ungracefully if necessary. --- watchtower/wtclient/client.go | 14 +++---- watchtower/wtclient/client_test.go | 64 ++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 71d767ef..62e4ac67 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -462,13 +462,7 @@ func (c *TowerClient) Stop() error { c.stopped.Do(func() { log.Debugf("Stopping watchtower client") - // 1. Shutdown the backup queue, which will prevent any further - // updates from being accepted. In practice, the links should be - // shutdown before the client has been stopped, so all updates - // would have been added prior. - c.pipeline.Stop() - - // 2. To ensure we don't hang forever on shutdown due to + // 1. To ensure we don't hang forever on shutdown due to // unintended failures, we'll delay a call to force quit the // pipeline if a ForceQuitDelay is specified. This will have no // effect if the pipeline shuts down cleanly before the delay @@ -483,6 +477,12 @@ func (c *TowerClient) Stop() error { time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit) } + // 2. Shutdown the backup queue, which will prevent any further + // updates from being accepted. In practice, the links should be + // shutdown before the client has been stopped, so all updates + // would have been added prior. + c.pipeline.Stop() + // 3. Once the backup queue has shutdown, wait for the main // dispatcher to exit. The backup queue will signal it's // completion to the dispatcher, which releases the wait group diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 166ab228..41e01b61 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -446,10 +446,11 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { NewAddress: func() ([]byte, error) { return addrScript, nil }, - ReadTimeout: timeout, - WriteTimeout: timeout, - MinBackoff: time.Millisecond, - MaxBackoff: 10 * time.Millisecond, + ReadTimeout: timeout, + WriteTimeout: timeout, + MinBackoff: time.Millisecond, + MaxBackoff: time.Second, + ForceQuitDelay: 10 * time.Second, } client, err := wtclient.New(clientCfg) if err != nil { @@ -1475,6 +1476,61 @@ var clientTests = []clientTest{ h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second) }, }, + { + // Asserts that the client's force quite delay will properly + // shutdown the client if it is unable to completely drain the + // task pipeline. + name: "force unclean shutdown", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: wtpolicy.TxPolicy{ + BlobType: blob.TypeAltruistCommit, + SweepFeeRate: wtpolicy.DefaultSweepFeeRate, + }, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + chanID = 0 + numUpdates = 6 + maxUpdates = 5 + ) + + // Advance the channel to create all states. + hints := h.advanceChannelN(chanID, numUpdates) + + // Back up 4 of the 5 states for the negotiated session. + h.backupStates(chanID, 0, maxUpdates-1, nil) + h.waitServerUpdates(hints[:maxUpdates-1], 5*time.Second) + + // Now, restart the tower and prevent it from acking any + // new sessions. We do this here as once the last slot + // is exhausted the client will attempt to renegotiate. + err := h.server.Stop() + require.Nil(h.t, err) + h.serverCfg.NoAckCreateSession = true + h.startServer() + + // Back up the remaining two states. Once the first is + // processed, the session will be exhausted but the + // client won't be able to regnegotiate a session for + // the final state. We'll only wait for the first five + // states to arrive at the tower. + h.backupStates(chanID, maxUpdates-1, numUpdates, nil) + h.waitServerUpdates(hints[:maxUpdates], 5*time.Second) + + // Finally, stop the client which will continue to + // attempt session negotiation since it has one more + // state to process. After the force quite delay + // expires, the client should force quite itself and + // allow the test to complete. + err = h.client.Stop() + require.Nil(h.t, err) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup