chainntnfs: dispatch historical block ntfns to clients

If a client passes in their best known block when registering for block notifications, check to see if it's behind our best block. If so, dispatch the missed block notifications to the client.

This is necessary because clients that persist their best known block can miss new blocks while registering for notifications.
This commit is contained in:
Valentine Wallace 2018-08-09 00:05:29 -07:00
parent 02ee5650c8
commit a5e1cf9c97
No known key found for this signature in database
GPG Key ID: B0E55E8D1776A58D
4 changed files with 200 additions and 33 deletions

@ -275,6 +275,22 @@ out:
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
if msg.bestBlock != nil {
missedBlocks, err :=
chainntnfs.GetClientMissedBlocks(
b.chainConn, msg.bestBlock,
b.bestBlock.Height, true,
)
if err != nil {
msg.errorChan <- err
continue
}
for _, block := range missedBlocks {
b.notifyBlockEpochClient(msg,
block.Height, block.Hash)
}
}
msg.errorChan <- nil
case chain.RelevantTx: case chain.RelevantTx:
b.handleRelevantTx(msg, b.bestBlock.Height) b.handleRelevantTx(msg, b.bestBlock.Height)
@ -525,21 +541,26 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash,
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
epoch := &chainntnfs.BlockEpoch{ epoch := &chainntnfs.BlockEpoch{
Height: newHeight, Height: height,
Hash: newSha, Hash: sha,
} }
for _, epochClient := range b.blockEpochClients {
select { select {
case epochClient.epochQueue.ChanIn() <- epoch: case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan: case <-epochClient.cancelChan:
case <-b.quit: case <-b.quit:
} }
}
} }
// spendNotification couples a target outpoint along with the channel used for // spendNotification couples a target outpoint along with the channel used for

@ -344,6 +344,23 @@ out:
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
if msg.bestBlock != nil {
missedBlocks, err :=
chainntnfs.GetClientMissedBlocks(
b.chainConn, msg.bestBlock,
b.bestBlock.Height, true,
)
if err != nil {
msg.errorChan <- err
continue
}
for _, block := range missedBlocks {
b.notifyBlockEpochClient(msg,
block.Height, block.Hash)
}
}
msg.errorChan <- nil
} }
case item := <-b.chainUpdates.ChanOut(): case item := <-b.chainUpdates.ChanOut():
@ -652,21 +669,26 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
for _, client := range b.blockEpochClients {
b.notifyBlockEpochClient(client, newHeight, newSha)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
epoch := &chainntnfs.BlockEpoch{ epoch := &chainntnfs.BlockEpoch{
Height: newHeight, Height: height,
Hash: newSha, Hash: sha,
} }
for _, epochClient := range b.blockEpochClients {
select { select {
case epochClient.epochQueue.ChanIn() <- epoch: case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan: case <-epochClient.cancelChan:
case <-b.quit: case <-b.quit:
} }
}
} }
// spendNotification couples a target outpoint along with the channel used for // spendNotification couples a target outpoint along with the channel used for

