chainntnfs/btcd: Handle block disconnects with chainUpdate.

This does not implement full handling of block disconnections, but
ensures that all chain updates are processed in order.
This commit is contained in:
Jim Posen 2017-11-09 15:30:38 -08:00 committed by Olaoluwa Osuntokun
parent 28c6a988ca
commit 0cac7e80d4

@ -35,6 +35,10 @@ var (
type chainUpdate struct { type chainUpdate struct {
blockHash *chainhash.Hash blockHash *chainhash.Hash
blockHeight int32 blockHeight int32
// connected is true if this update is a new block and false if it is a
// disconnected block.
connect bool
} }
// txUpdate encapsulates a transaction related notification sent from btcd to // txUpdate encapsulates a transaction related notification sent from btcd to
@ -70,8 +74,6 @@ type BtcdNotifier struct {
blockEpochClients map[uint64]*blockEpochRegistration blockEpochClients map[uint64]*blockEpochRegistration
disconnectedBlockHashes chan *blockNtfn
chainUpdates *chainntnfs.ConcurrentQueue chainUpdates *chainntnfs.ConcurrentQueue
txUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue
@ -97,8 +99,6 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) {
confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), confNotifications: make(map[chainhash.Hash][]*confirmationsNotification),
confHeap: newConfirmationHeap(), confHeap: newConfirmationHeap(),
disconnectedBlockHashes: make(chan *blockNtfn, 20),
chainUpdates: chainntnfs.NewConcurrentQueue(10), chainUpdates: chainntnfs.NewConcurrentQueue(10),
txUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10),
@ -192,24 +192,28 @@ func (b *BtcdNotifier) Stop() error {
return nil return nil
} }
// blockNtfn packages a notification of a connected/disconnected block along
// with its height at the time.
type blockNtfn struct {
sha *chainhash.Hash
height int32
}
// onBlockConnected implements on OnBlockConnected callback for rpcclient. // onBlockConnected implements on OnBlockConnected callback for rpcclient.
// Ingesting a block updates the wallet's internal utxo state based on the // Ingesting a block updates the wallet's internal utxo state based on the
// outputs created and destroyed within each block. // outputs created and destroyed within each block.
func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// updates. // updates.
b.chainUpdates.ChanIn() <- &chainUpdate{hash, height} b.chainUpdates.ChanIn() <- &chainUpdate{
blockHash: hash,
blockHeight: height,
connect: true,
}
} }
// onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. // onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient.
func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) { func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) {
// Append this new chain update to the end of the queue of new chain
// updates.
b.chainUpdates.ChanIn() <- &chainUpdate{
blockHash: hash,
blockHeight: height,
connect: false,
}
} }
// onRedeemingTx implements on OnRedeemingTx callback for rpcclient. // onRedeemingTx implements on OnRedeemingTx callback for rpcclient.
@ -289,48 +293,63 @@ out:
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
} }
case staleBlockHash := <-b.disconnectedBlockHashes:
// TODO(roasbeef): re-orgs
// * second channel to notify of confirmation decrementing
// re-org?
// * notify of negative confirmations
chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlockHash)
case item := <-b.chainUpdates.ChanOut(): case item := <-b.chainUpdates.ChanOut():
update := item.(*chainUpdate) update := item.(*chainUpdate)
currentHeight = update.blockHeight if update.connect {
if update.blockHeight != currentHeight+1 {
chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, new height=%d",
currentHeight, update.blockHeight)
}
newBlock, err := b.chainConn.GetBlock(update.blockHash) currentHeight = update.blockHeight
if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err) newBlock, err := b.chainConn.GetBlock(update.blockHash)
continue if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err)
continue
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
update.blockHeight, update.blockHash)
b.notifyBlockEpochs(update.blockHeight,
update.blockHash)
newHeight := update.blockHeight
for i, tx := range newBlock.Transactions {
// Check if the inclusion of this transaction
// within a block by itself triggers a block
// confirmation threshold, if so send a
// notification. Otherwise, place the
// notification on a heap to be triggered in
// the future once additional confirmations are
// attained.
txSha := tx.TxHash()
b.checkConfirmationTrigger(&txSha, update, i)
}
// A new block has been connected to the main
// chain. Send out any N confirmation notifications
// which may have been triggered by this new block.
b.notifyConfs(newHeight)
} else {
if update.blockHeight != currentHeight {
chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, disconnected height=%d",
currentHeight, update.blockHeight)
}
currentHeight = update.blockHeight - 1
// TODO(roasbeef): re-orgs
// * second channel to notify of confirmation decrementing
// re-org?
// * notify of negative confirmations
chainntnfs.Log.Infof("Block disconnected from main chain: "+
"height=%v, sha=%v", update.blockHeight, update.blockHash)
} }
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
update.blockHeight, update.blockHash)
b.notifyBlockEpochs(update.blockHeight,
update.blockHash)
newHeight := update.blockHeight
for i, tx := range newBlock.Transactions {
// Check if the inclusion of this transaction
// within a block by itself triggers a block
// confirmation threshold, if so send a
// notification. Otherwise, place the
// notification on a heap to be triggered in
// the future once additional confirmations are
// attained.
txSha := tx.TxHash()
b.checkConfirmationTrigger(&txSha, update, i)
}
// A new block has been connected to the main
// chain. Send out any N confirmation notifications
// which may have been triggered by this new block.
b.notifyConfs(newHeight)
case item := <-b.txUpdates.ChanOut(): case item := <-b.txUpdates.ChanOut():
newSpend := item.(*txUpdate) newSpend := item.(*txUpdate)
spendingTx := newSpend.tx spendingTx := newSpend.tx