sweeper: avoid deadlock on shutdown
We risked deadlocking on shutdown if a client (in our case a contract resolver) attempted to schedule a sweep of an input after the ChainNotifier had been shut down. This would cause the `collector` goroutine to exit, and not handle incoming requests, causing a deadlock (since the ChainArbitrator is being stopped before the Sweeper in the server). To fix this we could change the order these subsystems are stopped, but this doesn't ensure there aren't other clients that could end up in the same deadlock scenario. So instead we keep handling the incoming requests even after the collector has exited (immediatly returning an error), until the sweeper is signalled to shutdown.
This commit is contained in:
parent
de66d35a5b
commit
77daa3dbe4
@ -154,6 +154,7 @@ type inputCluster struct {
|
||||
// attempting to sweep.
|
||||
type pendingSweepsReq struct {
|
||||
respChan chan map[wire.OutPoint]*PendingInput
|
||||
errChan chan error
|
||||
}
|
||||
|
||||
// PendingInput contains information about an input that is currently being
|
||||
@ -381,6 +382,33 @@ func (s *UtxoSweeper) Start() error {
|
||||
defer s.wg.Done()
|
||||
|
||||
s.collector(blockEpochs.Epochs)
|
||||
|
||||
// The collector exited and won't longer handle incoming
|
||||
// requests. This can happen on shutdown, when the block
|
||||
// notifier shuts down before the sweeper and its clients. In
|
||||
// order to not deadlock the clients waiting for their requests
|
||||
// being handled, we handle them here and immediately return an
|
||||
// error. When the sweeper finally is shut down we can exit as
|
||||
// the clients will be notified.
|
||||
for {
|
||||
select {
|
||||
case inp := <-s.newInputs:
|
||||
inp.resultChan <- Result{
|
||||
Err: ErrSweeperShuttingDown,
|
||||
}
|
||||
|
||||
case req := <-s.pendingSweepsReqs:
|
||||
req.errChan <- ErrSweeperShuttingDown
|
||||
|
||||
case req := <-s.updateReqs:
|
||||
req.responseChan <- &updateResp{
|
||||
err: ErrSweeperShuttingDown,
|
||||
}
|
||||
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
@ -1290,9 +1318,11 @@ func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint,
|
||||
// attempting to sweep.
|
||||
func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {
|
||||
respChan := make(chan map[wire.OutPoint]*PendingInput, 1)
|
||||
errChan := make(chan error, 1)
|
||||
select {
|
||||
case s.pendingSweepsReqs <- &pendingSweepsReq{
|
||||
respChan: respChan,
|
||||
errChan: errChan,
|
||||
}:
|
||||
case <-s.quit:
|
||||
return nil, ErrSweeperShuttingDown
|
||||
@ -1301,6 +1331,8 @@ func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {
|
||||
select {
|
||||
case pendingSweeps := <-respChan:
|
||||
return pendingSweeps, nil
|
||||
case err := <-errChan:
|
||||
return nil, err
|
||||
case <-s.quit:
|
||||
return nil, ErrSweeperShuttingDown
|
||||
}
|
||||
|
@ -2218,3 +2218,44 @@ func TestRequiredTxOuts(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestSweeperShutdownHandling tests that we notify callers when the sweeper
|
||||
// cannot handle requests since it's in the process of shutting down.
|
||||
func TestSweeperShutdownHandling(t *testing.T) {
|
||||
ctx := createSweeperTestContext(t)
|
||||
|
||||
// Make the backing notifier break down. This is what happens during
|
||||
// lnd shut down, since the notifier is stopped before the sweeper.
|
||||
require.Len(t, ctx.notifier.epochChan, 1)
|
||||
for epochChan := range ctx.notifier.epochChan {
|
||||
close(epochChan)
|
||||
}
|
||||
|
||||
// Give the collector some time to exit.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
// Now trying to sweep inputs should return an error on the error
|
||||
// channel.
|
||||
resultChan, err := ctx.sweeper.SweepInput(
|
||||
spendableInputs[0], defaultFeePref,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
select {
|
||||
case res := <-resultChan:
|
||||
require.Equal(t, ErrSweeperShuttingDown, res.Err)
|
||||
|
||||
case <-time.After(defaultTestTimeout):
|
||||
t.Fatalf("no result arrived")
|
||||
}
|
||||
|
||||
// Stop the sweeper properly.
|
||||
err = ctx.sweeper.Stop()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now attempting to sweep an input should error out immediately.
|
||||
_, err = ctx.sweeper.SweepInput(
|
||||
spendableInputs[0], defaultFeePref,
|
||||
)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user