diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index cdd1e778..c393300b 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -139,17 +139,24 @@ func (b *BtcdNotifier) Start() error { return nil } + // Start our concurrent queues before starting the chain connection, to + // ensure onBlockConnected and onRedeemingTx callbacks won't be + // blocked. + b.chainUpdates.Start() + b.txUpdates.Start() + // Connect to btcd, and register for notifications on connected, and // disconnected blocks. if err := b.chainConn.Connect(20); err != nil { - return err - } - if err := b.chainConn.NotifyBlocks(); err != nil { + b.txUpdates.Stop() + b.chainUpdates.Stop() return err } currentHash, currentHeight, err := b.chainConn.GetBestBlock() if err != nil { + b.txUpdates.Stop() + b.chainUpdates.Stop() return err } @@ -163,8 +170,11 @@ func (b *BtcdNotifier) Start() error { Hash: currentHash, } - b.chainUpdates.Start() - b.txUpdates.Start() + if err := b.chainConn.NotifyBlocks(); err != nil { + b.txUpdates.Stop() + b.chainUpdates.Stop() + return err + } b.wg.Add(1) go b.notificationDispatcher() @@ -208,10 +218,14 @@ func (b *BtcdNotifier) Stop() error { func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { // Append this new chain update to the end of the queue of new chain // updates. - b.chainUpdates.ChanIn() <- &chainUpdate{ + select { + case b.chainUpdates.ChanIn() <- &chainUpdate{ blockHash: hash, blockHeight: height, connect: true, + }: + case <-b.quit: + return } } @@ -236,10 +250,14 @@ type filteredBlock struct { func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) { // Append this new chain update to the end of the queue of new chain // updates. - b.chainUpdates.ChanIn() <- &chainUpdate{ + select { + case b.chainUpdates.ChanIn() <- &chainUpdate{ blockHash: hash, blockHeight: height, connect: false, + }: + case <-b.quit: + return } } @@ -247,7 +265,11 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { // Append this new transaction update to the end of the queue of new // chain updates. - b.txUpdates.ChanIn() <- &txUpdate{tx, details} + select { + case b.txUpdates.ChanIn() <- &txUpdate{tx, details}: + case <-b.quit: + return + } } // notificationDispatcher is the primary goroutine which handles client