Merge pull request #4789 from cfromknecht/wt-clean-shutdown
wtclient: schedule ForceQuit first during Stop
This commit is contained in:
commit
39d2ea752e
@ -462,13 +462,7 @@ func (c *TowerClient) Stop() error {
|
|||||||
c.stopped.Do(func() {
|
c.stopped.Do(func() {
|
||||||
log.Debugf("Stopping watchtower client")
|
log.Debugf("Stopping watchtower client")
|
||||||
|
|
||||||
// 1. Shutdown the backup queue, which will prevent any further
|
// 1. To ensure we don't hang forever on shutdown due to
|
||||||
// updates from being accepted. In practice, the links should be
|
|
||||||
// shutdown before the client has been stopped, so all updates
|
|
||||||
// would have been added prior.
|
|
||||||
c.pipeline.Stop()
|
|
||||||
|
|
||||||
// 2. To ensure we don't hang forever on shutdown due to
|
|
||||||
// unintended failures, we'll delay a call to force quit the
|
// unintended failures, we'll delay a call to force quit the
|
||||||
// pipeline if a ForceQuitDelay is specified. This will have no
|
// pipeline if a ForceQuitDelay is specified. This will have no
|
||||||
// effect if the pipeline shuts down cleanly before the delay
|
// effect if the pipeline shuts down cleanly before the delay
|
||||||
@ -483,6 +477,12 @@ func (c *TowerClient) Stop() error {
|
|||||||
time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit)
|
time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 2. Shutdown the backup queue, which will prevent any further
|
||||||
|
// updates from being accepted. In practice, the links should be
|
||||||
|
// shutdown before the client has been stopped, so all updates
|
||||||
|
// would have been added prior.
|
||||||
|
c.pipeline.Stop()
|
||||||
|
|
||||||
// 3. Once the backup queue has shutdown, wait for the main
|
// 3. Once the backup queue has shutdown, wait for the main
|
||||||
// dispatcher to exit. The backup queue will signal it's
|
// dispatcher to exit. The backup queue will signal it's
|
||||||
// completion to the dispatcher, which releases the wait group
|
// completion to the dispatcher, which releases the wait group
|
||||||
|
@ -446,10 +446,11 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
|
|||||||
NewAddress: func() ([]byte, error) {
|
NewAddress: func() ([]byte, error) {
|
||||||
return addrScript, nil
|
return addrScript, nil
|
||||||
},
|
},
|
||||||
ReadTimeout: timeout,
|
ReadTimeout: timeout,
|
||||||
WriteTimeout: timeout,
|
WriteTimeout: timeout,
|
||||||
MinBackoff: time.Millisecond,
|
MinBackoff: time.Millisecond,
|
||||||
MaxBackoff: 10 * time.Millisecond,
|
MaxBackoff: time.Second,
|
||||||
|
ForceQuitDelay: 10 * time.Second,
|
||||||
}
|
}
|
||||||
client, err := wtclient.New(clientCfg)
|
client, err := wtclient.New(clientCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1475,6 +1476,61 @@ var clientTests = []clientTest{
|
|||||||
h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second)
|
h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// Asserts that the client's force quite delay will properly
|
||||||
|
// shutdown the client if it is unable to completely drain the
|
||||||
|
// task pipeline.
|
||||||
|
name: "force unclean shutdown",
|
||||||
|
cfg: harnessCfg{
|
||||||
|
localBalance: localBalance,
|
||||||
|
remoteBalance: remoteBalance,
|
||||||
|
policy: wtpolicy.Policy{
|
||||||
|
TxPolicy: wtpolicy.TxPolicy{
|
||||||
|
BlobType: blob.TypeAltruistCommit,
|
||||||
|
SweepFeeRate: wtpolicy.DefaultSweepFeeRate,
|
||||||
|
},
|
||||||
|
MaxUpdates: 5,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
fn: func(h *testHarness) {
|
||||||
|
const (
|
||||||
|
chanID = 0
|
||||||
|
numUpdates = 6
|
||||||
|
maxUpdates = 5
|
||||||
|
)
|
||||||
|
|
||||||
|
// Advance the channel to create all states.
|
||||||
|
hints := h.advanceChannelN(chanID, numUpdates)
|
||||||
|
|
||||||
|
// Back up 4 of the 5 states for the negotiated session.
|
||||||
|
h.backupStates(chanID, 0, maxUpdates-1, nil)
|
||||||
|
h.waitServerUpdates(hints[:maxUpdates-1], 5*time.Second)
|
||||||
|
|
||||||
|
// Now, restart the tower and prevent it from acking any
|
||||||
|
// new sessions. We do this here as once the last slot
|
||||||
|
// is exhausted the client will attempt to renegotiate.
|
||||||
|
err := h.server.Stop()
|
||||||
|
require.Nil(h.t, err)
|
||||||
|
h.serverCfg.NoAckCreateSession = true
|
||||||
|
h.startServer()
|
||||||
|
|
||||||
|
// Back up the remaining two states. Once the first is
|
||||||
|
// processed, the session will be exhausted but the
|
||||||
|
// client won't be able to regnegotiate a session for
|
||||||
|
// the final state. We'll only wait for the first five
|
||||||
|
// states to arrive at the tower.
|
||||||
|
h.backupStates(chanID, maxUpdates-1, numUpdates, nil)
|
||||||
|
h.waitServerUpdates(hints[:maxUpdates], 5*time.Second)
|
||||||
|
|
||||||
|
// Finally, stop the client which will continue to
|
||||||
|
// attempt session negotiation since it has one more
|
||||||
|
// state to process. After the force quite delay
|
||||||
|
// expires, the client should force quite itself and
|
||||||
|
// allow the test to complete.
|
||||||
|
err = h.client.Stop()
|
||||||
|
require.Nil(h.t, err)
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestClient executes the client test suite, asserting the ability to backup
|
// TestClient executes the client test suite, asserting the ability to backup
|
||||||
|
@ -231,6 +231,19 @@ func (n *sessionNegotiator) negotiate() {
|
|||||||
// backoff.
|
// backoff.
|
||||||
var backoff time.Duration
|
var backoff time.Duration
|
||||||
|
|
||||||
|
// Create a closure to update the backoff upon failure such that it
|
||||||
|
// stays within our min and max backoff parameters.
|
||||||
|
updateBackoff := func() {
|
||||||
|
if backoff == 0 {
|
||||||
|
backoff = n.cfg.MinBackoff
|
||||||
|
} else {
|
||||||
|
backoff *= 2
|
||||||
|
if backoff > n.cfg.MaxBackoff {
|
||||||
|
backoff = n.cfg.MaxBackoff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
retryWithBackoff:
|
retryWithBackoff:
|
||||||
// If we are retrying, wait out the delay before continuing.
|
// If we are retrying, wait out the delay before continuing.
|
||||||
if backoff > 0 {
|
if backoff > 0 {
|
||||||
@ -251,16 +264,8 @@ retryWithBackoff:
|
|||||||
// Pull the next candidate from our list of addresses.
|
// Pull the next candidate from our list of addresses.
|
||||||
tower, err := n.cfg.Candidates.Next()
|
tower, err := n.cfg.Candidates.Next()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if backoff == 0 {
|
// We've run out of addresses, update our backoff.
|
||||||
backoff = n.cfg.MinBackoff
|
updateBackoff()
|
||||||
} else {
|
|
||||||
// We've run out of addresses, double and clamp
|
|
||||||
// backoff.
|
|
||||||
backoff *= 2
|
|
||||||
if backoff > n.cfg.MaxBackoff {
|
|
||||||
backoff = n.cfg.MaxBackoff
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("Unable to get new tower candidate, "+
|
log.Debugf("Unable to get new tower candidate, "+
|
||||||
"retrying after %v -- reason: %v", backoff, err)
|
"retrying after %v -- reason: %v", backoff, err)
|
||||||
@ -293,10 +298,14 @@ retryWithBackoff:
|
|||||||
// get a new session, trying all addresses if necessary.
|
// get a new session, trying all addresses if necessary.
|
||||||
err = n.createSession(tower, keyIndex)
|
err = n.createSession(tower, keyIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// An unexpected error occurred, updpate our backoff.
|
||||||
|
updateBackoff()
|
||||||
|
|
||||||
log.Debugf("Session negotiation with tower=%x "+
|
log.Debugf("Session negotiation with tower=%x "+
|
||||||
"failed, trying again -- reason: %v",
|
"failed, trying again -- reason: %v",
|
||||||
tower.IdentityKey.SerializeCompressed(), err)
|
tower.IdentityKey.SerializeCompressed(), err)
|
||||||
continue
|
|
||||||
|
goto retryWithBackoff
|
||||||
}
|
}
|
||||||
|
|
||||||
// Success.
|
// Success.
|
||||||
|
Loading…
Reference in New Issue
Block a user