chainntnfs: use sync.Once to start notifiers.

This commit is contained in:
Roei Erez 2020-04-30 12:34:47 +03:00
parent 0cf63ae898
commit cfe0babd78
3 changed files with 101 additions and 92 deletions

@ -34,7 +34,7 @@ const (
type BitcoindNotifier struct { type BitcoindNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically. start sync.Once
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
chainConn *chain.BitcoindClient chainConn *chain.BitcoindClient
@ -96,11 +96,41 @@ func New(chainConn *chain.BitcoindConn, chainParams *chaincfg.Params,
// Start connects to the running bitcoind node over websockets, registers for // Start connects to the running bitcoind node over websockets, registers for
// block notifications, and finally launches all related helper goroutines. // block notifications, and finally launches all related helper goroutines.
func (b *BitcoindNotifier) Start() error { func (b *BitcoindNotifier) Start() error {
// Already started? var startErr error
if atomic.AddInt32(&b.started, 1) != 1 { b.start.Do(func() {
startErr = b.startNotifier()
})
return startErr
}
// Stop shutsdown the BitcoindNotifier.
func (b *BitcoindNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil return nil
} }
// Shutdown the rpc client, this gracefully disconnects from bitcoind,
// and cleans up all related resources.
b.chainConn.Stop()
close(b.quit)
b.wg.Wait()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil
}
func (b *BitcoindNotifier) startNotifier() error {
// Connect to bitcoind, and register for notifications on connected, // Connect to bitcoind, and register for notifications on connected,
// and disconnected blocks. // and disconnected blocks.
if err := b.chainConn.Start(); err != nil { if err := b.chainConn.Start(); err != nil {
@ -131,33 +161,6 @@ func (b *BitcoindNotifier) Start() error {
return nil return nil
} }
// Stop shutsdown the BitcoindNotifier.
func (b *BitcoindNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil
}
// Shutdown the rpc client, this gracefully disconnects from bitcoind,
// and cleans up all related resources.
b.chainConn.Stop()
close(b.quit)
b.wg.Wait()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil
}
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches. // notification registrations, as well as notification dispatches.
func (b *BitcoindNotifier) notificationDispatcher() { func (b *BitcoindNotifier) notificationDispatcher() {

@ -53,7 +53,7 @@ type txUpdate struct {
type BtcdNotifier struct { type BtcdNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically. start sync.Once
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
chainConn *rpcclient.Client chainConn *rpcclient.Client
@ -134,11 +134,44 @@ func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params,
// Start connects to the running btcd node over websockets, registers for block // Start connects to the running btcd node over websockets, registers for block
// notifications, and finally launches all related helper goroutines. // notifications, and finally launches all related helper goroutines.
func (b *BtcdNotifier) Start() error { func (b *BtcdNotifier) Start() error {
// Already started? var startErr error
if atomic.AddInt32(&b.started, 1) != 1 { b.start.Do(func() {
startErr = b.startNotifier()
})
return startErr
}
// Stop shutsdown the BtcdNotifier.
func (b *BtcdNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil return nil
} }
// Shutdown the rpc client, this gracefully disconnects from btcd, and
// cleans up all related resources.
b.chainConn.Shutdown()
close(b.quit)
b.wg.Wait()
b.chainUpdates.Stop()
b.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil
}
func (b *BtcdNotifier) startNotifier() error {
// Start our concurrent queues before starting the chain connection, to // Start our concurrent queues before starting the chain connection, to
// ensure onBlockConnected and onRedeemingTx callbacks won't be // ensure onBlockConnected and onRedeemingTx callbacks won't be
// blocked. // blocked.
@ -182,36 +215,6 @@ func (b *BtcdNotifier) Start() error {
return nil return nil
} }
// Stop shutsdown the BtcdNotifier.
func (b *BtcdNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil
}
// Shutdown the rpc client, this gracefully disconnects from btcd, and
// cleans up all related resources.
b.chainConn.Shutdown()
close(b.quit)
b.wg.Wait()
b.chainUpdates.Stop()
b.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil
}
// onBlockConnected implements on OnBlockConnected callback for rpcclient. // onBlockConnected implements on OnBlockConnected callback for rpcclient.
// Ingesting a block updates the wallet's internal utxo state based on the // Ingesting a block updates the wallet's internal utxo state based on the
// outputs created and destroyed within each block. // outputs created and destroyed within each block.

@ -39,7 +39,7 @@ const (
type NeutrinoNotifier struct { type NeutrinoNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically. start sync.Once
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
bestBlockMtx sync.RWMutex bestBlockMtx sync.RWMutex
@ -111,11 +111,40 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
// Start contacts the running neutrino light client and kicks off an initial // Start contacts the running neutrino light client and kicks off an initial
// empty rescan. // empty rescan.
func (n *NeutrinoNotifier) Start() error { func (n *NeutrinoNotifier) Start() error {
// Already started? var startErr error
if atomic.AddInt32(&n.started, 1) != 1 { n.start.Do(func() {
startErr = n.startNotifier()
})
return startErr
}
// Stop shuts down the NeutrinoNotifier.
func (n *NeutrinoNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&n.stopped, 1) != 1 {
return nil return nil
} }
close(n.quit)
n.wg.Wait()
n.chainUpdates.Stop()
n.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range n.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
n.txNotifier.TearDown()
return nil
}
func (n *NeutrinoNotifier) startNotifier() error {
// Start our concurrent queues before starting the rescan, to ensure // Start our concurrent queues before starting the rescan, to ensure
// onFilteredBlockConnected and onRelavantTx callbacks won't be // onFilteredBlockConnected and onRelavantTx callbacks won't be
// blocked. // blocked.
@ -174,32 +203,6 @@ func (n *NeutrinoNotifier) Start() error {
return nil return nil
} }
// Stop shuts down the NeutrinoNotifier.
func (n *NeutrinoNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&n.stopped, 1) != 1 {
return nil
}
close(n.quit)
n.wg.Wait()
n.chainUpdates.Stop()
n.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range n.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
n.txNotifier.TearDown()
return nil
}
// filteredBlock represents a new block which has been connected to the main // filteredBlock represents a new block which has been connected to the main
// chain. The slice of transactions will only be populated if the block // chain. The slice of transactions will only be populated if the block
// includes a transaction that confirmed one of our watched txids, or spends // includes a transaction that confirmed one of our watched txids, or spends