From 5aa59906b56ace20d180f70ecf8531ee94c46b0e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 30 Nov 2020 13:08:04 -0800 Subject: [PATCH 1/2] wtclient: schedule ForceQuit first during Stop Currently the ForceQuit call is scheduled after trying to stop the backup queue. In certain cases, the call to stop the queue never finishes, which means the force quit is never scheduled. We rememdy by scheduling this call before any other operations to ensure we can always exit ungracefully if necessary. --- watchtower/wtclient/client.go | 14 +++---- watchtower/wtclient/client_test.go | 64 ++++++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 71d767ef..62e4ac67 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -462,13 +462,7 @@ func (c *TowerClient) Stop() error { c.stopped.Do(func() { log.Debugf("Stopping watchtower client") - // 1. Shutdown the backup queue, which will prevent any further - // updates from being accepted. In practice, the links should be - // shutdown before the client has been stopped, so all updates - // would have been added prior. - c.pipeline.Stop() - - // 2. To ensure we don't hang forever on shutdown due to + // 1. To ensure we don't hang forever on shutdown due to // unintended failures, we'll delay a call to force quit the // pipeline if a ForceQuitDelay is specified. This will have no // effect if the pipeline shuts down cleanly before the delay @@ -483,6 +477,12 @@ func (c *TowerClient) Stop() error { time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit) } + // 2. Shutdown the backup queue, which will prevent any further + // updates from being accepted. In practice, the links should be + // shutdown before the client has been stopped, so all updates + // would have been added prior. + c.pipeline.Stop() + // 3. Once the backup queue has shutdown, wait for the main // dispatcher to exit. The backup queue will signal it's // completion to the dispatcher, which releases the wait group diff --git a/watchtower/wtclient/client_test.go b/watchtower/wtclient/client_test.go index 166ab228..41e01b61 100644 --- a/watchtower/wtclient/client_test.go +++ b/watchtower/wtclient/client_test.go @@ -446,10 +446,11 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness { NewAddress: func() ([]byte, error) { return addrScript, nil }, - ReadTimeout: timeout, - WriteTimeout: timeout, - MinBackoff: time.Millisecond, - MaxBackoff: 10 * time.Millisecond, + ReadTimeout: timeout, + WriteTimeout: timeout, + MinBackoff: time.Millisecond, + MaxBackoff: time.Second, + ForceQuitDelay: 10 * time.Second, } client, err := wtclient.New(clientCfg) if err != nil { @@ -1475,6 +1476,61 @@ var clientTests = []clientTest{ h.waitServerUpdates(hints[numUpdates/2:], 5*time.Second) }, }, + { + // Asserts that the client's force quite delay will properly + // shutdown the client if it is unable to completely drain the + // task pipeline. + name: "force unclean shutdown", + cfg: harnessCfg{ + localBalance: localBalance, + remoteBalance: remoteBalance, + policy: wtpolicy.Policy{ + TxPolicy: wtpolicy.TxPolicy{ + BlobType: blob.TypeAltruistCommit, + SweepFeeRate: wtpolicy.DefaultSweepFeeRate, + }, + MaxUpdates: 5, + }, + }, + fn: func(h *testHarness) { + const ( + chanID = 0 + numUpdates = 6 + maxUpdates = 5 + ) + + // Advance the channel to create all states. + hints := h.advanceChannelN(chanID, numUpdates) + + // Back up 4 of the 5 states for the negotiated session. + h.backupStates(chanID, 0, maxUpdates-1, nil) + h.waitServerUpdates(hints[:maxUpdates-1], 5*time.Second) + + // Now, restart the tower and prevent it from acking any + // new sessions. We do this here as once the last slot + // is exhausted the client will attempt to renegotiate. + err := h.server.Stop() + require.Nil(h.t, err) + h.serverCfg.NoAckCreateSession = true + h.startServer() + + // Back up the remaining two states. Once the first is + // processed, the session will be exhausted but the + // client won't be able to regnegotiate a session for + // the final state. We'll only wait for the first five + // states to arrive at the tower. + h.backupStates(chanID, maxUpdates-1, numUpdates, nil) + h.waitServerUpdates(hints[:maxUpdates], 5*time.Second) + + // Finally, stop the client which will continue to + // attempt session negotiation since it has one more + // state to process. After the force quite delay + // expires, the client should force quite itself and + // allow the test to complete. + err = h.client.Stop() + require.Nil(h.t, err) + }, + }, } // TestClient executes the client test suite, asserting the ability to backup From 1f86526250f66c300f916f105512dc3ef311fd9c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 30 Nov 2020 13:08:18 -0800 Subject: [PATCH 2/2] wtclient: add backoff in negotiation when createSession() fails Currently if the tower hangs up during session negotiation there is no backoff applied. We add backoff here to avoid excessive CPU/network utilization during unexpected failures. --- watchtower/wtclient/session_negotiator.go | 31 +++++++++++++++-------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/watchtower/wtclient/session_negotiator.go b/watchtower/wtclient/session_negotiator.go index 67be526d..c782deee 100644 --- a/watchtower/wtclient/session_negotiator.go +++ b/watchtower/wtclient/session_negotiator.go @@ -231,6 +231,19 @@ func (n *sessionNegotiator) negotiate() { // backoff. var backoff time.Duration + // Create a closure to update the backoff upon failure such that it + // stays within our min and max backoff parameters. + updateBackoff := func() { + if backoff == 0 { + backoff = n.cfg.MinBackoff + } else { + backoff *= 2 + if backoff > n.cfg.MaxBackoff { + backoff = n.cfg.MaxBackoff + } + } + } + retryWithBackoff: // If we are retrying, wait out the delay before continuing. if backoff > 0 { @@ -251,16 +264,8 @@ retryWithBackoff: // Pull the next candidate from our list of addresses. tower, err := n.cfg.Candidates.Next() if err != nil { - if backoff == 0 { - backoff = n.cfg.MinBackoff - } else { - // We've run out of addresses, double and clamp - // backoff. - backoff *= 2 - if backoff > n.cfg.MaxBackoff { - backoff = n.cfg.MaxBackoff - } - } + // We've run out of addresses, update our backoff. + updateBackoff() log.Debugf("Unable to get new tower candidate, "+ "retrying after %v -- reason: %v", backoff, err) @@ -293,10 +298,14 @@ retryWithBackoff: // get a new session, trying all addresses if necessary. err = n.createSession(tower, keyIndex) if err != nil { + // An unexpected error occurred, updpate our backoff. + updateBackoff() + log.Debugf("Session negotiation with tower=%x "+ "failed, trying again -- reason: %v", tower.IdentityKey.SerializeCompressed(), err) - continue + + goto retryWithBackoff } // Success.