Merge pull request #1887 from cfromknecht/parallel-chain-watcher-start

contractcourt/chain_arbitrator: parallel chain watcher start
This commit is contained in:
Olaoluwa Osuntokun 2018-09-11 19:38:40 -07:00 committed by GitHub
commit 55e9ef0157
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,6 +1,7 @@
package contractcourt package contractcourt
import ( import (
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -14,6 +15,9 @@ import (
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
// ErrChainArbExiting signals that the chain arbitrator is shutting down.
var ErrChainArbExiting = errors.New("ChainArbitrator exiting")
// ResolutionMsg is a message sent by resolvers to outside sub-systems once an // ResolutionMsg is a message sent by resolvers to outside sub-systems once an
// outgoing contract has been fully resolved. For multi-hop contracts, if we // outgoing contract has been fully resolved. For multi-hop contracts, if we
// resolve the outgoing contract, we'll also need to ensure that the incoming // resolve the outgoing contract, we'll also need to ensure that the incoming
@ -423,15 +427,49 @@ func (c *ChainArbitrator) Start() error {
) )
} }
// Finally, we'll launch all the goroutines for each watcher and // Now, we'll start all chain watchers in parallel to shorten start up
// arbitrator so they can carry out their duties. // duration. In neutrino mode, this allows spend registrations to take
// advantage of batch spend reporting, instead of doing a single rescan
// per chain watcher.
//
// NOTE: After this point, we Stop the chain arb to ensure that any
// lingering goroutines are cleaned up before exiting.
watcherErrs := make(chan error, len(c.activeWatchers))
var wg sync.WaitGroup
for _, watcher := range c.activeWatchers { for _, watcher := range c.activeWatchers {
if err := watcher.Start(); err != nil { wg.Add(1)
go func(w *chainWatcher) {
defer wg.Done()
select {
case watcherErrs <- w.Start():
case <-c.quit:
watcherErrs <- ErrChainArbExiting
}
}(watcher)
}
// Once all chain watchers have been started, seal the err chan to
// signal the end of the err stream.
go func() {
wg.Wait()
close(watcherErrs)
}()
// Handle all errors returned from spawning our chain watchers. If any
// of them failed, we will stop the chain arb to shutdown any active
// goroutines.
for err := range watcherErrs {
if err != nil {
c.Stop()
return err return err
} }
} }
// Finally, we'll launch all the goroutines for each arbitrator so they
// can carry out their duties.
for _, arbitrator := range c.activeChannels { for _, arbitrator := range c.activeChannels {
if err := arbitrator.Start(); err != nil { if err := arbitrator.Start(); err != nil {
c.Stop()
return err return err
} }
} }
@ -573,7 +611,7 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg
closeTx: respChan, closeTx: respChan,
}: }:
case <-c.quit: case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down") return nil, ErrChainArbExiting
} }
// We'll await two responses: the error response, and the transaction // We'll await two responses: the error response, and the transaction
@ -584,14 +622,14 @@ func (c *ChainArbitrator) ForceCloseContract(chanPoint wire.OutPoint) (*wire.Msg
return nil, err return nil, err
} }
case <-c.quit: case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down") return nil, ErrChainArbExiting
} }
var closeTx *wire.MsgTx var closeTx *wire.MsgTx
select { select {
case closeTx = <-respChan: case closeTx = <-respChan:
case <-c.quit: case <-c.quit:
return nil, fmt.Errorf("ChainArbitrator shutting down") return nil, ErrChainArbExiting
} }
// We'll attempt to disable the channel in the background to // We'll attempt to disable the channel in the background to