diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 7dc47170..32756903 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -254,13 +254,7 @@ out: } b.spendNotifications[op][msg.spendID] = msg - case *confirmationNotification: - chainntnfs.Log.Infof("New confirmation "+ - "subscription: txid=%v, numconfs=%v", - msg.TxID, msg.NumConfirmations) - - currentHeight := uint32(b.bestBlock.Height) - + case *chainntnfs.HistoricalConfDispatch: // Look up whether the transaction is already // included in the active chain. We'll do this // in a goroutine to prevent blocking @@ -270,22 +264,25 @@ out: defer b.wg.Done() confDetails, _, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, - currentHeight, + msg.TxID, msg.StartHeight, msg.EndHeight, ) if err != nil { chainntnfs.Log.Error(err) return } - if confDetails != nil { - err := b.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, - ) - if err != nil { - chainntnfs.Log.Error(err) - } + // If the historical dispatch finished + // without error, we will invoke + // UpdateConfDetails even if none were + // found. This allows the notifier to + // begin safely updating the height hint + // cache at tip, since any pending + // rescans have now completed. + err = b.txConfNotifier.UpdateConfDetails( + *msg.TxID, confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) } }() @@ -448,7 +445,7 @@ func (b *BitcoindNotifier) handleRelevantTx(tx chain.RelevantTx, bestHeight int3 // historicalConfDetails looks up whether a transaction is already included in a // block in the active chain and, if so, returns details about the confirmation. func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash, - heightHint, currentHeight uint32) (*chainntnfs.TxConfirmation, + startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, chainntnfs.TxConfStatus, error) { // We'll first attempt to retrieve the transaction using the node's @@ -464,7 +461,7 @@ func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash, case err != nil: chainntnfs.Log.Debugf("Failed getting conf details from "+ "index (%v), scanning manually", err) - return b.confDetailsManually(txid, heightHint, currentHeight) + return b.confDetailsManually(txid, startHeight, endHeight) // The transaction was found within the node's mempool. case txStatus == chainntnfs.TxFoundMempool: @@ -943,47 +940,40 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint, return ErrTransactionNotFound } -// confirmationNotification represents a client's intent to receive a -// notification once the target txid reaches numConfirmations confirmations. -type confirmationNotification struct { - chainntnfs.ConfNtfn - heightHint uint32 -} - // RegisterConfirmationsNtfn registers a notification with BitcoindNotifier // which will be triggered once the txid reaches numConfs number of // confirmations. func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, txid) - heightHint = hint - } - } - // Construct a notification request for the transaction and send it to // the main event loop. - ntfn := &confirmationNotification{ - ConfNtfn: chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&b.confClientCounter, 1), - TxID: txid, - NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - }, - heightHint: heightHint, + ntfn := &chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(numConfs), + HeightHint: heightHint, } - if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + chainntnfs.Log.Infof("New confirmation subscription: "+ + "txid=%v, numconfs=%v", txid, numConfs) + + // Register the conf notification with txconfnotifier. A non-nil value + // for `dispatch` will be returned if we are required to perform a + // manual scan for the confirmation. Otherwise the notifier will begin + // watching at tip for the transaction to confirm. + dispatch, err := b.txConfNotifier.Register(ntfn) + if err != nil { return nil, err } + if dispatch == nil { + return ntfn.Event, nil + } + select { - case b.notificationRegistry <- ntfn: + case b.notificationRegistry <- dispatch: return ntfn.Event, nil case <-b.quit: return nil, ErrChainNotifierShuttingDown diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 209b9744..5a60a83f 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -324,13 +324,7 @@ out: } b.spendNotifications[op][msg.spendID] = msg - case *confirmationNotification: - chainntnfs.Log.Infof("New confirmation "+ - "subscription: txid=%v, numconfs=%v", - msg.TxID, msg.NumConfirmations) - - bestHeight := uint32(b.bestBlock.Height) - + case *chainntnfs.HistoricalConfDispatch: // Look up whether the transaction is already // included in the active chain. We'll do this // in a goroutine to prevent blocking @@ -340,22 +334,25 @@ out: defer b.wg.Done() confDetails, _, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, - bestHeight, + msg.TxID, msg.StartHeight, msg.EndHeight, ) if err != nil { chainntnfs.Log.Error(err) return } - if confDetails != nil { - err = b.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, - ) - if err != nil { - chainntnfs.Log.Error(err) - } + // If the historical dispatch finished + // without error, we will invoke + // UpdateConfDetails even if none were + // found. This allows the notifier to + // begin safely updating the height hint + // cache at tip, since any pending + // rescans have now completed. + err = b.txConfNotifier.UpdateConfDetails( + *msg.TxID, confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) } }() @@ -518,7 +515,7 @@ out: // historicalConfDetails looks up whether a transaction is already included in a // block in the active chain and, if so, returns details about the confirmation. func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, - heightHint, currentHeight uint32) (*chainntnfs.TxConfirmation, + startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, chainntnfs.TxConfStatus, error) { // We'll first attempt to retrieve the transaction using the node's @@ -534,7 +531,7 @@ func (b *BtcdNotifier) historicalConfDetails(txid *chainhash.Hash, case err != nil: chainntnfs.Log.Debugf("Failed getting conf details from "+ "index (%v), scanning manually", err) - return b.confDetailsManually(txid, heightHint, currentHeight) + return b.confDetailsManually(txid, startHeight, endHeight) // The transaction was found within the node's mempool. case txStatus == chainntnfs.TxFoundMempool: @@ -633,15 +630,15 @@ func (b *BtcdNotifier) confDetailsFromTxIndex(txid *chainhash.Hash, // earliest height the transaction could have been included in, to the current // height in the chain. If the transaction is found, its confirmation details // are returned. Otherwise, nil is returned. -func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, heightHint, - currentHeight uint32) (*chainntnfs.TxConfirmation, +func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, startHeight, + endHeight uint32) (*chainntnfs.TxConfirmation, chainntnfs.TxConfStatus, error) { targetTxidStr := txid.String() // Begin scanning blocks at every height to determine where the // transaction was included in. - for height := heightHint; height <= currentHeight; height++ { + for height := startHeight; height <= endHeight; height++ { // Ensure we haven't been requested to shut down before // processing the next height. select { @@ -1003,47 +1000,40 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, }, nil } -// confirmationNotification represents a client's intent to receive a -// notification once the target txid reaches numConfirmations confirmations. -type confirmationNotification struct { - chainntnfs.ConfNtfn - heightHint uint32 -} - // RegisterConfirmationsNtfn registers a notification with BtcdNotifier // which will be triggered once the txid reaches numConfs number of // confirmations. func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := b.confirmHintCache.QueryConfirmHint(*txid); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, txid) - heightHint = hint - } - } - // Construct a notification request for the transaction and send it to // the main event loop. - ntfn := &confirmationNotification{ - ConfNtfn: chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&b.confClientCounter, 1), - TxID: txid, - NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - }, - heightHint: heightHint, + ntfn := &chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&b.confClientCounter, 1), + TxID: txid, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(numConfs), + HeightHint: heightHint, } - if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + chainntnfs.Log.Infof("New confirmation subscription: "+ + "txid=%v, numconfs=%v", txid, numConfs) + + // Register the conf notification with txconfnotifier. A non-nil value + // for `dispatch` will be returned if we are required to perform a + // manual scan for the confirmation. Otherwise the notifier will begin + // watching at tip for the transaction to confirm. + dispatch, err := b.txConfNotifier.Register(ntfn) + if err != nil { return nil, err } + if dispatch == nil { + return ntfn.Event, nil + } + select { - case b.notificationRegistry <- ntfn: + case b.notificationRegistry <- dispatch: return ntfn.Event, nil case <-b.quit: return nil, ErrChainNotifierShuttingDown diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index f410b7b8..b4455442 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -314,18 +314,7 @@ out: } n.spendNotifications[op][msg.spendID] = msg - case *confirmationsNotification: - chainntnfs.Log.Infof("New confirmations subscription: "+ - "txid=%v, numconfs=%v, height_hint=%v", - msg.TxID, msg.NumConfirmations, msg.heightHint) - - // If the notification can be partially or - // fully dispatched, then we can skip the first - // phase for ntfns. - n.heightMtx.RLock() - currentHeight := n.bestHeight - n.heightMtx.RUnlock() - + case *chainntnfs.HistoricalConfDispatch: // Look up whether the transaction is already // included in the active chain. We'll do this // in a goroutine to prevent blocking @@ -335,7 +324,8 @@ out: defer n.wg.Done() confDetails, err := n.historicalConfDetails( - msg.TxID, msg.pkScript, currentHeight, msg.heightHint, + msg.TxID, msg.PkScript, + msg.StartHeight, msg.EndHeight, ) if err != nil { chainntnfs.Log.Error(err) @@ -347,20 +337,27 @@ out: // the script is found in a block. params := n.p2pNode.ChainParams() _, addrs, _, err := txscript.ExtractPkScriptAddrs( - msg.pkScript, ¶ms, + msg.PkScript, ¶ms, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + + // If the historical dispatch finished + // without error, we will invoke + // UpdateConfDetails even if none were + // found. This allows the notifier to + // begin safely updating the height hint + // cache at tip, since any pending + // rescans have now completed. + err = n.txConfNotifier.UpdateConfDetails( + *msg.TxID, confDetails, ) if err != nil { chainntnfs.Log.Error(err) } if confDetails != nil { - err := n.txConfNotifier.UpdateConfDetails( - *msg.TxID, msg.ConfID, - confDetails, - ) - if err != nil { - chainntnfs.Log.Error(err) - } return } @@ -370,16 +367,14 @@ out: // future initial confirmation. rescanUpdate := []neutrino.UpdateOption{ neutrino.AddAddrs(addrs...), - neutrino.Rewind(currentHeight), + neutrino.Rewind(msg.EndHeight), neutrino.DisableDisconnectedNtfns(true), } err = n.chainView.Update(rescanUpdate...) if err != nil { - chainntnfs.Log.Errorf("Unable "+ - "to update rescan: %v", + chainntnfs.Log.Errorf("Unable to update rescan: %v", err) } - }() case *blockEpochRegistration: @@ -470,13 +465,13 @@ out: n.heightMtx.Lock() if update.height != uint32(n.bestHeight) { - chainntnfs.Log.Infof("Missed disconnected" + + chainntnfs.Log.Infof("Missed disconnected " + "blocks, attempting to catch up") } hash, err := n.p2pNode.GetBlockHash(int64(n.bestHeight)) if err != nil { - chainntnfs.Log.Errorf("Unable to fetch block hash"+ + chainntnfs.Log.Errorf("Unable to fetch block hash "+ "for height %d: %v", n.bestHeight, err) n.heightMtx.Unlock() continue @@ -516,11 +511,11 @@ out: // confirmation. func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, pkScript []byte, - currentHeight, heightHint uint32) (*chainntnfs.TxConfirmation, error) { + startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, error) { // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. - for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { + for scanHeight := startHeight; scanHeight <= endHeight; scanHeight++ { // Ensure we haven't been requested to shut down before // processing the next height. select { @@ -912,14 +907,6 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return spendEvent, nil } -// confirmationNotification represents a client's intent to receive a -// notification once the target txid reaches numConfirmations confirmations. -type confirmationsNotification struct { - chainntnfs.ConfNtfn - heightHint uint32 - pkScript []byte -} - // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier // which will be triggered once the txid reaches numConfs number of // confirmations. @@ -927,35 +914,35 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := n.confirmHintCache.QueryConfirmHint(*txid); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, txid) - heightHint = hint - } - } - // Construct a notification request for the transaction and send it to // the main event loop. - ntfn := &confirmationsNotification{ - ConfNtfn: chainntnfs.ConfNtfn{ - ConfID: atomic.AddUint64(&n.confClientCounter, 1), - TxID: txid, - NumConfirmations: numConfs, - Event: chainntnfs.NewConfirmationEvent(numConfs), - }, - heightHint: heightHint, - pkScript: pkScript, + ntfn := &chainntnfs.ConfNtfn{ + ConfID: atomic.AddUint64(&n.confClientCounter, 1), + TxID: txid, + PkScript: pkScript, + NumConfirmations: numConfs, + Event: chainntnfs.NewConfirmationEvent(numConfs), + HeightHint: heightHint, } - if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + chainntnfs.Log.Infof("New confirmation subscription: "+ + "txid=%v, numconfs=%v", txid, numConfs) + + // Register the conf notification with txconfnotifier. A non-nil value + // for `dispatch` will be returned if we are required to perform a + // manual scan for the confirmation. Otherwise the notifier will begin + // watching at tip for the transaction to confirm. + dispatch, err := n.txConfNotifier.Register(ntfn) + if err != nil { return nil, err } + if dispatch == nil { + return ntfn.Event, nil + } + select { - case n.notificationRegistry <- ntfn: + case n.notificationRegistry <- dispatch: return ntfn.Event, nil case <-n.quit: return nil, ErrChainNotifierShuttingDown diff --git a/chainntnfs/txconfnotifier.go b/chainntnfs/txconfnotifier.go index 0a951851..fb3998c4 100644 --- a/chainntnfs/txconfnotifier.go +++ b/chainntnfs/txconfnotifier.go @@ -13,6 +13,10 @@ var ( // ErrTxConfNotifierExiting is an error returned when attempting to // interact with the TxConfNotifier but it been shut down. ErrTxConfNotifierExiting = errors.New("TxConfNotifier is exiting") + + // ErrTxMaxConfs signals that the user requested a number of + // confirmations beyond the reorg safety limit. + ErrTxMaxConfs = errors.New("too many confirmations requested") ) // ConfNtfn represents a notifier client's request to receive a notification @@ -27,6 +31,13 @@ type ConfNtfn struct { // are requested. TxID *chainhash.Hash + // PkScript is the public key script of an outpoint created in this + // transaction. + // + // NOTE: This value MUST be set when the dispatch is to be performed + // using compact filters. + PkScript []byte + // NumConfirmations is the number of confirmations after which the // notification is to be sent. NumConfirmations uint32 @@ -35,14 +46,37 @@ type ConfNtfn struct { // be sent over. Event *ConfirmationEvent - // details describes the transaction's position is the blockchain. May be - // nil for unconfirmed transactions. - details *TxConfirmation + // HeightHint is the minimum height in the chain that we expect to find + // this txid. + HeightHint uint32 // dispatched is false if the confirmed notification has not been sent yet. dispatched bool } +// HistoricalConfDispatch parameterizes a manual rescan for a particular +// transaction identifier. The parameters include the start and end block +// heights specifying the range of blocks to scan. +type HistoricalConfDispatch struct { + // TxID is the transaction ID to search for in the historical dispatch. + TxID *chainhash.Hash + + // PkScript is a public key script from an output created by this + // transaction. + // + // NOTE: This value MUST be set when the dispatch is to be performed + // using compact filters. + PkScript []byte + + // StartHeight specifies the block height at which to being the + // historical rescan. + StartHeight uint32 + + // EndHeight specifies the last block height (inclusive) that the + // historical scan should consider. + EndHeight uint32 +} + // NewConfirmationEvent constructs a new ConfirmationEvent with newly opened // channels. func NewConfirmationEvent(numConfs uint32) *ConfirmationEvent { @@ -77,7 +111,7 @@ type TxConfNotifier struct { // confNotifications is an index of notification requests by transaction // hash. - confNotifications map[chainhash.Hash]map[uint64]*ConfNtfn + confNotifications map[chainhash.Hash]*confNtfnSet // txsByInitialHeight is an index of watched transactions by the height // that they are included at in the blockchain. This is tracked so that @@ -101,6 +135,47 @@ type TxConfNotifier struct { sync.Mutex } +// rescanState indicates the progression of a registration before the notifier +// can begin dispatching confirmations at tip. +type rescanState uint8 + +const ( + // rescanNotStarted is the initial state, denoting that a historical + // dispatch may be required. + rescanNotStarted rescanState = iota + + // rescanPending indicates that a dispatch has already been made, and we + // are waiting for its completion. No other rescans should be dispatched + // while in this state. + rescanPending + + // rescanComplete signals either that a rescan was dispatched and has + // completed, or that we began watching at tip immediately. In either + // case, the notifier can only dispatch notifications from tip when in + // this state. + rescanComplete +) + +// confNtfnSet holds all known, registered confirmation notifications for a +// single txid. If duplicates notifications are requested, only one historical +// dispatch will be spawned to ensure redundant scans are not permitted. A +// single conf detail will be constructed and dispatched to all interested +// clients. +type confNtfnSet struct { + ntfns map[uint64]*ConfNtfn + rescanStatus rescanState + details *TxConfirmation +} + +// newConfNtfnSet constructs a fresh confNtfnSet for a group of clients +// interested in a notification for a particular txid. +func newConfNtfnSet() *confNtfnSet { + return &confNtfnSet{ + ntfns: make(map[uint64]*ConfNtfn), + rescanStatus: rescanNotStarted, + } +} + // NewTxConfNotifier creates a TxConfNotifier. The current height of the // blockchain is accepted as a parameter. func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, @@ -109,7 +184,7 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, return &TxConfNotifier{ currentHeight: startHeight, reorgSafetyLimit: reorgSafetyLimit, - confNotifications: make(map[chainhash.Hash]map[uint64]*ConfNtfn), + confNotifications: make(map[chainhash.Hash]*confNtfnSet), txsByInitialHeight: make(map[uint32]map[chainhash.Hash]struct{}), ntfnsByConfirmHeight: make(map[uint32]map[*ConfNtfn]struct{}), hintCache: hintCache, @@ -119,40 +194,114 @@ func NewTxConfNotifier(startHeight uint32, reorgSafetyLimit uint32, // Register handles a new notification request. The client will be notified when // the transaction gets a sufficient number of confirmations on the blockchain. +// The registration succeeds if no error is returned. If the returned +// HistoricalConfDispatch is non-nil, the caller is responsible for attempting +// to manually rescan blocks for the txid between the start and end heights. // // NOTE: If the transaction has already been included in a block on the chain, // the confirmation details must be provided with the UpdateConfDetails method, // otherwise we will wait for the transaction to confirm even though it already // has. -func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { +func (tcn *TxConfNotifier) Register( + ntfn *ConfNtfn) (*HistoricalConfDispatch, error) { + select { case <-tcn.quit: - return ErrTxConfNotifierExiting + return nil, ErrTxConfNotifierExiting default: } + // Enforce that we will not dispatch confirmations beyond the reorg + // safety limit. + if ntfn.NumConfirmations > tcn.reorgSafetyLimit { + return nil, ErrTxMaxConfs + } + + // Before proceeding to register the notification, we'll query our + // height hint cache to determine whether a better one exists. + // + // TODO(conner): verify that all submitted height hints are identical. + startHeight := ntfn.HeightHint + hint, err := tcn.hintCache.QueryConfirmHint(*ntfn.TxID) + if err == nil { + if hint > startHeight { + Log.Debugf("Using height hint %d retrieved "+ + "from cache for %v", hint, *ntfn.TxID) + startHeight = hint + } + } else if err != ErrConfirmHintNotFound { + Log.Errorf("Unable to query confirm hint for %v: %v", + *ntfn.TxID, err) + } + tcn.Lock() defer tcn.Unlock() - ntfns, ok := tcn.confNotifications[*ntfn.TxID] + confSet, ok := tcn.confNotifications[*ntfn.TxID] if !ok { - ntfns = make(map[uint64]*ConfNtfn) - tcn.confNotifications[*ntfn.TxID] = ntfns - - err := tcn.hintCache.CommitConfirmHint( - tcn.currentHeight, *ntfn.TxID, - ) - if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. - Log.Errorf("Unable to update confirm hint to %d for "+ - "%v: %v", tcn.currentHeight, *ntfn.TxID, err) - } + // If this is the first registration for this txid, construct a + // confSet to coalesce all notifications for the same txid. + confSet = newConfNtfnSet() + tcn.confNotifications[*ntfn.TxID] = confSet } - ntfns[ntfn.ConfID] = ntfn + confSet.ntfns[ntfn.ConfID] = ntfn - return nil + switch confSet.rescanStatus { + + // A prior rescan has already completed and we are actively watching at + // tip for this txid. + case rescanComplete: + // If conf details for this set of notifications has already + // been found, we'll attempt to deliver them immediately to this + // client. + Log.Debugf("Attempting to dispatch conf for txid=%v "+ + "on registration since rescan has finished", ntfn.TxID) + return nil, tcn.dispatchConfDetails(ntfn, confSet.details) + + // A rescan is already in progress, return here to prevent dispatching + // another. When the scan returns, this notifications details will be + // updated as well. + case rescanPending: + Log.Debugf("Waiting for pending rescan to finish before "+ + "notifying txid=%v at tip", ntfn.TxID) + return nil, nil + + // If no rescan has been dispatched, attempt to do so now. + case rescanNotStarted: + } + + // If the provided or cached height hint indicates that the transaction + // is to be confirmed at a height greater than the conf notifier's + // current height, we'll refrain from spawning a historical dispatch. + if startHeight > tcn.currentHeight { + Log.Debugf("Height hint is above current height, not dispatching "+ + "historical rescan for txid=%v ", ntfn.TxID) + // Set the rescan status to complete, which will allow the conf + // notifier to start delivering messages for this set + // immediately. + confSet.rescanStatus = rescanComplete + return nil, nil + } + + Log.Debugf("Dispatching historical rescan for txid=%v ", ntfn.TxID) + + // Construct the parameters for historical dispatch, scanning the range + // of blocks between our best known height hint and the notifier's + // current height. The notifier will begin also watching for + // confirmations at tip starting with the next block. + dispatch := &HistoricalConfDispatch{ + TxID: ntfn.TxID, + PkScript: ntfn.PkScript, + StartHeight: startHeight, + EndHeight: tcn.currentHeight, + } + + // Set this confSet's status to pending, ensuring subsequent + // registrations don't also attempt a dispatch. + confSet.rescanStatus = rescanPending + + return dispatch, nil } // UpdateConfDetails attempts to update the confirmation details for an active @@ -162,7 +311,7 @@ func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { // NOTE: The notification should be registered first to ensure notifications are // dispatched correctly. func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, - clientID uint64, details *TxConfirmation) error { + details *TxConfirmation) error { select { case <-tcn.quit: @@ -177,22 +326,42 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, // First, we'll determine whether we have an active notification for // this transaction with the given ID. - ntfns, ok := tcn.confNotifications[txid] + confSet, ok := tcn.confNotifications[txid] if !ok { - return fmt.Errorf("no notifications found for txid %v", txid) + return fmt.Errorf("no notification found with TxID %v", txid) } - ntfn, ok := ntfns[clientID] - if !ok { - return fmt.Errorf("no notification found with ID %v", clientID) - } - - // If the notification has already recognized that the transaction - // confirmed, there's nothing left for us to do. - if ntfn.details != nil { + // If the conf details were already found at tip, all existing + // notifications will have been dispatched or queued for dispatch. We + // can exit early to avoid sending too many notifications on the + // buffered channels. + if confSet.details != nil { return nil } + // The historical dispatch has been completed for this confSet. We'll + // update the rescan status and cache any details that were found. If + // the details are nil, that implies we did not find them and will + // continue to watch for them at tip. + confSet.rescanStatus = rescanComplete + + // The notifier has yet to reach the height at which the transaction was + // included in a block, so we should defer until handling it then within + // ConnectTip. + if details == nil { + Log.Debugf("Conf details for txid=%v not found during "+ + "historical dispatch, waiting to dispatch at tip", txid) + return nil + } + + if details.BlockHeight > tcn.currentHeight { + Log.Debugf("Conf details for txid=%v found above current "+ + "height, waiting to dispatch at tip", txid) + return nil + } + + Log.Debugf("Updating conf details for txid=%v details", txid) + err := tcn.hintCache.CommitConfirmHint(details.BlockHeight, txid) if err != nil { // The error is not fatal, so we should not return an error to @@ -201,25 +370,44 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, details.BlockHeight, txid, err) } - // The notifier has yet to reach the height at which the transaction was - // included in a block, so we should defer until handling it then within - // ConnectTip. - if details == nil || details.BlockHeight > tcn.currentHeight { + // Cache the details found in the rescan and attempt to dispatch any + // notifications that have not yet been delivered. + confSet.details = details + for _, ntfn := range confSet.ntfns { + err = tcn.dispatchConfDetails(ntfn, details) + if err != nil { + return err + } + } + + return nil +} + +// dispatchConfDetails attempts to cache and dispatch details to a particular +// client if the transaction has sufficiently confirmed. If the provided details +// are nil, this method will be a no-op. +func (tcn *TxConfNotifier) dispatchConfDetails( + ntfn *ConfNtfn, details *TxConfirmation) error { + + // If no details are provided, return early as we can't dispatch. + if details == nil { + Log.Debugf("Unable to dispatch %v, no details provided", + ntfn.TxID) return nil } - ntfn.details = details - - // Now, we'll examine whether the transaction of this notification - // request has reached its required number of confirmations. If it has, - // we'll disaptch a confirmation notification to the caller. + // Now, we'll examine whether the transaction of this + // notification request has reached its required number of + // confirmations. If it has, we'll dispatch a confirmation + // notification to the caller. confHeight := details.BlockHeight + ntfn.NumConfirmations - 1 if confHeight <= tcn.currentHeight { Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) - // We'll send a 0 value to the Updates channel, indicating that - // the transaction has already been confirmed. + // We'll send a 0 value to the Updates channel, + // indicating that the transaction has already been + // confirmed. select { case ntfn.Event.Updates <- 0: case <-tcn.quit: @@ -233,9 +421,12 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, return ErrTxConfNotifierExiting } } else { - // Otherwise, we'll keep track of the notification request by - // the height at which we should dispatch the confirmation - // notification. + Log.Debugf("Queueing %v conf notification for %v at tip ", + ntfn.NumConfirmations, ntfn.TxID) + + // Otherwise, we'll keep track of the notification + // request by the height at which we should dispatch the + // confirmation notification. ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { ntfnSet = make(map[*ConfNtfn]struct{}) @@ -244,7 +435,8 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, ntfnSet[ntfn] = struct{}{} // We'll also send an update to the client of how many - // confirmations are left for the transaction to be confirmed. + // confirmations are left for the transaction to be + // confirmed. numConfsLeft := confHeight - tcn.currentHeight select { case ntfn.Event.Updates <- numConfsLeft: @@ -253,15 +445,17 @@ func (tcn *TxConfNotifier) UpdateConfDetails(txid chainhash.Hash, } } - // As a final check, we'll also watch the transaction if it's still - // possible for it to get reorged out of the chain. - if details.BlockHeight+tcn.reorgSafetyLimit > tcn.currentHeight { - txSet, exists := tcn.txsByInitialHeight[details.BlockHeight] + // As a final check, we'll also watch the transaction if it's + // still possible for it to get reorged out of the chain. + blockHeight := details.BlockHeight + reorgSafeHeight := blockHeight + tcn.reorgSafetyLimit + if reorgSafeHeight > tcn.currentHeight { + txSet, exists := tcn.txsByInitialHeight[blockHeight] if !exists { txSet = make(map[chainhash.Hash]struct{}) - tcn.txsByInitialHeight[details.BlockHeight] = txSet + tcn.txsByInitialHeight[blockHeight] = txSet } - txSet[txid] = struct{}{} + txSet[*ntfn.TxID] = struct{}{} } return nil @@ -299,13 +493,41 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // handled correctly. for _, tx := range txns { txHash := tx.Hash() - for _, ntfn := range tcn.confNotifications[*txHash] { - ntfn.details = &TxConfirmation{ - BlockHash: blockHash, - BlockHeight: blockHeight, - TxIndex: uint32(tx.Index()), + + // Check if we have any pending notifications for this txid. If + // none are found, we can proceed to the next transaction. + confSet, ok := tcn.confNotifications[*txHash] + if !ok { + continue + } + + Log.Debugf("Block contains txid=%v, constructing details", + txHash) + + // If we have any, we'll record its confirmed height so that + // notifications get dispatched when the transaction reaches the + // clients' desired number of confirmations. + details := &TxConfirmation{ + BlockHash: blockHash, + BlockHeight: blockHeight, + TxIndex: uint32(tx.Index()), + } + + confSet.rescanStatus = rescanComplete + confSet.details = details + for _, ntfn := range confSet.ntfns { + // In the event that this notification was aware that + // the transaction was reorged out of the chain, we'll + // consume the reorg notification if it hasn't been done + // yet already. + select { + case <-ntfn.Event.NegativeConf: + default: } + // We'll note this client's required number of + // confirmations so that we can notify them when + // expected. confHeight := blockHeight + ntfn.NumConfirmations - 1 ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] if !exists { @@ -314,6 +536,9 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, } ntfnSet[ntfn] = struct{}{} + // We'll also note the initial confirmation height in + // order to correctly handle dispatching notifications + // when the transaction gets reorged out of the chain. txSet, exists := tcn.txsByInitialHeight[blockHeight] if !exists { txSet = make(map[chainhash.Hash]struct{}) @@ -328,14 +553,21 @@ func (tcn *TxConfNotifier) ConnectTip(blockHash *chainhash.Hash, // transactions along with the ones that confirmed at the current // height. To do so, we'll iterate over the confNotifications map, which // contains the transactions we currently have notifications for. Since - // this map doesn't tell us whether the transaction hsa confirmed or + // this map doesn't tell us whether the transaction has confirmed or // not, we'll need to look at txsByInitialHeight to determine so. var txsToUpdateHints []chainhash.Hash for confirmedTx := range tcn.txsByInitialHeight[tcn.currentHeight] { txsToUpdateHints = append(txsToUpdateHints, confirmedTx) } out: - for maybeUnconfirmedTx := range tcn.confNotifications { + for maybeUnconfirmedTx, confSet := range tcn.confNotifications { + // We shouldn't update the confirm hints if we still have a + // pending rescan in progress. We'll skip writing any for + // notification sets that haven't reached rescanComplete. + if confSet.rescanStatus != rescanComplete { + continue + } + for height, confirmedTxs := range tcn.txsByInitialHeight { // Skip the transactions that confirmed at the new block // height as those have already been added. @@ -370,16 +602,9 @@ out: // this new height. for _, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { - for _, ntfn := range tcn.confNotifications[txHash] { - // If the notification hasn't learned about the - // confirmation of its transaction yet (in the - // case of historical confirmations), we'll skip - // it. - if ntfn.details == nil { - continue - } - - txConfHeight := ntfn.details.BlockHeight + + confSet := tcn.confNotifications[txHash] + for _, ntfn := range confSet.ntfns { + txConfHeight := confSet.details.BlockHeight + ntfn.NumConfirmations - 1 numConfsLeft := txConfHeight - blockHeight @@ -403,12 +628,14 @@ out: // Then, we'll dispatch notifications for all the transactions that have // become confirmed at this new block height. - for ntfn := range tcn.ntfnsByConfirmHeight[tcn.currentHeight] { + for ntfn := range tcn.ntfnsByConfirmHeight[blockHeight] { + confSet := tcn.confNotifications[*ntfn.TxID] + Log.Infof("Dispatching %v conf notification for %v", ntfn.NumConfirmations, ntfn.TxID) select { - case ntfn.Event.Confirmed <- ntfn.details: + case ntfn.Event.Confirmed <- confSet.details: ntfn.dispatched = true case <-tcn.quit: return ErrTxConfNotifierExiting @@ -470,7 +697,16 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // clients is always non-blocking. for initialHeight, txHashes := range tcn.txsByInitialHeight { for txHash := range txHashes { - for _, ntfn := range tcn.confNotifications[txHash] { + // If the transaction has been reorged out of the chain, + // we'll make sure to remove the cached confirmation + // details to prevent notifying clients with old + // information. + confSet := tcn.confNotifications[txHash] + if initialHeight == blockHeight { + confSet.details = nil + } + + for _, ntfn := range confSet.ntfns { // First, we'll attempt to drain an update // from each notification to ensure sends to the // Updates channel are always non-blocking. @@ -483,46 +719,15 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { // Then, we'll check if the current transaction // was included in the block currently being - // disconnected. If it was, we'll need to take - // some necessary precautions. + // disconnected. If it was, we'll need to + // dispatch a reorg notification to the client. if initialHeight == blockHeight { - // If the transaction's confirmation notification - // has already been dispatched, we'll attempt to - // notify the client it was reorged out of the chain. - if ntfn.dispatched { - // Attempt to drain the confirmation notification - // to ensure sends to the Confirmed channel are - // always non-blocking. - select { - case <-ntfn.Event.Confirmed: - case <-tcn.quit: - return ErrTxConfNotifierExiting - default: - } - - ntfn.dispatched = false - - // Send a negative confirmation notification to the - // client indicating how many blocks have been - // disconnected successively. - select { - case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): - case <-tcn.quit: - return ErrTxConfNotifierExiting - } - - continue + err := tcn.dispatchConfReorg( + ntfn, blockHeight, + ) + if err != nil { + return err } - - // Otherwise, since the transactions was reorged out - // of the chain, we can safely remove its accompanying - // confirmation notification. - confHeight := blockHeight + ntfn.NumConfirmations - 1 - ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] - if !exists { - continue - } - delete(ntfnSet, ntfn) } } } @@ -535,6 +740,49 @@ func (tcn *TxConfNotifier) DisconnectTip(blockHeight uint32) error { return nil } +// dispatchConfReorg dispatches a reorg notification to the client if the +// confirmation notification was already delivered. +// +// NOTE: This must be called with the TxNotifier's lock held. +func (tcn *TxConfNotifier) dispatchConfReorg( + ntfn *ConfNtfn, heightDisconnected uint32) error { + + // If the transaction's confirmation notification has yet to be + // dispatched, we'll need to clear its entry within the + // ntfnsByConfirmHeight index to prevent from notifiying the client once + // the notifier reaches the confirmation height. + if !ntfn.dispatched { + confHeight := heightDisconnected + ntfn.NumConfirmations - 1 + ntfnSet, exists := tcn.ntfnsByConfirmHeight[confHeight] + if exists { + delete(ntfnSet, ntfn) + } + return nil + } + + // Otherwise, the entry within the ntfnsByConfirmHeight has already been + // deleted, so we'll attempt to drain the confirmation notification to + // ensure sends to the Confirmed channel are always non-blocking. + select { + case <-ntfn.Event.Confirmed: + case <-tcn.quit: + return ErrTxConfNotifierExiting + default: + } + + ntfn.dispatched = false + + // Send a negative confirmation notification to the client indicating + // how many blocks have been disconnected successively. + select { + case ntfn.Event.NegativeConf <- int32(tcn.reorgDepth): + case <-tcn.quit: + return ErrTxConfNotifierExiting + } + + return nil +} + // TearDown is to be called when the owner of the TxConfNotifier is exiting. // This closes the event channels of all registered notifications that have // not been dispatched yet. @@ -544,8 +792,8 @@ func (tcn *TxConfNotifier) TearDown() { close(tcn.quit) - for _, ntfns := range tcn.confNotifications { - for _, ntfn := range ntfns { + for _, confSet := range tcn.confNotifications { + for _, ntfn := range confSet.ntfns { if ntfn.dispatched { continue } diff --git a/chainntnfs/txconfnotifier_test.go b/chainntnfs/txconfnotifier_test.go index fb996d0a..04f13ef5 100644 --- a/chainntnfs/txconfnotifier_test.go +++ b/chainntnfs/txconfnotifier_test.go @@ -113,7 +113,7 @@ func TestTxConfFutureDispatch(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier before including them in a block to receive future @@ -124,7 +124,7 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -134,7 +134,7 @@ func TestTxConfFutureDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -162,7 +162,7 @@ func TestTxConfFutureDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx1, &tx2, &tx3}, }) - err := txConfNotifier.ConnectTip( + err := tcn.ConnectTip( block1.Hash(), 11, block1.Transactions(), ) if err != nil { @@ -225,7 +225,7 @@ func TestTxConfFutureDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err = txConfNotifier.ConnectTip(block2.Hash(), 12, block2.Transactions()) + err = tcn.ConnectTip(block2.Hash(), 12, block2.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -287,7 +287,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions at a height before the TxConfNotifier's // starting height so that they are confirmed once registering them. @@ -298,7 +298,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -309,7 +309,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } @@ -320,7 +320,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { BlockHeight: 9, TxIndex: 1, } - err := txConfNotifier.UpdateConfDetails(tx1Hash, ntfn1.ConfID, &txConf1) + err := tcn.UpdateConfDetails(tx1Hash, &txConf1) if err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -353,7 +353,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { BlockHeight: 9, TxIndex: 2, } - err = txConfNotifier.UpdateConfDetails(tx2Hash, ntfn2.ConfID, &txConf2) + err = tcn.UpdateConfDetails(tx2Hash, &txConf2) if err != nil { t.Fatalf("unable to update conf details: %v", err) } @@ -381,7 +381,7 @@ func TestTxConfHistoricalDispatch(t *testing.T) { Transactions: []*wire.MsgTx{&tx3}, }) - err = txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err = tcn.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -439,7 +439,7 @@ func TestTxConfChainReorg(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(7, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(7, 100, hintCache) // Tx 1 will be confirmed in block 9 and requires 2 confs. tx1Hash := tx1.TxHash() @@ -448,10 +448,14 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx1NumConfs, Event: chainntnfs.NewConfirmationEvent(tx1NumConfs), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn1.TxID, nil); err != nil { + t.Fatalf("unable to deliver conf details: %v", err) + } + // Tx 2 will be confirmed in block 10 and requires 1 conf. tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -459,10 +463,14 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx2NumConfs, Event: chainntnfs.NewConfirmationEvent(tx2NumConfs), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn2.TxID, nil); err != nil { + t.Fatalf("unable to deliver conf details: %v", err) + } + // Tx 3 will be confirmed in block 10 and requires 2 confs. tx3Hash := tx3.TxHash() ntfn3 := chainntnfs.ConfNtfn{ @@ -470,19 +478,23 @@ func TestTxConfChainReorg(t *testing.T) { NumConfirmations: tx3NumConfs, Event: chainntnfs.NewConfirmationEvent(tx3NumConfs), } - if err := txConfNotifier.Register(&ntfn3); err != nil { + if _, err := tcn.Register(&ntfn3); err != nil { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn3.TxID, nil); err != nil { + t.Fatalf("unable to deliver conf details: %v", err) + } + // Sync chain to block 10. Txs 1 & 2 should be confirmed. block1 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx1}, }) - err := txConfNotifier.ConnectTip(nil, 8, block1.Transactions()) + err := tcn.ConnectTip(nil, 8, block1.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(nil, 9, nil) + err = tcn.ConnectTip(nil, 9, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -490,7 +502,7 @@ func TestTxConfChainReorg(t *testing.T) { block2 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx2, &tx3}, }) - err = txConfNotifier.ConnectTip(nil, 10, block2.Transactions()) + err = tcn.ConnectTip(nil, 10, block2.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -547,17 +559,17 @@ func TestTxConfChainReorg(t *testing.T) { // The block that included tx2 and tx3 is disconnected and two next // blocks without them are connected. - err = txConfNotifier.DisconnectTip(10) + err = tcn.DisconnectTip(10) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(nil, 10, nil) + err = tcn.ConnectTip(nil, 10, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(nil, 11, nil) + err = tcn.ConnectTip(nil, 11, nil) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -605,12 +617,12 @@ func TestTxConfChainReorg(t *testing.T) { }) block4 := btcutil.NewBlock(&wire.MsgBlock{}) - err = txConfNotifier.ConnectTip(block3.Hash(), 12, block3.Transactions()) + err = tcn.ConnectTip(block3.Hash(), 12, block3.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - err = txConfNotifier.ConnectTip(block4.Hash(), 13, block4.Transactions()) + err = tcn.ConnectTip(block4.Hash(), 13, block4.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -675,19 +687,23 @@ func TestTxConfChainReorg(t *testing.T) { } // TestTxConfHeightHintCache ensures that the height hints for transactions are -// kept track of correctly with each new block connected/disconnected. +// kept track of correctly with each new block connected/disconnected. This test +// also asserts that the height hints are not updated until the simulated +// historical dispatches have returned, and we know the transactions aren't +// already in the chain. func TestTxConfHeightHintCache(t *testing.T) { t.Parallel() const ( - startingHeight = 10 - tx1Height = 11 - tx2Height = 12 + startingHeight = 200 + txDummyHeight = 201 + tx1Height = 202 + tx2Height = 203 ) // Initialize our TxConfNotifier instance backed by a height hint cache. hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier( + tcn := chainntnfs.NewTxConfNotifier( startingHeight, 100, hintCache, ) @@ -708,72 +724,112 @@ func TestTxConfHeightHintCache(t *testing.T) { Event: chainntnfs.NewConfirmationEvent(2), } - if err := txConfNotifier.Register(ntfn1); err != nil { + if _, err := tcn.Register(ntfn1); err != nil { t.Fatalf("unable to register tx1: %v", err) } - if err := txConfNotifier.Register(ntfn2); err != nil { + if _, err := tcn.Register(ntfn2); err != nil { t.Fatalf("unable to register tx2: %v", err) } - // Both transactions should have a height hint of the starting height - // due to registering notifications for them. - hint, err := hintCache.QueryConfirmHint(tx1Hash) - if err != nil { - t.Fatalf("unable to query for hint: %v", err) - } - if hint != startingHeight { - t.Fatalf("expected hint %d, got %d", startingHeight, hint) + // Both transactions should not have a height hint set, as Register + // should not alter the cache state. + _, err := hintCache.QueryConfirmHint(tx1Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) } - hint, err = hintCache.QueryConfirmHint(tx2Hash) - if err != nil { - t.Fatalf("unable to query for hint: %v", err) - } - if hint != startingHeight { - t.Fatalf("expected hint %d, got %d", startingHeight, hint) + _, err = hintCache.QueryConfirmHint(tx2Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) } - // Create a new block that will include the first transaction and extend + // Create a new block that will include the dummy transaction and extend // the chain. + txDummy := wire.MsgTx{Version: 3} block1 := btcutil.NewBlock(&wire.MsgBlock{ - Transactions: []*wire.MsgTx{&tx1}, + Transactions: []*wire.MsgTx{&txDummy}, }) - err = txConfNotifier.ConnectTip( - block1.Hash(), tx1Height, block1.Transactions(), + err = tcn.ConnectTip( + block1.Hash(), txDummyHeight, block1.Transactions(), ) if err != nil { t.Fatalf("Failed to connect block: %v", err) } - // The height hint for the first transaction should now be updated to - // reflect its confirmation. + // Since UpdateConfDetails has not been called for either transaction, + // the height hints should remain unchanged. This simulates blocks + // confirming while the historical dispatch is processing the + // registration. + hint, err := hintCache.QueryConfirmHint(tx1Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) + } + + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != chainntnfs.ErrConfirmHintNotFound { + t.Fatalf("unexpected error when querying for height hint "+ + "want: %v, got %v", + chainntnfs.ErrConfirmHintNotFound, err) + } + + // Now, update the conf details reporting that the neither txn was found + // in the historical dispatch. + if err := tcn.UpdateConfDetails(tx1Hash, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } + if err := tcn.UpdateConfDetails(tx2Hash, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } + + // We'll create another block that will include the first transaction + // and extend the chain. + block2 := btcutil.NewBlock(&wire.MsgBlock{ + Transactions: []*wire.MsgTx{&tx1}, + }) + + err = tcn.ConnectTip( + block2.Hash(), tx1Height, block2.Transactions(), + ) + if err != nil { + t.Fatalf("Failed to connect block: %v", err) + } + + // Now that both notifications are waiting at tip for confirmations, + // they should have their height hints updated to the latest block + // height. hint, err = hintCache.QueryConfirmHint(tx1Hash) if err != nil { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) } - // The height hint for the second transaction should also be updated due - // to it still being unconfirmed. hint, err = hintCache.QueryConfirmHint(tx2Hash) if err != nil { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx2Height, hint) } - // Now, we'll create another block that will include the second + // Next, we'll create another block that will include the second // transaction and extend the chain. - block2 := btcutil.NewBlock(&wire.MsgBlock{ + block3 := btcutil.NewBlock(&wire.MsgBlock{ Transactions: []*wire.MsgTx{&tx2}, }) - err = txConfNotifier.ConnectTip( - block2.Hash(), tx2Height, block2.Transactions(), + err = tcn.ConnectTip( + block3.Hash(), tx2Height, block3.Transactions(), ) if err != nil { t.Fatalf("Failed to connect block: %v", err) @@ -785,7 +841,8 @@ func TestTxConfHeightHintCache(t *testing.T) { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) } // The height hint for the second transaction should now be updated to @@ -795,12 +852,13 @@ func TestTxConfHeightHintCache(t *testing.T) { t.Fatalf("unable to query for hint: %v", err) } if hint != tx2Height { - t.Fatalf("expected hint %d, got %d", tx2Height, hint) + t.Fatalf("expected hint %d, got %d", + tx2Height, hint) } - // Now, we'll attempt do disconnect the last block in order to simulate - // a chain reorg. - if err := txConfNotifier.DisconnectTip(tx2Height); err != nil { + // Finally, we'll attempt do disconnect the last block in order to + // simulate a chain reorg. + if err := tcn.DisconnectTip(tx2Height); err != nil { t.Fatalf("Failed to disconnect block: %v", err) } @@ -811,7 +869,19 @@ func TestTxConfHeightHintCache(t *testing.T) { t.Fatalf("unable to query for hint: %v", err) } if hint != tx1Height { - t.Fatalf("expected hint %d, got %d", tx1Height, hint) + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) + } + + // The first transaction's height hint should remain at the original + // confirmation height. + hint, err = hintCache.QueryConfirmHint(tx2Hash) + if err != nil { + t.Fatalf("unable to query for hint: %v", err) + } + if hint != tx1Height { + t.Fatalf("expected hint %d, got %d", + tx1Height, hint) } } @@ -824,7 +894,7 @@ func TestTxConfTearDown(t *testing.T) { ) hintCache := newMockHintCache() - txConfNotifier := chainntnfs.NewTxConfNotifier(10, 100, hintCache) + tcn := chainntnfs.NewTxConfNotifier(10, 100, hintCache) // Create the test transactions and register them with the // TxConfNotifier to receive notifications. @@ -834,9 +904,12 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 1, Event: chainntnfs.NewConfirmationEvent(1), } - if err := txConfNotifier.Register(&ntfn1); err != nil { + if _, err := tcn.Register(&ntfn1); err != nil { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn1.TxID, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } tx2Hash := tx2.TxHash() ntfn2 := chainntnfs.ConfNtfn{ @@ -844,9 +917,12 @@ func TestTxConfTearDown(t *testing.T) { NumConfirmations: 2, Event: chainntnfs.NewConfirmationEvent(2), } - if err := txConfNotifier.Register(&ntfn2); err != nil { + if _, err := tcn.Register(&ntfn2); err != nil { t.Fatalf("unable to register ntfn: %v", err) } + if err := tcn.UpdateConfDetails(*ntfn2.TxID, nil); err != nil { + t.Fatalf("unable to update conf details: %v", err) + } // Include the transactions in a block and add it to the TxConfNotifier. // This should confirm tx1, but not tx2. @@ -854,7 +930,7 @@ func TestTxConfTearDown(t *testing.T) { Transactions: []*wire.MsgTx{&tx1, &tx2}, }) - err := txConfNotifier.ConnectTip(block.Hash(), 11, block.Transactions()) + err := tcn.ConnectTip(block.Hash(), 11, block.Transactions()) if err != nil { t.Fatalf("Failed to connect block: %v", err) } @@ -890,7 +966,7 @@ func TestTxConfTearDown(t *testing.T) { // The notification channels should be closed for notifications that // have not been dispatched yet, so we should not expect to receive any // more updates. - txConfNotifier.TearDown() + tcn.TearDown() // tx1 should not receive any more updates because it has already been // confirmed and the TxConfNotifier has been shut down.