From 79cbea1c9cd6402a12de5669e6ce5ed68e14c0ed Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 9 Aug 2018 00:05:30 -0700 Subject: [PATCH] chainntnfs: enable notifiers to catch up on missed blocks This resolves the situation where a notifier's chain backend skips a series of blocks, causing the notifier to need to dispatch historical block notifications to clients. Additionally, if the current notifier's best block has been reorged out, this logic enables the notifier to rewind to the common ancestor between the current chain and the outdated best block and dispatches notifications from the ancestor. --- chainntnfs/bitcoindnotify/bitcoind.go | 57 +++++++++++++++++------- chainntnfs/btcdnotify/btcd.go | 57 ++++++++++++++++-------- chainntnfs/interface.go | 59 +++++++++++++++++++++++- chainntnfs/neutrinonotify/neutrino.go | 64 ++++++++++++++++++++++----- 4 files changed, 187 insertions(+), 50 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index adad07dd..21140994 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -299,30 +299,53 @@ out: case ntfn := <-b.chainConn.Notifications(): switch item := ntfn.(type) { case chain.BlockConnected: - if item.Height != b.bestBlock.Height+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - bestHeight, item.Height) + blockHeader, err := + b.chainConn.GetBlockHeader(&item.Hash) + if err != nil { + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - rawBlock, err := b.chainConn.GetBlock(&item.Hash) - if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) - continue + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend. + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, item.Height, + true, + ) + + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } + + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - item.Height, item.Hash) - - b.notifyBlockEpochs(item.Height, &item.Hash) - - txns := btcutil.NewBlock(rawBlock).Transactions() - err = b.txConfNotifier.ConnectTip(&item.Hash, - uint32(item.Height), txns) - if err != nil { + newBlock := chainntnfs.BlockEpoch{ + Height: item.Height, + Hash: &item.Hash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } + continue case chain.BlockDisconnected: diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 177d9880..c7cb6995 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -366,31 +366,50 @@ out: case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) if update.connect { - if update.blockHeight != b.bestBlock.Height+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - currentHeight, update.blockHeight) - continue - } - - rawBlock, err := b.chainConn.GetBlock(update.blockHash) + blockHeader, err := + b.chainConn.GetBlockHeader(update.blockHash) if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.blockHeight, update.blockHash) + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, + update.blockHeight, + true, + ) + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } - txns := btcutil.NewBlock(rawBlock).Transactions() - - block := &filteredBlock{ - hash: *update.blockHash, - height: uint32(update.blockHeight), - txns: txns, - connect: true, + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } } - if err := b.handleBlockConnected(block); err != nil { + + newBlock := chainntnfs.BlockEpoch{ + Height: update.blockHeight, + Hash: update.blockHash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } continue diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index c46125c1..7c7a2ec6 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -378,6 +378,61 @@ func RewindChain(chainConn ChainConn, txConfNotifier *TxConfNotifier, return newBestBlock, nil } +// HandleMissedBlocks is called when the chain backend for a notifier misses a +// series of blocks, handling a reorg if necessary. Its backendStoresReorgs +// parameter tells it whether or not the notifier's chainConn stores +// information about blocks that have been reorged out of the chain, which allows +// HandleMissedBlocks to check whether the notifier's best block has been +// reorged out, and rewind the chain accordingly. It returns the best block for +// the notifier and a slice of the missed blocks. The new best block needs to be +// returned in case a chain rewind occurs and partially completes before +// erroring. In the case where there is no rewind, the notifier's +// current best block is returned. +func HandleMissedBlocks(chainConn ChainConn, txConfNotifier *TxConfNotifier, + currBestBlock BlockEpoch, newHeight int32, + backendStoresReorgs bool) (BlockEpoch, []BlockEpoch, error) { + + startingHeight := currBestBlock.Height + + if backendStoresReorgs { + // If a reorg causes our best hash to be incorrect, rewind the + // chain so our best block is set to the closest common + // ancestor, then dispatch notifications from there. + hashAtBestHeight, err := + chainConn.GetBlockHash(int64(currBestBlock.Height)) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "blockhash for height=%d: %v", + currBestBlock.Height, err) + } + + startingHeight, err = GetCommonBlockAncestorHeight( + chainConn, *currBestBlock.Hash, *hashAtBestHeight, + ) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "common ancestor: %v", err) + } + + currBestBlock, err = RewindChain(chainConn, txConfNotifier, + currBestBlock, startingHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to "+ + "rewind chain: %v", err) + } + } + + // We want to start dispatching historical notifications from the block + // right after our best block, to avoid a redundant notification. + missedBlocks, err := getMissedBlocks(chainConn, startingHeight+1, newHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to get missed "+ + "blocks: %v", err) + } + + return currBestBlock, missedBlocks, nil +} + // getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight) // fetched from the chain. func getMissedBlocks(chainConn ChainConn, startingHeight, @@ -393,8 +448,8 @@ func getMissedBlocks(chainConn ChainConn, startingHeight, for height := startingHeight; height < endingHeight; height++ { hash, err := chainConn.GetBlockHash(int64(height)) if err != nil { - return nil, fmt.Errorf("unable to find blockhash for height=%d: %v", - height, err) + return nil, fmt.Errorf("unable to find blockhash for "+ + "height=%d: %v", height, err) } missedBlocks = append(missedBlocks, BlockEpoch{Hash: hash, Height: height}) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 54459bca..07e297a6 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -245,7 +245,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // notification registrations, as well as notification dispatches. func (n *NeutrinoNotifier) notificationDispatcher() { defer n.wg.Done() - +out: for { select { case cancelMsg := <-n.notificationCancels: @@ -396,24 +396,61 @@ func (n *NeutrinoNotifier) notificationDispatcher() { update := item.(*filteredBlock) if update.connect { n.heightMtx.Lock() + // Since neutrino has no way of knowing what + // height to rewind to in the case of a reorged + // best known height, there is no point in + // checking that the previous hash matches the + // the hash from our best known height the way + // the other notifiers do when they receive + // a new connected block. Therefore, we just + // compare the heights. if update.height != n.bestHeight+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - n.bestHeight, update.height) - n.heightMtx.Unlock() - continue + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + bestBlock := chainntnfs.BlockEpoch{ + Height: int32(n.bestHeight), + Hash: nil, + } + _, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + n.chainConn, + n.txConfNotifier, + bestBlock, + int32(update.height), + false, + ) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue + } + + for _, block := range missedBlocks { + filteredBlock, err := + n.getFilteredBlock(block) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + err = n.handleBlockConnected(filteredBlock) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + } + } - n.bestHeight = update.height - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.height, update.hash) - err := n.handleBlockConnected(update) if err != nil { chainntnfs.Log.Error(err) } + n.heightMtx.Unlock() continue } @@ -429,7 +466,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() { if err != nil { chainntnfs.Log.Errorf("Unable to fetch header"+ "for height %d: %v", n.bestHeight, err) + n.heightMtx.Unlock() + continue } + hash := header.BlockHash() notifierBestBlock := chainntnfs.BlockEpoch{ Height: int32(n.bestHeight),