diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 62e838cd..dc7ad2c1 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -69,6 +69,7 @@ type NeutrinoNotifier struct { rescanErr <-chan error chainUpdates *queue.ConcurrentQueue + txUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -107,6 +108,7 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, rescanErr: make(chan error), chainUpdates: queue.NewConcurrentQueue(10), + txUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -149,6 +151,7 @@ func (n *NeutrinoNotifier) Start() error { rpcclient.NotificationHandlers{ OnFilteredBlockConnected: n.onFilteredBlockConnected, OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected, + OnRedeemingTx: n.onRelevantTx, }, ), neutrino.WatchInputs(zeroInput), @@ -160,6 +163,7 @@ func (n *NeutrinoNotifier) Start() error { n.rescanErr = n.chainView.Start() n.chainUpdates.Start() + n.txUpdates.Start() n.wg.Add(1) go n.notificationDispatcher() @@ -178,6 +182,7 @@ func (n *NeutrinoNotifier) Stop() error { n.wg.Wait() n.chainUpdates.Stop() + n.txUpdates.Stop() // Notify all pending clients of our shutdown by closing the related // notification channels. @@ -221,11 +226,14 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. - n.chainUpdates.ChanIn() <- &filteredBlock{ + select { + case n.chainUpdates.ChanIn() <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, connect: true, + }: + case <-n.quit: } } @@ -236,10 +244,29 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. - n.chainUpdates.ChanIn() <- &filteredBlock{ + select { + case n.chainUpdates.ChanIn() <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), connect: false, + }: + case <-n.quit: + } +} + +// relevantTx represents a relevant transaction to the notifier that fulfills +// any outstanding spend requests. +type relevantTx struct { + tx *btcutil.Tx + details *btcjson.BlockDetails +} + +// onRelevantTx is a callback that proxies relevant transaction notifications +// from the backend to the notifier's main event handler. +func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { + select { + case n.txUpdates.ChanIn() <- &relevantTx{tx, details}: + case <-n.quit: } } @@ -437,6 +464,22 @@ out: n.bestHeight = uint32(newBestBlock.Height) n.heightMtx.Unlock() + case txUpdate := <-n.txUpdates.ChanOut(): + // A new relevant transaction notification has been + // received from the backend. We'll attempt to process + // it to determine if it fulfills any outstanding + // confirmation and/or spend requests and dispatch + // notifications for them. + update := txUpdate.(*relevantTx) + err := n.txNotifier.ProcessRelevantSpendTx( + update.tx, uint32(update.details.Height), + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to process "+ + "transaction %v: %v", update.tx.Hash(), + err) + } + case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -601,8 +644,12 @@ func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistr } // RegisterSpendNtfn registers an intent to be notified once the target -// outpoint has been spent by a transaction on-chain. Once a spend of the -// target outpoint has been detected, the details of the spending event will be +// outpoint/output script has been spent by a transaction on-chain. When +// intending to be notified of the spend of an output script, a nil outpoint +// must be used. The heightHint should represent the earliest height in the +// chain of the transaction that spent the outpoint/output script. +// +// Once a spend of has been detected, the details of the spending event will be // sent across the 'Spend' channel. func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { @@ -610,26 +657,22 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // First, we'll construct a spend notification request and hand it off // to the txNotifier. spendID := atomic.AddUint64(&n.spendClientCounter, 1) - cancel := func() { - n.txNotifier.CancelSpend(*outpoint, spendID) - } - ntfn := &chainntnfs.SpendNtfn{ - SpendID: spendID, - OutPoint: *outpoint, - Event: chainntnfs.NewSpendEvent(cancel), - HeightHint: heightHint, - } - - historicalDispatch, err := n.txNotifier.RegisterSpend(ntfn) + spendRequest, err := chainntnfs.NewSpendRequest(outpoint, pkScript) if err != nil { return nil, err } + ntfn := &chainntnfs.SpendNtfn{ + SpendID: spendID, + SpendRequest: spendRequest, + Event: chainntnfs.NewSpendEvent(func() { + n.txNotifier.CancelSpend(spendRequest, spendID) + }), + HeightHint: heightHint, + } - // If the txNotifier didn't return any details to perform a historical - // scan of the chain, then we can return early as there's nothing left - // for us to do. - if historicalDispatch == nil { - return ntfn.Event, nil + historicalDispatch, txNotifierTip, err := n.txNotifier.RegisterSpend(ntfn) + if err != nil { + return nil, err } // To determine whether this outpoint has been spent on-chain, we'll @@ -638,22 +681,32 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // past. // // We'll update our filter first to ensure we can immediately detect the - // spend at tip. To do so, we'll map the script into an address - // type so we can instruct neutrino to match if the transaction - // containing the script is found in a block. + // spend at tip. inputToWatch := neutrino.InputWithScript{ - OutPoint: *outpoint, - PkScript: pkScript, + OutPoint: spendRequest.OutPoint, + PkScript: spendRequest.PkScript.Script(), } + updateOptions := []neutrino.UpdateOption{ + neutrino.AddInputs(inputToWatch), + neutrino.DisableDisconnectedNtfns(true), + } + + // We'll use the txNotifier's tip as the starting point of our filter + // update. In the case of an output script spend request, we'll check if + // we should perform a historical rescan and start from there, as we + // cannot do so with GetUtxo since it matches outpoints. + rewindHeight := txNotifierTip + if historicalDispatch != nil && + spendRequest.OutPoint == chainntnfs.ZeroOutPoint { + rewindHeight = historicalDispatch.StartHeight + } + updateOptions = append(updateOptions, neutrino.Rewind(rewindHeight)) + errChan := make(chan error, 1) select { case n.notificationRegistry <- &rescanFilterUpdate{ - updateOptions: []neutrino.UpdateOption{ - neutrino.AddInputs(inputToWatch), - neutrino.Rewind(historicalDispatch.EndHeight), - neutrino.DisableDisconnectedNtfns(true), - }, - errChan: errChan, + updateOptions: updateOptions, + errChan: errChan, }: case <-n.quit: return nil, ErrChainNotifierShuttingDown @@ -668,6 +721,15 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, fmt.Errorf("unable to update filter: %v", err) } + // If the txNotifier didn't return any details to perform a historical + // scan of the chain, or if we already performed one like in the case of + // output script spend requests, then we can return early as there's + // nothing left for us to do. + if historicalDispatch == nil || + spendRequest.OutPoint == chainntnfs.ZeroOutPoint { + return ntfn.Event, nil + } + // With the filter updated, we'll dispatch our historical rescan to // ensure we detect the spend if it happened in the past. We'll ensure // that neutrino is caught up to the starting height before we attempt @@ -704,7 +766,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, if spendReport != nil && spendReport.SpendingTx != nil { spendingTxHash := spendReport.SpendingTx.TxHash() spendDetails = &chainntnfs.SpendDetail{ - SpentOutPoint: outpoint, + SpentOutPoint: &spendRequest.OutPoint, SpenderTxHash: &spendingTxHash, SpendingTx: spendReport.SpendingTx, SpenderInputIndex: spendReport.SpendingInputIndex, @@ -716,7 +778,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // not, we'll mark our historical rescan as complete to ensure the // outpoint's spend hint gets updated upon connected/disconnected // blocks. - err = n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails) + err = n.txNotifier.UpdateSpendDetails(spendRequest, spendDetails) if err != nil { return nil, err } diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index f341bdf3..44c1ba6d 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -59,6 +59,7 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, n.rescanErr = n.chainView.Start() n.chainUpdates.Start() + n.txUpdates.Start() if generateBlocks != nil { // Ensure no block notifications are pending when we start the