chainntnfs/txnotifier: support registration of scripts for confirmations

This commit is contained in:
Wilmer Paulino 2018-12-06 21:13:38 -08:00
parent a0d3b7d9e3
commit 0f4ff2d09c
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -72,21 +72,23 @@ const (
) )
// confNtfnSet holds all known, registered confirmation notifications for a // confNtfnSet holds all known, registered confirmation notifications for a
// single txid. If duplicates notifications are requested, only one historical // txid/output script. If duplicates notifications are requested, only one
// dispatch will be spawned to ensure redundant scans are not permitted. A // historical dispatch will be spawned to ensure redundant scans are not
// single conf detail will be constructed and dispatched to all interested // permitted. A single conf detail will be constructed and dispatched to all
// interested
// clients. // clients.
type confNtfnSet struct { type confNtfnSet struct {
// ntfns keeps tracks of all the active client notification requests for // ntfns keeps tracks of all the active client notification requests for
// a transaction. // a transaction/output script
ntfns map[uint64]*ConfNtfn ntfns map[uint64]*ConfNtfn
// rescanStatus represents the current rescan state for the transaction. // rescanStatus represents the current rescan state for the
// transaction/output script.
rescanStatus rescanState rescanStatus rescanState
// details serves as a cache of the confirmation details of a // details serves as a cache of the confirmation details of a
// transaction that we'll use to determine if a transaction has already // transaction that we'll use to determine if a transaction/output
// confirmed at the time of registration. // script has already confirmed at the time of registration.
details *TxConfirmation details *TxConfirmation
} }
@ -183,23 +185,16 @@ func (r ConfRequest) ConfHintKey() ([]byte, error) {
} }
// ConfNtfn represents a notifier client's request to receive a notification // ConfNtfn represents a notifier client's request to receive a notification
// once the target transaction gets sufficient confirmations. The client is // once the target transaction/ouput script gets sufficient confirmations. The
// asynchronously notified via the ConfirmationEvent channels. // client is asynchronously notified via the ConfirmationEvent channels.
type ConfNtfn struct { type ConfNtfn struct {
// ConfID uniquely identifies the confirmation notification request for // ConfID uniquely identifies the confirmation notification request for
// the specified transaction. // the specified transaction/output script.
ConfID uint64 ConfID uint64
// TxID is the hash of the transaction for which confirmation notifications // ConfRequest represents either the txid or script we should detect
// are requested. // inclusion of within the chain.
TxID *chainhash.Hash ConfRequest
// PkScript is the public key script of an outpoint created in this
// transaction.
//
// NOTE: This value MUST be set when the dispatch is to be performed
// using compact filters.
PkScript []byte
// NumConfirmations is the number of confirmations after which the // NumConfirmations is the number of confirmations after which the
// notification is to be sent. // notification is to be sent.
@ -218,18 +213,12 @@ type ConfNtfn struct {
} }
// HistoricalConfDispatch parameterizes a manual rescan for a particular // HistoricalConfDispatch parameterizes a manual rescan for a particular
// transaction identifier. The parameters include the start and end block // transaction/output script. The parameters include the start and end block
// heights specifying the range of blocks to scan. // heights specifying the range of blocks to scan.
type HistoricalConfDispatch struct { type HistoricalConfDispatch struct {
// TxID is the transaction ID to search for in the historical dispatch. // ConfRequest represents either the txid or script we should detect
TxID *chainhash.Hash // inclusion of within the chain.
ConfRequest
// PkScript is a public key script from an output created by this
// transaction.
//
// NOTE: This value MUST be set when the dispatch is to be performed
// using compact filters.
PkScript []byte
// StartHeight specifies the block height at which to being the // StartHeight specifies the block height at which to being the
// historical rescan. // historical rescan.
@ -351,9 +340,9 @@ type HistoricalSpendDispatch struct {
// TxNotifier is a struct responsible for delivering transaction notifications // TxNotifier is a struct responsible for delivering transaction notifications
// to subscribers. These notifications can be of two different types: // to subscribers. These notifications can be of two different types:
// transaction confirmations and/or outpoint spends. The TxNotifier will watch // transaction/output script confirmations and/or outpoint/output script spends.
// the blockchain as new blocks come in, in order to satisfy its client // The TxNotifier will watch the blockchain as new blocks come in, in order to
// requests. // satisfy its client requests.
type TxNotifier struct { type TxNotifier struct {
// currentHeight is the height of the tracked blockchain. It is used to // currentHeight is the height of the tracked blockchain. It is used to
// determine the number of confirmations a tx has and ensure blocks are // determine the number of confirmations a tx has and ensure blocks are
@ -372,18 +361,20 @@ type TxNotifier struct {
// blocks are disconnected without being interrupted by a new block. // blocks are disconnected without being interrupted by a new block.
reorgDepth uint32 reorgDepth uint32
// confNotifications is an index of notification requests by transaction // confNotifications is an index of confirmation notification requests
// hash. // by transaction hash/output script.
confNotifications map[chainhash.Hash]*confNtfnSet confNotifications map[ConfRequest]*confNtfnSet
// txsByInitialHeight is an index of watched transactions by the height // confsByInitialHeight is an index of watched transactions/output
// that they are included at in the blockchain. This is tracked so that // scripts by the height that they are included at in the chain. This
// incorrect notifications are not sent if a transaction is reorged out // is tracked so that incorrect notifications are not sent if a
// of the chain and so that negative confirmations can be recognized. // transaction/output script is reorged out of the chain and so that
txsByInitialHeight map[uint32]map[chainhash.Hash]struct{} // negative confirmations can be recognized.
confsByInitialHeight map[uint32]map[ConfRequest]struct{}
// ntfnsByConfirmHeight is an index of notification requests by the // ntfnsByConfirmHeight is an index of notification requests by the
// height at which the transaction will have sufficient confirmations. // height at which the transaction/output script will have sufficient
// confirmations.
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
// spendNotifications is an index of all active notification requests // spendNotifications is an index of all active notification requests
@ -397,8 +388,9 @@ type TxNotifier struct {
opsBySpendHeight map[uint32]map[wire.OutPoint]struct{} opsBySpendHeight map[uint32]map[wire.OutPoint]struct{}
// confirmHintCache is a cache used to maintain the latest height hints // confirmHintCache is a cache used to maintain the latest height hints
// for transactions. Each height hint represents the earliest height at // for transactions/output scripts. Each height hint represents the
// which the transactions could have been confirmed within the chain. // earliest height at which they scripts could have been confirmed
// within the chain.
confirmHintCache ConfirmHintCache confirmHintCache ConfirmHintCache
// spendHintCache is a cache used to maintain the latest height hints // spendHintCache is a cache used to maintain the latest height hints
@ -424,8 +416,8 @@ func NewTxNotifier(startHeight uint32, reorgSafetyLimit uint32,
return &TxNotifier{ return &TxNotifier{
currentHeight: startHeight, currentHeight: startHeight,
reorgSafetyLimit: reorgSafetyLimit, reorgSafetyLimit: reorgSafetyLimit,
confNotifications: make(map[chainhash.Hash]*confNtfnSet), confNotifications: make(map[ConfRequest]*confNtfnSet),
txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), confsByInitialHeight: make(map[uint32]map[ConfRequest]struct{}),
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
spendNotifications: make(map[wire.OutPoint]*spendNtfnSet), spendNotifications: make(map[wire.OutPoint]*spendNtfnSet),
opsBySpendHeight: make(map[uint32]map[wire.OutPoint]struct{}), opsBySpendHeight: make(map[uint32]map[wire.OutPoint]struct{}),
@ -435,28 +427,32 @@ func NewTxNotifier(startHeight uint32, reorgSafetyLimit uint32,
} }
} }
// RegisterConf handles a new notification request. The client will be notified // RegisterConf handles a new confirmation notification request. The client will
// when the transaction gets a sufficient number of confirmations on the // be notified when the transaction/output script gets a sufficient number of
// blockchain. The registration succeeds if no error is returned. If the // confirmations in the blockchain. The registration succeeds if no error is
// returned HistoricalConfDispatch is non-nil, the caller is responsible for // returned. If the returned HistoricalConfDispatch is non-nil, the caller is
// attempting to manually rescan blocks for the txid between the start and end // responsible for attempting to manually rescan blocks for the txid/output
// heights. // script between the start and end heights. The notifier's current height is
// also returned so that backends can request to be notified of confirmations
// from this point forwards.
// //
// NOTE: If the transaction has already been included in a block on the chain, // NOTE: If the transaction/output script has already been included in a block
// the confirmation details must be provided with the UpdateConfDetails method, // on the chain, the confirmation details must be provided with the
// otherwise we will wait for the transaction to confirm even though it already // UpdateConfDetails method, otherwise we will wait for the transaction/output
// has. // script to confirm even though it already has.
func (n *TxNotifier) RegisterConf(ntfn *ConfNtfn) (*HistoricalConfDispatch, error) { func (n *TxNotifier) RegisterConf(ntfn *ConfNtfn) (*HistoricalConfDispatch,
uint32, error) {
select { select {
case <-n.quit: case <-n.quit:
return nil, ErrTxNotifierExiting return nil, 0, ErrTxNotifierExiting
default: default:
} }
// Enforce that we will not dispatch confirmations beyond the reorg // Enforce that we will not dispatch confirmations beyond the reorg
// safety limit. // safety limit.
if ntfn.NumConfirmations > n.reorgSafetyLimit { if ntfn.NumConfirmations > n.reorgSafetyLimit {
return nil, ErrTxMaxConfs return nil, 0, ErrTxMaxConfs
} }
// Before proceeding to register the notification, we'll query our // Before proceeding to register the notification, we'll query our
@ -464,77 +460,84 @@ func (n *TxNotifier) RegisterConf(ntfn *ConfNtfn) (*HistoricalConfDispatch, erro
// //
// TODO(conner): verify that all submitted height hints are identical. // TODO(conner): verify that all submitted height hints are identical.
startHeight := ntfn.HeightHint startHeight := ntfn.HeightHint
hint, err := n.confirmHintCache.QueryConfirmHint(*ntfn.TxID) hint, err := n.confirmHintCache.QueryConfirmHint(ntfn.ConfRequest)
if err == nil { if err == nil {
if hint > startHeight { if hint > startHeight {
Log.Debugf("Using height hint %d retrieved "+ Log.Debugf("Using height hint %d retrieved from cache "+
"from cache for %v", hint, *ntfn.TxID) "for %v", hint, ntfn.ConfRequest)
startHeight = hint startHeight = hint
} }
} else if err != ErrConfirmHintNotFound { } else if err != ErrConfirmHintNotFound {
Log.Errorf("Unable to query confirm hint for %v: %v", Log.Errorf("Unable to query confirm hint for %v: %v",
*ntfn.TxID, err) ntfn.ConfRequest, err)
} }
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
confSet, ok := n.confNotifications[*ntfn.TxID] confSet, ok := n.confNotifications[ntfn.ConfRequest]
if !ok { if !ok {
// If this is the first registration for this txid, construct a // If this is the first registration for this request, construct
// confSet to coalesce all notifications for the same txid. // a confSet to coalesce all notifications for the same request.
confSet = newConfNtfnSet() confSet = newConfNtfnSet()
n.confNotifications[*ntfn.TxID] = confSet n.confNotifications[ntfn.ConfRequest] = confSet
} }
confSet.ntfns[ntfn.ConfID] = ntfn confSet.ntfns[ntfn.ConfID] = ntfn
switch confSet.rescanStatus { switch confSet.rescanStatus {
// A prior rescan has already completed and we are actively watching at // A prior rescan has already completed and we are actively watching at
// tip for this txid. // tip for this request.
case rescanComplete: case rescanComplete:
// If conf details for this set of notifications has already // If the confirmation details for this set of notifications has
// been found, we'll attempt to deliver them immediately to this // already been found, we'll attempt to deliver them immediately
// client. // to this client.
Log.Debugf("Attempting to dispatch conf for txid=%v "+ Log.Debugf("Attempting to dispatch confirmation for %v on "+
"on registration since rescan has finished", ntfn.TxID) "registration since rescan has finished",
return nil, n.dispatchConfDetails(ntfn, confSet.details) ntfn.ConfRequest)
return nil, n.currentHeight, n.dispatchConfDetails(
ntfn, confSet.details,
)
// A rescan is already in progress, return here to prevent dispatching // A rescan is already in progress, return here to prevent dispatching
// another. When the scan returns, this notifications details will be // another. When the rescan returns, this notification's details will be
// updated as well. // updated as well.
case rescanPending: case rescanPending:
Log.Debugf("Waiting for pending rescan to finish before "+ Log.Debugf("Waiting for pending rescan to finish before "+
"notifying txid=%v at tip", ntfn.TxID) "notifying %v at tip", ntfn.ConfRequest)
return nil, nil
return nil, n.currentHeight, nil
// If no rescan has been dispatched, attempt to do so now. // If no rescan has been dispatched, attempt to do so now.
case rescanNotStarted: case rescanNotStarted:
} }
// If the provided or cached height hint indicates that the transaction // If the provided or cached height hint indicates that the
// is to be confirmed at a height greater than the conf notifier's // transaction with the given txid/output script is to be confirmed at a
// current height, we'll refrain from spawning a historical dispatch. // height greater than the notifier's current height, we'll refrain from
// spawning a historical dispatch.
if startHeight > n.currentHeight { if startHeight > n.currentHeight {
Log.Debugf("Height hint is above current height, not dispatching "+ Log.Debugf("Height hint is above current height, not "+
"historical rescan for txid=%v ", ntfn.TxID) "dispatching historical confirmation rescan for %v",
// Set the rescan status to complete, which will allow the conf ntfn.ConfRequest)
// Set the rescan status to complete, which will allow the
// notifier to start delivering messages for this set // notifier to start delivering messages for this set
// immediately. // immediately.
confSet.rescanStatus = rescanComplete confSet.rescanStatus = rescanComplete
return nil, nil return nil, n.currentHeight, nil
} }
Log.Debugf("Dispatching historical rescan for txid=%v ", ntfn.TxID) Log.Debugf("Dispatching historical confirmation rescan for %v",
ntfn.ConfRequest)
// Construct the parameters for historical dispatch, scanning the range // Construct the parameters for historical dispatch, scanning the range
// of blocks between our best known height hint and the notifier's // of blocks between our best known height hint and the notifier's
// current height. The notifier will begin also watching for // current height. The notifier will begin also watching for
// confirmations at tip starting with the next block. // confirmations at tip starting with the next block.
dispatch := &HistoricalConfDispatch{ dispatch := &HistoricalConfDispatch{
TxID: ntfn.TxID, ConfRequest: ntfn.ConfRequest,
PkScript: ntfn.PkScript,
StartHeight: startHeight, StartHeight: startHeight,
EndHeight: n.currentHeight, EndHeight: n.currentHeight,
} }
@ -543,16 +546,17 @@ func (n *TxNotifier) RegisterConf(ntfn *ConfNtfn) (*HistoricalConfDispatch, erro
// registrations don't also attempt a dispatch. // registrations don't also attempt a dispatch.
confSet.rescanStatus = rescanPending confSet.rescanStatus = rescanPending
return dispatch, nil return dispatch, n.currentHeight, nil
} }
// UpdateConfDetails attempts to update the confirmation details for an active // UpdateConfDetails attempts to update the confirmation details for an active
// notification within the notifier. This should only be used in the case of a // notification within the notifier. This should only be used in the case of a
// transaction that has confirmed before the notifier's current height. // transaction/output script that has confirmed before the notifier's current
// height.
// //
// NOTE: The notification should be registered first to ensure notifications are // NOTE: The notification should be registered first to ensure notifications are
// dispatched correctly. // dispatched correctly.
func (n *TxNotifier) UpdateConfDetails(txid chainhash.Hash, func (n *TxNotifier) UpdateConfDetails(confRequest ConfRequest,
details *TxConfirmation) error { details *TxConfirmation) error {
select { select {
@ -566,14 +570,15 @@ func (n *TxNotifier) UpdateConfDetails(txid chainhash.Hash,
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
// First, we'll determine whether we have an active notification for // First, we'll determine whether we have an active confirmation
// this transaction with the given ID. // notification for the given txid/script.
confSet, ok := n.confNotifications[txid] confSet, ok := n.confNotifications[confRequest]
if !ok { if !ok {
return fmt.Errorf("no notification found with TxID %v", txid) return fmt.Errorf("confirmation notification for %v not found",
confRequest)
} }
// If the conf details were already found at tip, all existing // If the confirmation details were already found at tip, all existing
// notifications will have been dispatched or queued for dispatch. We // notifications will have been dispatched or queued for dispatch. We
// can exit early to avoid sending too many notifications on the // can exit early to avoid sending too many notifications on the
// buffered channels. // buffered channels.
@ -587,43 +592,47 @@ func (n *TxNotifier) UpdateConfDetails(txid chainhash.Hash,
// continue to watch for them at tip. // continue to watch for them at tip.
confSet.rescanStatus = rescanComplete confSet.rescanStatus = rescanComplete
// The notifier has yet to reach the height at which the transaction was // The notifier has yet to reach the height at which the
// included in a block, so we should defer until handling it then within // transaction/output script was included in a block, so we should defer
// ConnectTip. // until handling it then within ConnectTip.
if details == nil { if details == nil {
Log.Debugf("Conf details for txid=%v not found during "+ Log.Debugf("Confirmation details for %v not found during "+
"historical dispatch, waiting to dispatch at tip", txid) "historical dispatch, waiting to dispatch at tip",
confRequest)
// We'll commit the current height as the confirm hint to // We'll commit the current height as the confirm hint to
// prevent another potentially long rescan if we restart before // prevent another potentially long rescan if we restart before
// a new block comes in. // a new block comes in.
err := n.confirmHintCache.CommitConfirmHint( err := n.confirmHintCache.CommitConfirmHint(
n.currentHeight, txid, n.currentHeight, confRequest,
) )
if err != nil { if err != nil {
// The error is not fatal as this is an optimistic // The error is not fatal as this is an optimistic
// optimization, so we'll avoid returning an error. // optimization, so we'll avoid returning an error.
Log.Debugf("Unable to update confirm hint to %d for "+ Log.Debugf("Unable to update confirm hint to %d for "+
"%v: %v", n.currentHeight, txid, err) "%v: %v", n.currentHeight, confRequest, err)
} }
return nil return nil
} }
if details.BlockHeight > n.currentHeight { if details.BlockHeight > n.currentHeight {
Log.Debugf("Conf details for txid=%v found above current "+ Log.Debugf("Confirmation details for %v found above current "+
"height, waiting to dispatch at tip", txid) "height, waiting to dispatch at tip", confRequest)
return nil return nil
} }
Log.Debugf("Updating conf details for txid=%v details", txid) Log.Debugf("Updating confirmation details for %v", confRequest)
err := n.confirmHintCache.CommitConfirmHint(details.BlockHeight, txid) err := n.confirmHintCache.CommitConfirmHint(
details.BlockHeight, confRequest,
)
if err != nil { if err != nil {
// The error is not fatal, so we should not return an error to // The error is not fatal, so we should not return an error to
// the caller. // the caller.
Log.Errorf("Unable to update confirm hint to %d for %v: %v", Log.Errorf("Unable to update confirm hint to %d for %v: %v",
details.BlockHeight, txid, err) details.BlockHeight, confRequest, err)
} }
// Cache the details found in the rescan and attempt to dispatch any // Cache the details found in the rescan and attempt to dispatch any
@ -640,30 +649,30 @@ func (n *TxNotifier) UpdateConfDetails(txid chainhash.Hash,
} }
// dispatchConfDetails attempts to cache and dispatch details to a particular // dispatchConfDetails attempts to cache and dispatch details to a particular
// client if the transaction has sufficiently confirmed. If the provided details // client if the transaction/output script has sufficiently confirmed. If the
// are nil, this method will be a no-op. // provided details are nil, this method will be a no-op.
func (n *TxNotifier) dispatchConfDetails( func (n *TxNotifier) dispatchConfDetails(
ntfn *ConfNtfn, details *TxConfirmation) error { ntfn *ConfNtfn, details *TxConfirmation) error {
// If no details are provided, return early as we can't dispatch. // If no details are provided, return early as we can't dispatch.
if details == nil { if details == nil {
Log.Debugf("Unable to dispatch %v, no details provided", Log.Debugf("Unable to dispatch %v, no details provided",
ntfn.TxID) ntfn.ConfRequest)
return nil return nil
} }
// Now, we'll examine whether the transaction of this // Now, we'll examine whether the transaction/output script of this
// notification request has reached its required number of // request has reached its required number of confirmations. If it has,
// confirmations. If it has, we'll dispatch a confirmation // we'll dispatch a confirmation notification to the caller.
// notification to the caller.
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
if confHeight <= n.currentHeight { if confHeight <= n.currentHeight {
Log.Infof("Dispatching %v conf notification for %v", Log.Infof("Dispatching %v confirmation notification for %v",
ntfn.NumConfirmations, ntfn.TxID) ntfn.NumConfirmations, ntfn.ConfRequest)
// We'll send a 0 value to the Updates channel, // We'll send a 0 value to the Updates channel,
// indicating that the transaction has already been // indicating that the transaction/output script has already
// confirmed. // been confirmed.
select { select {
case ntfn.Event.Updates <- 0: case ntfn.Event.Updates <- 0:
case <-n.quit: case <-n.quit:
@ -677,8 +686,8 @@ func (n *TxNotifier) dispatchConfDetails(
return ErrTxNotifierExiting return ErrTxNotifierExiting
} }
} else { } else {
Log.Debugf("Queueing %v conf notification for %v at tip ", Log.Debugf("Queueing %v confirmation notification for %v at tip ",
ntfn.NumConfirmations, ntfn.TxID) ntfn.NumConfirmations, ntfn.ConfRequest)
// Otherwise, we'll keep track of the notification // Otherwise, we'll keep track of the notification
// request by the height at which we should dispatch the // request by the height at which we should dispatch the
@ -691,8 +700,8 @@ func (n *TxNotifier) dispatchConfDetails(
ntfnSet[ntfn] = struct{}{} ntfnSet[ntfn] = struct{}{}
// We'll also send an update to the client of how many // We'll also send an update to the client of how many
// confirmations are left for the transaction to be // confirmations are left for the transaction/output script to
// confirmed. // be confirmed.
numConfsLeft := confHeight - n.currentHeight numConfsLeft := confHeight - n.currentHeight
select { select {
case ntfn.Event.Updates <- numConfsLeft: case ntfn.Event.Updates <- numConfsLeft:
@ -701,17 +710,16 @@ func (n *TxNotifier) dispatchConfDetails(
} }
} }
// As a final check, we'll also watch the transaction if it's // As a final check, we'll also watch the transaction/output script if
// still possible for it to get reorged out of the chain. // it's still possible for it to get reorged out of the chain.
blockHeight := details.BlockHeight reorgSafeHeight := details.BlockHeight + n.reorgSafetyLimit
reorgSafeHeight := blockHeight + n.reorgSafetyLimit
if reorgSafeHeight > n.currentHeight { if reorgSafeHeight > n.currentHeight {
txSet, exists := n.txsByInitialHeight[blockHeight] txSet, exists := n.confsByInitialHeight[details.BlockHeight]
if !exists { if !exists {
txSet = make(map[chainhash.Hash]struct{}) txSet = make(map[ConfRequest]struct{})
n.txsByInitialHeight[blockHeight] = txSet n.confsByInitialHeight[details.BlockHeight] = txSet
} }
txSet[*ntfn.TxID] = struct{}{} txSet[ntfn.ConfRequest] = struct{}{}
} }
return nil return nil