contractcourt/chain_arbitrator: parallel chain watcher start

This commit restructures the initialization procedure
for chain watchers such that they can proceed in parallel.
This is primarily to help nodes running with the neutrino
backend, which otherwise forces a serial rescan for each
active channel to check for spentness.

Doing so allows the rescans to take advantage of batch
scheduling in registering for the spend notifications,
ensuring that only one or two passes are made, as opposed
to one for each channel.

Lastly, this commit ensures that the chain arb is properly
shutdown if any of it's chain watchers or channel arbs
fails to start, so as to cancel their goroutines before
exiting.
This commit is contained in:
Conner Fromknecht 2018-09-11 16:05:26 -07:00
parent d050cedd02
commit 0c74575c44
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -1,6 +1,7 @@
package contractcourt package contractcourt
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -14,6 +15,9 @@ import (
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an // ResolutionMsg is a message sent by resolvers to outside sub-systems once an
// outgoing contract has been fully resolved. For multi-hop contracts, if we // outgoing contract has been fully resolved. For multi-hop contracts, if we
// resolve the outgoing contract, we'll also need to ensure that the incoming // resolve the outgoing contract, we'll also need to ensure that the incoming
@ -423,15 +427,49 @@ func (c *ChainArbitrator) Start() error {
) )
} }
// Finally, we'll launch all the goroutines for each watcher and // Now, we'll start all chain watchers in parallel to shorten start up
// arbitrator so they can carry out their duties. // duration. In neutrino mode, this allows spend registrations to take
// advantage of batch spend reporting, instead of doing a single rescan
// per chain watcher.
//
// NOTE: After this point, we Stop the chain arb to ensure that any
// lingering goroutines are cleaned up before exiting.
watcherErrs := make(chan error, len(c.activeWatchers))
var wg sync.WaitGroup
for _, watcher := range c.activeWatchers { for _, watcher := range c.activeWatchers {
if err := watcher.Start(); err != nil { wg.Add(1)
go func(w *chainWatcher) {
defer wg.Done()
select {
case watcherErrs <- w.Start():
case <-c.quit:
watcherErrs <- ErrChainArbExiting
}
}(watcher)
}
// Once all chain watchers have been started, seal the err chan to
// signal the end of the err stream.
go func() {
wg.Wait()
close(watcherErrs)
}()
// Handle all errors returned from spawning our chain watchers. If any
// of them failed, we will stop the chain arb to shutdown any active
// goroutines.
for err := range watcherErrs {
if err != nil {
c.Stop()
return err return err
} }
} }
// Finally, we'll launch all the goroutines for each arbitrator so they
// can carry out their duties.
for _, arbitrator := range c.activeChannels { for _, arbitrator := range c.activeChannels {
if err := arbitrator.Start(); err != nil { if err := arbitrator.Start(); err != nil {
c.Stop()
return err return err
} }
} }
@ -573,7 +611,7 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg
closeTx: respChan, closeTx: respChan,
}: }:
case <-c.quit: case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down") return nil, ErrChainArbExiting
} }
// We'll await two responses: the error response, and the transaction // We'll await two responses: the error response, and the transaction
@ -584,14 +622,14 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg
return nil, err return nil, err
} }
case <-c.quit: case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down") return nil, ErrChainArbExiting
} }
var closeTx *wire.MsgTx var closeTx *wire.MsgTx
select { select {
case closeTx = <-respChan: case closeTx = <-respChan:
case <-c.quit: case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down") return nil, ErrChainArbExiting
} }
// We'll attempt to disable the channel in the background to // We'll attempt to disable the channel in the background to