chainntnfs: enable notifiers to catch up on missed blocks
This resolves the situation where a notifier's chain backend skips a series of blocks, causing the notifier to need to dispatch historical block notifications to clients. Additionally, if the current notifier's best block has been reorged out, this logic enables the notifier to rewind to the common ancestor between the current chain and the outdated best block and dispatches notifications from the ancestor.
This commit is contained in:
parent
3df5b26699
commit
79cbea1c9c
@ -299,30 +299,53 @@ out:
|
|||||||
case ntfn := <-b.chainConn.Notifications():
|
case ntfn := <-b.chainConn.Notifications():
|
||||||
switch item := ntfn.(type) {
|
switch item := ntfn.(type) {
|
||||||
case chain.BlockConnected:
|
case chain.BlockConnected:
|
||||||
if item.Height != b.bestBlock.Height+1 {
|
blockHeader, err :=
|
||||||
chainntnfs.Log.Warnf("Received blocks out of order: "+
|
b.chainConn.GetBlockHeader(&item.Hash)
|
||||||
"current height=%d, new height=%d",
|
|
||||||
bestHeight, item.Height)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
rawBlock, err := b.chainConn.GetBlock(&item.Hash)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
chainntnfs.Log.Errorf("Unable to get block: %v", err)
|
chainntnfs.Log.Errorf("Unable to fetch "+
|
||||||
|
"block header: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
|
if blockHeader.PrevBlock != *b.bestBlock.Hash {
|
||||||
item.Height, item.Hash)
|
// Handle the case where the notifier
|
||||||
|
// missed some blocks from its chain
|
||||||
|
// backend.
|
||||||
|
chainntnfs.Log.Infof("Missed blocks, " +
|
||||||
|
"attempting to catch up")
|
||||||
|
newBestBlock, missedBlocks, err :=
|
||||||
|
chainntnfs.HandleMissedBlocks(
|
||||||
|
b.chainConn,
|
||||||
|
b.txConfNotifier,
|
||||||
|
b.bestBlock, item.Height,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
|
||||||
b.notifyBlockEpochs(item.Height, &item.Hash)
|
if err != nil {
|
||||||
|
// Set the bestBlock here in case
|
||||||
|
// a catch up partially completed.
|
||||||
|
b.bestBlock = newBestBlock
|
||||||
|
chainntnfs.Log.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
txns := btcutil.NewBlock(rawBlock).Transactions()
|
for _, block := range missedBlocks {
|
||||||
err = b.txConfNotifier.ConnectTip(&item.Hash,
|
err := b.handleBlockConnected(block)
|
||||||
uint32(item.Height), txns)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
chainntnfs.Log.Error(err)
|
chainntnfs.Log.Error(err)
|
||||||
|
continue out
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newBlock := chainntnfs.BlockEpoch{
|
||||||
|
Height: item.Height,
|
||||||
|
Hash: &item.Hash,
|
||||||
|
}
|
||||||
|
if err := b.handleBlockConnected(newBlock); err != nil {
|
||||||
|
chainntnfs.Log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case chain.BlockDisconnected:
|
case chain.BlockDisconnected:
|
||||||
|
@ -366,31 +366,50 @@ out:
|
|||||||
case item := <-b.chainUpdates.ChanOut():
|
case item := <-b.chainUpdates.ChanOut():
|
||||||
update := item.(*chainUpdate)
|
update := item.(*chainUpdate)
|
||||||
if update.connect {
|
if update.connect {
|
||||||
if update.blockHeight != b.bestBlock.Height+1 {
|
blockHeader, err :=
|
||||||
chainntnfs.Log.Warnf("Received blocks out of order: "+
|
b.chainConn.GetBlockHeader(update.blockHash)
|
||||||
"current height=%d, new height=%d",
|
|
||||||
currentHeight, update.blockHeight)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
rawBlock, err := b.chainConn.GetBlock(update.blockHash)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
chainntnfs.Log.Errorf("Unable to get block: %v", err)
|
chainntnfs.Log.Errorf("Unable to fetch "+
|
||||||
|
"block header: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
|
if blockHeader.PrevBlock != *b.bestBlock.Hash {
|
||||||
update.blockHeight, update.blockHash)
|
// Handle the case where the notifier
|
||||||
|
// missed some blocks from its chain
|
||||||
txns := btcutil.NewBlock(rawBlock).Transactions()
|
// backend
|
||||||
|
chainntnfs.Log.Infof("Missed blocks, " +
|
||||||
block := &filteredBlock{
|
"attempting to catch up")
|
||||||
hash: *update.blockHash,
|
newBestBlock, missedBlocks, err :=
|
||||||
height: uint32(update.blockHeight),
|
chainntnfs.HandleMissedBlocks(
|
||||||
txns: txns,
|
b.chainConn,
|
||||||
connect: true,
|
b.txConfNotifier,
|
||||||
|
b.bestBlock,
|
||||||
|
update.blockHeight,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
// Set the bestBlock here in case
|
||||||
|
// a catch up partially completed.
|
||||||
|
b.bestBlock = newBestBlock
|
||||||
|
chainntnfs.Log.Error(err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if err := b.handleBlockConnected(block); err != nil {
|
|
||||||
|
for _, block := range missedBlocks {
|
||||||
|
err := b.handleBlockConnected(block)
|
||||||
|
if err != nil {
|
||||||
|
chainntnfs.Log.Error(err)
|
||||||
|
continue out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
newBlock := chainntnfs.BlockEpoch{
|
||||||
|
Height: update.blockHeight,
|
||||||
|
Hash: update.blockHash,
|
||||||
|
}
|
||||||
|
if err := b.handleBlockConnected(newBlock); err != nil {
|
||||||
chainntnfs.Log.Error(err)
|
chainntnfs.Log.Error(err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
@ -378,6 +378,61 @@ func RewindChain(chainConn ChainConn, txConfNotifier *TxConfNotifier,
|
|||||||
return newBestBlock, nil
|
return newBestBlock, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HandleMissedBlocks is called when the chain backend for a notifier misses a
|
||||||
|
// series of blocks, handling a reorg if necessary. Its backendStoresReorgs
|
||||||
|
// parameter tells it whether or not the notifier's chainConn stores
|
||||||
|
// information about blocks that have been reorged out of the chain, which allows
|
||||||
|
// HandleMissedBlocks to check whether the notifier's best block has been
|
||||||
|
// reorged out, and rewind the chain accordingly. It returns the best block for
|
||||||
|
// the notifier and a slice of the missed blocks. The new best block needs to be
|
||||||
|
// returned in case a chain rewind occurs and partially completes before
|
||||||
|
// erroring. In the case where there is no rewind, the notifier's
|
||||||
|
// current best block is returned.
|
||||||
|
func HandleMissedBlocks(chainConn ChainConn, txConfNotifier *TxConfNotifier,
|
||||||
|
currBestBlock BlockEpoch, newHeight int32,
|
||||||
|
backendStoresReorgs bool) (BlockEpoch, []BlockEpoch, error) {
|
||||||
|
|
||||||
|
startingHeight := currBestBlock.Height
|
||||||
|
|
||||||
|
if backendStoresReorgs {
|
||||||
|
// If a reorg causes our best hash to be incorrect, rewind the
|
||||||
|
// chain so our best block is set to the closest common
|
||||||
|
// ancestor, then dispatch notifications from there.
|
||||||
|
hashAtBestHeight, err :=
|
||||||
|
chainConn.GetBlockHash(int64(currBestBlock.Height))
|
||||||
|
if err != nil {
|
||||||
|
return currBestBlock, nil, fmt.Errorf("unable to find "+
|
||||||
|
"blockhash for height=%d: %v",
|
||||||
|
currBestBlock.Height, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
startingHeight, err = GetCommonBlockAncestorHeight(
|
||||||
|
chainConn, *currBestBlock.Hash, *hashAtBestHeight,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return currBestBlock, nil, fmt.Errorf("unable to find "+
|
||||||
|
"common ancestor: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
currBestBlock, err = RewindChain(chainConn, txConfNotifier,
|
||||||
|
currBestBlock, startingHeight)
|
||||||
|
if err != nil {
|
||||||
|
return currBestBlock, nil, fmt.Errorf("unable to "+
|
||||||
|
"rewind chain: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We want to start dispatching historical notifications from the block
|
||||||
|
// right after our best block, to avoid a redundant notification.
|
||||||
|
missedBlocks, err := getMissedBlocks(chainConn, startingHeight+1, newHeight)
|
||||||
|
if err != nil {
|
||||||
|
return currBestBlock, nil, fmt.Errorf("unable to get missed "+
|
||||||
|
"blocks: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return currBestBlock, missedBlocks, nil
|
||||||
|
}
|
||||||
|
|
||||||
// getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight)
|
// getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight)
|
||||||
// fetched from the chain.
|
// fetched from the chain.
|
||||||
func getMissedBlocks(chainConn ChainConn, startingHeight,
|
func getMissedBlocks(chainConn ChainConn, startingHeight,
|
||||||
@ -393,8 +448,8 @@ func getMissedBlocks(chainConn ChainConn, startingHeight,
|
|||||||
for height := startingHeight; height < endingHeight; height++ {
|
for height := startingHeight; height < endingHeight; height++ {
|
||||||
hash, err := chainConn.GetBlockHash(int64(height))
|
hash, err := chainConn.GetBlockHash(int64(height))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("unable to find blockhash for height=%d: %v",
|
return nil, fmt.Errorf("unable to find blockhash for "+
|
||||||
height, err)
|
"height=%d: %v", height, err)
|
||||||
}
|
}
|
||||||
missedBlocks = append(missedBlocks,
|
missedBlocks = append(missedBlocks,
|
||||||
BlockEpoch{Hash: hash, Height: height})
|
BlockEpoch{Hash: hash, Height: height})
|
||||||
|
@ -245,7 +245,7 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
|
|||||||
// notification registrations, as well as notification dispatches.
|
// notification registrations, as well as notification dispatches.
|
||||||
func (n *NeutrinoNotifier) notificationDispatcher() {
|
func (n *NeutrinoNotifier) notificationDispatcher() {
|
||||||
defer n.wg.Done()
|
defer n.wg.Done()
|
||||||
|
out:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case cancelMsg := <-n.notificationCancels:
|
case cancelMsg := <-n.notificationCancels:
|
||||||
@ -396,24 +396,61 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
|
|||||||
update := item.(*filteredBlock)
|
update := item.(*filteredBlock)
|
||||||
if update.connect {
|
if update.connect {
|
||||||
n.heightMtx.Lock()
|
n.heightMtx.Lock()
|
||||||
|
// Since neutrino has no way of knowing what
|
||||||
|
// height to rewind to in the case of a reorged
|
||||||
|
// best known height, there is no point in
|
||||||
|
// checking that the previous hash matches the
|
||||||
|
// the hash from our best known height the way
|
||||||
|
// the other notifiers do when they receive
|
||||||
|
// a new connected block. Therefore, we just
|
||||||
|
// compare the heights.
|
||||||
if update.height != n.bestHeight+1 {
|
if update.height != n.bestHeight+1 {
|
||||||
chainntnfs.Log.Warnf("Received blocks out of order: "+
|
// Handle the case where the notifier
|
||||||
"current height=%d, new height=%d",
|
// missed some blocks from its chain
|
||||||
n.bestHeight, update.height)
|
// backend
|
||||||
|
chainntnfs.Log.Infof("Missed blocks, " +
|
||||||
|
"attempting to catch up")
|
||||||
|
bestBlock := chainntnfs.BlockEpoch{
|
||||||
|
Height: int32(n.bestHeight),
|
||||||
|
Hash: nil,
|
||||||
|
}
|
||||||
|
_, missedBlocks, err :=
|
||||||
|
chainntnfs.HandleMissedBlocks(
|
||||||
|
n.chainConn,
|
||||||
|
n.txConfNotifier,
|
||||||
|
bestBlock,
|
||||||
|
int32(update.height),
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
chainntnfs.Log.Error(err)
|
||||||
n.heightMtx.Unlock()
|
n.heightMtx.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
n.bestHeight = update.height
|
for _, block := range missedBlocks {
|
||||||
|
filteredBlock, err :=
|
||||||
|
n.getFilteredBlock(block)
|
||||||
|
if err != nil {
|
||||||
|
chainntnfs.Log.Error(err)
|
||||||
n.heightMtx.Unlock()
|
n.heightMtx.Unlock()
|
||||||
|
continue out
|
||||||
|
}
|
||||||
|
err = n.handleBlockConnected(filteredBlock)
|
||||||
|
if err != nil {
|
||||||
|
chainntnfs.Log.Error(err)
|
||||||
|
n.heightMtx.Unlock()
|
||||||
|
continue out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
|
}
|
||||||
update.height, update.hash)
|
|
||||||
|
|
||||||
err := n.handleBlockConnected(update)
|
err := n.handleBlockConnected(update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
chainntnfs.Log.Error(err)
|
chainntnfs.Log.Error(err)
|
||||||
}
|
}
|
||||||
|
n.heightMtx.Unlock()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -429,7 +466,10 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
chainntnfs.Log.Errorf("Unable to fetch header"+
|
chainntnfs.Log.Errorf("Unable to fetch header"+
|
||||||
"for height %d: %v", n.bestHeight, err)
|
"for height %d: %v", n.bestHeight, err)
|
||||||
|
n.heightMtx.Unlock()
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := header.BlockHash()
|
hash := header.BlockHash()
|
||||||
notifierBestBlock := chainntnfs.BlockEpoch{
|
notifierBestBlock := chainntnfs.BlockEpoch{
|
||||||
Height: int32(n.bestHeight),
|
Height: int32(n.bestHeight),
|
||||||
|
Loading…
Reference in New Issue
Block a user