lnd: Ensure networkWatcher goroutine exits when harness is stopped.

This commit is contained in:
Jim Posen 2017-10-24 19:54:18 -07:00 committed by Olaoluwa Osuntokun
parent 923dd9ac30
commit 7d65ad4302

@ -130,6 +130,8 @@ func newLightningNode(btcrpcConfig *rpcclient.ConnConfig, lndArgs []string) (*li
} }
nodeNum := numActiveNodes nodeNum := numActiveNodes
numActiveNodes++
cfg.DataDir, err = ioutil.TempDir("", "lndtest-data") cfg.DataDir, err = ioutil.TempDir("", "lndtest-data")
if err != nil { if err != nil {
return nil, err return nil, err
@ -145,8 +147,6 @@ func newLightningNode(btcrpcConfig *rpcclient.ConnConfig, lndArgs []string) (*li
cfg.PeerPort, cfg.RPCPort = generateListeningPorts() cfg.PeerPort, cfg.RPCPort = generateListeningPorts()
numActiveNodes++
lndArgs = append(lndArgs, "--externalip=127.0.0.1:"+ lndArgs = append(lndArgs, "--externalip=127.0.0.1:"+
strconv.Itoa(cfg.PeerPort)) strconv.Itoa(cfg.PeerPort))
lndArgs = append(lndArgs, "--noencryptwallet") lndArgs = append(lndArgs, "--noencryptwallet")
@ -198,7 +198,7 @@ func (l *lightningNode) genArgs() []string {
// Start launches a new process running lnd. Additionally, the PID of the // Start launches a new process running lnd. Additionally, the PID of the
// launched process is saved in order to possibly kill the process forcibly // launched process is saved in order to possibly kill the process forcibly
// later. // later.
func (l *lightningNode) Start(lndError chan error) error { func (l *lightningNode) Start(lndError chan<- error) error {
args := l.genArgs() args := l.genArgs()
l.cmd = exec.Command("lnd", args...) l.cmd = exec.Command("lnd", args...)
@ -346,8 +346,9 @@ func (l *lightningNode) cleanup() error {
var err error var err error
for _, dir := range dirs { for _, dir := range dirs {
if err = os.RemoveAll(dir); err != nil { if removeErr := os.RemoveAll(dir); removeErr != nil {
log.Printf("Cannot remove dir %s: %v", dir, err) log.Printf("Cannot remove dir %s: %v", dir, removeErr)
err = removeErr
} }
} }
return err return err
@ -388,7 +389,7 @@ func (l *lightningNode) Stop() error {
// process has been started up again. // process has been started up again.
func (l *lightningNode) Restart(errChan chan error, callback func() error) error { func (l *lightningNode) Restart(errChan chan error, callback func() error) error {
if err := l.Stop(); err != nil { if err := l.Stop(); err != nil {
return nil return err
} }
<-l.processExit <-l.processExit
@ -705,6 +706,8 @@ type networkHarness struct {
// to main process. // to main process.
lndErrorChan chan error lndErrorChan chan error
quit chan struct{}
sync.Mutex sync.Mutex
} }
@ -718,10 +721,11 @@ func newNetworkHarness() (*networkHarness, error) {
seenTxns: make(chan chainhash.Hash), seenTxns: make(chan chainhash.Hash),
bitcoinWatchRequests: make(chan *txWatchRequest), bitcoinWatchRequests: make(chan *txWatchRequest),
lndErrorChan: make(chan error), lndErrorChan: make(chan error),
quit: make(chan struct{}),
}, nil }, nil
} }
// InitializeSeedNodes initialized alice and bob nodes given an already // InitializeSeedNodes initializes alice and bob nodes given an already
// running instance of btcd's rpctest harness and extra command line flags, // running instance of btcd's rpctest harness and extra command line flags,
// which should be formatted properly - "--arg=value". // which should be formatted properly - "--arg=value".
func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error { func (n *networkHarness) InitializeSeedNodes(r *rpctest.Harness, lndArgs []string) error {
@ -780,19 +784,15 @@ func (n *networkHarness) SetUp() error {
errChan := make(chan error, 2) errChan := make(chan error, 2)
wg.Add(2) wg.Add(2)
go func() { go func() {
var err error
defer wg.Done() defer wg.Done()
if err = n.Alice.Start(n.lndErrorChan); err != nil { if err := n.Alice.Start(n.lndErrorChan); err != nil {
errChan <- err errChan <- err
return
} }
}() }()
go func() { go func() {
var err error
defer wg.Done() defer wg.Done()
if err = n.Bob.Start(n.lndErrorChan); err != nil { if err := n.Bob.Start(n.lndErrorChan); err != nil {
errChan <- err errChan <- err
return
} }
}() }()
wg.Wait() wg.Wait()
@ -888,6 +888,8 @@ func (n *networkHarness) TearDownAll() error {
} }
close(n.lndErrorChan) close(n.lndErrorChan)
close(n.quit)
return nil return nil
} }
@ -1016,6 +1018,9 @@ func (n *networkHarness) networkWatcher() {
for { for {
select { select {
case <-n.quit:
return
case req := <-n.bitcoinWatchRequests: case req := <-n.bitcoinWatchRequests:
// If we've already seen this transaction, then // If we've already seen this transaction, then
// immediately dispatch the request. Otherwise, append // immediately dispatch the request. Otherwise, append
@ -1053,6 +1058,13 @@ func (n *networkHarness) networkWatcher() {
// OnTxAccepted is a callback to be called each time a new transaction has been // OnTxAccepted is a callback to be called each time a new transaction has been
// broadcast on the network. // broadcast on the network.
func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) { func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount) {
// Return immediately if harness has been torn down.
select {
case <-n.quit:
return
default:
}
go func() { go func() {
n.seenTxns <- *hash n.seenTxns <- *hash
}() }()
@ -1063,6 +1075,13 @@ func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount)
// then an error is returned. // then an error is returned.
// TODO(roasbeef): add another method which creates queue of all seen transactions // TODO(roasbeef): add another method which creates queue of all seen transactions
func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error { func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error {
// Return immediately if harness has been torn down.
select {
case <-n.quit:
return fmt.Errorf("networkHarness has been torn down")
default:
}
eventChan := make(chan struct{}) eventChan := make(chan struct{})
n.bitcoinWatchRequests <- &txWatchRequest{ n.bitcoinWatchRequests <- &txWatchRequest{
@ -1073,6 +1092,8 @@ func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.
select { select {
case <-eventChan: case <-eventChan:
return nil return nil
case <-n.quit:
return fmt.Errorf("networkHarness has been torn down")
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("tx not seen before context timeout") return fmt.Errorf("tx not seen before context timeout")
} }