From 77daa3dbe430d82a94adfc0f84fdc252a6667b82 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 9 Dec 2020 12:24:01 +0100 Subject: [PATCH] sweeper: avoid deadlock on shutdown We risked deadlocking on shutdown if a client (in our case a contract resolver) attempted to schedule a sweep of an input after the ChainNotifier had been shut down. This would cause the `collector` goroutine to exit, and not handle incoming requests, causing a deadlock (since the ChainArbitrator is being stopped before the Sweeper in the server). To fix this we could change the order these subsystems are stopped, but this doesn't ensure there aren't other clients that could end up in the same deadlock scenario. So instead we keep handling the incoming requests even after the collector has exited (immediatly returning an error), until the sweeper is signalled to shutdown. --- sweep/sweeper.go | 32 ++++++++++++++++++++++++++++++++ sweep/sweeper_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/sweep/sweeper.go b/sweep/sweeper.go index a1b3cf16..b7f20274 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -154,6 +154,7 @@ type inputCluster struct { // attempting to sweep. type pendingSweepsReq struct { respChan chan map[wire.OutPoint]*PendingInput + errChan chan error } // PendingInput contains information about an input that is currently being @@ -381,6 +382,33 @@ func (s *UtxoSweeper) Start() error { defer s.wg.Done() s.collector(blockEpochs.Epochs) + + // The collector exited and won't longer handle incoming + // requests. This can happen on shutdown, when the block + // notifier shuts down before the sweeper and its clients. In + // order to not deadlock the clients waiting for their requests + // being handled, we handle them here and immediately return an + // error. When the sweeper finally is shut down we can exit as + // the clients will be notified. + for { + select { + case inp := <-s.newInputs: + inp.resultChan <- Result{ + Err: ErrSweeperShuttingDown, + } + + case req := <-s.pendingSweepsReqs: + req.errChan <- ErrSweeperShuttingDown + + case req := <-s.updateReqs: + req.responseChan <- &updateResp{ + err: ErrSweeperShuttingDown, + } + + case <-s.quit: + return + } + } }() return nil @@ -1290,9 +1318,11 @@ func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint, // attempting to sweep. func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) { respChan := make(chan map[wire.OutPoint]*PendingInput, 1) + errChan := make(chan error, 1) select { case s.pendingSweepsReqs <- &pendingSweepsReq{ respChan: respChan, + errChan: errChan, }: case <-s.quit: return nil, ErrSweeperShuttingDown @@ -1301,6 +1331,8 @@ func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) { select { case pendingSweeps := <-respChan: return pendingSweeps, nil + case err := <-errChan: + return nil, err case <-s.quit: return nil, ErrSweeperShuttingDown } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 54d622d5..10cbd05a 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -2218,3 +2218,44 @@ func TestRequiredTxOuts(t *testing.T) { }) } } + +// TestSweeperShutdownHandling tests that we notify callers when the sweeper +// cannot handle requests since it's in the process of shutting down. +func TestSweeperShutdownHandling(t *testing.T) { + ctx := createSweeperTestContext(t) + + // Make the backing notifier break down. This is what happens during + // lnd shut down, since the notifier is stopped before the sweeper. + require.Len(t, ctx.notifier.epochChan, 1) + for epochChan := range ctx.notifier.epochChan { + close(epochChan) + } + + // Give the collector some time to exit. + time.Sleep(50 * time.Millisecond) + + // Now trying to sweep inputs should return an error on the error + // channel. + resultChan, err := ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) + require.NoError(t, err) + + select { + case res := <-resultChan: + require.Equal(t, ErrSweeperShuttingDown, res.Err) + + case <-time.After(defaultTestTimeout): + t.Fatalf("no result arrived") + } + + // Stop the sweeper properly. + err = ctx.sweeper.Stop() + require.NoError(t, err) + + // Now attempting to sweep an input should error out immediately. + _, err = ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) + require.Error(t, err) +}