From 0c74575c44f9772d4226c72ed44cb6e5236878d1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 11 Sep 2018 16:05:26 -0700 Subject: [PATCH] contractcourt/chain_arbitrator: parallel chain watcher start This commit restructures the initialization procedure for chain watchers such that they can proceed in parallel. This is primarily to help nodes running with the neutrino backend, which otherwise forces a serial rescan for each active channel to check for spentness. Doing so allows the rescans to take advantage of batch scheduling in registering for the spend notifications, ensuring that only one or two passes are made, as opposed to one for each channel. Lastly, this commit ensures that the chain arb is properly shutdown if any of it's chain watchers or channel arbs fails to start, so as to cancel their goroutines before exiting. --- contractcourt/chain_arbitrator.go | 50 +++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index b8491b06..3bfbafa1 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -1,6 +1,7 @@ package contractcourt import ( + "errors" "fmt" "sync" "sync/atomic" @@ -14,6 +15,9 @@ import ( "github.com/lightningnetwork/lnd/lnwire" ) +// ErrChainArbExiting signals that the chain arbitrator is shutting down. +var ErrChainArbExiting = errors.New("ChainArbitrator exiting") + // ResolutionMsg is a message sent by resolvers to outside sub-systems once an // outgoing contract has been fully resolved. For multi-hop contracts, if we // resolve the outgoing contract, we'll also need to ensure that the incoming @@ -423,15 +427,49 @@ func (c *ChainArbitrator) Start() error { ) } - // Finally, we'll launch all the goroutines for each watcher and - // arbitrator so they can carry out their duties. + // Now, we'll start all chain watchers in parallel to shorten start up + // duration. In neutrino mode, this allows spend registrations to take + // advantage of batch spend reporting, instead of doing a single rescan + // per chain watcher. + // + // NOTE: After this point, we Stop the chain arb to ensure that any + // lingering goroutines are cleaned up before exiting. + watcherErrs := make(chan error, len(c.activeWatchers)) + var wg sync.WaitGroup for _, watcher := range c.activeWatchers { - if err := watcher.Start(); err != nil { + wg.Add(1) + go func(w *chainWatcher) { + defer wg.Done() + select { + case watcherErrs <- w.Start(): + case <-c.quit: + watcherErrs <- ErrChainArbExiting + } + }(watcher) + } + + // Once all chain watchers have been started, seal the err chan to + // signal the end of the err stream. + go func() { + wg.Wait() + close(watcherErrs) + }() + + // Handle all errors returned from spawning our chain watchers. If any + // of them failed, we will stop the chain arb to shutdown any active + // goroutines. + for err := range watcherErrs { + if err != nil { + c.Stop() return err } } + + // Finally, we'll launch all the goroutines for each arbitrator so they + // can carry out their duties. for _, arbitrator := range c.activeChannels { if err := arbitrator.Start(); err != nil { + c.Stop() return err } } @@ -573,7 +611,7 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg closeTx: respChan, }: case <-c.quit: - return nil, fmt.Errorf("ChainArbitrator shutting down") + return nil, ErrChainArbExiting } // We'll await two responses: the error response, and the transaction @@ -584,14 +622,14 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg return nil, err } case <-c.quit: - return nil, fmt.Errorf("ChainArbitrator shutting down") + return nil, ErrChainArbExiting } var closeTx *wire.MsgTx select { case closeTx = <-respChan: case <-c.quit: - return nil, fmt.Errorf("ChainArbitrator shutting down") + return nil, ErrChainArbExiting } // We'll attempt to disable the channel in the background to