From 0cac7e80d435209f9ee3e56584e01439b85227d7 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Thu, 9 Nov 2017 15:30:38 -0800 Subject: [PATCH] chainntnfs/btcd: Handle block disconnects with chainUpdate. This does not implement full handling of block disconnections, but ensures that all chain updates are processed in order. --- chainntnfs/btcdnotify/btcd.go | 117 ++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 49 deletions(-) diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 3b3f16b4..249e861f 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -35,6 +35,10 @@ var ( type chainUpdate struct { blockHash *chainhash.Hash blockHeight int32 + + // connected is true if this update is a new block and false if it is a + // disconnected block. + connect bool } // txUpdate encapsulates a transaction related notification sent from btcd to @@ -70,8 +74,6 @@ type BtcdNotifier struct { blockEpochClients map[uint64]*blockEpochRegistration - disconnectedBlockHashes chan *blockNtfn - chainUpdates *chainntnfs.ConcurrentQueue txUpdates *chainntnfs.ConcurrentQueue @@ -97,8 +99,6 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { confNotifications: make(map[chainhash.Hash][]*confirmationsNotification), confHeap: newConfirmationHeap(), - disconnectedBlockHashes: make(chan *blockNtfn, 20), - chainUpdates: chainntnfs.NewConcurrentQueue(10), txUpdates: chainntnfs.NewConcurrentQueue(10), @@ -192,24 +192,28 @@ func (b *BtcdNotifier) Stop() error { return nil } -// blockNtfn packages a notification of a connected/disconnected block along -// with its height at the time. -type blockNtfn struct { - sha *chainhash.Hash - height int32 -} - // onBlockConnected implements on OnBlockConnected callback for rpcclient. // Ingesting a block updates the wallet's internal utxo state based on the // outputs created and destroyed within each block. func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { // Append this new chain update to the end of the queue of new chain // updates. - b.chainUpdates.ChanIn() <- &chainUpdate{hash, height} + b.chainUpdates.ChanIn() <- &chainUpdate{ + blockHash: hash, + blockHeight: height, + connect: true, + } } // onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t time.Time) { + // Append this new chain update to the end of the queue of new chain + // updates. + b.chainUpdates.ChanIn() <- &chainUpdate{ + blockHash: hash, + blockHeight: height, + connect: false, + } } // onRedeemingTx implements on OnRedeemingTx callback for rpcclient. @@ -289,48 +293,63 @@ out: b.blockEpochClients[msg.epochID] = msg } - case staleBlockHash := <-b.disconnectedBlockHashes: - // TODO(roasbeef): re-orgs - // * second channel to notify of confirmation decrementing - // re-org? - // * notify of negative confirmations - chainntnfs.Log.Warnf("Block disconnected from main "+ - "chain: %v", staleBlockHash) - case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) - currentHeight = update.blockHeight + if update.connect { + if update.blockHeight != currentHeight+1 { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, new height=%d", + currentHeight, update.blockHeight) + } - newBlock, err := b.chainConn.GetBlock(update.blockHash) - if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) - continue + currentHeight = update.blockHeight + + newBlock, err := b.chainConn.GetBlock(update.blockHash) + if err != nil { + chainntnfs.Log.Errorf("Unable to get block: %v", err) + continue + } + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + update.blockHeight, update.blockHash) + + b.notifyBlockEpochs(update.blockHeight, + update.blockHash) + + newHeight := update.blockHeight + for i, tx := range newBlock.Transactions { + // Check if the inclusion of this transaction + // within a block by itself triggers a block + // confirmation threshold, if so send a + // notification. Otherwise, place the + // notification on a heap to be triggered in + // the future once additional confirmations are + // attained. + txSha := tx.TxHash() + b.checkConfirmationTrigger(&txSha, update, i) + } + + // A new block has been connected to the main + // chain. Send out any N confirmation notifications + // which may have been triggered by this new block. + b.notifyConfs(newHeight) + } else { + if update.blockHeight != currentHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + currentHeight, update.blockHeight) + } + + currentHeight = update.blockHeight - 1 + + // TODO(roasbeef): re-orgs + // * second channel to notify of confirmation decrementing + // re-org? + // * notify of negative confirmations + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.blockHeight, update.blockHash) } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.blockHeight, update.blockHash) - - b.notifyBlockEpochs(update.blockHeight, - update.blockHash) - - newHeight := update.blockHeight - for i, tx := range newBlock.Transactions { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - txSha := tx.TxHash() - b.checkConfirmationTrigger(&txSha, update, i) - } - - // A new block has been connected to the main - // chain. Send out any N confirmation notifications - // which may have been triggered by this new block. - b.notifyConfs(newHeight) - case item := <-b.txUpdates.ChanOut(): newSpend := item.(*txUpdate) spendingTx := newSpend.tx