diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index b8491b06..3bfbafa1 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -1,6 +1,7 @@ package contractcourt import ( + "errors" "fmt" "sync" "sync/atomic" @@ -14,6 +15,9 @@ import ( "github.com/lightningnetwork/lnd/lnwire" ) +// ErrChainArbExiting signals that the chain arbitrator is shutting down. +var ErrChainArbExiting = errors.New("ChainArbitrator exiting") + // ResolutionMsg is a message sent by resolvers to outside sub-systems once an // outgoing contract has been fully resolved. For multi-hop contracts, if we // resolve the outgoing contract, we'll also need to ensure that the incoming @@ -423,15 +427,49 @@ func (c *ChainArbitrator) Start() error { ) } - // Finally, we'll launch all the goroutines for each watcher and - // arbitrator so they can carry out their duties. + // Now, we'll start all chain watchers in parallel to shorten start up + // duration. In neutrino mode, this allows spend registrations to take + // advantage of batch spend reporting, instead of doing a single rescan + // per chain watcher. + // + // NOTE: After this point, we Stop the chain arb to ensure that any + // lingering goroutines are cleaned up before exiting. + watcherErrs := make(chan error, len(c.activeWatchers)) + var wg sync.WaitGroup for _, watcher := range c.activeWatchers { - if err := watcher.Start(); err != nil { + wg.Add(1) + go func(w *chainWatcher) { + defer wg.Done() + select { + case watcherErrs <- w.Start(): + case <-c.quit: + watcherErrs <- ErrChainArbExiting + } + }(watcher) + } + + // Once all chain watchers have been started, seal the err chan to + // signal the end of the err stream. + go func() { + wg.Wait() + close(watcherErrs) + }() + + // Handle all errors returned from spawning our chain watchers. If any + // of them failed, we will stop the chain arb to shutdown any active + // goroutines. + for err := range watcherErrs { + if err != nil { + c.Stop() return err } } + + // Finally, we'll launch all the goroutines for each arbitrator so they + // can carry out their duties. for _, arbitrator := range c.activeChannels { if err := arbitrator.Start(); err != nil { + c.Stop() return err } } @@ -573,7 +611,7 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg closeTx: respChan, }: case <-c.quit: - return nil, fmt.Errorf("ChainArbitrator shutting down") + return nil, ErrChainArbExiting } // We'll await two responses: the error response, and the transaction @@ -584,14 +622,14 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg return nil, err } case <-c.quit: - return nil, fmt.Errorf("ChainArbitrator shutting down") + return nil, ErrChainArbExiting } var closeTx *wire.MsgTx select { case closeTx = <-respChan: case <-c.quit: - return nil, fmt.Errorf("ChainArbitrator shutting down") + return nil, ErrChainArbExiting } // We'll attempt to disable the channel in the background to