chainntnfs/btcd: start concurrent queues prior to connection
We would establish the connection to the chain backend and start getting block notifications before we had started the concurrent queues, which would lead to the OnBlockConnected call being blocked, and a deadlock (since GetBestBlock would never return). Instead we make sure to start the queues before establishing the connection, consuming the notifications right away.
This commit is contained in:
parent
35027e52fc
commit
6b6beb4d7d
@ -139,17 +139,24 @@ func (b *BtcdNotifier) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start our concurrent queues before starting the chain connection, to
|
||||||
|
// ensure onBlockConnected and onRedeemingTx callbacks won't be
|
||||||
|
// blocked.
|
||||||
|
b.chainUpdates.Start()
|
||||||
|
b.txUpdates.Start()
|
||||||
|
|
||||||
// Connect to btcd, and register for notifications on connected, and
|
// Connect to btcd, and register for notifications on connected, and
|
||||||
// disconnected blocks.
|
// disconnected blocks.
|
||||||
if err := b.chainConn.Connect(20); err != nil {
|
if err := b.chainConn.Connect(20); err != nil {
|
||||||
return err
|
b.txUpdates.Stop()
|
||||||
}
|
b.chainUpdates.Stop()
|
||||||
if err := b.chainConn.NotifyBlocks(); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
currentHash, currentHeight, err := b.chainConn.GetBestBlock()
|
currentHash, currentHeight, err := b.chainConn.GetBestBlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.txUpdates.Stop()
|
||||||
|
b.chainUpdates.Stop()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,8 +170,11 @@ func (b *BtcdNotifier) Start() error {
|
|||||||
Hash: currentHash,
|
Hash: currentHash,
|
||||||
}
|
}
|
||||||
|
|
||||||
b.chainUpdates.Start()
|
if err := b.chainConn.NotifyBlocks(); err != nil {
|
||||||
b.txUpdates.Start()
|
b.txUpdates.Stop()
|
||||||
|
b.chainUpdates.Stop()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
b.wg.Add(1)
|
b.wg.Add(1)
|
||||||
go b.notificationDispatcher()
|
go b.notificationDispatcher()
|
||||||
@ -208,10 +218,14 @@ func (b *BtcdNotifier) Stop() error {
|
|||||||
func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) {
|
func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) {
|
||||||
// Append this new chain update to the end of the queue of new chain
|
// Append this new chain update to the end of the queue of new chain
|
||||||
// updates.
|
// updates.
|
||||||
b.chainUpdates.ChanIn() <- &chainUpdate{
|
select {
|
||||||
|
case b.chainUpdates.ChanIn() <- &chainUpdate{
|
||||||
blockHash: hash,
|
blockHash: hash,
|
||||||
blockHeight: height,
|
blockHeight: height,
|
||||||
connect: true,
|
connect: true,
|
||||||
|
}:
|
||||||
|
case <-b.quit:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,10 +250,14 @@ type filteredBlock struct {
|
|||||||
func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) {
|
func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) {
|
||||||
// Append this new chain update to the end of the queue of new chain
|
// Append this new chain update to the end of the queue of new chain
|
||||||
// updates.
|
// updates.
|
||||||
b.chainUpdates.ChanIn() <- &chainUpdate{
|
select {
|
||||||
|
case b.chainUpdates.ChanIn() <- &chainUpdate{
|
||||||
blockHash: hash,
|
blockHash: hash,
|
||||||
blockHeight: height,
|
blockHeight: height,
|
||||||
connect: false,
|
connect: false,
|
||||||
|
}:
|
||||||
|
case <-b.quit:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -247,7 +265,11 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t
|
|||||||
func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
|
func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
|
||||||
// Append this new transaction update to the end of the queue of new
|
// Append this new transaction update to the end of the queue of new
|
||||||
// chain updates.
|
// chain updates.
|
||||||
b.txUpdates.ChanIn() <- &txUpdate{tx, details}
|
select {
|
||||||
|
case b.txUpdates.ChanIn() <- &txUpdate{tx, details}:
|
||||||
|
case <-b.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// notificationDispatcher is the primary goroutine which handles client
|
// notificationDispatcher is the primary goroutine which handles client
|
||||||
|
Loading…
Reference in New Issue
Block a user