chainntnfs: notify clients after block connect has succeeded

This prevents the situation where we notify clients about a newly connected block, and then the block connection itself fails. We also want to set our best block in between connecting the block and notifying clients, in case a client makes queries about the new block they have received.
This commit is contained in:
Valentine Wallace 2018-08-09 00:05:29 -07:00
parent cbf1799c40
commit 3df5b26699
No known key found for this signature in database
GPG Key ID: B0E55E8D1776A58D
3 changed files with 112 additions and 31 deletions

@ -539,6 +539,35 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash,
return nil, nil return nil, nil
} }
// handleBlockConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications
// now or after numConfirmations confs.
func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) error {
rawBlock, err := b.chainConn.GetBlock(block.Hash)
if err != nil {
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
block.Height, block.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
err = b.txConfNotifier.ConnectTip(
block.Hash, uint32(block.Height), txns)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
// We want to set the best block before dispatching notifications so
// if any subscribers make queries based on their received block epoch,
// our state is fully updated in time.
b.bestBlock = block
b.notifyBlockEpochs(block.Height, block.Hash)
return nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {

@ -613,17 +613,47 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash,
return nil, nil return nil, nil
} }
// handleBlocksConnected applies a chain update for a new block. Any watched // handleBlockConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications // transactions included this block will processed to either send notifications
// now or after numConfirmations confs. // now or after numConfirmations confs.
// TODO(halseth): this is reusing the neutrino notifier implementation, unify // TODO(halseth): this is reusing the neutrino notifier implementation, unify
// them. // them.
func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error { func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
// First we'll notify any subscribed clients of the block. // First process the block for our internal state. A new block has
// been connected to the main chain. Send out any N confirmation
// notifications which may have been triggered by this new block.
rawBlock, err := b.chainConn.GetBlock(epoch.Hash)
if err != nil {
return fmt.Errorf("unable to get block: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
epoch.Height, epoch.Hash)
txns := btcutil.NewBlock(rawBlock).Transactions()
newBlock := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
txns: txns,
connect: true,
}
err = b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
// We want to set the best block before dispatching notifications
// so if any subscribers make queries based on their received
// block epoch, our state is fully updated in time.
b.bestBlock = epoch
// Next we'll notify any subscribed clients of the block.
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Next, we'll scan over the list of relevant transactions and possibly // Finally, we'll scan over the list of relevant transactions and
// dispatch notifications for confirmations and spends. // possibly dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns { for _, tx := range newBlock.txns {
mtx := tx.MsgTx() mtx := tx.MsgTx()
txSha := mtx.TxHash() txSha := mtx.TxHash()
@ -631,9 +661,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
for i, txIn := range mtx.TxIn { for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output which we have a // If this transaction indeed does spend an output which
// registered notification for, then create a spend summary, finally // we have a registered notification for, then create a
// sending off the details to the notification subscriber. // spend summary, finally sending off the details to the
// notification subscriber.
clients, ok := b.spendNotifications[prevOut] clients, ok := b.spendNotifications[prevOut]
if !ok { if !ok {
continue continue
@ -652,9 +683,10 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
"outpoint=%v", ntfn.targetOutpoint) "outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not // Close spendChan to ensure that any calls to
// block. This is safe to do since the channel is buffered, and // Cancel will not block. This is safe to do
// the message can still be read by the receiver. // since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan) close(ntfn.spendChan)
} }
@ -662,11 +694,6 @@ func (b *BtcdNotifier) handleBlockConnected(newBlock *filteredBlock) error {
} }
} }
// A new block has been connected to the main chain.
// Send out any N confirmation notifications which may
// have been triggered by this new block.
b.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height, newBlock.txns)
return nil return nil
} }

@ -541,15 +541,29 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash,
return nil, nil return nil, nil
} }
// handleBlocksConnected applies a chain update for a new block. Any watched // handleBlockConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications // transactions included this block will processed to either send notifications
// now or after numConfirmations confs. // now or after numConfirmations confs.
func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// First we'll notify any subscribed clients of the block. // First process the block for our internal state. A new block has
// been connected to the main chain. Send out any N confirmation
// notifications which may have been triggered by this new block.
err := n.txConfNotifier.ConnectTip(&newBlock.hash, newBlock.height,
newBlock.txns)
if err != nil {
return fmt.Errorf("unable to connect tip: %v", err)
}
chainntnfs.Log.Infof("New block: height=%v, sha=%v",
newBlock.height, newBlock.hash)
n.bestHeight = newBlock.height
// Next, notify any subscribed clients of the block.
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Next, we'll scan over the list of relevant transactions and possibly // Finally, we'll scan over the list of relevant transactions and
// dispatch notifications for confirmations and spends. // possibly dispatch notifications for confirmations and spends.
for _, tx := range newBlock.txns { for _, tx := range newBlock.txns {
mtx := tx.MsgTx() mtx := tx.MsgTx()
txSha := mtx.TxHash() txSha := mtx.TxHash()
@ -557,10 +571,10 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
for i, txIn := range mtx.TxIn { for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output // If this transaction indeed does spend an output which
// which we have a registered notification for, then // we have a registered notification for, then create a
// create a spend summary, finally sending off the // spend summary, finally sending off the details to the
// details to the notification subscriber. // notification subscriber.
clients, ok := n.spendNotifications[prevOut] clients, ok := n.spendNotifications[prevOut]
if !ok { if !ok {
continue continue
@ -592,16 +606,27 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
} }
} }
// A new block has been connected to the main chain. Send out any N
// confirmation notifications which may have been triggered by this new
// block.
n.txConfNotifier.ConnectTip(
&newBlock.hash, newBlock.height, newBlock.txns,
)
return nil return nil
} }
// getFilteredBlock is a utility to retrieve the full filtered block from a block epoch.
func (n *NeutrinoNotifier) getFilteredBlock(epoch chainntnfs.BlockEpoch) (*filteredBlock, error) {
rawBlock, err := n.p2pNode.GetBlockFromNetwork(*epoch.Hash)
if err != nil {
return nil, fmt.Errorf("unable to get block: %v", err)
}
txns := rawBlock.Transactions()
block := &filteredBlock{
hash: *epoch.Hash,
height: uint32(epoch.Height),
txns: txns,
connect: true,
}
return block, nil
}
// notifyBlockEpochs notifies all registered block epoch clients of the newly // notifyBlockEpochs notifies all registered block epoch clients of the newly
// connected block to the main chain. // connected block to the main chain.
func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) { func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {