You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1977 lines
64 KiB
1977 lines
64 KiB
package chainntnfs |
|
|
|
import ( |
|
"bytes" |
|
"errors" |
|
"fmt" |
|
"sync" |
|
"sync/atomic" |
|
|
|
"github.com/btcsuite/btcd/chaincfg/chainhash" |
|
"github.com/btcsuite/btcd/txscript" |
|
"github.com/btcsuite/btcd/wire" |
|
"github.com/btcsuite/btcutil" |
|
"github.com/lightningnetwork/lnd/channeldb" |
|
) |
|
|
|
const ( |
|
// ReorgSafetyLimit is the chain depth beyond which it is assumed a |
|
// block will not be reorganized out of the chain. This is used to |
|
// determine when to prune old confirmation requests so that reorgs are |
|
// handled correctly. The average number of blocks in a day is a |
|
// reasonable value to use. |
|
ReorgSafetyLimit = 144 |
|
|
|
// MaxNumConfs is the maximum number of confirmations that can be |
|
// requested on a transaction. |
|
MaxNumConfs = ReorgSafetyLimit |
|
) |
|
|
|
var ( |
|
// ZeroHash is the value that should be used as the txid when |
|
// registering for the confirmation of a script on-chain. This allows |
|
// the notifier to match _and_ dispatch upon the inclusion of the script |
|
// on-chain, rather than the txid. |
|
ZeroHash chainhash.Hash |
|
|
|
// ZeroOutPoint is the value that should be used as the outpoint when |
|
// registering for the spend of a script on-chain. This allows the |
|
// notifier to match _and_ dispatch upon detecting the spend of the |
|
// script on-chain, rather than the outpoint. |
|
ZeroOutPoint wire.OutPoint |
|
) |
|
|
|
var ( |
|
// ErrTxNotifierExiting is an error returned when attempting to interact |
|
// with the TxNotifier but it been shut down. |
|
ErrTxNotifierExiting = errors.New("TxNotifier is exiting") |
|
|
|
// ErrNoScript is an error returned when a confirmation/spend |
|
// registration is attempted without providing an accompanying output |
|
// script. |
|
ErrNoScript = errors.New("an output script must be provided") |
|
|
|
// ErrNoHeightHint is an error returned when a confirmation/spend |
|
// registration is attempted without providing an accompanying height |
|
// hint. |
|
ErrNoHeightHint = errors.New("a height hint greater than 0 must be " + |
|
"provided") |
|
|
|
// ErrNumConfsOutOfRange is an error returned when a confirmation/spend |
|
// registration is attempted and the number of confirmations provided is |
|
// out of range. |
|
ErrNumConfsOutOfRange = fmt.Errorf("number of confirmations must be "+ |
|
"between %d and %d", 1, MaxNumConfs) |
|
) |
|
|
|
// rescanState indicates the progression of a registration before the notifier |
|
// can begin dispatching confirmations at tip. |
|
type rescanState byte |
|
|
|
const ( |
|
// rescanNotStarted is the initial state, denoting that a historical |
|
// dispatch may be required. |
|
rescanNotStarted rescanState = iota |
|
|
|
// rescanPending indicates that a dispatch has already been made, and we |
|
// are waiting for its completion. No other rescans should be dispatched |
|
// while in this state. |
|
rescanPending |
|
|
|
// rescanComplete signals either that a rescan was dispatched and has |
|
// completed, or that we began watching at tip immediately. In either |
|
// case, the notifier can only dispatch notifications from tip when in |
|
// this state. |
|
rescanComplete |
|
) |
|
|
|
// confNtfnSet holds all known, registered confirmation notifications for a |
|
// txid/output script. If duplicates notifications are requested, only one |
|
// historical dispatch will be spawned to ensure redundant scans are not |
|
// permitted. A single conf detail will be constructed and dispatched to all |
|
// interested |
|
// clients. |
|
type confNtfnSet struct { |
|
// ntfns keeps tracks of all the active client notification requests for |
|
// a transaction/output script |
|
ntfns map[uint64]*ConfNtfn |
|
|
|
// rescanStatus represents the current rescan state for the |
|
// transaction/output script. |
|
rescanStatus rescanState |
|
|
|
// details serves as a cache of the confirmation details of a |
|
// transaction that we'll use to determine if a transaction/output |
|
// script has already confirmed at the time of registration. |
|
// details is also used to make sure that in case of an address reuse |
|
// (funds sent to a previously confirmed script) no additional |
|
// notification is registered which would lead to an inconsistent state. |
|
details *TxConfirmation |
|
} |
|
|
|
// newConfNtfnSet constructs a fresh confNtfnSet for a group of clients |
|
// interested in a notification for a particular txid. |
|
func newConfNtfnSet() *confNtfnSet { |
|
return &confNtfnSet{ |
|
ntfns: make(map[uint64]*ConfNtfn), |
|
rescanStatus: rescanNotStarted, |
|
} |
|
} |
|
|
|
// spendNtfnSet holds all known, registered spend notifications for a spend |
|
// request (outpoint/output script). If duplicate notifications are requested, |
|
// only one historical dispatch will be spawned to ensure redundant scans are |
|
// not permitted. |
|
type spendNtfnSet struct { |
|
// ntfns keeps tracks of all the active client notification requests for |
|
// an outpoint/output script. |
|
ntfns map[uint64]*SpendNtfn |
|
|
|
// rescanStatus represents the current rescan state for the spend |
|
// request (outpoint/output script). |
|
rescanStatus rescanState |
|
|
|
// details serves as a cache of the spend details for an outpoint/output |
|
// script that we'll use to determine if it has already been spent at |
|
// the time of registration. |
|
details *SpendDetail |
|
} |
|
|
|
// newSpendNtfnSet constructs a new spend notification set. |
|
func newSpendNtfnSet() *spendNtfnSet { |
|
return &spendNtfnSet{ |
|
ntfns: make(map[uint64]*SpendNtfn), |
|
rescanStatus: rescanNotStarted, |
|
} |
|
} |
|
|
|
// ConfRequest encapsulates a request for a confirmation notification of either |
|
// a txid or output script. |
|
type ConfRequest struct { |
|
// TxID is the hash of the transaction for which confirmation |
|
// notifications are requested. If set to a zero hash, then a |
|
// confirmation notification will be dispatched upon inclusion of the |
|
// _script_, rather than the txid. |
|
TxID chainhash.Hash |
|
|
|
// PkScript is the public key script of an outpoint created in this |
|
// transaction. |
|
PkScript txscript.PkScript |
|
} |
|
|
|
// NewConfRequest creates a request for a confirmation notification of either a |
|
// txid or output script. A nil txid or an allocated ZeroHash can be used to |
|
// dispatch the confirmation notification on the script. |
|
func NewConfRequest(txid *chainhash.Hash, pkScript []byte) (ConfRequest, error) { |
|
var r ConfRequest |
|
outputScript, err := txscript.ParsePkScript(pkScript) |
|
if err != nil { |
|
return r, err |
|
} |
|
|
|
// We'll only set a txid for which we'll dispatch a confirmation |
|
// notification on this request if one was provided. Otherwise, we'll |
|
// default to dispatching on the confirmation of the script instead. |
|
if txid != nil { |
|
r.TxID = *txid |
|
} |
|
r.PkScript = outputScript |
|
|
|
return r, nil |
|
} |
|
|
|
// String returns the string representation of the ConfRequest. |
|
func (r ConfRequest) String() string { |
|
if r.TxID != ZeroHash { |
|
return fmt.Sprintf("txid=%v", r.TxID) |
|
} |
|
return fmt.Sprintf("script=%v", r.PkScript) |
|
} |
|
|
|
// ConfHintKey returns the key that will be used to index the confirmation |
|
// request's hint within the height hint cache. |
|
func (r ConfRequest) ConfHintKey() ([]byte, error) { |
|
if r.TxID == ZeroHash { |
|
return r.PkScript.Script(), nil |
|
} |
|
|
|
var txid bytes.Buffer |
|
if err := channeldb.WriteElement(&txid, r.TxID); err != nil { |
|
return nil, err |
|
} |
|
|
|
return txid.Bytes(), nil |
|
} |
|
|
|
// MatchesTx determines whether the given transaction satisfies the confirmation |
|
// request. If the confirmation request is for a script, then we'll check all of |
|
// the outputs of the transaction to determine if it matches. Otherwise, we'll |
|
// match on the txid. |
|
func (r ConfRequest) MatchesTx(tx *wire.MsgTx) bool { |
|
scriptMatches := func() bool { |
|
pkScript := r.PkScript.Script() |
|
for _, txOut := range tx.TxOut { |
|
if bytes.Equal(txOut.PkScript, pkScript) { |
|
return true |
|
} |
|
} |
|
|
|
return false |
|
} |
|
|
|
if r.TxID != ZeroHash { |
|
return r.TxID == tx.TxHash() && scriptMatches() |
|
} |
|
|
|
return scriptMatches() |
|
} |
|
|
|
// ConfNtfn represents a notifier client's request to receive a notification |
|
// once the target transaction/output script gets sufficient confirmations. The |
|
// client is asynchronously notified via the ConfirmationEvent channels. |
|
type ConfNtfn struct { |
|
// ConfID uniquely identifies the confirmation notification request for |
|
// the specified transaction/output script. |
|
ConfID uint64 |
|
|
|
// ConfRequest represents either the txid or script we should detect |
|
// inclusion of within the chain. |
|
ConfRequest |
|
|
|
// NumConfirmations is the number of confirmations after which the |
|
// notification is to be sent. |
|
NumConfirmations uint32 |
|
|
|
// Event contains references to the channels that the notifications are to |
|
// be sent over. |
|
Event *ConfirmationEvent |
|
|
|
// HeightHint is the minimum height in the chain that we expect to find |
|
// this txid. |
|
HeightHint uint32 |
|
|
|
// dispatched is false if the confirmed notification has not been sent yet. |
|
dispatched bool |
|
} |
|
|
|
// HistoricalConfDispatch parameterizes a manual rescan for a particular |
|
// transaction/output script. The parameters include the start and end block |
|
// heights specifying the range of blocks to scan. |
|
type HistoricalConfDispatch struct { |
|
// ConfRequest represents either the txid or script we should detect |
|
// inclusion of within the chain. |
|
ConfRequest |
|
|
|
// StartHeight specifies the block height at which to begin the |
|
// historical rescan. |
|
StartHeight uint32 |
|
|
|
// EndHeight specifies the last block height (inclusive) that the |
|
// historical scan should consider. |
|
EndHeight uint32 |
|
} |
|
|
|
// ConfRegistration encompasses all of the information required for callers to |
|
// retrieve details about a confirmation event. |
|
type ConfRegistration struct { |
|
// Event contains references to the channels that the notifications are |
|
// to be sent over. |
|
Event *ConfirmationEvent |
|
|
|
// HistoricalDispatch, if non-nil, signals to the client who registered |
|
// the notification that they are responsible for attempting to manually |
|
// rescan blocks for the txid/output script between the start and end |
|
// heights. |
|
HistoricalDispatch *HistoricalConfDispatch |
|
|
|
// Height is the height of the TxNotifier at the time the confirmation |
|
// notification was registered. This can be used so that backends can |
|
// request to be notified of confirmations from this point forwards. |
|
Height uint32 |
|
} |
|
|
|
// SpendRequest encapsulates a request for a spend notification of either an |
|
// outpoint or output script. |
|
type SpendRequest struct { |
|
// OutPoint is the outpoint for which a client has requested a spend |
|
// notification for. If set to a zero outpoint, then a spend |
|
// notification will be dispatched upon detecting the spend of the |
|
// _script_, rather than the outpoint. |
|
OutPoint wire.OutPoint |
|
|
|
// PkScript is the script of the outpoint. If a zero outpoint is set, |
|
// then this can be an arbitrary script. |
|
PkScript txscript.PkScript |
|
} |
|
|
|
// NewSpendRequest creates a request for a spend notification of either an |
|
// outpoint or output script. A nil outpoint or an allocated ZeroOutPoint can be |
|
// used to dispatch the confirmation notification on the script. |
|
func NewSpendRequest(op *wire.OutPoint, pkScript []byte) (SpendRequest, error) { |
|
var r SpendRequest |
|
outputScript, err := txscript.ParsePkScript(pkScript) |
|
if err != nil { |
|
return r, err |
|
} |
|
|
|
// We'll only set an outpoint for which we'll dispatch a spend |
|
// notification on this request if one was provided. Otherwise, we'll |
|
// default to dispatching on the spend of the script instead. |
|
if op != nil { |
|
r.OutPoint = *op |
|
} |
|
r.PkScript = outputScript |
|
|
|
return r, nil |
|
} |
|
|
|
// String returns the string representation of the SpendRequest. |
|
func (r SpendRequest) String() string { |
|
if r.OutPoint != ZeroOutPoint { |
|
return fmt.Sprintf("outpoint=%v, script=%v", r.OutPoint, |
|
r.PkScript) |
|
} |
|
return fmt.Sprintf("outpoint=<zero>, script=%v", r.PkScript) |
|
} |
|
|
|
// SpendHintKey returns the key that will be used to index the spend request's |
|
// hint within the height hint cache. |
|
func (r SpendRequest) SpendHintKey() ([]byte, error) { |
|
if r.OutPoint == ZeroOutPoint { |
|
return r.PkScript.Script(), nil |
|
} |
|
|
|
var outpoint bytes.Buffer |
|
err := channeldb.WriteElement(&outpoint, r.OutPoint) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return outpoint.Bytes(), nil |
|
} |
|
|
|
// MatchesTx determines whether the given transaction satisfies the spend |
|
// request. If the spend request is for an outpoint, then we'll check all of |
|
// the outputs being spent by the inputs of the transaction to determine if it |
|
// matches. Otherwise, we'll need to match on the output script being spent, so |
|
// we'll recompute it for each input of the transaction to determine if it |
|
// matches. |
|
func (r SpendRequest) MatchesTx(tx *wire.MsgTx) (bool, uint32, error) { |
|
if r.OutPoint != ZeroOutPoint { |
|
for i, txIn := range tx.TxIn { |
|
if txIn.PreviousOutPoint == r.OutPoint { |
|
return true, uint32(i), nil |
|
} |
|
} |
|
|
|
return false, 0, nil |
|
} |
|
|
|
for i, txIn := range tx.TxIn { |
|
pkScript, err := txscript.ComputePkScript( |
|
txIn.SignatureScript, txIn.Witness, |
|
) |
|
if err == txscript.ErrUnsupportedScriptType { |
|
continue |
|
} |
|
if err != nil { |
|
return false, 0, err |
|
} |
|
|
|
if bytes.Equal(pkScript.Script(), r.PkScript.Script()) { |
|
return true, uint32(i), nil |
|
} |
|
} |
|
|
|
return false, 0, nil |
|
} |
|
|
|
// SpendNtfn represents a client's request to receive a notification once an |
|
// outpoint/output script has been spent on-chain. The client is asynchronously |
|
// notified via the SpendEvent channels. |
|
type SpendNtfn struct { |
|
// SpendID uniquely identies the spend notification request for the |
|
// specified outpoint/output script. |
|
SpendID uint64 |
|
|
|
// SpendRequest represents either the outpoint or script we should |
|
// detect the spend of. |
|
SpendRequest |
|
|
|
// Event contains references to the channels that the notifications are |
|
// to be sent over. |
|
Event *SpendEvent |
|
|
|
// HeightHint is the earliest height in the chain that we expect to find |
|
// the spending transaction of the specified outpoint/output script. |
|
// This value will be overridden by the spend hint cache if it contains |
|
// an entry for it. |
|
HeightHint uint32 |
|
|
|
// dispatched signals whether a spend notification has been disptached |
|
// to the client. |
|
dispatched bool |
|
} |
|
|
|
// HistoricalSpendDispatch parameterizes a manual rescan to determine the |
|
// spending details (if any) of an outpoint/output script. The parameters |
|
// include the start and end block heights specifying the range of blocks to |
|
// scan. |
|
type HistoricalSpendDispatch struct { |
|
// SpendRequest represents either the outpoint or script we should |
|
// detect the spend of. |
|
SpendRequest |
|
|
|
// StartHeight specified the block height at which to begin the |
|
// historical rescan. |
|
StartHeight uint32 |
|
|
|
// EndHeight specifies the last block height (inclusive) that the |
|
// historical rescan should consider. |
|
EndHeight uint32 |
|
} |
|
|
|
// SpendRegistration encompasses all of the information required for callers to |
|
// retrieve details about a spend event. |
|
type SpendRegistration struct { |
|
// Event contains references to the channels that the notifications are |
|
// to be sent over. |
|
Event *SpendEvent |
|
|
|
// HistoricalDispatch, if non-nil, signals to the client who registered |
|
// the notification that they are responsible for attempting to manually |
|
// rescan blocks for the txid/output script between the start and end |
|
// heights. |
|
HistoricalDispatch *HistoricalSpendDispatch |
|
|
|
// Height is the height of the TxNotifier at the time the spend |
|
// notification was registered. This can be used so that backends can |
|
// request to be notified of spends from this point forwards. |
|
Height uint32 |
|
} |
|
|
|
// TxNotifier is a struct responsible for delivering transaction notifications |
|
// to subscribers. These notifications can be of two different types: |
|
// transaction/output script confirmations and/or outpoint/output script spends. |
|
// The TxNotifier will watch the blockchain as new blocks come in, in order to |
|
// satisfy its client requests. |
|
type TxNotifier struct { |
|
confClientCounter uint64 // To be used atomically. |
|
spendClientCounter uint64 // To be used atomically. |
|
|
|
// currentHeight is the height of the tracked blockchain. It is used to |
|
// determine the number of confirmations a tx has and ensure blocks are |
|
// connected and disconnected in order. |
|
currentHeight uint32 |
|
|
|
// reorgSafetyLimit is the chain depth beyond which it is assumed a |
|
// block will not be reorganized out of the chain. This is used to |
|
// determine when to prune old notification requests so that reorgs are |
|
// handled correctly. The coinbase maturity period is a reasonable value |
|
// to use. |
|
reorgSafetyLimit uint32 |
|
|
|
// reorgDepth is the depth of a chain organization that this system is |
|
// being informed of. This is incremented as long as a sequence of |
|
// blocks are disconnected without being interrupted by a new block. |
|
reorgDepth uint32 |
|
|
|
// confNotifications is an index of confirmation notification requests |
|
// by transaction hash/output script. |
|
confNotifications map[ConfRequest]*confNtfnSet |
|
|
|
// confsByInitialHeight is an index of watched transactions/output |
|
// scripts by the height that they are included at in the chain. This |
|
// is tracked so that incorrect notifications are not sent if a |
|
// transaction/output script is reorged out of the chain and so that |
|
// negative confirmations can be recognized. |
|
confsByInitialHeight map[uint32]map[ConfRequest]struct{} |
|
|
|
// ntfnsByConfirmHeight is an index of notification requests by the |
|
// height at which the transaction/output script will have sufficient |
|
// confirmations. |
|
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{} |
|
|
|
// spendNotifications is an index of all active notification requests |
|
// per outpoint/output script. |
|
spendNotifications map[SpendRequest]*spendNtfnSet |
|
|
|
// spendsByHeight is an index that keeps tracks of the spending height |
|
// of outpoints/output scripts we are currently tracking notifications |
|
// for. This is used in order to recover from spending transactions |
|
// being reorged out of the chain. |
|
spendsByHeight map[uint32]map[SpendRequest]struct{} |
|
|
|
// confirmHintCache is a cache used to maintain the latest height hints |
|
// for transactions/output scripts. Each height hint represents the |
|
// earliest height at which they scripts could have been confirmed |
|
// within the chain. |
|
confirmHintCache ConfirmHintCache |
|
|
|
// spendHintCache is a cache used to maintain the latest height hints |
|
// for outpoints/output scripts. Each height hint represents the |
|
// earliest height at which they could have been spent within the chain. |
|
spendHintCache SpendHintCache |
|
|
|
// quit is closed in order to signal that the notifier is gracefully |
|
// exiting. |
|
quit chan struct{} |
|
|
|
sync.Mutex |
|
} |
|
|
|
// NewTxNotifier creates a TxNotifier. The current height of the blockchain is |
|
// accepted as a parameter. The different hint caches (confirm and spend) are |
|
// used as an optimization in order to retrieve a better starting point when |
|
// dispatching a recan for a historical event in the chain. |
|
func NewTxNotifier(startHeight uint32, reorgSafetyLimit uint32, |
|
confirmHintCache ConfirmHintCache, |
|
spendHintCache SpendHintCache) *TxNotifier { |
|
|
|
return &TxNotifier{ |
|
currentHeight: startHeight, |
|
reorgSafetyLimit: reorgSafetyLimit, |
|
confNotifications: make(map[ConfRequest]*confNtfnSet), |
|
confsByInitialHeight: make(map[uint32]map[ConfRequest]struct{}), |
|
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), |
|
spendNotifications: make(map[SpendRequest]*spendNtfnSet), |
|
spendsByHeight: make(map[uint32]map[SpendRequest]struct{}), |
|
confirmHintCache: confirmHintCache, |
|
spendHintCache: spendHintCache, |
|
quit: make(chan struct{}), |
|
} |
|
} |
|
|
|
// newConfNtfn validates all of the parameters required to successfully create |
|
// and register a confirmation notification. |
|
func (n *TxNotifier) newConfNtfn(txid *chainhash.Hash, |
|
pkScript []byte, numConfs, heightHint uint32) (*ConfNtfn, error) { |
|
|
|
// An accompanying output script must always be provided. |
|
if len(pkScript) == 0 { |
|
return nil, ErrNoScript |
|
} |
|
|
|
// Enforce that we will not dispatch confirmations beyond the reorg |
|
// safety limit. |
|
if numConfs == 0 || numConfs > n.reorgSafetyLimit { |
|
return nil, ErrNumConfsOutOfRange |
|
} |
|
|
|
// A height hint must be provided to prevent scanning from the genesis |
|
// block. |
|
if heightHint == 0 { |
|
return nil, ErrNoHeightHint |
|
} |
|
|
|
// Ensure the output script is of a supported type. |
|
confRequest, err := NewConfRequest(txid, pkScript) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
confID := atomic.AddUint64(&n.confClientCounter, 1) |
|
return &ConfNtfn{ |
|
ConfID: confID, |
|
ConfRequest: confRequest, |
|
NumConfirmations: numConfs, |
|
Event: NewConfirmationEvent(numConfs, func() { |
|
n.CancelConf(confRequest, confID) |
|
}), |
|
HeightHint: heightHint, |
|
}, nil |
|
} |
|
|
|
// RegisterConf handles a new confirmation notification request. The client will |
|
// be notified when the transaction/output script gets a sufficient number of |
|
// confirmations in the blockchain. |
|
// |
|
// NOTE: If the transaction/output script has already been included in a block |
|
// on the chain, the confirmation details must be provided with the |
|
// UpdateConfDetails method, otherwise we will wait for the transaction/output |
|
// script to confirm even though it already has. |
|
func (n *TxNotifier) RegisterConf(txid *chainhash.Hash, pkScript []byte, |
|
numConfs, heightHint uint32) (*ConfRegistration, error) { |
|
|
|
select { |
|
case <-n.quit: |
|
return nil, ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
// We'll start by performing a series of validation checks. |
|
ntfn, err := n.newConfNtfn(txid, pkScript, numConfs, heightHint) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// Before proceeding to register the notification, we'll query our |
|
// height hint cache to determine whether a better one exists. |
|
// |
|
// TODO(conner): verify that all submitted height hints are identical. |
|
startHeight := ntfn.HeightHint |
|
hint, err := n.confirmHintCache.QueryConfirmHint(ntfn.ConfRequest) |
|
if err == nil { |
|
if hint > startHeight { |
|
Log.Debugf("Using height hint %d retrieved from cache "+ |
|
"for %v instead of %d", hint, ntfn.ConfRequest, |
|
startHeight) |
|
startHeight = hint |
|
} |
|
} else if err != ErrConfirmHintNotFound { |
|
Log.Errorf("Unable to query confirm hint for %v: %v", |
|
ntfn.ConfRequest, err) |
|
} |
|
|
|
Log.Infof("New confirmation subscription: conf_id=%d, %v, "+ |
|
"num_confs=%v height_hint=%d", ntfn.ConfID, ntfn.ConfRequest, |
|
numConfs, startHeight) |
|
|
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
confSet, ok := n.confNotifications[ntfn.ConfRequest] |
|
if !ok { |
|
// If this is the first registration for this request, construct |
|
// a confSet to coalesce all notifications for the same request. |
|
confSet = newConfNtfnSet() |
|
n.confNotifications[ntfn.ConfRequest] = confSet |
|
} |
|
confSet.ntfns[ntfn.ConfID] = ntfn |
|
|
|
switch confSet.rescanStatus { |
|
|
|
// A prior rescan has already completed and we are actively watching at |
|
// tip for this request. |
|
case rescanComplete: |
|
// If the confirmation details for this set of notifications has |
|
// already been found, we'll attempt to deliver them immediately |
|
// to this client. |
|
Log.Debugf("Attempting to dispatch confirmation for %v on "+ |
|
"registration since rescan has finished", |
|
ntfn.ConfRequest) |
|
|
|
err := n.dispatchConfDetails(ntfn, confSet.details) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return &ConfRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: nil, |
|
Height: n.currentHeight, |
|
}, nil |
|
|
|
// A rescan is already in progress, return here to prevent dispatching |
|
// another. When the rescan returns, this notification's details will be |
|
// updated as well. |
|
case rescanPending: |
|
Log.Debugf("Waiting for pending rescan to finish before "+ |
|
"notifying %v at tip", ntfn.ConfRequest) |
|
|
|
return &ConfRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: nil, |
|
Height: n.currentHeight, |
|
}, nil |
|
|
|
// If no rescan has been dispatched, attempt to do so now. |
|
case rescanNotStarted: |
|
} |
|
|
|
// If the provided or cached height hint indicates that the |
|
// transaction with the given txid/output script is to be confirmed at a |
|
// height greater than the notifier's current height, we'll refrain from |
|
// spawning a historical dispatch. |
|
if startHeight > n.currentHeight { |
|
Log.Debugf("Height hint is above current height, not "+ |
|
"dispatching historical confirmation rescan for %v", |
|
ntfn.ConfRequest) |
|
|
|
// Set the rescan status to complete, which will allow the |
|
// notifier to start delivering messages for this set |
|
// immediately. |
|
confSet.rescanStatus = rescanComplete |
|
return &ConfRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: nil, |
|
Height: n.currentHeight, |
|
}, nil |
|
} |
|
|
|
Log.Debugf("Dispatching historical confirmation rescan for %v", |
|
ntfn.ConfRequest) |
|
|
|
// Construct the parameters for historical dispatch, scanning the range |
|
// of blocks between our best known height hint and the notifier's |
|
// current height. The notifier will begin also watching for |
|
// confirmations at tip starting with the next block. |
|
dispatch := &HistoricalConfDispatch{ |
|
ConfRequest: ntfn.ConfRequest, |
|
StartHeight: startHeight, |
|
EndHeight: n.currentHeight, |
|
} |
|
|
|
// Set this confSet's status to pending, ensuring subsequent |
|
// registrations don't also attempt a dispatch. |
|
confSet.rescanStatus = rescanPending |
|
|
|
return &ConfRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: dispatch, |
|
Height: n.currentHeight, |
|
}, nil |
|
} |
|
|
|
// CancelConf cancels an existing request for a spend notification of an |
|
// outpoint/output script. The request is identified by its spend ID. |
|
func (n *TxNotifier) CancelConf(confRequest ConfRequest, confID uint64) { |
|
select { |
|
case <-n.quit: |
|
return |
|
default: |
|
} |
|
|
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
confSet, ok := n.confNotifications[confRequest] |
|
if !ok { |
|
return |
|
} |
|
ntfn, ok := confSet.ntfns[confID] |
|
if !ok { |
|
return |
|
} |
|
|
|
Log.Infof("Canceling confirmation notification: conf_id=%d, %v", confID, |
|
confRequest) |
|
|
|
// We'll close all the notification channels to let the client know |
|
// their cancel request has been fulfilled. |
|
close(ntfn.Event.Confirmed) |
|
close(ntfn.Event.Updates) |
|
close(ntfn.Event.NegativeConf) |
|
|
|
// Finally, we'll clean up any lingering references to this |
|
// notification. |
|
delete(confSet.ntfns, confID) |
|
|
|
// Remove the queued confirmation notification if the transaction has |
|
// already confirmed, but hasn't met its required number of |
|
// confirmations. |
|
if confSet.details != nil { |
|
confHeight := confSet.details.BlockHeight + |
|
ntfn.NumConfirmations - 1 |
|
delete(n.ntfnsByConfirmHeight[confHeight], ntfn) |
|
} |
|
} |
|
|
|
// UpdateConfDetails attempts to update the confirmation details for an active |
|
// notification within the notifier. This should only be used in the case of a |
|
// transaction/output script that has confirmed before the notifier's current |
|
// height. |
|
// |
|
// NOTE: The notification should be registered first to ensure notifications are |
|
// dispatched correctly. |
|
func (n *TxNotifier) UpdateConfDetails(confRequest ConfRequest, |
|
details *TxConfirmation) error { |
|
|
|
select { |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
// Ensure we hold the lock throughout handling the notification to |
|
// prevent the notifier from advancing its height underneath us. |
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
// First, we'll determine whether we have an active confirmation |
|
// notification for the given txid/script. |
|
confSet, ok := n.confNotifications[confRequest] |
|
if !ok { |
|
return fmt.Errorf("confirmation notification for %v not found", |
|
confRequest) |
|
} |
|
|
|
// If the confirmation details were already found at tip, all existing |
|
// notifications will have been dispatched or queued for dispatch. We |
|
// can exit early to avoid sending too many notifications on the |
|
// buffered channels. |
|
if confSet.details != nil { |
|
return nil |
|
} |
|
|
|
// The historical dispatch has been completed for this confSet. We'll |
|
// update the rescan status and cache any details that were found. If |
|
// the details are nil, that implies we did not find them and will |
|
// continue to watch for them at tip. |
|
confSet.rescanStatus = rescanComplete |
|
|
|
// The notifier has yet to reach the height at which the |
|
// transaction/output script was included in a block, so we should defer |
|
// until handling it then within ConnectTip. |
|
if details == nil { |
|
Log.Debugf("Confirmation details for %v not found during "+ |
|
"historical dispatch, waiting to dispatch at tip", |
|
confRequest) |
|
|
|
// We'll commit the current height as the confirm hint to |
|
// prevent another potentially long rescan if we restart before |
|
// a new block comes in. |
|
err := n.confirmHintCache.CommitConfirmHint( |
|
n.currentHeight, confRequest, |
|
) |
|
if err != nil { |
|
// The error is not fatal as this is an optimistic |
|
// optimization, so we'll avoid returning an error. |
|
Log.Debugf("Unable to update confirm hint to %d for "+ |
|
"%v: %v", n.currentHeight, confRequest, err) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
if details.BlockHeight > n.currentHeight { |
|
Log.Debugf("Confirmation details for %v found above current "+ |
|
"height, waiting to dispatch at tip", confRequest) |
|
|
|
return nil |
|
} |
|
|
|
Log.Debugf("Updating confirmation details for %v", confRequest) |
|
|
|
err := n.confirmHintCache.CommitConfirmHint( |
|
details.BlockHeight, confRequest, |
|
) |
|
if err != nil { |
|
// The error is not fatal, so we should not return an error to |
|
// the caller. |
|
Log.Errorf("Unable to update confirm hint to %d for %v: %v", |
|
details.BlockHeight, confRequest, err) |
|
} |
|
|
|
// Cache the details found in the rescan and attempt to dispatch any |
|
// notifications that have not yet been delivered. |
|
confSet.details = details |
|
for _, ntfn := range confSet.ntfns { |
|
err = n.dispatchConfDetails(ntfn, details) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// dispatchConfDetails attempts to cache and dispatch details to a particular |
|
// client if the transaction/output script has sufficiently confirmed. If the |
|
// provided details are nil, this method will be a no-op. |
|
func (n *TxNotifier) dispatchConfDetails( |
|
ntfn *ConfNtfn, details *TxConfirmation) error { |
|
|
|
// If no details are provided, return early as we can't dispatch. |
|
if details == nil { |
|
Log.Debugf("Unable to dispatch %v, no details provided", |
|
ntfn.ConfRequest) |
|
|
|
return nil |
|
} |
|
|
|
// Now, we'll examine whether the transaction/output script of this |
|
// request has reached its required number of confirmations. If it has, |
|
// we'll dispatch a confirmation notification to the caller. |
|
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 |
|
if confHeight <= n.currentHeight { |
|
Log.Infof("Dispatching %v confirmation notification for %v", |
|
ntfn.NumConfirmations, ntfn.ConfRequest) |
|
|
|
// We'll send a 0 value to the Updates channel, |
|
// indicating that the transaction/output script has already |
|
// been confirmed. |
|
select { |
|
case ntfn.Event.Updates <- 0: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
|
|
select { |
|
case ntfn.Event.Confirmed <- details: |
|
ntfn.dispatched = true |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
} else { |
|
Log.Debugf("Queueing %v confirmation notification for %v at tip ", |
|
ntfn.NumConfirmations, ntfn.ConfRequest) |
|
|
|
// Otherwise, we'll keep track of the notification |
|
// request by the height at which we should dispatch the |
|
// confirmation notification. |
|
ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight] |
|
if !exists { |
|
ntfnSet = make(map[*ConfNtfn]struct{}) |
|
n.ntfnsByConfirmHeight[confHeight] = ntfnSet |
|
} |
|
ntfnSet[ntfn] = struct{}{} |
|
|
|
// We'll also send an update to the client of how many |
|
// confirmations are left for the transaction/output script to |
|
// be confirmed. |
|
numConfsLeft := confHeight - n.currentHeight |
|
select { |
|
case ntfn.Event.Updates <- numConfsLeft: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
} |
|
|
|
// As a final check, we'll also watch the transaction/output script if |
|
// it's still possible for it to get reorged out of the chain. |
|
reorgSafeHeight := details.BlockHeight + n.reorgSafetyLimit |
|
if reorgSafeHeight > n.currentHeight { |
|
txSet, exists := n.confsByInitialHeight[details.BlockHeight] |
|
if !exists { |
|
txSet = make(map[ConfRequest]struct{}) |
|
n.confsByInitialHeight[details.BlockHeight] = txSet |
|
} |
|
txSet[ntfn.ConfRequest] = struct{}{} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// newSpendNtfn validates all of the parameters required to successfully create |
|
// and register a spend notification. |
|
func (n *TxNotifier) newSpendNtfn(outpoint *wire.OutPoint, |
|
pkScript []byte, heightHint uint32) (*SpendNtfn, error) { |
|
|
|
// An accompanying output script must always be provided. |
|
if len(pkScript) == 0 { |
|
return nil, ErrNoScript |
|
} |
|
|
|
// A height hint must be provided to prevent scanning from the genesis |
|
// block. |
|
if heightHint == 0 { |
|
return nil, ErrNoHeightHint |
|
} |
|
|
|
// Ensure the output script is of a supported type. |
|
spendRequest, err := NewSpendRequest(outpoint, pkScript) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
spendID := atomic.AddUint64(&n.spendClientCounter, 1) |
|
return &SpendNtfn{ |
|
SpendID: spendID, |
|
SpendRequest: spendRequest, |
|
Event: NewSpendEvent(func() { |
|
n.CancelSpend(spendRequest, spendID) |
|
}), |
|
HeightHint: heightHint, |
|
}, nil |
|
} |
|
|
|
// RegisterSpend handles a new spend notification request. The client will be |
|
// notified once the outpoint/output script is detected as spent within the |
|
// chain. |
|
// |
|
// NOTE: If the outpoint/output script has already been spent within the chain |
|
// before the notifier's current tip, the spend details must be provided with |
|
// the UpdateSpendDetails method, otherwise we will wait for the outpoint/output |
|
// script to be spent at tip, even though it already has. |
|
func (n *TxNotifier) RegisterSpend(outpoint *wire.OutPoint, pkScript []byte, |
|
heightHint uint32) (*SpendRegistration, error) { |
|
|
|
select { |
|
case <-n.quit: |
|
return nil, ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
// We'll start by performing a series of validation checks. |
|
ntfn, err := n.newSpendNtfn(outpoint, pkScript, heightHint) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// Before proceeding to register the notification, we'll query our spend |
|
// hint cache to determine whether a better one exists. |
|
startHeight := ntfn.HeightHint |
|
hint, err := n.spendHintCache.QuerySpendHint(ntfn.SpendRequest) |
|
if err == nil { |
|
if hint > startHeight { |
|
Log.Debugf("Using height hint %d retrieved from cache "+ |
|
"for %v instead of %d", hint, ntfn.SpendRequest, |
|
startHeight) |
|
startHeight = hint |
|
} |
|
} else if err != ErrSpendHintNotFound { |
|
Log.Errorf("Unable to query spend hint for %v: %v", |
|
ntfn.SpendRequest, err) |
|
} |
|
|
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
Log.Infof("New spend subscription: spend_id=%d, %v, height_hint=%d", |
|
ntfn.SpendID, ntfn.SpendRequest, startHeight) |
|
|
|
// Keep track of the notification request so that we can properly |
|
// dispatch a spend notification later on. |
|
spendSet, ok := n.spendNotifications[ntfn.SpendRequest] |
|
if !ok { |
|
// If this is the first registration for the request, we'll |
|
// construct a spendNtfnSet to coalesce all notifications. |
|
spendSet = newSpendNtfnSet() |
|
n.spendNotifications[ntfn.SpendRequest] = spendSet |
|
} |
|
spendSet.ntfns[ntfn.SpendID] = ntfn |
|
|
|
// We'll now let the caller know whether a historical rescan is needed |
|
// depending on the current rescan status. |
|
switch spendSet.rescanStatus { |
|
|
|
// If the spending details for this request have already been determined |
|
// and cached, then we can use them to immediately dispatch the spend |
|
// notification to the client. |
|
case rescanComplete: |
|
Log.Debugf("Attempting to dispatch spend for %v on "+ |
|
"registration since rescan has finished", |
|
ntfn.SpendRequest) |
|
|
|
err := n.dispatchSpendDetails(ntfn, spendSet.details) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return &SpendRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: nil, |
|
Height: n.currentHeight, |
|
}, nil |
|
|
|
// If there is an active rescan to determine whether the request has |
|
// been spent, then we won't trigger another one. |
|
case rescanPending: |
|
Log.Debugf("Waiting for pending rescan to finish before "+ |
|
"notifying %v at tip", ntfn.SpendRequest) |
|
|
|
return &SpendRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: nil, |
|
Height: n.currentHeight, |
|
}, nil |
|
|
|
// Otherwise, we'll fall through and let the caller know that a rescan |
|
// should be dispatched to determine whether the request has already |
|
// been spent. |
|
case rescanNotStarted: |
|
} |
|
|
|
// However, if the spend hint, either provided by the caller or |
|
// retrieved from the cache, is found to be at a later height than the |
|
// TxNotifier is aware of, then we'll refrain from dispatching a |
|
// historical rescan and wait for the spend to come in at tip. |
|
if startHeight > n.currentHeight { |
|
Log.Debugf("Spend hint of %d for %v is above current height %d", |
|
startHeight, ntfn.SpendRequest, n.currentHeight) |
|
|
|
// We'll also set the rescan status as complete to ensure that |
|
// spend hints for this request get updated upon |
|
// connected/disconnected blocks. |
|
spendSet.rescanStatus = rescanComplete |
|
return &SpendRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: nil, |
|
Height: n.currentHeight, |
|
}, nil |
|
} |
|
|
|
// We'll set the rescan status to pending to ensure subsequent |
|
// notifications don't also attempt a historical dispatch. |
|
spendSet.rescanStatus = rescanPending |
|
|
|
Log.Infof("Dispatching historical spend rescan for %v, start=%d, "+ |
|
"end=%d", ntfn.SpendRequest, startHeight, n.currentHeight) |
|
|
|
return &SpendRegistration{ |
|
Event: ntfn.Event, |
|
HistoricalDispatch: &HistoricalSpendDispatch{ |
|
SpendRequest: ntfn.SpendRequest, |
|
StartHeight: startHeight, |
|
EndHeight: n.currentHeight, |
|
}, |
|
Height: n.currentHeight, |
|
}, nil |
|
} |
|
|
|
// CancelSpend cancels an existing request for a spend notification of an |
|
// outpoint/output script. The request is identified by its spend ID. |
|
func (n *TxNotifier) CancelSpend(spendRequest SpendRequest, spendID uint64) { |
|
select { |
|
case <-n.quit: |
|
return |
|
default: |
|
} |
|
|
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
spendSet, ok := n.spendNotifications[spendRequest] |
|
if !ok { |
|
return |
|
} |
|
ntfn, ok := spendSet.ntfns[spendID] |
|
if !ok { |
|
return |
|
} |
|
|
|
Log.Infof("Canceling spend notification: spend_id=%d, %v", spendID, |
|
spendRequest) |
|
|
|
// We'll close all the notification channels to let the client know |
|
// their cancel request has been fulfilled. |
|
close(ntfn.Event.Spend) |
|
close(ntfn.Event.Reorg) |
|
close(ntfn.Event.Done) |
|
delete(spendSet.ntfns, spendID) |
|
} |
|
|
|
// ProcessRelevantSpendTx processes a transaction provided externally. This will |
|
// check whether the transaction is relevant to the notifier if it spends any |
|
// outpoints/output scripts for which we currently have registered notifications |
|
// for. If it is relevant, spend notifications will be dispatched to the caller. |
|
func (n *TxNotifier) ProcessRelevantSpendTx(tx *btcutil.Tx, |
|
blockHeight uint32) error { |
|
|
|
select { |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
// Ensure we hold the lock throughout handling the notification to |
|
// prevent the notifier from advancing its height underneath us. |
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
// We'll use a channel to coalesce all the spend requests that this |
|
// transaction fulfills. |
|
type spend struct { |
|
request *SpendRequest |
|
details *SpendDetail |
|
} |
|
|
|
// We'll set up the onSpend filter callback to gather all the fulfilled |
|
// spends requests within this transaction. |
|
var spends []spend |
|
onSpend := func(request SpendRequest, details *SpendDetail) { |
|
spends = append(spends, spend{&request, details}) |
|
} |
|
n.filterTx(tx, nil, blockHeight, nil, onSpend) |
|
|
|
// After the transaction has been filtered, we can finally dispatch |
|
// notifications for each request. |
|
for _, spend := range spends { |
|
err := n.updateSpendDetails(*spend.request, spend.details) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// UpdateSpendDetails attempts to update the spend details for all active spend |
|
// notification requests for an outpoint/output script. This method should be |
|
// used once a historical scan of the chain has finished. If the historical scan |
|
// did not find a spending transaction for it, the spend details may be nil. |
|
// |
|
// NOTE: A notification request for the outpoint/output script must be |
|
// registered first to ensure notifications are delivered. |
|
func (n *TxNotifier) UpdateSpendDetails(spendRequest SpendRequest, |
|
details *SpendDetail) error { |
|
|
|
select { |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
// Ensure we hold the lock throughout handling the notification to |
|
// prevent the notifier from advancing its height underneath us. |
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
return n.updateSpendDetails(spendRequest, details) |
|
} |
|
|
|
// updateSpendDetails attempts to update the spend details for all active spend |
|
// notification requests for an outpoint/output script. This method should be |
|
// used once a historical scan of the chain has finished. If the historical scan |
|
// did not find a spending transaction for it, the spend details may be nil. |
|
// |
|
// NOTE: This method must be called with the TxNotifier's lock held. |
|
func (n *TxNotifier) updateSpendDetails(spendRequest SpendRequest, |
|
details *SpendDetail) error { |
|
|
|
// Mark the ongoing historical rescan for this request as finished. This |
|
// will allow us to update the spend hints for it at tip. |
|
spendSet, ok := n.spendNotifications[spendRequest] |
|
if !ok { |
|
return fmt.Errorf("spend notification for %v not found", |
|
spendRequest) |
|
} |
|
|
|
// If the spend details have already been found either at tip, then the |
|
// notifications should have already been dispatched, so we can exit |
|
// early to prevent sending duplicate notifications. |
|
if spendSet.details != nil { |
|
return nil |
|
} |
|
|
|
// Since the historical rescan has completed for this request, we'll |
|
// mark its rescan status as complete in order to ensure that the |
|
// TxNotifier can properly update its spend hints upon |
|
// connected/disconnected blocks. |
|
spendSet.rescanStatus = rescanComplete |
|
|
|
// If the historical rescan was not able to find a spending transaction |
|
// for this request, then we can track the spend at tip. |
|
if details == nil { |
|
// We'll commit the current height as the spend hint to prevent |
|
// another potentially long rescan if we restart before a new |
|
// block comes in. |
|
err := n.spendHintCache.CommitSpendHint( |
|
n.currentHeight, spendRequest, |
|
) |
|
if err != nil { |
|
// The error is not fatal as this is an optimistic |
|
// optimization, so we'll avoid returning an error. |
|
Log.Debugf("Unable to update spend hint to %d for %v: %v", |
|
n.currentHeight, spendRequest, err) |
|
} |
|
|
|
Log.Debugf("Updated spend hint to height=%v for unconfirmed "+ |
|
"spend request %v", n.currentHeight, spendRequest) |
|
return nil |
|
} |
|
|
|
// If the historical rescan found the spending transaction for this |
|
// request, but it's at a later height than the notifier (this can |
|
// happen due to latency with the backend during a reorg), then we'll |
|
// defer handling the notification until the notifier has caught up to |
|
// such height. |
|
if uint32(details.SpendingHeight) > n.currentHeight { |
|
return nil |
|
} |
|
|
|
// Now that we've determined the request has been spent, we'll commit |
|
// its spending height as its hint in the cache and dispatch |
|
// notifications to all of its respective clients. |
|
err := n.spendHintCache.CommitSpendHint( |
|
uint32(details.SpendingHeight), spendRequest, |
|
) |
|
if err != nil { |
|
// The error is not fatal as this is an optimistic optimization, |
|
// so we'll avoid returning an error. |
|
Log.Debugf("Unable to update spend hint to %d for %v: %v", |
|
details.SpendingHeight, spendRequest, err) |
|
} |
|
|
|
Log.Debugf("Updated spend hint to height=%v for confirmed spend "+ |
|
"request %v", details.SpendingHeight, spendRequest) |
|
|
|
spendSet.details = details |
|
for _, ntfn := range spendSet.ntfns { |
|
err := n.dispatchSpendDetails(ntfn, spendSet.details) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// dispatchSpendDetails dispatches a spend notification to the client. |
|
// |
|
// NOTE: This must be called with the TxNotifier's lock held. |
|
func (n *TxNotifier) dispatchSpendDetails(ntfn *SpendNtfn, details *SpendDetail) error { |
|
// If there are no spend details to dispatch or if the notification has |
|
// already been dispatched, then we can skip dispatching to this client. |
|
if details == nil || ntfn.dispatched { |
|
Log.Debugf("Skipping dispatch of spend details(%v) for "+ |
|
"request %v, dispatched=%v", details, ntfn.SpendRequest, |
|
ntfn.dispatched) |
|
return nil |
|
} |
|
|
|
Log.Infof("Dispatching confirmed spend notification for %v at "+ |
|
"current height=%d: %v", ntfn.SpendRequest, n.currentHeight, |
|
details) |
|
|
|
select { |
|
case ntfn.Event.Spend <- details: |
|
ntfn.dispatched = true |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// ConnectTip handles a new block extending the current chain. It will go |
|
// through every transaction and determine if it is relevant to any of its |
|
// clients. A transaction can be relevant in either of the following two ways: |
|
// |
|
// 1. One of the inputs in the transaction spends an outpoint/output script |
|
// for which we currently have an active spend registration for. |
|
// |
|
// 2. The transaction has a txid or output script for which we currently have |
|
// an active confirmation registration for. |
|
// |
|
// In the event that the transaction is relevant, a confirmation/spend |
|
// notification will be queued for dispatch to the relevant clients. |
|
// Confirmation notifications will only be dispatched for transactions/output |
|
// scripts that have met the required number of confirmations required by the |
|
// client. |
|
// |
|
// NOTE: In order to actually dispatch the relevant transaction notifications to |
|
// clients, NotifyHeight must be called with the same block height in order to |
|
// maintain correctness. |
|
func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32, |
|
txns []*btcutil.Tx) error { |
|
|
|
select { |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
if blockHeight != n.currentHeight+1 { |
|
return fmt.Errorf("received blocks out of order: "+ |
|
"current height=%d, new height=%d", |
|
n.currentHeight, blockHeight) |
|
} |
|
n.currentHeight++ |
|
n.reorgDepth = 0 |
|
|
|
// First, we'll iterate over all the transactions found in this block to |
|
// determine if it includes any relevant transactions to the TxNotifier. |
|
Log.Debugf("Filtering %d txns for %d spend requests at height %d", |
|
len(txns), len(n.spendNotifications), blockHeight) |
|
for _, tx := range txns { |
|
n.filterTx( |
|
tx, blockHash, blockHeight, n.handleConfDetailsAtTip, |
|
n.handleSpendDetailsAtTip, |
|
) |
|
} |
|
|
|
// Now that we've determined which requests were confirmed and spent |
|
// within the new block, we can update their entries in their respective |
|
// caches, along with all of our unconfirmed and unspent requests. |
|
n.updateHints(blockHeight) |
|
|
|
// Finally, we'll clear the entries from our set of notifications for |
|
// requests that are no longer under the risk of being reorged out of |
|
// the chain. |
|
if blockHeight >= n.reorgSafetyLimit { |
|
matureBlockHeight := blockHeight - n.reorgSafetyLimit |
|
for confRequest := range n.confsByInitialHeight[matureBlockHeight] { |
|
confSet := n.confNotifications[confRequest] |
|
for _, ntfn := range confSet.ntfns { |
|
select { |
|
case ntfn.Event.Done <- struct{}{}: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
} |
|
|
|
delete(n.confNotifications, confRequest) |
|
} |
|
delete(n.confsByInitialHeight, matureBlockHeight) |
|
|
|
for spendRequest := range n.spendsByHeight[matureBlockHeight] { |
|
spendSet := n.spendNotifications[spendRequest] |
|
for _, ntfn := range spendSet.ntfns { |
|
select { |
|
case ntfn.Event.Done <- struct{}{}: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
} |
|
|
|
Log.Debugf("Deleting mature spend request %v at "+ |
|
"height=%d", spendRequest, blockHeight) |
|
delete(n.spendNotifications, spendRequest) |
|
} |
|
delete(n.spendsByHeight, matureBlockHeight) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// filterTx determines whether the transaction spends or confirms any |
|
// outstanding pending requests. The onConf and onSpend callbacks can be used to |
|
// retrieve all the requests fulfilled by this transaction as they occur. |
|
func (n *TxNotifier) filterTx(tx *btcutil.Tx, blockHash *chainhash.Hash, |
|
blockHeight uint32, onConf func(ConfRequest, *TxConfirmation), |
|
onSpend func(SpendRequest, *SpendDetail)) { |
|
|
|
// In order to determine if this transaction is relevant to the |
|
// notifier, we'll check its inputs for any outstanding spend |
|
// requests. |
|
txHash := tx.Hash() |
|
if onSpend != nil { |
|
// notifyDetails is a helper closure that will construct the |
|
// spend details of a request and hand them off to the onSpend |
|
// callback. |
|
notifyDetails := func(spendRequest SpendRequest, |
|
prevOut wire.OutPoint, inputIdx uint32) { |
|
|
|
Log.Debugf("Found spend of %v: spend_tx=%v, "+ |
|
"block_height=%d", spendRequest, txHash, |
|
blockHeight) |
|
|
|
onSpend(spendRequest, &SpendDetail{ |
|
SpentOutPoint: &prevOut, |
|
SpenderTxHash: txHash, |
|
SpendingTx: tx.MsgTx(), |
|
SpenderInputIndex: inputIdx, |
|
SpendingHeight: int32(blockHeight), |
|
}) |
|
} |
|
|
|
for i, txIn := range tx.MsgTx().TxIn { |
|
// We'll re-derive the script of the output being spent |
|
// to determine if the inputs spends any registered |
|
// requests. |
|
prevOut := txIn.PreviousOutPoint |
|
pkScript, err := txscript.ComputePkScript( |
|
txIn.SignatureScript, txIn.Witness, |
|
) |
|
if err != nil { |
|
continue |
|
} |
|
spendRequest := SpendRequest{ |
|
OutPoint: prevOut, |
|
PkScript: pkScript, |
|
} |
|
|
|
// If we have any, we'll record their spend height so |
|
// that notifications get dispatched to the respective |
|
// clients. |
|
if _, ok := n.spendNotifications[spendRequest]; ok { |
|
notifyDetails(spendRequest, prevOut, uint32(i)) |
|
} |
|
spendRequest.OutPoint = ZeroOutPoint |
|
if _, ok := n.spendNotifications[spendRequest]; ok { |
|
notifyDetails(spendRequest, prevOut, uint32(i)) |
|
} |
|
} |
|
} |
|
|
|
// We'll also check its outputs to determine if there are any |
|
// outstanding confirmation requests. |
|
if onConf != nil { |
|
// notifyDetails is a helper closure that will construct the |
|
// confirmation details of a request and hand them off to the |
|
// onConf callback. |
|
notifyDetails := func(confRequest ConfRequest) { |
|
Log.Debugf("Found initial confirmation of %v: "+ |
|
"height=%d, hash=%v", confRequest, |
|
blockHeight, blockHash) |
|
|
|
details := &TxConfirmation{ |
|
Tx: tx.MsgTx(), |
|
BlockHash: blockHash, |
|
BlockHeight: blockHeight, |
|
TxIndex: uint32(tx.Index()), |
|
} |
|
|
|
onConf(confRequest, details) |
|
} |
|
|
|
for _, txOut := range tx.MsgTx().TxOut { |
|
// We'll parse the script of the output to determine if |
|
// we have any registered requests for it or the |
|
// transaction itself. |
|
pkScript, err := txscript.ParsePkScript(txOut.PkScript) |
|
if err != nil { |
|
continue |
|
} |
|
confRequest := ConfRequest{ |
|
TxID: *txHash, |
|
PkScript: pkScript, |
|
} |
|
|
|
// If we have any, we'll record their confirmed height |
|
// so that notifications get dispatched when they |
|
// reaches the clients' desired number of confirmations. |
|
if _, ok := n.confNotifications[confRequest]; ok { |
|
notifyDetails(confRequest) |
|
} |
|
confRequest.TxID = ZeroHash |
|
if _, ok := n.confNotifications[confRequest]; ok { |
|
notifyDetails(confRequest) |
|
} |
|
} |
|
} |
|
} |
|
|
|
// handleConfDetailsAtTip tracks the confirmation height of the txid/output |
|
// script in order to properly dispatch a confirmation notification after |
|
// meeting each request's desired number of confirmations for all current and |
|
// future registered clients. |
|
func (n *TxNotifier) handleConfDetailsAtTip(confRequest ConfRequest, |
|
details *TxConfirmation) { |
|
|
|
// TODO(wilmer): cancel pending historical rescans if any? |
|
confSet := n.confNotifications[confRequest] |
|
|
|
// If we already have details for this request, we don't want to add it |
|
// again since we have already dispatched notifications for it. |
|
if confSet.details != nil { |
|
Log.Warnf("Ignoring address reuse for %s at height %d.", |
|
confRequest, details.BlockHeight) |
|
return |
|
} |
|
|
|
confSet.rescanStatus = rescanComplete |
|
confSet.details = details |
|
|
|
for _, ntfn := range confSet.ntfns { |
|
// In the event that this notification was aware that the |
|
// transaction/output script was reorged out of the chain, we'll |
|
// consume the reorg notification if it hasn't been done yet |
|
// already. |
|
select { |
|
case <-ntfn.Event.NegativeConf: |
|
default: |
|
} |
|
|
|
// We'll note this client's required number of confirmations so |
|
// that we can notify them when expected. |
|
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 |
|
ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight] |
|
if !exists { |
|
ntfnSet = make(map[*ConfNtfn]struct{}) |
|
n.ntfnsByConfirmHeight[confHeight] = ntfnSet |
|
} |
|
ntfnSet[ntfn] = struct{}{} |
|
} |
|
|
|
// We'll also note the initial confirmation height in order to correctly |
|
// handle dispatching notifications when the transaction/output script |
|
// gets reorged out of the chain. |
|
txSet, exists := n.confsByInitialHeight[details.BlockHeight] |
|
if !exists { |
|
txSet = make(map[ConfRequest]struct{}) |
|
n.confsByInitialHeight[details.BlockHeight] = txSet |
|
} |
|
txSet[confRequest] = struct{}{} |
|
} |
|
|
|
// handleSpendDetailsAtTip tracks the spend height of the outpoint/output script |
|
// in order to properly dispatch a spend notification for all current and future |
|
// registered clients. |
|
func (n *TxNotifier) handleSpendDetailsAtTip(spendRequest SpendRequest, |
|
details *SpendDetail) { |
|
|
|
// TODO(wilmer): cancel pending historical rescans if any? |
|
spendSet := n.spendNotifications[spendRequest] |
|
spendSet.rescanStatus = rescanComplete |
|
spendSet.details = details |
|
|
|
for _, ntfn := range spendSet.ntfns { |
|
// In the event that this notification was aware that the |
|
// spending transaction of its outpoint/output script was |
|
// reorged out of the chain, we'll consume the reorg |
|
// notification if it hasn't been done yet already. |
|
select { |
|
case <-ntfn.Event.Reorg: |
|
default: |
|
} |
|
} |
|
|
|
// We'll note the spending height of the request in order to correctly |
|
// handle dispatching notifications when the spending transactions gets |
|
// reorged out of the chain. |
|
spendHeight := uint32(details.SpendingHeight) |
|
opSet, exists := n.spendsByHeight[spendHeight] |
|
if !exists { |
|
opSet = make(map[SpendRequest]struct{}) |
|
n.spendsByHeight[spendHeight] = opSet |
|
} |
|
opSet[spendRequest] = struct{}{} |
|
|
|
Log.Debugf("Spend request %v spent at tip=%d", spendRequest, |
|
spendHeight) |
|
} |
|
|
|
// NotifyHeight dispatches confirmation and spend notifications to the clients |
|
// who registered for a notification which has been fulfilled at the passed |
|
// height. |
|
func (n *TxNotifier) NotifyHeight(height uint32) error { |
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
// First, we'll dispatch an update to all of the notification clients |
|
// for our watched requests with the number of confirmations left at |
|
// this new height. |
|
for _, confRequests := range n.confsByInitialHeight { |
|
for confRequest := range confRequests { |
|
confSet := n.confNotifications[confRequest] |
|
for _, ntfn := range confSet.ntfns { |
|
txConfHeight := confSet.details.BlockHeight + |
|
ntfn.NumConfirmations - 1 |
|
numConfsLeft := txConfHeight - height |
|
|
|
// Since we don't clear notifications until |
|
// transactions/output scripts are no longer |
|
// under the risk of being reorganized out of |
|
// the chain, we'll skip sending updates for |
|
// those that have already been confirmed. |
|
if int32(numConfsLeft) < 0 { |
|
continue |
|
} |
|
|
|
select { |
|
case ntfn.Event.Updates <- numConfsLeft: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Then, we'll dispatch notifications for all the requests that have |
|
// become confirmed at this new block height. |
|
for ntfn := range n.ntfnsByConfirmHeight[height] { |
|
confSet := n.confNotifications[ntfn.ConfRequest] |
|
|
|
Log.Infof("Dispatching %v confirmation notification for %v", |
|
ntfn.NumConfirmations, ntfn.ConfRequest) |
|
|
|
select { |
|
case ntfn.Event.Confirmed <- confSet.details: |
|
ntfn.dispatched = true |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
} |
|
delete(n.ntfnsByConfirmHeight, height) |
|
|
|
// Finally, we'll dispatch spend notifications for all the requests that |
|
// were spent at this new block height. |
|
for spendRequest := range n.spendsByHeight[height] { |
|
spendSet := n.spendNotifications[spendRequest] |
|
for _, ntfn := range spendSet.ntfns { |
|
err := n.dispatchSpendDetails(ntfn, spendSet.details) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// DisconnectTip handles the tip of the current chain being disconnected during |
|
// a chain reorganization. If any watched requests were included in this block, |
|
// internal structures are updated to ensure confirmation/spend notifications |
|
// are consumed (if not already), and reorg notifications are dispatched |
|
// instead. Confirmation/spend notifications will be dispatched again upon block |
|
// inclusion. |
|
func (n *TxNotifier) DisconnectTip(blockHeight uint32) error { |
|
select { |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
if blockHeight != n.currentHeight { |
|
return fmt.Errorf("received blocks out of order: "+ |
|
"current height=%d, disconnected height=%d", |
|
n.currentHeight, blockHeight) |
|
} |
|
n.currentHeight-- |
|
n.reorgDepth++ |
|
|
|
// With the block disconnected, we'll update the confirm and spend hints |
|
// for our notification requests to reflect the new height, except for |
|
// those that have confirmed/spent at previous heights. |
|
n.updateHints(blockHeight) |
|
|
|
// We'll go through all of our watched confirmation requests and attempt |
|
// to drain their notification channels to ensure sending notifications |
|
// to the clients is always non-blocking. |
|
for initialHeight, txHashes := range n.confsByInitialHeight { |
|
for txHash := range txHashes { |
|
// If the transaction/output script has been reorged out |
|
// of the chain, we'll make sure to remove the cached |
|
// confirmation details to prevent notifying clients |
|
// with old information. |
|
confSet := n.confNotifications[txHash] |
|
if initialHeight == blockHeight { |
|
confSet.details = nil |
|
} |
|
|
|
for _, ntfn := range confSet.ntfns { |
|
// First, we'll attempt to drain an update |
|
// from each notification to ensure sends to the |
|
// Updates channel are always non-blocking. |
|
select { |
|
case <-ntfn.Event.Updates: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
// Then, we'll check if the current |
|
// transaction/output script was included in the |
|
// block currently being disconnected. If it |
|
// was, we'll need to dispatch a reorg |
|
// notification to the client. |
|
if initialHeight == blockHeight { |
|
err := n.dispatchConfReorg( |
|
ntfn, blockHeight, |
|
) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
// We'll also go through our watched spend requests and attempt to drain |
|
// their dispatched notifications to ensure dispatching notifications to |
|
// clients later on is always non-blocking. We're only interested in |
|
// requests whose spending transaction was included at the height being |
|
// disconnected. |
|
for op := range n.spendsByHeight[blockHeight] { |
|
// Since the spending transaction is being reorged out of the |
|
// chain, we'll need to clear out the spending details of the |
|
// request. |
|
spendSet := n.spendNotifications[op] |
|
spendSet.details = nil |
|
|
|
// For all requests which have had a spend notification |
|
// dispatched, we'll attempt to drain it and send a reorg |
|
// notification instead. |
|
for _, ntfn := range spendSet.ntfns { |
|
if err := n.dispatchSpendReorg(ntfn); err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
|
|
// Finally, we can remove the requests that were confirmed and/or spent |
|
// at the height being disconnected. We'll still continue to track them |
|
// until they have been confirmed/spent and are no longer under the risk |
|
// of being reorged out of the chain again. |
|
delete(n.confsByInitialHeight, blockHeight) |
|
delete(n.spendsByHeight, blockHeight) |
|
|
|
return nil |
|
} |
|
|
|
// updateHints attempts to update the confirm and spend hints for all relevant |
|
// requests respectively. The height parameter is used to determine which |
|
// requests we should update based on whether a new block is being |
|
// connected/disconnected. |
|
// |
|
// NOTE: This must be called with the TxNotifier's lock held and after its |
|
// height has already been reflected by a block being connected/disconnected. |
|
func (n *TxNotifier) updateHints(height uint32) { |
|
// TODO(wilmer): update under one database transaction. |
|
// |
|
// To update the height hint for all the required confirmation requests |
|
// under one database transaction, we'll gather the set of unconfirmed |
|
// requests along with the ones that confirmed at the height being |
|
// connected/disconnected. |
|
confRequests := n.unconfirmedRequests() |
|
for confRequest := range n.confsByInitialHeight[height] { |
|
confRequests = append(confRequests, confRequest) |
|
} |
|
err := n.confirmHintCache.CommitConfirmHint( |
|
n.currentHeight, confRequests..., |
|
) |
|
if err != nil { |
|
// The error is not fatal as this is an optimistic optimization, |
|
// so we'll avoid returning an error. |
|
Log.Debugf("Unable to update confirm hints to %d for "+ |
|
"%v: %v", n.currentHeight, confRequests, err) |
|
} |
|
|
|
// Similarly, to update the height hint for all the required spend |
|
// requests under one database transaction, we'll gather the set of |
|
// unspent requests along with the ones that were spent at the height |
|
// being connected/disconnected. |
|
spendRequests := n.unspentRequests() |
|
for spendRequest := range n.spendsByHeight[height] { |
|
spendRequests = append(spendRequests, spendRequest) |
|
} |
|
err = n.spendHintCache.CommitSpendHint(n.currentHeight, spendRequests...) |
|
if err != nil { |
|
// The error is not fatal as this is an optimistic optimization, |
|
// so we'll avoid returning an error. |
|
Log.Debugf("Unable to update spend hints to %d for "+ |
|
"%v: %v", n.currentHeight, spendRequests, err) |
|
} |
|
} |
|
|
|
// unconfirmedRequests returns the set of confirmation requests that are |
|
// still seen as unconfirmed by the TxNotifier. |
|
// |
|
// NOTE: This method must be called with the TxNotifier's lock held. |
|
func (n *TxNotifier) unconfirmedRequests() []ConfRequest { |
|
var unconfirmed []ConfRequest |
|
for confRequest, confNtfnSet := range n.confNotifications { |
|
// If the notification is already aware of its confirmation |
|
// details, or it's in the process of learning them, we'll skip |
|
// it as we can't yet determine if it's confirmed or not. |
|
if confNtfnSet.rescanStatus != rescanComplete || |
|
confNtfnSet.details != nil { |
|
continue |
|
} |
|
|
|
unconfirmed = append(unconfirmed, confRequest) |
|
} |
|
|
|
return unconfirmed |
|
} |
|
|
|
// unspentRequests returns the set of spend requests that are still seen as |
|
// unspent by the TxNotifier. |
|
// |
|
// NOTE: This method must be called with the TxNotifier's lock held. |
|
func (n *TxNotifier) unspentRequests() []SpendRequest { |
|
var unspent []SpendRequest |
|
for spendRequest, spendNtfnSet := range n.spendNotifications { |
|
// If the notification is already aware of its spend details, or |
|
// it's in the process of learning them, we'll skip it as we |
|
// can't yet determine if it's unspent or not. |
|
if spendNtfnSet.rescanStatus != rescanComplete || |
|
spendNtfnSet.details != nil { |
|
continue |
|
} |
|
|
|
unspent = append(unspent, spendRequest) |
|
} |
|
|
|
return unspent |
|
} |
|
|
|
// dispatchConfReorg dispatches a reorg notification to the client if the |
|
// confirmation notification was already delivered. |
|
// |
|
// NOTE: This must be called with the TxNotifier's lock held. |
|
func (n *TxNotifier) dispatchConfReorg(ntfn *ConfNtfn, |
|
heightDisconnected uint32) error { |
|
|
|
// If the request's confirmation notification has yet to be dispatched, |
|
// we'll need to clear its entry within the ntfnsByConfirmHeight index |
|
// to prevent from notifying the client once the notifier reaches the |
|
// confirmation height. |
|
if !ntfn.dispatched { |
|
confHeight := heightDisconnected + ntfn.NumConfirmations - 1 |
|
ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight] |
|
if exists { |
|
delete(ntfnSet, ntfn) |
|
} |
|
return nil |
|
} |
|
|
|
// Otherwise, the entry within the ntfnsByConfirmHeight has already been |
|
// deleted, so we'll attempt to drain the confirmation notification to |
|
// ensure sends to the Confirmed channel are always non-blocking. |
|
select { |
|
case <-ntfn.Event.Confirmed: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
default: |
|
} |
|
|
|
ntfn.dispatched = false |
|
|
|
// Send a negative confirmation notification to the client indicating |
|
// how many blocks have been disconnected successively. |
|
select { |
|
case ntfn.Event.NegativeConf <- int32(n.reorgDepth): |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// dispatchSpendReorg dispatches a reorg notification to the client if a spend |
|
// notiification was already delivered. |
|
// |
|
// NOTE: This must be called with the TxNotifier's lock held. |
|
func (n *TxNotifier) dispatchSpendReorg(ntfn *SpendNtfn) error { |
|
if !ntfn.dispatched { |
|
return nil |
|
} |
|
|
|
// Attempt to drain the spend notification to ensure sends to the Spend |
|
// channel are always non-blocking. |
|
select { |
|
case <-ntfn.Event.Spend: |
|
default: |
|
} |
|
|
|
// Send a reorg notification to the client in order for them to |
|
// correctly handle reorgs. |
|
select { |
|
case ntfn.Event.Reorg <- struct{}{}: |
|
case <-n.quit: |
|
return ErrTxNotifierExiting |
|
} |
|
|
|
ntfn.dispatched = false |
|
|
|
return nil |
|
} |
|
|
|
// TearDown is to be called when the owner of the TxNotifier is exiting. This |
|
// closes the event channels of all registered notifications that have not been |
|
// dispatched yet. |
|
func (n *TxNotifier) TearDown() { |
|
close(n.quit) |
|
|
|
n.Lock() |
|
defer n.Unlock() |
|
|
|
for _, confSet := range n.confNotifications { |
|
for confID, ntfn := range confSet.ntfns { |
|
close(ntfn.Event.Confirmed) |
|
close(ntfn.Event.Updates) |
|
close(ntfn.Event.NegativeConf) |
|
close(ntfn.Event.Done) |
|
delete(confSet.ntfns, confID) |
|
} |
|
} |
|
|
|
for _, spendSet := range n.spendNotifications { |
|
for spendID, ntfn := range spendSet.ntfns { |
|
close(ntfn.Event.Spend) |
|
close(ntfn.Event.Reorg) |
|
close(ntfn.Event.Done) |
|
delete(spendSet.ntfns, spendID) |
|
} |
|
} |
|
}
|
|
|