@ -268,3 +268,103 @@ type ChainConn interface {
// GetBlockHash returns the hash from a block height. // GetBlockHash returns the hash from a block height.
GetBlockHash(blockHeight int64) (*chainhash.Hash, error) GetBlockHash(blockHeight int64) (*chainhash.Hash, error)
} }
// GetCommonBlockAncestorHeight takes in:
// (1) the hash of a block that has been reorged out of the main chain
// (2) the hash of the block of the same height from the main chain
// It returns the height of the nearest common ancestor between the two hashes,
// or an error
func GetCommonBlockAncestorHeight(chainConn ChainConn, reorgHash,
chainHash chainhash.Hash) (int32, error) {
for reorgHash != chainHash {
reorgHeader, err := chainConn.GetBlockHeader(&reorgHash)
if err != nil {
return 0, fmt.Errorf("unable to get header for hash=%v: %v",
reorgHash, err)
}
chainHeader, err := chainConn.GetBlockHeader(&chainHash)
if err != nil {
return 0, fmt.Errorf("unable to get header for hash=%v: %v",
chainHash, err)
}
reorgHash = reorgHeader.PrevBlock
chainHash = chainHeader.PrevBlock
}
verboseHeader, err := chainConn.GetBlockHeaderVerbose(&chainHash)
if err != nil {
return 0, fmt.Errorf("unable to get verbose header for hash=%v: %v",
chainHash, err)
}
return verboseHeader.Height, nil
}
// GetClientMissedBlocks uses a client's best block to determine what blocks
// it missed being notified about, and returns them in a slice. Its
// backendStoresReorgs parameter tells it whether or not the notifier's
// chainConn stores information about blocks that have been reorged out of the
// chain, which allows GetClientMissedBlocks to find out whether the client's
// best block has been reorged out of the chain, rewind to the common ancestor
// and return blocks starting right after the common ancestor.
func GetClientMissedBlocks(chainConn ChainConn, clientBestBlock *BlockEpoch,
notifierBestHeight int32, backendStoresReorgs bool) ([]BlockEpoch, error) {
startingHeight := clientBestBlock.Height
if backendStoresReorgs {
// If a reorg causes the client's best hash to be incorrect,
// retrieve the closest common ancestor and dispatch
// notifications from there.
hashAtBestHeight, err := chainConn.GetBlockHash(
int64(clientBestBlock.Height))
if err != nil {
return nil, fmt.Errorf("unable to find blockhash for "+
"height=%d: %v", clientBestBlock.Height, err)
}
startingHeight, err = GetCommonBlockAncestorHeight(
chainConn, *clientBestBlock.Hash, *hashAtBestHeight,
)
if err != nil {
return nil, fmt.Errorf("unable to find common ancestor: "+
"%v", err)
}
}
// We want to start dispatching historical notifications from the block
// right after the client's best block, to avoid a redundant notification.
missedBlocks, err := getMissedBlocks(
chainConn, startingHeight+1, notifierBestHeight+1,
)
if err != nil {
return nil, fmt.Errorf("unable to get missed blocks: %v", err)
}
return missedBlocks, nil
}
// getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight)
// fetched from the chain.
func getMissedBlocks(chainConn ChainConn, startingHeight,
endingHeight int32) ([]BlockEpoch, error) {
numMissedBlocks := endingHeight - startingHeight
if numMissedBlocks < 0 {
return nil, fmt.Errorf("starting height %d is greater than "+
"ending height %d", startingHeight, endingHeight)
}
missedBlocks := make([]BlockEpoch, 0, numMissedBlocks)
for height := startingHeight; height < endingHeight; height++ {
hash, err := chainConn.GetBlockHash(int64(height))
if err != nil {
return nil, fmt.Errorf("unable to find blockhash for height=%d: %v",
height, err)
}
missedBlocks = append(missedBlocks,
BlockEpoch{Hash: hash, Height: height})
}
return missedBlocks, nil
}

@ -370,6 +370,25 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
n.blockEpochClients[msg.epochID] = msg n.blockEpochClients[msg.epochID] = msg
if msg.bestBlock != nil {
n.heightMtx.Lock()
bestHeight := int32(n.bestHeight)
n.heightMtx.Unlock()
missedBlocks, err :=
chainntnfs.GetClientMissedBlocks(
n.chainConn, msg.bestBlock,
bestHeight, false,
)
if err != nil {
msg.errorChan <- err
continue
}
for _, block := range missedBlocks {
n.notifyBlockEpochClient(msg,
block.Height, block.Hash)
}
}
msg.errorChan <- nil
} }
case item := <-n.chainUpdates.ChanOut(): case item := <-n.chainUpdates.ChanOut():
@ -572,21 +591,26 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {
for _, client := range n.blockEpochClients {
n.notifyBlockEpochClient(client, newHeight, newSha)
}
}
// notifyBlockEpochClient sends a registered block epoch client a notification
// about a specific block.
func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration,
height int32, sha *chainhash.Hash) {
epoch := &chainntnfs.BlockEpoch{ epoch := &chainntnfs.BlockEpoch{
Height: newHeight, Height: height,
Hash: newSha, Hash: sha,
} }
for _, epochClient := range n.blockEpochClients {
select { select {
case epochClient.epochQueue.ChanIn() <- epoch: case epochClient.epochQueue.ChanIn() <- epoch:
case <-epochClient.cancelChan: case <-epochClient.cancelChan:
case <-n.quit: case <-n.quit:
} }
}
} }
// spendNotification couples a target outpoint along with the channel used for // spendNotification couples a target outpoint along with the channel used for