wtclient: schedule ForceQuit first during Stop
Currently the ForceQuit call is scheduled after trying to stop the backup queue. In certain cases, the call to stop the queue never finishes, which means the force quit is never scheduled. We rememdy by scheduling this call before any other operations to ensure we can always exit ungracefully if necessary.
This commit is contained in:
parent
7e298f1434
commit
5aa59906b5
@ -462,13 +462,7 @@ func (c *TowerClient) Stop() error {
|
|||||||
c.stopped.Do(func() {
|
c.stopped.Do(func() {
|
||||||
log.Debugf("Stopping watchtower client")
|
log.Debugf("Stopping watchtower client")
|
||||||
|
|
||||||
// 1. Shutdown the backup queue, which will prevent any further
|
// 1. To ensure we don't hang forever on shutdown due to
|
||||||
// updates from being accepted. In practice, the links should be
|
|
||||||
// shutdown before the client has been stopped, so all updates
|
|
||||||
// would have been added prior.
|
|
||||||
c.pipeline.Stop()
|
|
||||||
|
|
||||||
// 2. To ensure we don't hang forever on shutdown due to
|
|
||||||
// unintended failures, we'll delay a call to force quit the
|
// unintended failures, we'll delay a call to force quit the
|
||||||
// pipeline if a ForceQuitDelay is specified. This will have no
|
// pipeline if a ForceQuitDelay is specified. This will have no
|
||||||
// effect if the pipeline shuts down cleanly before the delay
|
// effect if the pipeline shuts down cleanly before the delay
|
||||||
@ -483,6 +477,12 @@ func (c *TowerClient) Stop() error {
|
|||||||
time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit)
|
time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 2. Shutdown the backup queue, which will prevent any further
|
||||||
|
// updates from being accepted. In practice, the links should be
|
||||||
|
// shutdown before the client has been stopped, so all updates
|
||||||
|
// would have been added prior.
|
||||||
|
c.pipeline.Stop()
|
||||||
|
|
||||||
// 3. Once the backup queue has shutdown, wait for the main
|
// 3. Once the backup queue has shutdown, wait for the main
|
||||||
// dispatcher to exit. The backup queue will signal it's
|
// dispatcher to exit. The backup queue will signal it's
|
||||||
// completion to the dispatcher, which releases the wait group
|
// completion to the dispatcher, which releases the wait group
|
||||||
|
@ -449,7 +449,8 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
|
|||||||
ReadTimeout: timeout,
|
ReadTimeout: timeout,
|
||||||
WriteTimeout: timeout,
|
WriteTimeout: timeout,
|
||||||
MinBackoff: time.Millisecond,
|
MinBackoff: time.Millisecond,
|
||||||
MaxBackoff: 10 * time.Millisecond,
|
MaxBackoff: time.Second,
|
||||||
|
ForceQuitDelay: 10 * time.Second,
|
||||||
}
|
}
|
||||||
client, err := wtclient.New(clientCfg)
|
client, err := wtclient.New(clientCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1475,6 +1476,61 @@ var clientTests = []clientTest{
|
|||||||
h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second)
|
h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// Asserts that the client's force quite delay will properly
|
||||||
|
// shutdown the client if it is unable to completely drain the
|
||||||
|
// task pipeline.
|
||||||
|
name: "force unclean shutdown",
|
||||||
|
cfg: harnessCfg{
|
||||||
|
localBalance: localBalance,
|
||||||
|
remoteBalance: remoteBalance,
|
||||||
|
policy: wtpolicy.Policy{
|
||||||
|
TxPolicy: wtpolicy.TxPolicy{
|
||||||
|
BlobType: blob.TypeAltruistCommit,
|
||||||
|
SweepFeeRate: wtpolicy.DefaultSweepFeeRate,
|
||||||
|
},
|
||||||
|
MaxUpdates: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fn: func(h *testHarness) {
|
||||||
|
const (
|
||||||
|
chanID = 0
|
||||||
|
numUpdates = 6
|
||||||
|
maxUpdates = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
// Advance the channel to create all states.
|
||||||
|
hints := h.advanceChannelN(chanID, numUpdates)
|
||||||
|
|
||||||
|
// Back up 4 of the 5 states for the negotiated session.
|
||||||
|
h.backupStates(chanID, 0, maxUpdates-1, nil)
|
||||||
|
h.waitServerUpdates(hints[:maxUpdates-1], 5*time.Second)
|
||||||
|
|
||||||
|
// Now, restart the tower and prevent it from acking any
|
||||||
|
// new sessions. We do this here as once the last slot
|
||||||
|
// is exhausted the client will attempt to renegotiate.
|
||||||
|
err := h.server.Stop()
|
||||||
|
require.Nil(h.t, err)
|
||||||
|
h.serverCfg.NoAckCreateSession = true
|
||||||
|
h.startServer()
|
||||||
|
|
||||||
|
// Back up the remaining two states. Once the first is
|
||||||
|
// processed, the session will be exhausted but the
|
||||||
|
// client won't be able to regnegotiate a session for
|
||||||
|
// the final state. We'll only wait for the first five
|
||||||
|
// states to arrive at the tower.
|
||||||
|
h.backupStates(chanID, maxUpdates-1, numUpdates, nil)
|
||||||
|
h.waitServerUpdates(hints[:maxUpdates], 5*time.Second)
|
||||||
|
|
||||||
|
// Finally, stop the client which will continue to
|
||||||
|
// attempt session negotiation since it has one more
|
||||||
|
// state to process. After the force quite delay
|
||||||
|
// expires, the client should force quite itself and
|
||||||
|
// allow the test to complete.
|
||||||
|
err = h.client.Stop()
|
||||||
|
require.Nil(h.t, err)
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestClient executes the client test suite, asserting the ability to backup
|
// TestClient executes the client test suite, asserting the ability to backup
|
||||||
|
Loading…
Reference in New Issue
Block a user