diff --git a/chainntnfs/txnotifier.go b/chainntnfs/txnotifier.go index 356a739a..a4a0ad19 100644 --- a/chainntnfs/txnotifier.go +++ b/chainntnfs/txnotifier.go @@ -557,6 +557,300 @@ func (n *TxNotifier) dispatchConfDetails( return nil } +// RegisterSpend handles a new spend notification request. The client will be +// notified once the outpoint is detected as spent within the chain. +// +// The registration succeeds if no error is returned. If the returned +// HistoricalSpendDisaptch is non-nil, the caller is responsible for attempting +// to determine whether the outpoint has been spent between the start and end +// heights. +// +// NOTE: If the outpoint has already been spent within the chain before the +// notifier's current tip, the spend details must be provided with the +// UpdateSpendDetails method, otherwise we will wait for the outpoint to +// be spent at tip, even though it already has. +func (n *TxNotifier) RegisterSpend(ntfn *SpendNtfn) (*HistoricalSpendDispatch, error) { + select { + case <-n.quit: + return nil, ErrTxNotifierExiting + default: + } + + // Before proceeding to register the notification, we'll query our spend + // hint cache to determine whether a better one exists. + startHeight := ntfn.HeightHint + hint, err := n.spendHintCache.QuerySpendHint(ntfn.OutPoint) + if err == nil { + if hint > startHeight { + Log.Debugf("Using height hint %d retrieved from cache "+ + "for %v", startHeight, ntfn.OutPoint) + startHeight = hint + } + } else if err != ErrSpendHintNotFound { + Log.Errorf("Unable to query spend hint for %v: %v", + ntfn.OutPoint, err) + } + + n.Lock() + defer n.Unlock() + + Log.Infof("New spend subscription: spend_id=%d, outpoint=%v, "+ + "height_hint=%d", ntfn.SpendID, ntfn.OutPoint, ntfn.HeightHint) + + // Keep track of the notification request so that we can properly + // dispatch a spend notification later on. + spendSet, ok := n.spendNotifications[ntfn.OutPoint] + if !ok { + // If this is the first registration for the outpoint, we'll + // construct a spendNtfnSet to coalesce all notifications. + spendSet = newSpendNtfnSet() + n.spendNotifications[ntfn.OutPoint] = spendSet + } + spendSet.ntfns[ntfn.SpendID] = ntfn + + // We'll now let the caller know whether a historical rescan is needed + // depending on the current rescan status. + switch spendSet.rescanStatus { + + // If the spending details for this outpoint have already been + // determined and cached, then we can use them to immediately dispatch + // the spend notification to the client. + case rescanComplete: + return nil, n.dispatchSpendDetails(ntfn, spendSet.details) + + // If there is an active rescan to determine whether the outpoint has + // been spent, then we won't trigger another one. + case rescanPending: + return nil, nil + + // Otherwise, we'll fall through and let the caller know that a rescan + // should be dispatched to determine whether the outpoint has already + // been spent. + case rescanNotStarted: + } + + // However, if the spend hint, either provided by the caller or + // retrieved from the cache, is found to be at a later height than the + // TxNotifier is aware of, then we'll refrain from dispatching a + // historical rescan and wait for the spend to come in at tip. + if startHeight > n.currentHeight { + Log.Debugf("Spend hint of %d for %v is above current height %d", + startHeight, ntfn.OutPoint, n.currentHeight) + + // We'll also set the rescan status as complete to ensure that + // spend hints for this outpoint get updated upon + // connected/disconnected blocks. + spendSet.rescanStatus = rescanComplete + return nil, nil + } + + // We'll set the rescan status to pending to ensure subsequent + // notifications don't also attempt a historical dispatch. + spendSet.rescanStatus = rescanPending + + return &HistoricalSpendDispatch{ + OutPoint: ntfn.OutPoint, + PkScript: ntfn.PkScript, + StartHeight: startHeight, + EndHeight: n.currentHeight, + }, nil +} + +// CancelSpend cancels an existing request for a spend notification of an +// outpoint. The request is identified by its spend ID. +func (n *TxNotifier) CancelSpend(op wire.OutPoint, spendID uint64) { + select { + case <-n.quit: + return + default: + } + + n.Lock() + defer n.Unlock() + + Log.Infof("Canceling spend notification: spend_id=%d, outpoint=%v", + spendID, op) + + spendSet, ok := n.spendNotifications[op] + if !ok { + return + } + ntfn, ok := spendSet.ntfns[spendID] + if !ok { + return + } + + // We'll close all the notification channels to let the client know + // their cancel request has been fulfilled. + close(ntfn.Event.Spend) + close(ntfn.Event.Reorg) + delete(spendSet.ntfns, spendID) +} + +// ProcessRelevantSpendTx processes a transaction provided externally. This will +// check whether the transaction is relevant to the notifier if it spends any +// outputs for which we currently have registered notifications for. If it is +// relevant, spend notifications will be dispatched to the caller. +func (n *TxNotifier) ProcessRelevantSpendTx(tx *wire.MsgTx, txHeight int32) error { + select { + case <-n.quit: + return ErrTxNotifierExiting + default: + } + + // Ensure we hold the lock throughout handling the notification to + // prevent the notifier from advancing its height underneath us. + n.Lock() + defer n.Unlock() + + // Grab the set of active registered outpoints to determine if the + // transaction spends any of them. + spendNtfns := n.spendNotifications + + // We'll check if this transaction spends an output that has an existing + // spend notification for it. + for i, txIn := range tx.TxIn { + // If this input doesn't spend an existing registered outpoint, + // we'll go on to the next. + prevOut := txIn.PreviousOutPoint + if _, ok := spendNtfns[prevOut]; !ok { + continue + } + + // Otherwise, we'll create a spend summary and send off the + // details to the notification subscribers. + txHash := tx.TxHash() + details := &SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &txHash, + SpendingTx: tx, + SpenderInputIndex: uint32(i), + SpendingHeight: txHeight, + } + if err := n.updateSpendDetails(prevOut, details); err != nil { + return err + } + } + + return nil +} + +// UpdateSpendDetails attempts to update the spend details for all active spend +// notification requests for an outpoint. This method should be used once a +// historical scan of the chain has finished. If the historical scan did not +// find a spending transaction for the outpoint, the spend details may be nil. +// +// NOTE: A notification request for the outpoint must be registered first to +// ensure notifications are delivered. +func (n *TxNotifier) UpdateSpendDetails(op wire.OutPoint, + details *SpendDetail) error { + + select { + case <-n.quit: + return ErrTxNotifierExiting + default: + } + + // Ensure we hold the lock throughout handling the notification to + // prevent the notifier from advancing its height underneath us. + n.Lock() + defer n.Unlock() + + return n.updateSpendDetails(op, details) +} + +// updateSpendDetails attempts to update the spend details for all active spend +// notification requests for an outpoint. This method should be used once a +// historical scan of the chain has finished. If the historical scan did not +// find a spending transaction for the outpoint, the spend details may be nil. +// +// NOTE: This method must be called with the TxNotifier's lock held. +func (n *TxNotifier) updateSpendDetails(op wire.OutPoint, + details *SpendDetail) error { + + // Mark the ongoing historical rescan for this outpoint as finished. + // This will allow us to update the spend hints for this outpoint at + // tip. + spendSet, ok := n.spendNotifications[op] + if !ok { + return fmt.Errorf("no notifications found for outpoint %v", op) + } + + // If the spend details have already been found either at tip, then the + // notifications should have already been dispatched, so we can exit + // early to prevent sending duplicate notifications. + if spendSet.details != nil { + return nil + } + + // Since the historical rescan has completed for this outpoint, we'll + // mark its rescan status as complete in order to ensure that the + // TxNotifier can properly update its spend hints upon + // connected/disconnected blocks. + spendSet.rescanStatus = rescanComplete + + // If the historical rescan was not able to find a spending transaction + // for this outpoint, then we can track the spend at tip. + if details == nil { + return nil + } + + // If the historical rescan found the spending transaction for this + // outpoint, but it's at a later height than the notifier (this can + // happen due to latency with the backend during a reorg), then we'll + // defer handling the notification until the notifier has caught up to + // such height. + if uint32(details.SpendingHeight) > n.currentHeight { + return nil + } + + // Now that we've determined the outpoint has been spent, we'll commit + // its spending height as its hint in the cache and dispatch + // notifications to all of its respective clients. + err := n.spendHintCache.CommitSpendHint( + uint32(details.SpendingHeight), op, + ) + if err != nil { + // The error is not fatal as this is an optimistic optimization, + // so we'll avoid returning an error. + Log.Debugf("Unable to update spend hint to %d for %v: %v", + details.SpendingHeight, op, err) + } + + spendSet.details = details + for _, ntfn := range spendSet.ntfns { + err := n.dispatchSpendDetails(ntfn, spendSet.details) + if err != nil { + return err + } + } + + return nil +} + +// dispatchSpendDetails dispatches a spend notification to the client. +// +// NOTE: This must be called with the TxNotifier's lock held. +func (n *TxNotifier) dispatchSpendDetails(ntfn *SpendNtfn, details *SpendDetail) error { + // If there are no spend details to dispatch or if the notification has + // already been dispatched, then we can skip dispatching to this client. + if details == nil || ntfn.dispatched { + return nil + } + + Log.Infof("Dispatching spend notification for outpoint=%v at height=%d", + ntfn.OutPoint, n.currentHeight) + + select { + case ntfn.Event.Spend <- details: + ntfn.dispatched = true + case <-n.quit: + return ErrTxNotifierExiting + } + + return nil +} + // ConnectTip handles a new block extending the current chain. This checks each // transaction in the block to see if any watched transactions are included. // Also, if any watched transactions now have the required number of @@ -890,13 +1184,16 @@ func (n *TxNotifier) TearDown() { for _, confSet := range n.confNotifications { for _, ntfn := range confSet.ntfns { - if ntfn.dispatched { - continue - } - close(ntfn.Event.Confirmed) close(ntfn.Event.Updates) close(ntfn.Event.NegativeConf) } } + + for _, spendSet := range n.spendNotifications { + for _, ntfn := range spendSet.ntfns { + close(ntfn.Event.Spend) + close(ntfn.Event.Reorg) + } + } }