Merge pull request #1951 from cfromknecht/reorder-commit-hints-and-notify
chainntnfs: Reorder commit hints and notify
This commit is contained in:
commit
a3edcf9cd5
@ -632,13 +632,6 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
|
|||||||
chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height,
|
chainntnfs.Log.Infof("New block: height=%v, sha=%v", block.Height,
|
||||||
block.Hash)
|
block.Hash)
|
||||||
|
|
||||||
// We want to set the best block before dispatching notifications so
|
|
||||||
// if any subscribers make queries based on their received block epoch,
|
|
||||||
// our state is fully updated in time.
|
|
||||||
b.bestBlock = block
|
|
||||||
|
|
||||||
b.notifyBlockEpochs(block.Height, block.Hash)
|
|
||||||
|
|
||||||
// Finally, we'll update the spend height hint for all of our watched
|
// Finally, we'll update the spend height hint for all of our watched
|
||||||
// outpoints that have not been spent yet. This is safe to do as we do
|
// outpoints that have not been spent yet. This is safe to do as we do
|
||||||
// not watch already spent outpoints for spend notifications.
|
// not watch already spent outpoints for spend notifications.
|
||||||
@ -652,13 +645,22 @@ func (b *BitcoindNotifier) handleBlockConnected(block chainntnfs.BlockEpoch) err
|
|||||||
uint32(block.Height), ops...,
|
uint32(block.Height), ops...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The error is not fatal, so we should not return an
|
// The error is not fatal since we are connecting a
|
||||||
// error to the caller.
|
// block, and advancing the spend hint is an optimistic
|
||||||
|
// optimization.
|
||||||
chainntnfs.Log.Errorf("Unable to update spend hint to "+
|
chainntnfs.Log.Errorf("Unable to update spend hint to "+
|
||||||
"%d for %v: %v", block.Height, ops, err)
|
"%d for %v: %v", block.Height, ops, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We want to set the best block before dispatching notifications so
|
||||||
|
// if any subscribers make queries based on their received block epoch,
|
||||||
|
// our state is fully updated in time.
|
||||||
|
b.bestBlock = block
|
||||||
|
|
||||||
|
// Lastly we'll notify any subscribed clients of the block.
|
||||||
|
b.notifyBlockEpochs(block.Height, block.Hash)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -715,16 +715,16 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
|
|||||||
chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height,
|
chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height,
|
||||||
epoch.Hash)
|
epoch.Hash)
|
||||||
|
|
||||||
// We want to set the best block before dispatching notifications
|
// Define a helper struct for coalescing the spend notifications we will
|
||||||
// so if any subscribers make queries based on their received
|
// dispatch after trying to commit the spend hints.
|
||||||
// block epoch, our state is fully updated in time.
|
type spendNtfnBatch struct {
|
||||||
b.bestBlock = epoch
|
details *chainntnfs.SpendDetail
|
||||||
|
clients map[uint64]*spendNotification
|
||||||
// Next we'll notify any subscribed clients of the block.
|
}
|
||||||
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
|
|
||||||
|
|
||||||
// Scan over the list of relevant transactions and possibly dispatch
|
// Scan over the list of relevant transactions and possibly dispatch
|
||||||
// notifications for spends.
|
// notifications for spends.
|
||||||
|
spendBatches := make(map[wire.OutPoint]spendNtfnBatch)
|
||||||
for _, tx := range newBlock.txns {
|
for _, tx := range newBlock.txns {
|
||||||
mtx := tx.MsgTx()
|
mtx := tx.MsgTx()
|
||||||
txSha := mtx.TxHash()
|
txSha := mtx.TxHash()
|
||||||
@ -740,6 +740,7 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
|
|||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
delete(b.spendNotifications, prevOut)
|
||||||
|
|
||||||
spendDetails := &chainntnfs.SpendDetail{
|
spendDetails := &chainntnfs.SpendDetail{
|
||||||
SpentOutPoint: &prevOut,
|
SpentOutPoint: &prevOut,
|
||||||
@ -749,21 +750,10 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
|
|||||||
SpendingHeight: int32(newBlock.height),
|
SpendingHeight: int32(newBlock.height),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ntfn := range clients {
|
spendBatches[prevOut] = spendNtfnBatch{
|
||||||
chainntnfs.Log.Infof("Dispatching spend "+
|
details: spendDetails,
|
||||||
"notification for outpoint=%v",
|
clients: clients,
|
||||||
ntfn.targetOutpoint)
|
|
||||||
|
|
||||||
ntfn.spendChan <- spendDetails
|
|
||||||
|
|
||||||
// Close spendChan to ensure that any calls to
|
|
||||||
// Cancel will not block. This is safe to do
|
|
||||||
// since the channel is buffered, and the
|
|
||||||
// message can still be read by the receiver.
|
|
||||||
close(ntfn.spendChan)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(b.spendNotifications, prevOut)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -780,13 +770,39 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
|
|||||||
uint32(epoch.Height), ops...,
|
uint32(epoch.Height), ops...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The error is not fatal, so we should not return an
|
// The error is not fatal since we are connecting a
|
||||||
// error to the caller.
|
// block, and advancing the spend hint is an optimistic
|
||||||
|
// optimization.
|
||||||
chainntnfs.Log.Errorf("Unable to update spend hint to "+
|
chainntnfs.Log.Errorf("Unable to update spend hint to "+
|
||||||
"%d for %v: %v", epoch.Height, ops, err)
|
"%d for %v: %v", epoch.Height, ops, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We want to set the best block before dispatching notifications
|
||||||
|
// so if any subscribers make queries based on their received
|
||||||
|
// block epoch, our state is fully updated in time.
|
||||||
|
b.bestBlock = epoch
|
||||||
|
|
||||||
|
// Next we'll notify any subscribed clients of the block.
|
||||||
|
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
|
||||||
|
|
||||||
|
// Finally, send off the spend details to the notification subscribers.
|
||||||
|
for _, batch := range spendBatches {
|
||||||
|
for _, ntfn := range batch.clients {
|
||||||
|
chainntnfs.Log.Infof("Dispatching spend "+
|
||||||
|
"notification for outpoint=%v",
|
||||||
|
ntfn.targetOutpoint)
|
||||||
|
|
||||||
|
ntfn.spendChan <- batch.details
|
||||||
|
|
||||||
|
// Close spendChan to ensure that any calls to
|
||||||
|
// Cancel will not block. This is safe to do
|
||||||
|
// since the channel is buffered, and the
|
||||||
|
// message can still be read by the receiver.
|
||||||
|
close(ntfn.spendChan)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -614,13 +614,16 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
|
|||||||
chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
|
chainntnfs.Log.Infof("New block: height=%v, sha=%v", newBlock.height,
|
||||||
newBlock.hash)
|
newBlock.hash)
|
||||||
|
|
||||||
n.bestHeight = newBlock.height
|
// Create a helper struct for coalescing spend notifications triggered
|
||||||
|
// by this block.
|
||||||
|
type spendNtfnBatch struct {
|
||||||
|
details *chainntnfs.SpendDetail
|
||||||
|
clients map[uint64]*spendNotification
|
||||||
|
}
|
||||||
|
|
||||||
// Next, notify any subscribed clients of the block.
|
// Scan over the list of relevant transactions and assemble the
|
||||||
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
|
// possible spend notifications we need to dispatch.
|
||||||
|
spendBatches := make(map[wire.OutPoint]spendNtfnBatch)
|
||||||
// Scan over the list of relevant transactions and possibly dispatch
|
|
||||||
// notifications for spends.
|
|
||||||
for _, tx := range newBlock.txns {
|
for _, tx := range newBlock.txns {
|
||||||
mtx := tx.MsgTx()
|
mtx := tx.MsgTx()
|
||||||
txSha := mtx.TxHash()
|
txSha := mtx.TxHash()
|
||||||
@ -630,12 +633,13 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
|
|||||||
|
|
||||||
// If this transaction indeed does spend an output which
|
// If this transaction indeed does spend an output which
|
||||||
// we have a registered notification for, then create a
|
// we have a registered notification for, then create a
|
||||||
// spend summary, finally sending off the details to the
|
// spend summary and add it to our batch of spend
|
||||||
// notification subscriber.
|
// notifications to be delivered.
|
||||||
clients, ok := n.spendNotifications[prevOut]
|
clients, ok := n.spendNotifications[prevOut]
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
delete(n.spendNotifications, prevOut)
|
||||||
|
|
||||||
spendDetails := &chainntnfs.SpendDetail{
|
spendDetails := &chainntnfs.SpendDetail{
|
||||||
SpentOutPoint: &prevOut,
|
SpentOutPoint: &prevOut,
|
||||||
@ -645,25 +649,15 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
|
|||||||
SpendingHeight: int32(newBlock.height),
|
SpendingHeight: int32(newBlock.height),
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ntfn := range clients {
|
spendBatches[prevOut] = spendNtfnBatch{
|
||||||
chainntnfs.Log.Infof("Dispatching spend "+
|
details: spendDetails,
|
||||||
"notification for outpoint=%v",
|
clients: clients,
|
||||||
ntfn.targetOutpoint)
|
|
||||||
|
|
||||||
ntfn.spendChan <- spendDetails
|
|
||||||
|
|
||||||
// Close spendChan to ensure that any calls to
|
|
||||||
// Cancel will not block. This is safe to do
|
|
||||||
// since the channel is buffered, and the
|
|
||||||
// message can still be read by the receiver.
|
|
||||||
close(ntfn.spendChan)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(n.spendNotifications, prevOut)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, we'll update the spend height hint for all of our watched
|
// Now, we'll update the spend height hint for all of our watched
|
||||||
// outpoints that have not been spent yet. This is safe to do as we do
|
// outpoints that have not been spent yet. This is safe to do as we do
|
||||||
// not watch already spent outpoints for spend notifications.
|
// not watch already spent outpoints for spend notifications.
|
||||||
ops := make([]wire.OutPoint, 0, len(n.spendNotifications))
|
ops := make([]wire.OutPoint, 0, len(n.spendNotifications))
|
||||||
@ -674,13 +668,40 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
|
|||||||
if len(ops) > 0 {
|
if len(ops) > 0 {
|
||||||
err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...)
|
err := n.spendHintCache.CommitSpendHint(newBlock.height, ops...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// The error is not fatal, so we should not return an
|
// The error is not fatal since we are connecting a
|
||||||
// error to the caller.
|
// block, and advancing the spend hint is an optimistic
|
||||||
|
// optimization.
|
||||||
chainntnfs.Log.Errorf("Unable to update spend hint to "+
|
chainntnfs.Log.Errorf("Unable to update spend hint to "+
|
||||||
"%d for %v: %v", newBlock.height, ops, err)
|
"%d for %v: %v", newBlock.height, ops, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We want to set the best block before dispatching notifications
|
||||||
|
// so if any subscribers make queries based on their received
|
||||||
|
// block epoch, our state is fully updated in time.
|
||||||
|
n.bestHeight = newBlock.height
|
||||||
|
|
||||||
|
// With all persistent changes committed, notify any subscribed clients
|
||||||
|
// of the block.
|
||||||
|
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
|
||||||
|
|
||||||
|
// Finally, send off the spend details to the notification subscribers.
|
||||||
|
for _, batch := range spendBatches {
|
||||||
|
for _, ntfn := range batch.clients {
|
||||||
|
chainntnfs.Log.Infof("Dispatching spend "+
|
||||||
|
"notification for outpoint=%v",
|
||||||
|
ntfn.targetOutpoint)
|
||||||
|
|
||||||
|
ntfn.spendChan <- batch.details
|
||||||
|
|
||||||
|
// Close spendChan to ensure that any calls to
|
||||||
|
// Cancel will not block. This is safe to do
|
||||||
|
// since the channel is buffered, and the
|
||||||
|
// message can still be read by the receiver.
|
||||||
|
close(ntfn.spendChan)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -810,7 +831,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that neutrino is caught up to the height hint before we
|
// Ensure that neutrino is caught up to the height hint before we
|
||||||
// attempt to fetch the utxo fromt the chain. If we're behind, then we
|
// attempt to fetch the utxo from the chain. If we're behind, then we
|
||||||
// may miss a notification dispatch.
|
// may miss a notification dispatch.
|
||||||
for {
|
for {
|
||||||
n.heightMtx.RLock()
|
n.heightMtx.RLock()
|
||||||
|
@ -452,6 +452,19 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
|
|||||||
tcn.currentHeight--
|
tcn.currentHeight--
|
||||||
tcn.reorgDepth++
|
tcn.reorgDepth++
|
||||||
|
|
||||||
|
// Rewind the height hint for all watched transactions.
|
||||||
|
var txs []chainhash.Hash
|
||||||
|
for tx := range tcn.confNotifications {
|
||||||
|
txs = append(txs, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...)
|
||||||
|
if err != nil {
|
||||||
|
Log.Errorf("Unable to update confirm hint to %d for %v: %v",
|
||||||
|
tcn.currentHeight, txs, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// We'll go through all of our watched transactions and attempt to drain
|
// We'll go through all of our watched transactions and attempt to drain
|
||||||
// their notification channels to ensure sending notifications to the
|
// their notification channels to ensure sending notifications to the
|
||||||
// clients is always non-blocking.
|
// clients is always non-blocking.
|
||||||
@ -515,20 +528,6 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rewind the height hint for all watched transactions.
|
|
||||||
var txs []chainhash.Hash
|
|
||||||
for tx := range tcn.confNotifications {
|
|
||||||
txs = append(txs, tx)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := tcn.hintCache.CommitConfirmHint(tcn.currentHeight, txs...)
|
|
||||||
if err != nil {
|
|
||||||
// The error is not fatal, so we should not return an error to
|
|
||||||
// the caller.
|
|
||||||
Log.Errorf("Unable to update confirm hint to %d for %v: %v",
|
|
||||||
tcn.currentHeight, txs, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Finally, we can remove the transactions we're currently watching that
|
// Finally, we can remove the transactions we're currently watching that
|
||||||
// were included in this block height.
|
// were included in this block height.
|
||||||
delete(tcn.txsByInitialHeight, blockHeight)
|
delete(tcn.txsByInitialHeight, blockHeight)
|
||||||
|
Loading…
Reference in New Issue
Block a user