diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index adad07dd..21140994 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -299,30 +299,53 @@ out: case ntfn := <-b.chainConn.Notifications(): switch item := ntfn.(type) { case chain.BlockConnected: - if item.Height != b.bestBlock.Height+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - bestHeight, item.Height) + blockHeader, err := + b.chainConn.GetBlockHeader(&item.Hash) + if err != nil { + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - rawBlock, err := b.chainConn.GetBlock(&item.Hash) - if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) - continue + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend. + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, item.Height, + true, + ) + + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } + + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - item.Height, item.Hash) - - b.notifyBlockEpochs(item.Height, &item.Hash) - - txns := btcutil.NewBlock(rawBlock).Transactions() - err = b.txConfNotifier.ConnectTip(&item.Hash, - uint32(item.Height), txns) - if err != nil { + newBlock := chainntnfs.BlockEpoch{ + Height: item.Height, + Hash: &item.Hash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } + continue case chain.BlockDisconnected: diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 177d9880..c7cb6995 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -366,31 +366,50 @@ out: case item := <-b.chainUpdates.ChanOut(): update := item.(*chainUpdate) if update.connect { - if update.blockHeight != b.bestBlock.Height+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - currentHeight, update.blockHeight) - continue - } - - rawBlock, err := b.chainConn.GetBlock(update.blockHash) + blockHeader, err := + b.chainConn.GetBlockHeader(update.blockHash) if err != nil { - chainntnfs.Log.Errorf("Unable to get block: %v", err) + chainntnfs.Log.Errorf("Unable to fetch "+ + "block header: %v", err) continue } - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.blockHeight, update.blockHash) + if blockHeader.PrevBlock != *b.bestBlock.Hash { + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + newBestBlock, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + b.chainConn, + b.txConfNotifier, + b.bestBlock, + update.blockHeight, + true, + ) + if err != nil { + // Set the bestBlock here in case + // a catch up partially completed. + b.bestBlock = newBestBlock + chainntnfs.Log.Error(err) + continue + } - txns := btcutil.NewBlock(rawBlock).Transactions() - - block := &filteredBlock{ - hash: *update.blockHash, - height: uint32(update.blockHeight), - txns: txns, - connect: true, + for _, block := range missedBlocks { + err := b.handleBlockConnected(block) + if err != nil { + chainntnfs.Log.Error(err) + continue out + } + } } - if err := b.handleBlockConnected(block); err != nil { + + newBlock := chainntnfs.BlockEpoch{ + Height: update.blockHeight, + Hash: update.blockHash, + } + if err := b.handleBlockConnected(newBlock); err != nil { chainntnfs.Log.Error(err) } continue diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index c46125c1..7c7a2ec6 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -378,6 +378,61 @@ func RewindChain(chainConn ChainConn, txConfNotifier *TxConfNotifier, return newBestBlock, nil } +// HandleMissedBlocks is called when the chain backend for a notifier misses a +// series of blocks, handling a reorg if necessary. Its backendStoresReorgs +// parameter tells it whether or not the notifier's chainConn stores +// information about blocks that have been reorged out of the chain, which allows +// HandleMissedBlocks to check whether the notifier's best block has been +// reorged out, and rewind the chain accordingly. It returns the best block for +// the notifier and a slice of the missed blocks. The new best block needs to be +// returned in case a chain rewind occurs and partially completes before +// erroring. In the case where there is no rewind, the notifier's +// current best block is returned. +func HandleMissedBlocks(chainConn ChainConn, txConfNotifier *TxConfNotifier, + currBestBlock BlockEpoch, newHeight int32, + backendStoresReorgs bool) (BlockEpoch, []BlockEpoch, error) { + + startingHeight := currBestBlock.Height + + if backendStoresReorgs { + // If a reorg causes our best hash to be incorrect, rewind the + // chain so our best block is set to the closest common + // ancestor, then dispatch notifications from there. + hashAtBestHeight, err := + chainConn.GetBlockHash(int64(currBestBlock.Height)) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "blockhash for height=%d: %v", + currBestBlock.Height, err) + } + + startingHeight, err = GetCommonBlockAncestorHeight( + chainConn, *currBestBlock.Hash, *hashAtBestHeight, + ) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to find "+ + "common ancestor: %v", err) + } + + currBestBlock, err = RewindChain(chainConn, txConfNotifier, + currBestBlock, startingHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to "+ + "rewind chain: %v", err) + } + } + + // We want to start dispatching historical notifications from the block + // right after our best block, to avoid a redundant notification. + missedBlocks, err := getMissedBlocks(chainConn, startingHeight+1, newHeight) + if err != nil { + return currBestBlock, nil, fmt.Errorf("unable to get missed "+ + "blocks: %v", err) + } + + return currBestBlock, missedBlocks, nil +} + // getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight) // fetched from the chain. func getMissedBlocks(chainConn ChainConn, startingHeight, @@ -393,8 +448,8 @@ func getMissedBlocks(chainConn ChainConn, startingHeight, for height := startingHeight; height < endingHeight; height++ { hash, err := chainConn.GetBlockHash(int64(height)) if err != nil { - return nil, fmt.Errorf("unable to find blockhash for height=%d: %v", - height, err) + return nil, fmt.Errorf("unable to find blockhash for "+ + "height=%d: %v", height, err) } missedBlocks = append(missedBlocks, BlockEpoch{Hash: hash, Height: height}) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 54459bca..07e297a6 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -245,7 +245,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // notification registrations, as well as notification dispatches. func (n *NeutrinoNotifier) notificationDispatcher() { defer n.wg.Done() - +out: for { select { case cancelMsg := <-n.notificationCancels: @@ -396,24 +396,61 @@ func (n *NeutrinoNotifier) notificationDispatcher() { update := item.(*filteredBlock) if update.connect { n.heightMtx.Lock() + // Since neutrino has no way of knowing what + // height to rewind to in the case of a reorged + // best known height, there is no point in + // checking that the previous hash matches the + // the hash from our best known height the way + // the other notifiers do when they receive + // a new connected block. Therefore, we just + // compare the heights. if update.height != n.bestHeight+1 { - chainntnfs.Log.Warnf("Received blocks out of order: "+ - "current height=%d, new height=%d", - n.bestHeight, update.height) - n.heightMtx.Unlock() - continue + // Handle the case where the notifier + // missed some blocks from its chain + // backend + chainntnfs.Log.Infof("Missed blocks, " + + "attempting to catch up") + bestBlock := chainntnfs.BlockEpoch{ + Height: int32(n.bestHeight), + Hash: nil, + } + _, missedBlocks, err := + chainntnfs.HandleMissedBlocks( + n.chainConn, + n.txConfNotifier, + bestBlock, + int32(update.height), + false, + ) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue + } + + for _, block := range missedBlocks { + filteredBlock, err := + n.getFilteredBlock(block) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + err = n.handleBlockConnected(filteredBlock) + if err != nil { + chainntnfs.Log.Error(err) + n.heightMtx.Unlock() + continue out + } + } + } - n.bestHeight = update.height - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - update.height, update.hash) - err := n.handleBlockConnected(update) if err != nil { chainntnfs.Log.Error(err) } + n.heightMtx.Unlock() continue } @@ -429,7 +466,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() { if err != nil { chainntnfs.Log.Errorf("Unable to fetch header"+ "for height %d: %v", n.bestHeight, err) + n.heightMtx.Unlock() + continue } + hash := header.BlockHash() notifierBestBlock := chainntnfs.BlockEpoch{ Height: int32(n.bestHeight),