chainntnfs/txnotifier: watch for spends at tip
In this commit, we add support to allow the TxNotifier to properly determine whether a new block extending the chain contains a transaction that spends a registered outpoint. In the event that it does, spend notifications will be dispatched to all active registered clients for such outpoint.
This commit is contained in:
parent
4d7fa9ecc4
commit
e6b2755a57
@ -851,13 +851,22 @@ func (n *TxNotifier) dispatchSpendDetails(ntfn *SpendNtfn, details *SpendDetail)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectTip handles a new block extending the current chain. This checks each
|
// ConnectTip handles a new block extending the current chain. It will go
|
||||||
// transaction in the block to see if any watched transactions are included.
|
// through every transaction and determine if it is relevant to any of its
|
||||||
// Also, if any watched transactions now have the required number of
|
// clients. A transaction can be relevant in either of the following two ways:
|
||||||
// confirmations as a result of this block being connected, this dispatches
|
//
|
||||||
// notifications.
|
// 1. One of the inputs in the transaction spends an outpoint for which we
|
||||||
func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash,
|
// currently have an active spend registration for.
|
||||||
blockHeight uint32, txns []*btcutil.Tx) error {
|
//
|
||||||
|
// 2. The transaction is a transaction for which we currently have an active
|
||||||
|
// confirmation registration for.
|
||||||
|
//
|
||||||
|
// In the event that the transaction is relevant, a confirmation/spend
|
||||||
|
// notification will be dispatched to the relevant clients. Confirmation
|
||||||
|
// notifications will only be dispatched for transactions that have met the
|
||||||
|
// required number of confirmations required by the client.
|
||||||
|
func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32,
|
||||||
|
txns []*btcutil.Tx) error {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-n.quit:
|
case <-n.quit:
|
||||||
@ -876,14 +885,58 @@ func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash,
|
|||||||
n.currentHeight++
|
n.currentHeight++
|
||||||
n.reorgDepth = 0
|
n.reorgDepth = 0
|
||||||
|
|
||||||
// Record any newly confirmed transactions by their confirmed height so
|
// First, we'll iterate over all the transactions found in this block to
|
||||||
// that notifications get dispatched when the transactions reach their
|
// determine if it includes any relevant transactions to the TxNotifier.
|
||||||
// required number of confirmations. We'll also watch these transactions
|
|
||||||
// at the height they were included in the chain so reorgs can be
|
|
||||||
// handled correctly.
|
|
||||||
for _, tx := range txns {
|
for _, tx := range txns {
|
||||||
txHash := tx.Hash()
|
txHash := tx.Hash()
|
||||||
|
|
||||||
|
// In order to determine if this transaction is relevant to the
|
||||||
|
// notifier, we'll check its inputs for any outstanding spend
|
||||||
|
// notifications.
|
||||||
|
for i, txIn := range tx.MsgTx().TxIn {
|
||||||
|
prevOut := txIn.PreviousOutPoint
|
||||||
|
spendSet, ok := n.spendNotifications[prevOut]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we have any, we'll record its spend height so that
|
||||||
|
// notifications get dispatched to the respective
|
||||||
|
// clients.
|
||||||
|
spendDetails := &SpendDetail{
|
||||||
|
SpentOutPoint: &prevOut,
|
||||||
|
SpenderTxHash: txHash,
|
||||||
|
SpendingTx: tx.MsgTx(),
|
||||||
|
SpenderInputIndex: uint32(i),
|
||||||
|
SpendingHeight: int32(blockHeight),
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(wilmer): cancel pending historical rescans if any?
|
||||||
|
spendSet.rescanStatus = rescanComplete
|
||||||
|
spendSet.details = spendDetails
|
||||||
|
for _, ntfn := range spendSet.ntfns {
|
||||||
|
// In the event that this notification was aware
|
||||||
|
// that the spending transaction of its outpoint
|
||||||
|
// was reorged out of the chain, we'll consume
|
||||||
|
// the reorg notification if it hasn't been
|
||||||
|
// done yet already.
|
||||||
|
select {
|
||||||
|
case <-ntfn.Event.Reorg:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We'll note the outpoints spending height in order to
|
||||||
|
// correctly handle dispatching notifications when the
|
||||||
|
// spending transactions gets reorged out of the chain.
|
||||||
|
opSet, exists := n.opsBySpendHeight[blockHeight]
|
||||||
|
if !exists {
|
||||||
|
opSet = make(map[wire.OutPoint]struct{})
|
||||||
|
n.opsBySpendHeight[blockHeight] = opSet
|
||||||
|
}
|
||||||
|
opSet[prevOut] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
// Check if we have any pending notifications for this txid. If
|
// Check if we have any pending notifications for this txid. If
|
||||||
// none are found, we can proceed to the next transaction.
|
// none are found, we can proceed to the next transaction.
|
||||||
confSet, ok := n.confNotifications[*txHash]
|
confSet, ok := n.confNotifications[*txHash]
|
||||||
@ -903,6 +956,7 @@ func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash,
|
|||||||
TxIndex: uint32(tx.Index()),
|
TxIndex: uint32(tx.Index()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(wilmer): cancel pending historical rescans if any?
|
||||||
confSet.rescanStatus = rescanComplete
|
confSet.rescanStatus = rescanComplete
|
||||||
confSet.details = details
|
confSet.details = details
|
||||||
for _, ntfn := range confSet.ntfns {
|
for _, ntfn := range confSet.ntfns {
|
||||||
@ -1031,17 +1085,33 @@ out:
|
|||||||
return ErrTxNotifierExiting
|
return ErrTxNotifierExiting
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(n.ntfnsByConfirmHeight, n.currentHeight)
|
delete(n.ntfnsByConfirmHeight, blockHeight)
|
||||||
|
|
||||||
// Clear entries from confNotifications and confTxsByInitialHeight. We
|
// We'll also dispatch spend notifications for all the outpoints that
|
||||||
// assume that reorgs deeper than the reorg safety limit do not happen,
|
// were spent at this new block height.
|
||||||
// so we can clear out entries for the block that is now mature.
|
for op := range n.opsBySpendHeight[blockHeight] {
|
||||||
if n.currentHeight >= n.reorgSafetyLimit {
|
spendSet := n.spendNotifications[op]
|
||||||
matureBlockHeight := n.currentHeight - n.reorgSafetyLimit
|
for _, ntfn := range spendSet.ntfns {
|
||||||
for txHash := range n.txsByInitialHeight[matureBlockHeight] {
|
err := n.dispatchSpendDetails(ntfn, spendSet.details)
|
||||||
delete(n.confNotifications, txHash)
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, we'll clear the entries from our set of notifications for
|
||||||
|
// transactions and outpoints that are no longer under the risk of being
|
||||||
|
// reorged out of the chain.
|
||||||
|
if blockHeight >= n.reorgSafetyLimit {
|
||||||
|
matureBlockHeight := blockHeight - n.reorgSafetyLimit
|
||||||
|
for tx := range n.txsByInitialHeight[matureBlockHeight] {
|
||||||
|
delete(n.confNotifications, tx)
|
||||||
}
|
}
|
||||||
delete(n.txsByInitialHeight, matureBlockHeight)
|
delete(n.txsByInitialHeight, matureBlockHeight)
|
||||||
|
for op := range n.opsBySpendHeight[matureBlockHeight] {
|
||||||
|
delete(n.spendNotifications, op)
|
||||||
|
}
|
||||||
|
delete(n.opsBySpendHeight, matureBlockHeight)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
Reference in New Issue
Block a user