1417 lines
47 KiB
Go
1417 lines
47 KiB
Go
package chainntnfs
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/btcsuite/btcutil"
|
|
)
|
|
|
|
const (
|
|
// ReorgSafetyLimit is the chain depth beyond which it is assumed a
|
|
// block will not be reorganized out of the chain. This is used to
|
|
// determine when to prune old confirmation requests so that reorgs are
|
|
// handled correctly. The average number of blocks in a day is a
|
|
// reasonable value to use.
|
|
ReorgSafetyLimit = 144
|
|
|
|
// MaxNumConfs is the maximum number of confirmations that can be
|
|
// requested on a transaction.
|
|
MaxNumConfs = ReorgSafetyLimit
|
|
)
|
|
|
|
var (
|
|
// ErrTxNotifierExiting is an error returned when attempting to interact
|
|
// with the TxNotifier but it been shut down.
|
|
ErrTxNotifierExiting = errors.New("TxNotifier is exiting")
|
|
|
|
// ErrTxMaxConfs signals that the user requested a number of
|
|
// confirmations beyond the reorg safety limit.
|
|
ErrTxMaxConfs = fmt.Errorf("too many confirmations requested, max is %d",
|
|
MaxNumConfs)
|
|
)
|
|
|
|
// rescanState indicates the progression of a registration before the notifier
|
|
// can begin dispatching confirmations at tip.
|
|
type rescanState byte
|
|
|
|
const (
|
|
// rescanNotStarted is the initial state, denoting that a historical
|
|
// dispatch may be required.
|
|
rescanNotStarted rescanState = iota
|
|
|
|
// rescanPending indicates that a dispatch has already been made, and we
|
|
// are waiting for its completion. No other rescans should be dispatched
|
|
// while in this state.
|
|
rescanPending
|
|
|
|
// rescanComplete signals either that a rescan was dispatched and has
|
|
// completed, or that we began watching at tip immediately. In either
|
|
// case, the notifier can only dispatch notifications from tip when in
|
|
// this state.
|
|
rescanComplete
|
|
)
|
|
|
|
// confNtfnSet holds all known, registered confirmation notifications for a
|
|
// single txid. If duplicates notifications are requested, only one historical
|
|
// dispatch will be spawned to ensure redundant scans are not permitted. A
|
|
// single conf detail will be constructed and dispatched to all interested
|
|
// clients.
|
|
type confNtfnSet struct {
|
|
// ntfns keeps tracks of all the active client notification requests for
|
|
// a transaction.
|
|
ntfns map[uint64]*ConfNtfn
|
|
|
|
// rescanStatus represents the current rescan state for the transaction.
|
|
rescanStatus rescanState
|
|
|
|
// details serves as a cache of the confirmation details of a
|
|
// transaction that we'll use to determine if a transaction has already
|
|
// confirmed at the time of registration.
|
|
details *TxConfirmation
|
|
}
|
|
|
|
// newConfNtfnSet constructs a fresh confNtfnSet for a group of clients
|
|
// interested in a notification for a particular txid.
|
|
func newConfNtfnSet() *confNtfnSet {
|
|
return &confNtfnSet{
|
|
ntfns: make(map[uint64]*ConfNtfn),
|
|
rescanStatus: rescanNotStarted,
|
|
}
|
|
}
|
|
|
|
// spendNtfnSet holds all known, registered spend notifications for an outpoint.
|
|
// If duplicate notifications are requested, only one historical dispatch will
|
|
// be spawned to ensure redundant scans are not permitted.
|
|
type spendNtfnSet struct {
|
|
// ntfns keeps tracks of all the active client notification requests for
|
|
// an outpoint.
|
|
ntfns map[uint64]*SpendNtfn
|
|
|
|
// rescanStatus represents the current rescan state for the outpoint.
|
|
rescanStatus rescanState
|
|
|
|
// details serves as a cache of the spend details for an outpoint that
|
|
// we'll use to determine if an outpoint has already been spent at the
|
|
// time of registration.
|
|
details *SpendDetail
|
|
}
|
|
|
|
// newSpendNtfnSet constructs a new spend notification set.
|
|
func newSpendNtfnSet() *spendNtfnSet {
|
|
return &spendNtfnSet{
|
|
ntfns: make(map[uint64]*SpendNtfn),
|
|
rescanStatus: rescanNotStarted,
|
|
}
|
|
}
|
|
|
|
// ConfNtfn represents a notifier client's request to receive a notification
|
|
// once the target transaction gets sufficient confirmations. The client is
|
|
// asynchronously notified via the ConfirmationEvent channels.
|
|
type ConfNtfn struct {
|
|
// ConfID uniquely identifies the confirmation notification request for
|
|
// the specified transaction.
|
|
ConfID uint64
|
|
|
|
// TxID is the hash of the transaction for which confirmation notifications
|
|
// are requested.
|
|
TxID *chainhash.Hash
|
|
|
|
// PkScript is the public key script of an outpoint created in this
|
|
// transaction.
|
|
//
|
|
// NOTE: This value MUST be set when the dispatch is to be performed
|
|
// using compact filters.
|
|
PkScript []byte
|
|
|
|
// NumConfirmations is the number of confirmations after which the
|
|
// notification is to be sent.
|
|
NumConfirmations uint32
|
|
|
|
// Event contains references to the channels that the notifications are to
|
|
// be sent over.
|
|
Event *ConfirmationEvent
|
|
|
|
// HeightHint is the minimum height in the chain that we expect to find
|
|
// this txid.
|
|
HeightHint uint32
|
|
|
|
// dispatched is false if the confirmed notification has not been sent yet.
|
|
dispatched bool
|
|
}
|
|
|
|
// HistoricalConfDispatch parameterizes a manual rescan for a particular
|
|
// transaction identifier. The parameters include the start and end block
|
|
// heights specifying the range of blocks to scan.
|
|
type HistoricalConfDispatch struct {
|
|
// TxID is the transaction ID to search for in the historical dispatch.
|
|
TxID *chainhash.Hash
|
|
|
|
// PkScript is a public key script from an output created by this
|
|
// transaction.
|
|
//
|
|
// NOTE: This value MUST be set when the dispatch is to be performed
|
|
// using compact filters.
|
|
PkScript []byte
|
|
|
|
// StartHeight specifies the block height at which to being the
|
|
// historical rescan.
|
|
StartHeight uint32
|
|
|
|
// EndHeight specifies the last block height (inclusive) that the
|
|
// historical scan should consider.
|
|
EndHeight uint32
|
|
}
|
|
|
|
// SpendNtfn represents a client's request to receive a notification once an
|
|
// outpoint has been spent on-chain. The client is asynchronously notified via
|
|
// the SpendEvent channels.
|
|
type SpendNtfn struct {
|
|
// SpendID uniquely identies the spend notification request for the
|
|
// specified outpoint.
|
|
SpendID uint64
|
|
|
|
// OutPoint is the outpoint for which a client has requested a spend
|
|
// notification for.
|
|
OutPoint wire.OutPoint
|
|
|
|
// PkScript is the script of the outpoint. This is needed in order to
|
|
// match compact filters when attempting a historical rescan to
|
|
// determine if the outpoint has already been spent.
|
|
PkScript []byte
|
|
|
|
// Event contains references to the channels that the notifications are
|
|
// to be sent over.
|
|
Event *SpendEvent
|
|
|
|
// HeightHint is the earliest height in the chain that we expect to find
|
|
// the spending transaction of the specified outpoint. This value will
|
|
// be overridden by the spend hint cache if it contains an entry for it.
|
|
HeightHint uint32
|
|
|
|
// dispatched signals whether a spend notification has been disptached
|
|
// to the client.
|
|
dispatched bool
|
|
}
|
|
|
|
// HistoricalSpendDispatch parameterizes a manual rescan to determine the
|
|
// spending details (if any) of an outpoint. The parameters include the start
|
|
// and end block heights specifying the range of blocks to scan.
|
|
type HistoricalSpendDispatch struct {
|
|
// OutPoint is the outpoint which we should attempt to find the spending
|
|
OutPoint wire.OutPoint
|
|
|
|
// PkScript is the script of the outpoint. This is needed in order to
|
|
// match compact filters when attempting a historical rescan.
|
|
PkScript []byte
|
|
|
|
// StartHeight specified the block height at which to begin the
|
|
// historical rescan.
|
|
StartHeight uint32
|
|
|
|
// EndHeight specifies the last block height (inclusive) that the
|
|
// historical rescan should consider.
|
|
EndHeight uint32
|
|
}
|
|
|
|
// TxNotifier is a struct responsible for delivering transaction notifications
|
|
// to subscribers. These notifications can be of two different types:
|
|
// transaction confirmations and/or outpoint spends. The TxNotifier will watch
|
|
// the blockchain as new blocks come in, in order to satisfy its client
|
|
// requests.
|
|
type TxNotifier struct {
|
|
// currentHeight is the height of the tracked blockchain. It is used to
|
|
// determine the number of confirmations a tx has and ensure blocks are
|
|
// connected and disconnected in order.
|
|
currentHeight uint32
|
|
|
|
// reorgSafetyLimit is the chain depth beyond which it is assumed a
|
|
// block will not be reorganized out of the chain. This is used to
|
|
// determine when to prune old notification requests so that reorgs are
|
|
// handled correctly. The coinbase maturity period is a reasonable value
|
|
// to use.
|
|
reorgSafetyLimit uint32
|
|
|
|
// reorgDepth is the depth of a chain organization that this system is
|
|
// being informed of. This is incremented as long as a sequence of
|
|
// blocks are disconnected without being interrupted by a new block.
|
|
reorgDepth uint32
|
|
|
|
// confNotifications is an index of notification requests by transaction
|
|
// hash.
|
|
confNotifications map[chainhash.Hash]*confNtfnSet
|
|
|
|
// txsByInitialHeight is an index of watched transactions by the height
|
|
// that they are included at in the blockchain. This is tracked so that
|
|
// incorrect notifications are not sent if a transaction is reorged out
|
|
// of the chain and so that negative confirmations can be recognized.
|
|
txsByInitialHeight map[uint32]map[chainhash.Hash]struct{}
|
|
|
|
// ntfnsByConfirmHeight is an index of notification requests by the
|
|
// height at which the transaction will have sufficient confirmations.
|
|
ntfnsByConfirmHeight map[uint32]map[*ConfNtfn]struct{}
|
|
|
|
// spendNotifications is an index of all active notification requests
|
|
// per outpoint.
|
|
spendNotifications map[wire.OutPoint]*spendNtfnSet
|
|
|
|
// opsBySpendHeight is an index that keeps tracks of the spending height
|
|
// of an outpoint we are currently tracking notifications for. This is
|
|
// used in order to recover from the spending transaction of an outpoint
|
|
// being reorged out of the chain.
|
|
opsBySpendHeight map[uint32]map[wire.OutPoint]struct{}
|
|
|
|
// confirmHintCache is a cache used to maintain the latest height hints
|
|
// for transactions. Each height hint represents the earliest height at
|
|
// which the transactions could have been confirmed within the chain.
|
|
confirmHintCache ConfirmHintCache
|
|
|
|
// spendHintCache is a cache used to maintain the latest height hints
|
|
// for outpoints. Each height hint represents the earliest height at
|
|
// which the outpoints could have been spent within the chain.
|
|
spendHintCache SpendHintCache
|
|
|
|
// quit is closed in order to signal that the notifier is gracefully
|
|
// exiting.
|
|
quit chan struct{}
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
// NewTxNotifier creates a TxNotifier. The current height of the blockchain is
|
|
// accepted as a parameter. The different hint caches (confirm and spend) are
|
|
// used as an optimization in order to retrieve a better starting point when
|
|
// dispatching a recan for a historical event in the chain.
|
|
func NewTxNotifier(startHeight uint32, reorgSafetyLimit uint32,
|
|
confirmHintCache ConfirmHintCache,
|
|
spendHintCache SpendHintCache) *TxNotifier {
|
|
|
|
return &TxNotifier{
|
|
currentHeight: startHeight,
|
|
reorgSafetyLimit: reorgSafetyLimit,
|
|
confNotifications: make(map[chainhash.Hash]*confNtfnSet),
|
|
txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}),
|
|
ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}),
|
|
spendNotifications: make(map[wire.OutPoint]*spendNtfnSet),
|
|
opsBySpendHeight: make(map[uint32]map[wire.OutPoint]struct{}),
|
|
confirmHintCache: confirmHintCache,
|
|
spendHintCache: spendHintCache,
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// RegisterConf handles a new notification request. The client will be notified
|
|
// when the transaction gets a sufficient number of confirmations on the
|
|
// blockchain. The registration succeeds if no error is returned. If the
|
|
// returned HistoricalConfDispatch is non-nil, the caller is responsible for
|
|
// attempting to manually rescan blocks for the txid between the start and end
|
|
// heights.
|
|
//
|
|
// NOTE: If the transaction has already been included in a block on the chain,
|
|
// the confirmation details must be provided with the UpdateConfDetails method,
|
|
// otherwise we will wait for the transaction to confirm even though it already
|
|
// has.
|
|
func (n *TxNotifier) RegisterConf(ntfn *ConfNtfn) (*HistoricalConfDispatch, error) {
|
|
select {
|
|
case <-n.quit:
|
|
return nil, ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
// Enforce that we will not dispatch confirmations beyond the reorg
|
|
// safety limit.
|
|
if ntfn.NumConfirmations > n.reorgSafetyLimit {
|
|
return nil, ErrTxMaxConfs
|
|
}
|
|
|
|
// Before proceeding to register the notification, we'll query our
|
|
// height hint cache to determine whether a better one exists.
|
|
//
|
|
// TODO(conner): verify that all submitted height hints are identical.
|
|
startHeight := ntfn.HeightHint
|
|
hint, err := n.confirmHintCache.QueryConfirmHint(*ntfn.TxID)
|
|
if err == nil {
|
|
if hint > startHeight {
|
|
Log.Debugf("Using height hint %d retrieved "+
|
|
"from cache for %v", hint, *ntfn.TxID)
|
|
startHeight = hint
|
|
}
|
|
} else if err != ErrConfirmHintNotFound {
|
|
Log.Errorf("Unable to query confirm hint for %v: %v",
|
|
*ntfn.TxID, err)
|
|
}
|
|
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
confSet, ok := n.confNotifications[*ntfn.TxID]
|
|
if !ok {
|
|
// If this is the first registration for this txid, construct a
|
|
// confSet to coalesce all notifications for the same txid.
|
|
confSet = newConfNtfnSet()
|
|
n.confNotifications[*ntfn.TxID] = confSet
|
|
}
|
|
|
|
confSet.ntfns[ntfn.ConfID] = ntfn
|
|
|
|
switch confSet.rescanStatus {
|
|
|
|
// A prior rescan has already completed and we are actively watching at
|
|
// tip for this txid.
|
|
case rescanComplete:
|
|
// If conf details for this set of notifications has already
|
|
// been found, we'll attempt to deliver them immediately to this
|
|
// client.
|
|
Log.Debugf("Attempting to dispatch conf for txid=%v "+
|
|
"on registration since rescan has finished", ntfn.TxID)
|
|
return nil, n.dispatchConfDetails(ntfn, confSet.details)
|
|
|
|
// A rescan is already in progress, return here to prevent dispatching
|
|
// another. When the scan returns, this notifications details will be
|
|
// updated as well.
|
|
case rescanPending:
|
|
Log.Debugf("Waiting for pending rescan to finish before "+
|
|
"notifying txid=%v at tip", ntfn.TxID)
|
|
return nil, nil
|
|
|
|
// If no rescan has been dispatched, attempt to do so now.
|
|
case rescanNotStarted:
|
|
}
|
|
|
|
// If the provided or cached height hint indicates that the transaction
|
|
// is to be confirmed at a height greater than the conf notifier's
|
|
// current height, we'll refrain from spawning a historical dispatch.
|
|
if startHeight > n.currentHeight {
|
|
Log.Debugf("Height hint is above current height, not dispatching "+
|
|
"historical rescan for txid=%v ", ntfn.TxID)
|
|
// Set the rescan status to complete, which will allow the conf
|
|
// notifier to start delivering messages for this set
|
|
// immediately.
|
|
confSet.rescanStatus = rescanComplete
|
|
return nil, nil
|
|
}
|
|
|
|
Log.Debugf("Dispatching historical rescan for txid=%v ", ntfn.TxID)
|
|
|
|
// Construct the parameters for historical dispatch, scanning the range
|
|
// of blocks between our best known height hint and the notifier's
|
|
// current height. The notifier will begin also watching for
|
|
// confirmations at tip starting with the next block.
|
|
dispatch := &HistoricalConfDispatch{
|
|
TxID: ntfn.TxID,
|
|
PkScript: ntfn.PkScript,
|
|
StartHeight: startHeight,
|
|
EndHeight: n.currentHeight,
|
|
}
|
|
|
|
// Set this confSet's status to pending, ensuring subsequent
|
|
// registrations don't also attempt a dispatch.
|
|
confSet.rescanStatus = rescanPending
|
|
|
|
return dispatch, nil
|
|
}
|
|
|
|
// UpdateConfDetails attempts to update the confirmation details for an active
|
|
// notification within the notifier. This should only be used in the case of a
|
|
// transaction that has confirmed before the notifier's current height.
|
|
//
|
|
// NOTE: The notification should be registered first to ensure notifications are
|
|
// dispatched correctly.
|
|
func (n *TxNotifier) UpdateConfDetails(txid chainhash.Hash,
|
|
details *TxConfirmation) error {
|
|
|
|
select {
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
// Ensure we hold the lock throughout handling the notification to
|
|
// prevent the notifier from advancing its height underneath us.
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
// First, we'll determine whether we have an active notification for
|
|
// this transaction with the given ID.
|
|
confSet, ok := n.confNotifications[txid]
|
|
if !ok {
|
|
return fmt.Errorf("no notification found with TxID %v", txid)
|
|
}
|
|
|
|
// If the conf details were already found at tip, all existing
|
|
// notifications will have been dispatched or queued for dispatch. We
|
|
// can exit early to avoid sending too many notifications on the
|
|
// buffered channels.
|
|
if confSet.details != nil {
|
|
return nil
|
|
}
|
|
|
|
// The historical dispatch has been completed for this confSet. We'll
|
|
// update the rescan status and cache any details that were found. If
|
|
// the details are nil, that implies we did not find them and will
|
|
// continue to watch for them at tip.
|
|
confSet.rescanStatus = rescanComplete
|
|
|
|
// The notifier has yet to reach the height at which the transaction was
|
|
// included in a block, so we should defer until handling it then within
|
|
// ConnectTip.
|
|
if details == nil {
|
|
Log.Debugf("Conf details for txid=%v not found during "+
|
|
"historical dispatch, waiting to dispatch at tip", txid)
|
|
|
|
// We'll commit the current height as the confirm hint to
|
|
// prevent another potentially long rescan if we restart before
|
|
// a new block comes in.
|
|
err := n.confirmHintCache.CommitConfirmHint(
|
|
n.currentHeight, txid,
|
|
)
|
|
if err != nil {
|
|
// The error is not fatal as this is an optimistic
|
|
// optimization, so we'll avoid returning an error.
|
|
Log.Debugf("Unable to update confirm hint to %d for "+
|
|
"%v: %v", n.currentHeight, txid, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if details.BlockHeight > n.currentHeight {
|
|
Log.Debugf("Conf details for txid=%v found above current "+
|
|
"height, waiting to dispatch at tip", txid)
|
|
return nil
|
|
}
|
|
|
|
Log.Debugf("Updating conf details for txid=%v details", txid)
|
|
|
|
err := n.confirmHintCache.CommitConfirmHint(details.BlockHeight, txid)
|
|
if err != nil {
|
|
// The error is not fatal, so we should not return an error to
|
|
// the caller.
|
|
Log.Errorf("Unable to update confirm hint to %d for %v: %v",
|
|
details.BlockHeight, txid, err)
|
|
}
|
|
|
|
// Cache the details found in the rescan and attempt to dispatch any
|
|
// notifications that have not yet been delivered.
|
|
confSet.details = details
|
|
for _, ntfn := range confSet.ntfns {
|
|
err = n.dispatchConfDetails(ntfn, details)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dispatchConfDetails attempts to cache and dispatch details to a particular
|
|
// client if the transaction has sufficiently confirmed. If the provided details
|
|
// are nil, this method will be a no-op.
|
|
func (n *TxNotifier) dispatchConfDetails(
|
|
ntfn *ConfNtfn, details *TxConfirmation) error {
|
|
|
|
// If no details are provided, return early as we can't dispatch.
|
|
if details == nil {
|
|
Log.Debugf("Unable to dispatch %v, no details provided",
|
|
ntfn.TxID)
|
|
return nil
|
|
}
|
|
|
|
// Now, we'll examine whether the transaction of this
|
|
// notification request has reached its required number of
|
|
// confirmations. If it has, we'll dispatch a confirmation
|
|
// notification to the caller.
|
|
confHeight := details.BlockHeight + ntfn.NumConfirmations - 1
|
|
if confHeight <= n.currentHeight {
|
|
Log.Infof("Dispatching %v conf notification for %v",
|
|
ntfn.NumConfirmations, ntfn.TxID)
|
|
|
|
// We'll send a 0 value to the Updates channel,
|
|
// indicating that the transaction has already been
|
|
// confirmed.
|
|
select {
|
|
case ntfn.Event.Updates <- 0:
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
|
|
select {
|
|
case ntfn.Event.Confirmed <- details:
|
|
ntfn.dispatched = true
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
} else {
|
|
Log.Debugf("Queueing %v conf notification for %v at tip ",
|
|
ntfn.NumConfirmations, ntfn.TxID)
|
|
|
|
// Otherwise, we'll keep track of the notification
|
|
// request by the height at which we should dispatch the
|
|
// confirmation notification.
|
|
ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight]
|
|
if !exists {
|
|
ntfnSet = make(map[*ConfNtfn]struct{})
|
|
n.ntfnsByConfirmHeight[confHeight] = ntfnSet
|
|
}
|
|
ntfnSet[ntfn] = struct{}{}
|
|
|
|
// We'll also send an update to the client of how many
|
|
// confirmations are left for the transaction to be
|
|
// confirmed.
|
|
numConfsLeft := confHeight - n.currentHeight
|
|
select {
|
|
case ntfn.Event.Updates <- numConfsLeft:
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
}
|
|
|
|
// As a final check, we'll also watch the transaction if it's
|
|
// still possible for it to get reorged out of the chain.
|
|
blockHeight := details.BlockHeight
|
|
reorgSafeHeight := blockHeight + n.reorgSafetyLimit
|
|
if reorgSafeHeight > n.currentHeight {
|
|
txSet, exists := n.txsByInitialHeight[blockHeight]
|
|
if !exists {
|
|
txSet = make(map[chainhash.Hash]struct{})
|
|
n.txsByInitialHeight[blockHeight] = txSet
|
|
}
|
|
txSet[*ntfn.TxID] = struct{}{}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RegisterSpend handles a new spend notification request. The client will be
|
|
// notified once the outpoint is detected as spent within the chain.
|
|
//
|
|
// The registration succeeds if no error is returned. If the returned
|
|
// HistoricalSpendDisaptch is non-nil, the caller is responsible for attempting
|
|
// to determine whether the outpoint has been spent between the start and end
|
|
// heights.
|
|
//
|
|
// NOTE: If the outpoint has already been spent within the chain before the
|
|
// notifier's current tip, the spend details must be provided with the
|
|
// UpdateSpendDetails method, otherwise we will wait for the outpoint to
|
|
// be spent at tip, even though it already has.
|
|
func (n *TxNotifier) RegisterSpend(ntfn *SpendNtfn) (*HistoricalSpendDispatch, error) {
|
|
select {
|
|
case <-n.quit:
|
|
return nil, ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
// Before proceeding to register the notification, we'll query our spend
|
|
// hint cache to determine whether a better one exists.
|
|
startHeight := ntfn.HeightHint
|
|
hint, err := n.spendHintCache.QuerySpendHint(ntfn.OutPoint)
|
|
if err == nil {
|
|
if hint > startHeight {
|
|
Log.Debugf("Using height hint %d retrieved from cache "+
|
|
"for %v", startHeight, ntfn.OutPoint)
|
|
startHeight = hint
|
|
}
|
|
} else if err != ErrSpendHintNotFound {
|
|
Log.Errorf("Unable to query spend hint for %v: %v",
|
|
ntfn.OutPoint, err)
|
|
}
|
|
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
Log.Infof("New spend subscription: spend_id=%d, outpoint=%v, "+
|
|
"height_hint=%d", ntfn.SpendID, ntfn.OutPoint, ntfn.HeightHint)
|
|
|
|
// Keep track of the notification request so that we can properly
|
|
// dispatch a spend notification later on.
|
|
spendSet, ok := n.spendNotifications[ntfn.OutPoint]
|
|
if !ok {
|
|
// If this is the first registration for the outpoint, we'll
|
|
// construct a spendNtfnSet to coalesce all notifications.
|
|
spendSet = newSpendNtfnSet()
|
|
n.spendNotifications[ntfn.OutPoint] = spendSet
|
|
}
|
|
spendSet.ntfns[ntfn.SpendID] = ntfn
|
|
|
|
// We'll now let the caller know whether a historical rescan is needed
|
|
// depending on the current rescan status.
|
|
switch spendSet.rescanStatus {
|
|
|
|
// If the spending details for this outpoint have already been
|
|
// determined and cached, then we can use them to immediately dispatch
|
|
// the spend notification to the client.
|
|
case rescanComplete:
|
|
return nil, n.dispatchSpendDetails(ntfn, spendSet.details)
|
|
|
|
// If there is an active rescan to determine whether the outpoint has
|
|
// been spent, then we won't trigger another one.
|
|
case rescanPending:
|
|
return nil, nil
|
|
|
|
// Otherwise, we'll fall through and let the caller know that a rescan
|
|
// should be dispatched to determine whether the outpoint has already
|
|
// been spent.
|
|
case rescanNotStarted:
|
|
}
|
|
|
|
// However, if the spend hint, either provided by the caller or
|
|
// retrieved from the cache, is found to be at a later height than the
|
|
// TxNotifier is aware of, then we'll refrain from dispatching a
|
|
// historical rescan and wait for the spend to come in at tip.
|
|
if startHeight > n.currentHeight {
|
|
Log.Debugf("Spend hint of %d for %v is above current height %d",
|
|
startHeight, ntfn.OutPoint, n.currentHeight)
|
|
|
|
// We'll also set the rescan status as complete to ensure that
|
|
// spend hints for this outpoint get updated upon
|
|
// connected/disconnected blocks.
|
|
spendSet.rescanStatus = rescanComplete
|
|
return nil, nil
|
|
}
|
|
|
|
// We'll set the rescan status to pending to ensure subsequent
|
|
// notifications don't also attempt a historical dispatch.
|
|
spendSet.rescanStatus = rescanPending
|
|
|
|
return &HistoricalSpendDispatch{
|
|
OutPoint: ntfn.OutPoint,
|
|
PkScript: ntfn.PkScript,
|
|
StartHeight: startHeight,
|
|
EndHeight: n.currentHeight,
|
|
}, nil
|
|
}
|
|
|
|
// CancelSpend cancels an existing request for a spend notification of an
|
|
// outpoint. The request is identified by its spend ID.
|
|
func (n *TxNotifier) CancelSpend(op wire.OutPoint, spendID uint64) {
|
|
select {
|
|
case <-n.quit:
|
|
return
|
|
default:
|
|
}
|
|
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
Log.Infof("Canceling spend notification: spend_id=%d, outpoint=%v",
|
|
spendID, op)
|
|
|
|
spendSet, ok := n.spendNotifications[op]
|
|
if !ok {
|
|
return
|
|
}
|
|
ntfn, ok := spendSet.ntfns[spendID]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// We'll close all the notification channels to let the client know
|
|
// their cancel request has been fulfilled.
|
|
close(ntfn.Event.Spend)
|
|
close(ntfn.Event.Reorg)
|
|
delete(spendSet.ntfns, spendID)
|
|
}
|
|
|
|
// ProcessRelevantSpendTx processes a transaction provided externally. This will
|
|
// check whether the transaction is relevant to the notifier if it spends any
|
|
// outputs for which we currently have registered notifications for. If it is
|
|
// relevant, spend notifications will be dispatched to the caller.
|
|
func (n *TxNotifier) ProcessRelevantSpendTx(tx *wire.MsgTx, txHeight int32) error {
|
|
select {
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
// Ensure we hold the lock throughout handling the notification to
|
|
// prevent the notifier from advancing its height underneath us.
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
// Grab the set of active registered outpoints to determine if the
|
|
// transaction spends any of them.
|
|
spendNtfns := n.spendNotifications
|
|
|
|
// We'll check if this transaction spends an output that has an existing
|
|
// spend notification for it.
|
|
for i, txIn := range tx.TxIn {
|
|
// If this input doesn't spend an existing registered outpoint,
|
|
// we'll go on to the next.
|
|
prevOut := txIn.PreviousOutPoint
|
|
if _, ok := spendNtfns[prevOut]; !ok {
|
|
continue
|
|
}
|
|
|
|
// Otherwise, we'll create a spend summary and send off the
|
|
// details to the notification subscribers.
|
|
txHash := tx.TxHash()
|
|
details := &SpendDetail{
|
|
SpentOutPoint: &prevOut,
|
|
SpenderTxHash: &txHash,
|
|
SpendingTx: tx,
|
|
SpenderInputIndex: uint32(i),
|
|
SpendingHeight: txHeight,
|
|
}
|
|
if err := n.updateSpendDetails(prevOut, details); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateSpendDetails attempts to update the spend details for all active spend
|
|
// notification requests for an outpoint. This method should be used once a
|
|
// historical scan of the chain has finished. If the historical scan did not
|
|
// find a spending transaction for the outpoint, the spend details may be nil.
|
|
//
|
|
// NOTE: A notification request for the outpoint must be registered first to
|
|
// ensure notifications are delivered.
|
|
func (n *TxNotifier) UpdateSpendDetails(op wire.OutPoint,
|
|
details *SpendDetail) error {
|
|
|
|
select {
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
// Ensure we hold the lock throughout handling the notification to
|
|
// prevent the notifier from advancing its height underneath us.
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
return n.updateSpendDetails(op, details)
|
|
}
|
|
|
|
// updateSpendDetails attempts to update the spend details for all active spend
|
|
// notification requests for an outpoint. This method should be used once a
|
|
// historical scan of the chain has finished. If the historical scan did not
|
|
// find a spending transaction for the outpoint, the spend details may be nil.
|
|
//
|
|
// NOTE: This method must be called with the TxNotifier's lock held.
|
|
func (n *TxNotifier) updateSpendDetails(op wire.OutPoint,
|
|
details *SpendDetail) error {
|
|
|
|
// Mark the ongoing historical rescan for this outpoint as finished.
|
|
// This will allow us to update the spend hints for this outpoint at
|
|
// tip.
|
|
spendSet, ok := n.spendNotifications[op]
|
|
if !ok {
|
|
return fmt.Errorf("no notifications found for outpoint %v", op)
|
|
}
|
|
|
|
// If the spend details have already been found either at tip, then the
|
|
// notifications should have already been dispatched, so we can exit
|
|
// early to prevent sending duplicate notifications.
|
|
if spendSet.details != nil {
|
|
return nil
|
|
}
|
|
|
|
// Since the historical rescan has completed for this outpoint, we'll
|
|
// mark its rescan status as complete in order to ensure that the
|
|
// TxNotifier can properly update its spend hints upon
|
|
// connected/disconnected blocks.
|
|
spendSet.rescanStatus = rescanComplete
|
|
|
|
// If the historical rescan was not able to find a spending transaction
|
|
// for this outpoint, then we can track the spend at tip.
|
|
if details == nil {
|
|
// We'll commit the current height as the spend hint to prevent
|
|
// another potentially long rescan if we restart before a new
|
|
// block comes in.
|
|
err := n.spendHintCache.CommitSpendHint(n.currentHeight, op)
|
|
if err != nil {
|
|
// The error is not fatal as this is an optimistic
|
|
// optimization, so we'll avoid returning an error.
|
|
Log.Debugf("Unable to update spend hint to %d for %v: %v",
|
|
n.currentHeight, op, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// If the historical rescan found the spending transaction for this
|
|
// outpoint, but it's at a later height than the notifier (this can
|
|
// happen due to latency with the backend during a reorg), then we'll
|
|
// defer handling the notification until the notifier has caught up to
|
|
// such height.
|
|
if uint32(details.SpendingHeight) > n.currentHeight {
|
|
return nil
|
|
}
|
|
|
|
// Now that we've determined the outpoint has been spent, we'll commit
|
|
// its spending height as its hint in the cache and dispatch
|
|
// notifications to all of its respective clients.
|
|
err := n.spendHintCache.CommitSpendHint(
|
|
uint32(details.SpendingHeight), op,
|
|
)
|
|
if err != nil {
|
|
// The error is not fatal as this is an optimistic optimization,
|
|
// so we'll avoid returning an error.
|
|
Log.Debugf("Unable to update spend hint to %d for %v: %v",
|
|
details.SpendingHeight, op, err)
|
|
}
|
|
|
|
spendSet.details = details
|
|
for _, ntfn := range spendSet.ntfns {
|
|
err := n.dispatchSpendDetails(ntfn, spendSet.details)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dispatchSpendDetails dispatches a spend notification to the client.
|
|
//
|
|
// NOTE: This must be called with the TxNotifier's lock held.
|
|
func (n *TxNotifier) dispatchSpendDetails(ntfn *SpendNtfn, details *SpendDetail) error {
|
|
// If there are no spend details to dispatch or if the notification has
|
|
// already been dispatched, then we can skip dispatching to this client.
|
|
if details == nil || ntfn.dispatched {
|
|
return nil
|
|
}
|
|
|
|
Log.Infof("Dispatching spend notification for outpoint=%v at height=%d",
|
|
ntfn.OutPoint, n.currentHeight)
|
|
|
|
select {
|
|
case ntfn.Event.Spend <- details:
|
|
ntfn.dispatched = true
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConnectTip handles a new block extending the current chain. It will go
|
|
// through every transaction and determine if it is relevant to any of its
|
|
// clients. A transaction can be relevant in either of the following two ways:
|
|
//
|
|
// 1. One of the inputs in the transaction spends an outpoint for which we
|
|
// currently have an active spend registration for.
|
|
//
|
|
// 2. The transaction is a transaction for which we currently have an active
|
|
// confirmation registration for.
|
|
//
|
|
// In the event that the transaction is relevant, a confirmation/spend
|
|
// notification will be queued for dispatch to the relevant clients.
|
|
// Confirmation notifications will only be dispatched for transactions that have
|
|
// met the required number of confirmations required by the client.
|
|
//
|
|
// NOTE: In order to actually dispatch the relevant transaction notifications to
|
|
// clients, NotifyHeight must be called with the same block height in order to
|
|
// maintain correctness.
|
|
func (n *TxNotifier) ConnectTip(blockHash *chainhash.Hash, blockHeight uint32,
|
|
txns []*btcutil.Tx) error {
|
|
|
|
select {
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
if blockHeight != n.currentHeight+1 {
|
|
return fmt.Errorf("Received blocks out of order: "+
|
|
"current height=%d, new height=%d",
|
|
n.currentHeight, blockHeight)
|
|
}
|
|
n.currentHeight++
|
|
n.reorgDepth = 0
|
|
|
|
// First, we'll iterate over all the transactions found in this block to
|
|
// determine if it includes any relevant transactions to the TxNotifier.
|
|
for _, tx := range txns {
|
|
txHash := tx.Hash()
|
|
|
|
// In order to determine if this transaction is relevant to the
|
|
// notifier, we'll check its inputs for any outstanding spend
|
|
// notifications.
|
|
for i, txIn := range tx.MsgTx().TxIn {
|
|
prevOut := txIn.PreviousOutPoint
|
|
spendSet, ok := n.spendNotifications[prevOut]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// If we have any, we'll record its spend height so that
|
|
// notifications get dispatched to the respective
|
|
// clients.
|
|
spendDetails := &SpendDetail{
|
|
SpentOutPoint: &prevOut,
|
|
SpenderTxHash: txHash,
|
|
SpendingTx: tx.MsgTx(),
|
|
SpenderInputIndex: uint32(i),
|
|
SpendingHeight: int32(blockHeight),
|
|
}
|
|
|
|
// TODO(wilmer): cancel pending historical rescans if any?
|
|
spendSet.rescanStatus = rescanComplete
|
|
spendSet.details = spendDetails
|
|
for _, ntfn := range spendSet.ntfns {
|
|
// In the event that this notification was aware
|
|
// that the spending transaction of its outpoint
|
|
// was reorged out of the chain, we'll consume
|
|
// the reorg notification if it hasn't been
|
|
// done yet already.
|
|
select {
|
|
case <-ntfn.Event.Reorg:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// We'll note the outpoints spending height in order to
|
|
// correctly handle dispatching notifications when the
|
|
// spending transactions gets reorged out of the chain.
|
|
opSet, exists := n.opsBySpendHeight[blockHeight]
|
|
if !exists {
|
|
opSet = make(map[wire.OutPoint]struct{})
|
|
n.opsBySpendHeight[blockHeight] = opSet
|
|
}
|
|
opSet[prevOut] = struct{}{}
|
|
}
|
|
|
|
// Check if we have any pending notifications for this txid. If
|
|
// none are found, we can proceed to the next transaction.
|
|
confSet, ok := n.confNotifications[*txHash]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
Log.Debugf("Block contains txid=%v, constructing details",
|
|
txHash)
|
|
|
|
// If we have any, we'll record its confirmed height so that
|
|
// notifications get dispatched when the transaction reaches the
|
|
// clients' desired number of confirmations.
|
|
details := &TxConfirmation{
|
|
BlockHash: blockHash,
|
|
BlockHeight: blockHeight,
|
|
TxIndex: uint32(tx.Index()),
|
|
}
|
|
|
|
// TODO(wilmer): cancel pending historical rescans if any?
|
|
confSet.rescanStatus = rescanComplete
|
|
confSet.details = details
|
|
for _, ntfn := range confSet.ntfns {
|
|
// In the event that this notification was aware that
|
|
// the transaction was reorged out of the chain, we'll
|
|
// consume the reorg notification if it hasn't been done
|
|
// yet already.
|
|
select {
|
|
case <-ntfn.Event.NegativeConf:
|
|
default:
|
|
}
|
|
|
|
// We'll note this client's required number of
|
|
// confirmations so that we can notify them when
|
|
// expected.
|
|
confHeight := blockHeight + ntfn.NumConfirmations - 1
|
|
ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight]
|
|
if !exists {
|
|
ntfnSet = make(map[*ConfNtfn]struct{})
|
|
n.ntfnsByConfirmHeight[confHeight] = ntfnSet
|
|
}
|
|
ntfnSet[ntfn] = struct{}{}
|
|
|
|
// We'll also note the initial confirmation height in
|
|
// order to correctly handle dispatching notifications
|
|
// when the transaction gets reorged out of the chain.
|
|
txSet, exists := n.txsByInitialHeight[blockHeight]
|
|
if !exists {
|
|
txSet = make(map[chainhash.Hash]struct{})
|
|
n.txsByInitialHeight[blockHeight] = txSet
|
|
}
|
|
txSet[*txHash] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Finally, now that we've determined which transactions were confirmed
|
|
// and which outpoints were spent within the new block, we can update
|
|
// their entries in their respective caches, along with all of our
|
|
// unconfirmed transactions and unspent outpoints.
|
|
n.updateHints(blockHeight)
|
|
|
|
return nil
|
|
}
|
|
|
|
// NotifyHeight dispatches confirmation and spend notifications to the clients
|
|
// who registered for a notification which has been fulfilled at the passed
|
|
// height.
|
|
func (n *TxNotifier) NotifyHeight(height uint32) error {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
// First, we'll dispatch an update to all of the notification clients
|
|
// for our watched transactions with the number of confirmations left at
|
|
// this new height.
|
|
for _, txHashes := range n.txsByInitialHeight {
|
|
for txHash := range txHashes {
|
|
confSet := n.confNotifications[txHash]
|
|
for _, ntfn := range confSet.ntfns {
|
|
txConfHeight := confSet.details.BlockHeight +
|
|
ntfn.NumConfirmations - 1
|
|
numConfsLeft := txConfHeight - height
|
|
|
|
// Since we don't clear notifications until
|
|
// transactions are no longer under the risk of
|
|
// being reorganized out of the chain, we'll
|
|
// skip sending updates for transactions that
|
|
// have already been confirmed.
|
|
if int32(numConfsLeft) < 0 {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case ntfn.Event.Updates <- numConfsLeft:
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Then, we'll dispatch notifications for all the transactions that have
|
|
// become confirmed at this new block height.
|
|
for ntfn := range n.ntfnsByConfirmHeight[height] {
|
|
confSet := n.confNotifications[*ntfn.TxID]
|
|
|
|
Log.Infof("Dispatching %v conf notification for %v",
|
|
ntfn.NumConfirmations, ntfn.TxID)
|
|
|
|
select {
|
|
case ntfn.Event.Confirmed <- confSet.details:
|
|
ntfn.dispatched = true
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
}
|
|
delete(n.ntfnsByConfirmHeight, height)
|
|
|
|
// We'll also dispatch spend notifications for all the outpoints that
|
|
// were spent at this new block height.
|
|
for op := range n.opsBySpendHeight[height] {
|
|
spendSet := n.spendNotifications[op]
|
|
for _, ntfn := range spendSet.ntfns {
|
|
err := n.dispatchSpendDetails(ntfn, spendSet.details)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Finally, we'll clear the entries from our set of notifications for
|
|
// transactions and outpoints that are no longer under the risk of being
|
|
// reorged out of the chain.
|
|
if height >= n.reorgSafetyLimit {
|
|
matureBlockHeight := height - n.reorgSafetyLimit
|
|
for tx := range n.txsByInitialHeight[matureBlockHeight] {
|
|
delete(n.confNotifications, tx)
|
|
}
|
|
delete(n.txsByInitialHeight, matureBlockHeight)
|
|
for op := range n.opsBySpendHeight[matureBlockHeight] {
|
|
delete(n.spendNotifications, op)
|
|
}
|
|
delete(n.opsBySpendHeight, matureBlockHeight)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DisconnectTip handles the tip of the current chain being disconnected during
|
|
// a chain reorganization. If any watched transactions or spending transactions
|
|
// for registered outpoints were included in this block, internal structures are
|
|
// updated to ensure confirmation/spend notifications are consumed (if not
|
|
// already), and reorg notifications are dispatched instead. Confirmation/spend
|
|
// notifications will be dispatched again upon block inclusion.
|
|
func (n *TxNotifier) DisconnectTip(blockHeight uint32) error {
|
|
select {
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
if blockHeight != n.currentHeight {
|
|
return fmt.Errorf("Received blocks out of order: "+
|
|
"current height=%d, disconnected height=%d",
|
|
n.currentHeight, blockHeight)
|
|
}
|
|
n.currentHeight--
|
|
n.reorgDepth++
|
|
|
|
// With the block disconnected, we'll update the confirm and spend hints
|
|
// for our transactions and outpoints to reflect the new height, except
|
|
// for those that have confirmed/spent at previous heights.
|
|
n.updateHints(blockHeight)
|
|
|
|
// We'll go through all of our watched transactions and attempt to drain
|
|
// their notification channels to ensure sending notifications to the
|
|
// clients is always non-blocking.
|
|
for initialHeight, txHashes := range n.txsByInitialHeight {
|
|
for txHash := range txHashes {
|
|
// If the transaction has been reorged out of the chain,
|
|
// we'll make sure to remove the cached confirmation
|
|
// details to prevent notifying clients with old
|
|
// information.
|
|
confSet := n.confNotifications[txHash]
|
|
if initialHeight == blockHeight {
|
|
confSet.details = nil
|
|
}
|
|
|
|
for _, ntfn := range confSet.ntfns {
|
|
// First, we'll attempt to drain an update
|
|
// from each notification to ensure sends to the
|
|
// Updates channel are always non-blocking.
|
|
select {
|
|
case <-ntfn.Event.Updates:
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
// Then, we'll check if the current transaction
|
|
// was included in the block currently being
|
|
// disconnected. If it was, we'll need to
|
|
// dispatch a reorg notification to the client.
|
|
if initialHeight == blockHeight {
|
|
err := n.dispatchConfReorg(
|
|
ntfn, blockHeight,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// We'll also go through our watched outpoints and attempt to drain
|
|
// their dispatched notifications to ensure dispatching notifications to
|
|
// clients later on is always non-blocking. We're only interested in
|
|
// outpoints whose spending transaction was included at the height being
|
|
// disconnected.
|
|
for op := range n.opsBySpendHeight[blockHeight] {
|
|
// Since the spending transaction is being reorged out of the
|
|
// chain, we'll need to clear out the spending details of the
|
|
// outpoint.
|
|
spendSet := n.spendNotifications[op]
|
|
spendSet.details = nil
|
|
|
|
// For all requests which have had a spend notification
|
|
// dispatched, we'll attempt to drain it and send a reorg
|
|
// notification instead.
|
|
for _, ntfn := range spendSet.ntfns {
|
|
if err := n.dispatchSpendReorg(ntfn); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Finally, we can remove the transactions that were confirmed and the
|
|
// outpoints that were spent at the height being disconnected. We'll
|
|
// still continue to track them until they have been confirmed/spent and
|
|
// are no longer under the risk of being reorged out of the chain again.
|
|
delete(n.txsByInitialHeight, blockHeight)
|
|
delete(n.opsBySpendHeight, blockHeight)
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateHints attempts to update the confirm and spend hints for all relevant
|
|
// transactions and outpoints respectively. The height parameter is used to
|
|
// determine which transactions and outpoints we should update based on whether
|
|
// a new block is being connected/disconnected.
|
|
//
|
|
// NOTE: This must be called with the TxNotifier's lock held and after its
|
|
// height has already been reflected by a block being connected/disconnected.
|
|
func (n *TxNotifier) updateHints(height uint32) {
|
|
// TODO(wilmer): update under one database transaction.
|
|
//
|
|
// To update the height hint for all the required transactions under one
|
|
// database transaction, we'll gather the set of unconfirmed
|
|
// transactions along with the ones that confirmed at the height being
|
|
// connected/disconnected.
|
|
txsToUpdateHints := n.unconfirmedTxs()
|
|
for confirmedTx := range n.txsByInitialHeight[height] {
|
|
txsToUpdateHints = append(txsToUpdateHints, confirmedTx)
|
|
}
|
|
err := n.confirmHintCache.CommitConfirmHint(
|
|
n.currentHeight, txsToUpdateHints...,
|
|
)
|
|
if err != nil {
|
|
// The error is not fatal as this is an optimistic optimization,
|
|
// so we'll avoid returning an error.
|
|
Log.Debugf("Unable to update confirm hints to %d for "+
|
|
"%v: %v", n.currentHeight, txsToUpdateHints, err)
|
|
}
|
|
|
|
// Similarly, to update the height hint for all the required outpoints
|
|
// under one database transaction, we'll gather the set of unspent
|
|
// outpoints along with the ones that were spent at the height being
|
|
// connected/disconnected.
|
|
opsToUpdateHints := n.unspentOutPoints()
|
|
for spentOp := range n.opsBySpendHeight[height] {
|
|
opsToUpdateHints = append(opsToUpdateHints, spentOp)
|
|
}
|
|
err = n.spendHintCache.CommitSpendHint(
|
|
n.currentHeight, opsToUpdateHints...,
|
|
)
|
|
if err != nil {
|
|
// The error is not fatal as this is an optimistic optimization,
|
|
// so we'll avoid returning an error.
|
|
Log.Debugf("Unable to update spend hints to %d for "+
|
|
"%v: %v", n.currentHeight, opsToUpdateHints, err)
|
|
}
|
|
}
|
|
|
|
// unconfirmedTxs returns the set of transactions that are still seen as
|
|
// unconfirmed by the TxNotifier.
|
|
//
|
|
// NOTE: This method must be called with the TxNotifier's lock held.
|
|
func (n *TxNotifier) unconfirmedTxs() []chainhash.Hash {
|
|
var unconfirmedTxs []chainhash.Hash
|
|
for tx, confNtfnSet := range n.confNotifications {
|
|
// If the notification is already aware of its confirmation
|
|
// details, or it's in the process of learning them, we'll skip
|
|
// it as we can't yet determine if it's confirmed or not.
|
|
if confNtfnSet.rescanStatus != rescanComplete ||
|
|
confNtfnSet.details != nil {
|
|
continue
|
|
}
|
|
|
|
unconfirmedTxs = append(unconfirmedTxs, tx)
|
|
}
|
|
|
|
return unconfirmedTxs
|
|
}
|
|
|
|
// unspentOutPoints returns the set of outpoints that are still seen as unspent
|
|
// by the TxNotifier.
|
|
//
|
|
// NOTE: This method must be called with the TxNotifier's lock held.
|
|
func (n *TxNotifier) unspentOutPoints() []wire.OutPoint {
|
|
var unspentOps []wire.OutPoint
|
|
for op, spendNtfnSet := range n.spendNotifications {
|
|
// If the notification is already aware of its spend details, or
|
|
// it's in the process of learning them, we'll skip it as we
|
|
// can't yet determine if it's unspent or not.
|
|
if spendNtfnSet.rescanStatus != rescanComplete ||
|
|
spendNtfnSet.details != nil {
|
|
continue
|
|
}
|
|
|
|
unspentOps = append(unspentOps, op)
|
|
}
|
|
|
|
return unspentOps
|
|
}
|
|
|
|
// dispatchConfReorg dispatches a reorg notification to the client if the
|
|
// confirmation notification was already delivered.
|
|
//
|
|
// NOTE: This must be called with the TxNotifier's lock held.
|
|
func (n *TxNotifier) dispatchConfReorg(ntfn *ConfNtfn,
|
|
heightDisconnected uint32) error {
|
|
|
|
// If the transaction's confirmation notification has yet to be
|
|
// dispatched, we'll need to clear its entry within the
|
|
// ntfnsByConfirmHeight index to prevent from notifying the client once
|
|
// the notifier reaches the confirmation height.
|
|
if !ntfn.dispatched {
|
|
confHeight := heightDisconnected + ntfn.NumConfirmations - 1
|
|
ntfnSet, exists := n.ntfnsByConfirmHeight[confHeight]
|
|
if exists {
|
|
delete(ntfnSet, ntfn)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Otherwise, the entry within the ntfnsByConfirmHeight has already been
|
|
// deleted, so we'll attempt to drain the confirmation notification to
|
|
// ensure sends to the Confirmed channel are always non-blocking.
|
|
select {
|
|
case <-ntfn.Event.Confirmed:
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
default:
|
|
}
|
|
|
|
ntfn.dispatched = false
|
|
|
|
// Send a negative confirmation notification to the client indicating
|
|
// how many blocks have been disconnected successively.
|
|
select {
|
|
case ntfn.Event.NegativeConf <- int32(n.reorgDepth):
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// dispatchSpendReorg dispatches a reorg notification to the client if a spend
|
|
// notiification was already delivered.
|
|
//
|
|
// NOTE: This must be called with the TxNotifier's lock held.
|
|
func (n *TxNotifier) dispatchSpendReorg(ntfn *SpendNtfn) error {
|
|
if !ntfn.dispatched {
|
|
return nil
|
|
}
|
|
|
|
// Attempt to drain the spend notification to ensure sends to the Spend
|
|
// channel are always non-blocking.
|
|
select {
|
|
case <-ntfn.Event.Spend:
|
|
default:
|
|
}
|
|
|
|
// Send a reorg notification to the client in order for them to
|
|
// correctly handle reorgs.
|
|
select {
|
|
case ntfn.Event.Reorg <- struct{}{}:
|
|
case <-n.quit:
|
|
return ErrTxNotifierExiting
|
|
}
|
|
|
|
ntfn.dispatched = false
|
|
|
|
return nil
|
|
}
|
|
|
|
// TearDown is to be called when the owner of the TxNotifier is exiting. This
|
|
// closes the event channels of all registered notifications that have not been
|
|
// dispatched yet.
|
|
func (n *TxNotifier) TearDown() {
|
|
n.Lock()
|
|
defer n.Unlock()
|
|
|
|
close(n.quit)
|
|
|
|
for _, confSet := range n.confNotifications {
|
|
for _, ntfn := range confSet.ntfns {
|
|
close(ntfn.Event.Confirmed)
|
|
close(ntfn.Event.Updates)
|
|
close(ntfn.Event.NegativeConf)
|
|
}
|
|
}
|
|
|
|
for _, spendSet := range n.spendNotifications {
|
|
for _, ntfn := range spendSet.ntfns {
|
|
close(ntfn.Event.Spend)
|
|
close(ntfn.Event.Reorg)
|
|
}
|
|
}
|
|
}
|