diff --git a/sweep/sweeper.go b/sweep/sweeper.go index a1b3cf16..b7f20274 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -154,6 +154,7 @@ type inputCluster struct { // attempting to sweep. type pendingSweepsReq struct { respChan chan map[wire.OutPoint]*PendingInput + errChan chan error } // PendingInput contains information about an input that is currently being @@ -381,6 +382,33 @@ func (s *UtxoSweeper) Start() error { defer s.wg.Done() s.collector(blockEpochs.Epochs) + + // The collector exited and won't longer handle incoming + // requests. This can happen on shutdown, when the block + // notifier shuts down before the sweeper and its clients. In + // order to not deadlock the clients waiting for their requests + // being handled, we handle them here and immediately return an + // error. When the sweeper finally is shut down we can exit as + // the clients will be notified. + for { + select { + case inp := <-s.newInputs: + inp.resultChan <- Result{ + Err: ErrSweeperShuttingDown, + } + + case req := <-s.pendingSweepsReqs: + req.errChan <- ErrSweeperShuttingDown + + case req := <-s.updateReqs: + req.responseChan <- &updateResp{ + err: ErrSweeperShuttingDown, + } + + case <-s.quit: + return + } + } }() return nil @@ -1290,9 +1318,11 @@ func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint, // attempting to sweep. func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) { respChan := make(chan map[wire.OutPoint]*PendingInput, 1) + errChan := make(chan error, 1) select { case s.pendingSweepsReqs <- &pendingSweepsReq{ respChan: respChan, + errChan: errChan, }: case <-s.quit: return nil, ErrSweeperShuttingDown @@ -1301,6 +1331,8 @@ func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) { select { case pendingSweeps := <-respChan: return pendingSweeps, nil + case err := <-errChan: + return nil, err case <-s.quit: return nil, ErrSweeperShuttingDown } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 54d622d5..10cbd05a 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -2218,3 +2218,44 @@ func TestRequiredTxOuts(t *testing.T) { }) } } + +// TestSweeperShutdownHandling tests that we notify callers when the sweeper +// cannot handle requests since it's in the process of shutting down. +func TestSweeperShutdownHandling(t *testing.T) { + ctx := createSweeperTestContext(t) + + // Make the backing notifier break down. This is what happens during + // lnd shut down, since the notifier is stopped before the sweeper. + require.Len(t, ctx.notifier.epochChan, 1) + for epochChan := range ctx.notifier.epochChan { + close(epochChan) + } + + // Give the collector some time to exit. + time.Sleep(50 * time.Millisecond) + + // Now trying to sweep inputs should return an error on the error + // channel. + resultChan, err := ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) + require.NoError(t, err) + + select { + case res := <-resultChan: + require.Equal(t, ErrSweeperShuttingDown, res.Err) + + case <-time.After(defaultTestTimeout): + t.Fatalf("no result arrived") + } + + // Stop the sweeper properly. + err = ctx.sweeper.Stop() + require.NoError(t, err) + + // Now attempting to sweep an input should error out immediately. + _, err = ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) + require.Error(t, err) +}