chainntnfs/txnotifier: support registration of scripts for spends

This commit is contained in:
Wilmer Paulino 2018-12-06 21:13:41 -08:00
parent 0f4ff2d09c
commit b579c23310
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -101,20 +101,22 @@ func newConfNtfnSet() *confNtfnSet {
} }
} }
// spendNtfnSet holds all known, registered spend notifications for an outpoint. // spendNtfnSet holds all known, registered spend notifications for a spend
// If duplicate notifications are requested, only one historical dispatch will // request (outpoint/output script). If duplicate notifications are requested,
// be spawned to ensure redundant scans are not permitted. // only one historical dispatch will be spawned to ensure redundant scans are
// not permitted.
type spendNtfnSet struct { type spendNtfnSet struct {
// ntfns keeps tracks of all the active client notification requests for // ntfns keeps tracks of all the active client notification requests for
// an outpoint. // an outpoint/output script.
ntfns map[uint64]*SpendNtfn ntfns map[uint64]*SpendNtfn
// rescanStatus represents the current rescan state for the outpoint. // rescanStatus represents the current rescan state for the spend
// request (outpoint/output script).
rescanStatus rescanState rescanStatus rescanState
// details serves as a cache of the spend details for an outpoint that // details serves as a cache of the spend details for an outpoint/output
// we'll use to determine if an outpoint has already been spent at the // script that we'll use to determine if it has already been spent at
// time of registration. // the time of registration.
details *SpendDetail details *SpendDetail
} }
@ -238,7 +240,8 @@ type SpendRequest struct {
// _script_, rather than the outpoint. // _script_, rather than the outpoint.
OutPoint wire.OutPoint OutPoint wire.OutPoint
// PkScript is the script of the outpoint. // PkScript is the script of the outpoint. If a zero outpoint is set,
// then this can be an arbitrary script.
PkScript txscript.PkScript PkScript txscript.PkScript
} }
@ -288,29 +291,25 @@ func (r SpendRequest) SpendHintKey() ([]byte, error) {
} }
// SpendNtfn represents a client's request to receive a notification once an // SpendNtfn represents a client's request to receive a notification once an
// outpoint has been spent on-chain. The client is asynchronously notified via // outpoint/output script has been spent on-chain. The client is asynchronously
// the SpendEvent channels. // notified via the SpendEvent channels.
type SpendNtfn struct { type SpendNtfn struct {
// SpendID uniquely identies the spend notification request for the // SpendID uniquely identies the spend notification request for the
// specified outpoint. // specified outpoint/output script.
SpendID uint64 SpendID uint64
// OutPoint is the outpoint for which a client has requested a spend // SpendRequest represents either the outpoint or script we should
// notification for. // detect the spend of.
OutPoint wire.OutPoint SpendRequest
// PkScript is the script of the outpoint. This is needed in order to
// match compact filters when attempting a historical rescan to
// determine if the outpoint has already been spent.
PkScript []byte
// Event contains references to the channels that the notifications are // Event contains references to the channels that the notifications are
// to be sent over. // to be sent over.
Event *SpendEvent Event *SpendEvent
// HeightHint is the earliest height in the chain that we expect to find // HeightHint is the earliest height in the chain that we expect to find
// the spending transaction of the specified outpoint. This value will // the spending transaction of the specified outpoint/output script.
// be overridden by the spend hint cache if it contains an entry for it. // This value will be overridden by the spend hint cache if it contains
// an entry for it.
HeightHint uint32 HeightHint uint32
// dispatched signals whether a spend notification has been disptached // dispatched signals whether a spend notification has been disptached
@ -319,15 +318,13 @@ type SpendNtfn struct {
} }
// HistoricalSpendDispatch parameterizes a manual rescan to determine the // HistoricalSpendDispatch parameterizes a manual rescan to determine the
// spending details (if any) of an outpoint. The parameters include the start // spending details (if any) of an outpoint/output script. The parameters
// and end block heights specifying the range of blocks to scan. // include the start and end block heights specifying the range of blocks to
// scan.
type HistoricalSpendDispatch struct { type HistoricalSpendDispatch struct {
// OutPoint is the outpoint which we should attempt to find the spending // SpendRequest represents either the outpoint or script we should
OutPoint wire.OutPoint // detect the spend of.
SpendRequest
// PkScript is the script of the outpoint. This is needed in order to
// match compact filters when attempting a historical rescan.
PkScript []byte
// StartHeight specified the block height at which to begin the // StartHeight specified the block height at which to begin the
// historical rescan. // historical rescan.
@ -378,14 +375,14 @@ type TxNotifier struct {
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
// spendNotifications is an index of all active notification requests // spendNotifications is an index of all active notification requests
// per outpoint. // per outpoint/output script.
spendNotifications map[wire.OutPoint]*spendNtfnSet spendNotifications map[SpendRequest]*spendNtfnSet
// opsBySpendHeight is an index that keeps tracks of the spending height // spendsByHeight is an index that keeps tracks of the spending height
// of an outpoint we are currently tracking notifications for. This is // of outpoints/output scripts we are currently tracking notifications
// used in order to recover from the spending transaction of an outpoint // for. This is used in order to recover from spending transactions
// being reorged out of the chain. // being reorged out of the chain.
opsBySpendHeight map[uint32]map[wire.OutPoint]struct{} spendsByHeight map[uint32]map[SpendRequest]struct{}
// confirmHintCache is a cache used to maintain the latest height hints // confirmHintCache is a cache used to maintain the latest height hints
// for transactions/output scripts. Each height hint represents the // for transactions/output scripts. Each height hint represents the
@ -394,8 +391,8 @@ type TxNotifier struct {
confirmHintCache ConfirmHintCache confirmHintCache ConfirmHintCache
// spendHintCache is a cache used to maintain the latest height hints // spendHintCache is a cache used to maintain the latest height hints
// for outpoints. Each height hint represents the earliest height at // for outpoints/output scripts. Each height hint represents the
// which the outpoints could have been spent within the chain. // earliest height at which they could have been spent within the chain.
spendHintCache SpendHintCache spendHintCache SpendHintCache
// quit is closed in order to signal that the notifier is gracefully // quit is closed in order to signal that the notifier is gracefully
@ -419,8 +416,8 @@ func NewTxNotifier(startHeight uint32, reorgSafetyLimit uint32,
confNotifications: make(map[ConfRequest]*confNtfnSet), confNotifications: make(map[ConfRequest]*confNtfnSet),
confsByInitialHeight: make(map[uint32]map[ConfRequest]struct{}), confsByInitialHeight: make(map[uint32]map[ConfRequest]struct{}),
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
spendNotifications: make(map[wire.OutPoint]*spendNtfnSet), spendNotifications: make(map[SpendRequest]*spendNtfnSet),
opsBySpendHeight: make(map[uint32]map[wire.OutPoint]struct{}), spendsByHeight: make(map[uint32]map[SpendRequest]struct{}),
confirmHintCache: confirmHintCache, confirmHintCache: confirmHintCache,
spendHintCache: spendHintCache, spendHintCache: spendHintCache,
quit: make(chan struct{}), quit: make(chan struct{}),
@ -726,53 +723,57 @@ func (n *TxNotifier) dispatchConfDetails(
} }
// RegisterSpend handles a new spend notification request. The client will be // RegisterSpend handles a new spend notification request. The client will be
// notified once the outpoint is detected as spent within the chain. // notified once the outpoint/output script is detected as spent within the
// chain.
// //
// The registration succeeds if no error is returned. If the returned // The registration succeeds if no error is returned. If the returned
// HistoricalSpendDisaptch is non-nil, the caller is responsible for attempting // HistoricalSpendDisaptch is non-nil, the caller is responsible for attempting
// to determine whether the outpoint has been spent between the start and end // to determine whether the outpoint/output script has been spent between the
// heights. // start and end heights. The notifier's current height is also returned so that
// backends can request to be notified of spends from this point forwards.
// //
// NOTE: If the outpoint has already been spent within the chain before the // NOTE: If the outpoint/output script has already been spent within the chain
// notifier's current tip, the spend details must be provided with the // before the notifier's current tip, the spend details must be provided with
// UpdateSpendDetails method, otherwise we will wait for the outpoint to // the UpdateSpendDetails method, otherwise we will wait for the outpoint/output
// be spent at tip, even though it already has. // script to be spent at tip, even though it already has.
func (n *TxNotifier) RegisterSpend(ntfn *SpendNtfn) (*HistoricalSpendDispatch, error) { func (n *TxNotifier) RegisterSpend(ntfn *SpendNtfn) (*HistoricalSpendDispatch,
uint32, error) {
select { select {
case <-n.quit: case <-n.quit:
return nil, ErrTxNotifierExiting return nil, 0, ErrTxNotifierExiting
default: default:
} }
// Before proceeding to register the notification, we'll query our spend // Before proceeding to register the notification, we'll query our spend
// hint cache to determine whether a better one exists. // hint cache to determine whether a better one exists.
startHeight := ntfn.HeightHint startHeight := ntfn.HeightHint
hint, err := n.spendHintCache.QuerySpendHint(ntfn.OutPoint) hint, err := n.spendHintCache.QuerySpendHint(ntfn.SpendRequest)
if err == nil { if err == nil {
if hint > startHeight { if hint > startHeight {
Log.Debugf("Using height hint %d retrieved from cache "+ Log.Debugf("Using height hint %d retrieved from cache "+
"for %v", startHeight, ntfn.OutPoint) "for %v", startHeight, ntfn.SpendRequest)
startHeight = hint startHeight = hint
} }
} else if err != ErrSpendHintNotFound { } else if err != ErrSpendHintNotFound {
Log.Errorf("Unable to query spend hint for %v: %v", Log.Errorf("Unable to query spend hint for %v: %v",
ntfn.OutPoint, err) ntfn.SpendRequest, err)
} }
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
Log.Infof("New spend subscription: spend_id=%d, outpoint=%v, "+ Log.Infof("New spend subscription: spend_id=%d, %v, height_hint=%d",
"height_hint=%d", ntfn.SpendID, ntfn.OutPoint, ntfn.HeightHint) ntfn.SpendID, ntfn.SpendRequest, ntfn.HeightHint)
// Keep track of the notification request so that we can properly // Keep track of the notification request so that we can properly
// dispatch a spend notification later on. // dispatch a spend notification later on.
spendSet, ok := n.spendNotifications[ntfn.OutPoint] spendSet, ok := n.spendNotifications[ntfn.SpendRequest]
if !ok { if !ok {
// If this is the first registration for the outpoint, we'll // If this is the first registration for the request, we'll
// construct a spendNtfnSet to coalesce all notifications. // construct a spendNtfnSet to coalesce all notifications.
spendSet = newSpendNtfnSet() spendSet = newSpendNtfnSet()
n.spendNotifications[ntfn.OutPoint] = spendSet n.spendNotifications[ntfn.SpendRequest] = spendSet
} }
spendSet.ntfns[ntfn.SpendID] = ntfn spendSet.ntfns[ntfn.SpendID] = ntfn
@ -780,19 +781,28 @@ func (n *TxNotifier) RegisterSpend(ntfn *SpendNtfn) (*HistoricalSpendDispatch, e
// depending on the current rescan status. // depending on the current rescan status.
switch spendSet.rescanStatus { switch spendSet.rescanStatus {
// If the spending details for this outpoint have already been // If the spending details for this request have already been determined
// determined and cached, then we can use them to immediately dispatch // and cached, then we can use them to immediately dispatch the spend
// the spend notification to the client. // notification to the client.
case rescanComplete: case rescanComplete:
return nil, n.dispatchSpendDetails(ntfn, spendSet.details) Log.Debugf("Attempting to dispatch spend for %v on "+
"registration since rescan has finished",
ntfn.SpendRequest)
// If there is an active rescan to determine whether the outpoint has return nil, n.currentHeight, n.dispatchSpendDetails(
ntfn, spendSet.details,
)
// If there is an active rescan to determine whether the request has
// been spent, then we won't trigger another one. // been spent, then we won't trigger another one.
case rescanPending: case rescanPending:
return nil, nil Log.Debugf("Waiting for pending rescan to finish before "+
"notifying %v at tip", ntfn.SpendRequest)
return nil, n.currentHeight, nil
// Otherwise, we'll fall through and let the caller know that a rescan // Otherwise, we'll fall through and let the caller know that a rescan
// should be dispatched to determine whether the outpoint has already // should be dispatched to determine whether the request has already
// been spent. // been spent.
case rescanNotStarted: case rescanNotStarted:
} }
@ -803,30 +813,32 @@ func (n *TxNotifier) RegisterSpend(ntfn *SpendNtfn) (*HistoricalSpendDispatch, e
// historical rescan and wait for the spend to come in at tip. // historical rescan and wait for the spend to come in at tip.
if startHeight > n.currentHeight { if startHeight > n.currentHeight {
Log.Debugf("Spend hint of %d for %v is above current height %d", Log.Debugf("Spend hint of %d for %v is above current height %d",
startHeight, ntfn.OutPoint, n.currentHeight) startHeight, ntfn.SpendRequest, n.currentHeight)
// We'll also set the rescan status as complete to ensure that // We'll also set the rescan status as complete to ensure that
// spend hints for this outpoint get updated upon // spend hints for this request get updated upon
// connected/disconnected blocks. // connected/disconnected blocks.
spendSet.rescanStatus = rescanComplete spendSet.rescanStatus = rescanComplete
return nil, nil return nil, n.currentHeight, nil
} }
// We'll set the rescan status to pending to ensure subsequent // We'll set the rescan status to pending to ensure subsequent
// notifications don't also attempt a historical dispatch. // notifications don't also attempt a historical dispatch.
spendSet.rescanStatus = rescanPending spendSet.rescanStatus = rescanPending
Log.Debugf("Dispatching historical spend rescan for %v",
ntfn.SpendRequest)
return &HistoricalSpendDispatch{ return &HistoricalSpendDispatch{
OutPoint: ntfn.OutPoint, SpendRequest: ntfn.SpendRequest,
PkScript: ntfn.PkScript, StartHeight: startHeight,
StartHeight: startHeight, EndHeight: n.currentHeight,
EndHeight: n.currentHeight, }, n.currentHeight, nil
}, nil
} }
// CancelSpend cancels an existing request for a spend notification of an // CancelSpend cancels an existing request for a spend notification of an
// outpoint. The request is identified by its spend ID. // outpoint/output script. The request is identified by its spend ID.
func (n *TxNotifier) CancelSpend(op wire.OutPoint, spendID uint64) { func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) {
select { select {
case <-n.quit: case <-n.quit:
return return
@ -836,10 +848,10 @@ func (n *TxNotifier) CancelSpend(op wire.OutPoint, spendID uint64) {
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
Log.Infof("Canceling spend notification: spend_id=%d, outpoint=%v", Log.Infof("Canceling spend notification: spend_id=%d, %v", spendID,
spendID, op) spendRequest)
spendSet, ok := n.spendNotifications[op] spendSet, ok := n.spendNotifications[spendRequest]
if !ok { if !ok {
return return
} }
@ -904,13 +916,13 @@ func (n *TxNotifier) ProcessRelevantSpendTx(tx *wire.MsgTx, txHeight int32) erro
} }
// UpdateSpendDetails attempts to update the spend details for all active spend // UpdateSpendDetails attempts to update the spend details for all active spend
// notification requests for an outpoint. This method should be used once a // notification requests for an outpoint/output script. This method should be
// historical scan of the chain has finished. If the historical scan did not // used once a historical scan of the chain has finished. If the historical scan
// find a spending transaction for the outpoint, the spend details may be nil. // did not find a spending transaction for it, the spend details may be nil.
// //
// NOTE: A notification request for the outpoint must be registered first to // NOTE: A notification request for the outpoint/output script must be
// ensure notifications are delivered. // registered first to ensure notifications are delivered.
func (n *TxNotifier) UpdateSpendDetails(op wire.OutPoint, func (n *TxNotifier) UpdateSpendDetails(spendRequest SpendRequest,
details *SpendDetail) error { details *SpendDetail) error {
select { select {
@ -924,24 +936,24 @@ func (n *TxNotifier) UpdateSpendDetails(op wire.OutPoint,
n.Lock() n.Lock()
defer n.Unlock() defer n.Unlock()
return n.updateSpendDetails(op, details) return n.updateSpendDetails(spendRequest, details)
} }
// updateSpendDetails attempts to update the spend details for all active spend // updateSpendDetails attempts to update the spend details for all active spend
// notification requests for an outpoint. This method should be used once a // notification requests for an outpoint/output script. This method should be
// historical scan of the chain has finished. If the historical scan did not // used once a historical scan of the chain has finished. If the historical scan
// find a spending transaction for the outpoint, the spend details may be nil. // did not find a spending transaction for it, the spend details may be nil.
// //
// NOTE: This method must be called with the TxNotifier's lock held. // NOTE: This method must be called with the TxNotifier's lock held.
func (n *TxNotifier) updateSpendDetails(op wire.OutPoint, func (n *TxNotifier) updateSpendDetails(spendRequest SpendRequest,
details *SpendDetail) error { details *SpendDetail) error {
// Mark the ongoing historical rescan for this outpoint as finished. // Mark the ongoing historical rescan for this request as finished. This
// This will allow us to update the spend hints for this outpoint at // will allow us to update the spend hints for it at tip.
// tip. spendSet, ok := n.spendNotifications[spendRequest]
spendSet, ok := n.spendNotifications[op]
if !ok { if !ok {
return fmt.Errorf("no notifications found for outpoint %v", op) return fmt.Errorf("spend notification for %v not found",
spendRequest)
} }
// If the spend details have already been found either at tip, then the // If the spend details have already been found either at tip, then the
@ -951,31 +963,33 @@ func (n *TxNotifier) updateSpendDetails(op wire.OutPoint,
return nil return nil
} }
// Since the historical rescan has completed for this outpoint, we'll // Since the historical rescan has completed for this request, we'll
// mark its rescan status as complete in order to ensure that the // mark its rescan status as complete in order to ensure that the
// TxNotifier can properly update its spend hints upon // TxNotifier can properly update its spend hints upon
// connected/disconnected blocks. // connected/disconnected blocks.
spendSet.rescanStatus = rescanComplete spendSet.rescanStatus = rescanComplete
// If the historical rescan was not able to find a spending transaction // If the historical rescan was not able to find a spending transaction
// for this outpoint, then we can track the spend at tip. // for this request, then we can track the spend at tip.
if details == nil { if details == nil {
// We'll commit the current height as the spend hint to prevent // We'll commit the current height as the spend hint to prevent
// another potentially long rescan if we restart before a new // another potentially long rescan if we restart before a new
// block comes in. // block comes in.
err := n.spendHintCache.CommitSpendHint(n.currentHeight, op) err := n.spendHintCache.CommitSpendHint(
n.currentHeight, spendRequest,
)
if err != nil { if err != nil {
// The error is not fatal as this is an optimistic // The error is not fatal as this is an optimistic
// optimization, so we'll avoid returning an error. // optimization, so we'll avoid returning an error.
Log.Debugf("Unable to update spend hint to %d for %v: %v", Log.Debugf("Unable to update spend hint to %d for %v: %v",
n.currentHeight, op, err) n.currentHeight, spendRequest, err)
} }
return nil return nil
} }
// If the historical rescan found the spending transaction for this // If the historical rescan found the spending transaction for this
// outpoint, but it's at a later height than the notifier (this can // request, but it's at a later height than the notifier (this can
// happen due to latency with the backend during a reorg), then we'll // happen due to latency with the backend during a reorg), then we'll
// defer handling the notification until the notifier has caught up to // defer handling the notification until the notifier has caught up to
// such height. // such height.
@ -983,17 +997,17 @@ func (n *TxNotifier) updateSpendDetails(op wire.OutPoint,
return nil return nil
} }
// Now that we've determined the outpoint has been spent, we'll commit // Now that we've determined the request has been spent, we'll commit
// its spending height as its hint in the cache and dispatch // its spending height as its hint in the cache and dispatch
// notifications to all of its respective clients. // notifications to all of its respective clients.
err := n.spendHintCache.CommitSpendHint( err := n.spendHintCache.CommitSpendHint(
uint32(details.SpendingHeight), op, uint32(details.SpendingHeight), spendRequest,
) )
if err != nil { if err != nil {
// The error is not fatal as this is an optimistic optimization, // The error is not fatal as this is an optimistic optimization,
// so we'll avoid returning an error. // so we'll avoid returning an error.
Log.Debugf("Unable to update spend hint to %d for %v: %v", Log.Debugf("Unable to update spend hint to %d for %v: %v",
details.SpendingHeight, op, err) details.SpendingHeight, spendRequest, err)
} }
spendSet.details = details spendSet.details = details
@ -1017,8 +1031,8 @@ func (n *TxNotifier) dispatchSpendDetails(ntfn *SpendNtfn, details *SpendDetail)
return nil return nil
} }
Log.Infof("Dispatching spend notification for outpoint=%v at height=%d", Log.Infof("Dispatching confirmed spend notification for %v at height=%d",
ntfn.OutPoint, n.currentHeight) ntfn.SpendRequest, n.currentHeight)
select { select {
case ntfn.Event.Spend <- details: case ntfn.Event.Spend <- details: