diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 5956a0fb..bb96a7ec 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -34,7 +34,7 @@ const ( type BitcoindNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once stopped int32 // To be used atomically. chainConn *chain.BitcoindClient @@ -96,11 +96,41 @@ func New(chainConn *chain.BitcoindConn, chainParams *chaincfg.Params, // Start connects to the running bitcoind node over websockets, registers for // block notifications, and finally launches all related helper goroutines. func (b *BitcoindNotifier) Start() error { - // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + var startErr error + b.start.Do(func() { + startErr = b.startNotifier() + }) + return startErr +} + +// Stop shutsdown the BitcoindNotifier. +func (b *BitcoindNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { return nil } + // Shutdown the rpc client, this gracefully disconnects from bitcoind, + // and cleans up all related resources. + b.chainConn.Stop() + + close(b.quit) + b.wg.Wait() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + b.txNotifier.TearDown() + + return nil +} + +func (b *BitcoindNotifier) startNotifier() error { // Connect to bitcoind, and register for notifications on connected, // and disconnected blocks. if err := b.chainConn.Start(); err != nil { @@ -131,33 +161,6 @@ func (b *BitcoindNotifier) Start() error { return nil } -// Stop shutsdown the BitcoindNotifier. -func (b *BitcoindNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&b.stopped, 1) != 1 { - return nil - } - - // Shutdown the rpc client, this gracefully disconnects from bitcoind, - // and cleans up all related resources. - b.chainConn.Stop() - - close(b.quit) - b.wg.Wait() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range b.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - b.txNotifier.TearDown() - - return nil -} - // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. func (b *BitcoindNotifier) notificationDispatcher() { diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 7b179209..1ce8c630 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -53,7 +53,7 @@ type txUpdate struct { type BtcdNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once stopped int32 // To be used atomically. chainConn *rpcclient.Client @@ -134,11 +134,44 @@ func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params, // Start connects to the running btcd node over websockets, registers for block // notifications, and finally launches all related helper goroutines. func (b *BtcdNotifier) Start() error { - // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + var startErr error + b.start.Do(func() { + startErr = b.startNotifier() + }) + return startErr +} + +// Stop shutsdown the BtcdNotifier. +func (b *BtcdNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { return nil } + // Shutdown the rpc client, this gracefully disconnects from btcd, and + // cleans up all related resources. + b.chainConn.Shutdown() + + close(b.quit) + b.wg.Wait() + + b.chainUpdates.Stop() + b.txUpdates.Stop() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + b.txNotifier.TearDown() + + return nil +} + +func (b *BtcdNotifier) startNotifier() error { // Start our concurrent queues before starting the chain connection, to // ensure onBlockConnected and onRedeemingTx callbacks won't be // blocked. @@ -182,36 +215,6 @@ func (b *BtcdNotifier) Start() error { return nil } -// Stop shutsdown the BtcdNotifier. -func (b *BtcdNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&b.stopped, 1) != 1 { - return nil - } - - // Shutdown the rpc client, this gracefully disconnects from btcd, and - // cleans up all related resources. - b.chainConn.Shutdown() - - close(b.quit) - b.wg.Wait() - - b.chainUpdates.Stop() - b.txUpdates.Stop() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range b.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - b.txNotifier.TearDown() - - return nil -} - // onBlockConnected implements on OnBlockConnected callback for rpcclient. // Ingesting a block updates the wallet's internal utxo state based on the // outputs created and destroyed within each block. diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 771c4b86..be1e8f80 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -39,7 +39,7 @@ const ( type NeutrinoNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once stopped int32 // To be used atomically. bestBlockMtx sync.RWMutex @@ -111,11 +111,40 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, // Start contacts the running neutrino light client and kicks off an initial // empty rescan. func (n *NeutrinoNotifier) Start() error { - // Already started? - if atomic.AddInt32(&n.started, 1) != 1 { + var startErr error + n.start.Do(func() { + startErr = n.startNotifier() + }) + return startErr +} + +// Stop shuts down the NeutrinoNotifier. +func (n *NeutrinoNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&n.stopped, 1) != 1 { return nil } + close(n.quit) + n.wg.Wait() + + n.chainUpdates.Stop() + n.txUpdates.Stop() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range n.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + n.txNotifier.TearDown() + + return nil +} + +func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be // blocked. @@ -174,32 +203,6 @@ func (n *NeutrinoNotifier) Start() error { return nil } -// Stop shuts down the NeutrinoNotifier. -func (n *NeutrinoNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&n.stopped, 1) != 1 { - return nil - } - - close(n.quit) - n.wg.Wait() - - n.chainUpdates.Stop() - n.txUpdates.Stop() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range n.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - n.txNotifier.TearDown() - - return nil -} - // filteredBlock represents a new block which has been connected to the main // chain. The slice of transactions will only be populated if the block // includes a transaction that confirmed one of our watched txids, or spends