diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 4ebd4650..347b12c1 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -275,6 +275,22 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + b.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + } + msg.errorChan <- nil case chain.RelevantTx: b.handleRelevantTx(msg, b.bestBlock.Height) @@ -525,20 +541,25 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range b.blockEpochClients { + b.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (b *BitcoindNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range b.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-b.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-b.quit: } } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 85155e61..0103b421 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -344,6 +344,23 @@ out: case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + b.chainConn, msg.bestBlock, + b.bestBlock.Height, true, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + b.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + + } + msg.errorChan <- nil } case item := <-b.chainUpdates.ChanOut(): @@ -652,20 +669,25 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range b.blockEpochClients { + b.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range b.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-b.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-b.quit: } } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index 356edc14..8ec223b8 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -268,3 +268,103 @@ type ChainConn interface { // GetBlockHash returns the hash from a block height. GetBlockHash(blockHeight int64) (*chainhash.Hash, error) } + +// GetCommonBlockAncestorHeight takes in: +// (1) the hash of a block that has been reorged out of the main chain +// (2) the hash of the block of the same height from the main chain +// It returns the height of the nearest common ancestor between the two hashes, +// or an error +func GetCommonBlockAncestorHeight(chainConn ChainConn, reorgHash, + chainHash chainhash.Hash) (int32, error) { + + for reorgHash != chainHash { + reorgHeader, err := chainConn.GetBlockHeader(&reorgHash) + if err != nil { + return 0, fmt.Errorf("unable to get header for hash=%v: %v", + reorgHash, err) + } + chainHeader, err := chainConn.GetBlockHeader(&chainHash) + if err != nil { + return 0, fmt.Errorf("unable to get header for hash=%v: %v", + chainHash, err) + } + reorgHash = reorgHeader.PrevBlock + chainHash = chainHeader.PrevBlock + } + + verboseHeader, err := chainConn.GetBlockHeaderVerbose(&chainHash) + if err != nil { + return 0, fmt.Errorf("unable to get verbose header for hash=%v: %v", + chainHash, err) + } + + return verboseHeader.Height, nil +} + +// GetClientMissedBlocks uses a client's best block to determine what blocks +// it missed being notified about, and returns them in a slice. Its +// backendStoresReorgs parameter tells it whether or not the notifier's +// chainConn stores information about blocks that have been reorged out of the +// chain, which allows GetClientMissedBlocks to find out whether the client's +// best block has been reorged out of the chain, rewind to the common ancestor +// and return blocks starting right after the common ancestor. +func GetClientMissedBlocks(chainConn ChainConn, clientBestBlock *BlockEpoch, + notifierBestHeight int32, backendStoresReorgs bool) ([]BlockEpoch, error) { + + startingHeight := clientBestBlock.Height + if backendStoresReorgs { + // If a reorg causes the client's best hash to be incorrect, + // retrieve the closest common ancestor and dispatch + // notifications from there. + hashAtBestHeight, err := chainConn.GetBlockHash( + int64(clientBestBlock.Height)) + if err != nil { + return nil, fmt.Errorf("unable to find blockhash for "+ + "height=%d: %v", clientBestBlock.Height, err) + } + + startingHeight, err = GetCommonBlockAncestorHeight( + chainConn, *clientBestBlock.Hash, *hashAtBestHeight, + ) + if err != nil { + return nil, fmt.Errorf("unable to find common ancestor: "+ + "%v", err) + } + } + + // We want to start dispatching historical notifications from the block + // right after the client's best block, to avoid a redundant notification. + missedBlocks, err := getMissedBlocks( + chainConn, startingHeight+1, notifierBestHeight+1, + ) + if err != nil { + return nil, fmt.Errorf("unable to get missed blocks: %v", err) + } + + return missedBlocks, nil +} + +// getMissedBlocks returns a slice of blocks: [startingHeight, endingHeight) +// fetched from the chain. +func getMissedBlocks(chainConn ChainConn, startingHeight, + endingHeight int32) ([]BlockEpoch, error) { + + numMissedBlocks := endingHeight - startingHeight + if numMissedBlocks < 0 { + return nil, fmt.Errorf("starting height %d is greater than "+ + "ending height %d", startingHeight, endingHeight) + } + + missedBlocks := make([]BlockEpoch, 0, numMissedBlocks) + for height := startingHeight; height < endingHeight; height++ { + hash, err := chainConn.GetBlockHash(int64(height)) + if err != nil { + return nil, fmt.Errorf("unable to find blockhash for height=%d: %v", + height, err) + } + missedBlocks = append(missedBlocks, + BlockEpoch{Hash: hash, Height: height}) + } + + return missedBlocks, nil +} diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index da330dce..1da5b404 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -370,6 +370,25 @@ func (n *NeutrinoNotifier) notificationDispatcher() { case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") n.blockEpochClients[msg.epochID] = msg + if msg.bestBlock != nil { + n.heightMtx.Lock() + bestHeight := int32(n.bestHeight) + n.heightMtx.Unlock() + missedBlocks, err := + chainntnfs.GetClientMissedBlocks( + n.chainConn, msg.bestBlock, + bestHeight, false, + ) + if err != nil { + msg.errorChan <- err + continue + } + for _, block := range missedBlocks { + n.notifyBlockEpochClient(msg, + block.Height, block.Hash) + } + } + msg.errorChan <- nil } case item := <-n.chainUpdates.ChanOut(): @@ -572,20 +591,25 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { + for _, client := range n.blockEpochClients { + n.notifyBlockEpochClient(client, newHeight, newSha) + } +} + +// notifyBlockEpochClient sends a registered block epoch client a notification +// about a specific block. +func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistration, + height int32, sha *chainhash.Hash) { + epoch := &chainntnfs.BlockEpoch{ - Height: newHeight, - Hash: newSha, + Height: height, + Hash: sha, } - for _, epochClient := range n.blockEpochClients { - select { - - case epochClient.epochQueue.ChanIn() <- epoch: - - case <-epochClient.cancelChan: - - case <-n.quit: - } + select { + case epochClient.epochQueue.ChanIn() <- epoch: + case <-epochClient.cancelChan: + case <-n.quit: } }