diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 71d767ef..62e4ac67 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -462,13 +462,7 @@ func (c *TowerClient) Stop() error { c.stopped.Do(func() { log.Debugf("Stopping watchtower client") - // 1. Shutdown the backup queue, which will prevent any further - // updates from being accepted. In practice, the links should be - // shutdown before the client has been stopped, so all updates - // would have been added prior. - c.pipeline.Stop() - - // 2. To ensure we don't hang forever on shutdown due to + // 1. To ensure we don't hang forever on shutdown due to // unintended failures, we'll delay a call to force quit the // pipeline if a ForceQuitDelay is specified. This will have no // effect if the pipeline shuts down cleanly before the delay @@ -483,6 +477,12 @@ func (c *TowerClient) Stop() error { time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit) } + // 2. Shutdown the backup queue, which will prevent any further + // updates from being accepted. In practice, the links should be + // shutdown before the client has been stopped, so all updates + // would have been added prior. + c.pipeline.Stop() + // 3. Once the backup queue has shutdown, wait for the main // dispatcher to exit. The backup queue will signal it's // completion to the dispatcher, which releases the wait group diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 166ab228..41e01b61 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -446,10 +446,11 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { NewAddress: func() ([]byte, error) { return addrScript, nil }, - ReadTimeout: timeout, - WriteTimeout: timeout, - MinBackoff: time.Millisecond, - MaxBackoff: 10 * time.Millisecond, + ReadTimeout: timeout, + WriteTimeout: timeout, + MinBackoff: time.Millisecond, + MaxBackoff: time.Second, + ForceQuitDelay: 10 * time.Second, } client, err := wtclient.New(clientCfg) if err != nil { @@ -1475,6 +1476,61 @@ var clientTests = []clientTest{ h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second) }, }, + { + // Asserts that the client's force quite delay will properly + // shutdown the client if it is unable to completely drain the + // task pipeline. + name: "force unclean shutdown", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: wtpolicy.TxPolicy{ + BlobType: blob.TypeAltruistCommit, + SweepFeeRate: wtpolicy.DefaultSweepFeeRate, + }, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + chanID = 0 + numUpdates = 6 + maxUpdates = 5 + ) + + // Advance the channel to create all states. + hints := h.advanceChannelN(chanID, numUpdates) + + // Back up 4 of the 5 states for the negotiated session. + h.backupStates(chanID, 0, maxUpdates-1, nil) + h.waitServerUpdates(hints[:maxUpdates-1], 5*time.Second) + + // Now, restart the tower and prevent it from acking any + // new sessions. We do this here as once the last slot + // is exhausted the client will attempt to renegotiate. + err := h.server.Stop() + require.Nil(h.t, err) + h.serverCfg.NoAckCreateSession = true + h.startServer() + + // Back up the remaining two states. Once the first is + // processed, the session will be exhausted but the + // client won't be able to regnegotiate a session for + // the final state. We'll only wait for the first five + // states to arrive at the tower. + h.backupStates(chanID, maxUpdates-1, numUpdates, nil) + h.waitServerUpdates(hints[:maxUpdates], 5*time.Second) + + // Finally, stop the client which will continue to + // attempt session negotiation since it has one more + // state to process. After the force quite delay + // expires, the client should force quite itself and + // allow the test to complete. + err = h.client.Stop() + require.Nil(h.t, err) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup