From 52db5ed68247ab09c0bbb0e89f1a7dab849a0040 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 6 Dec 2018 21:14:31 -0800 Subject: [PATCH] chainntnfs/neutrinonotify: support registration for script spends In this commit, we extend the NeutrinoNotifier to support registering scripts for spends notifications. Once the script has been detected as spent within the chain, a spend notification will be dispatched through the Spend channel of the SpendEvent returned upon registration. For scripts that have been spent in the past, the rescan logic has been modified to match on the script rather than the outpoint. A concurrent queue for relevant transactions has been added to proxy notifications from the underlying rescan to the txNotifier. This is needed for scripts, as we cannot perform a historical rescan for scripts through `GetUtxo`, like we do with outpoints. For scripts that are unspent, a filter update is sent to the underlying rescan to ensure that we match and dispatch on the script when processing new blocks. Along the way, we also address an issue where we'd miss detecting that an outpoint/script has been spent in the future due to not receiving a historical dispatch request from the underlying txNotifier. To fix this, we ensure that we always request the backend to notify us of the spend once it detects it at tip, regardless of whether a historical rescan was detected or not. --- chainntnfs/neutrinonotify/neutrino.go | 128 ++++++++++++++++------ chainntnfs/neutrinonotify/neutrino_dev.go | 1 + 2 files changed, 96 insertions(+), 33 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 62e838cd..dc7ad2c1 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -69,6 +69,7 @@ type NeutrinoNotifier struct { rescanErr <-chan error chainUpdates *queue.ConcurrentQueue + txUpdates *queue.ConcurrentQueue // spendHintCache is a cache used to query and update the latest height // hints for an outpoint. Each height hint represents the earliest @@ -107,6 +108,7 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, rescanErr: make(chan error), chainUpdates: queue.NewConcurrentQueue(10), + txUpdates: queue.NewConcurrentQueue(10), spendHintCache: spendHintCache, confirmHintCache: confirmHintCache, @@ -149,6 +151,7 @@ func (n *NeutrinoNotifier) Start() error { rpcclient.NotificationHandlers{ OnFilteredBlockConnected: n.onFilteredBlockConnected, OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected, + OnRedeemingTx: n.onRelevantTx, }, ), neutrino.WatchInputs(zeroInput), @@ -160,6 +163,7 @@ func (n *NeutrinoNotifier) Start() error { n.rescanErr = n.chainView.Start() n.chainUpdates.Start() + n.txUpdates.Start() n.wg.Add(1) go n.notificationDispatcher() @@ -178,6 +182,7 @@ func (n *NeutrinoNotifier) Stop() error { n.wg.Wait() n.chainUpdates.Stop() + n.txUpdates.Stop() // Notify all pending clients of our shutdown by closing the related // notification channels. @@ -221,11 +226,14 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. - n.chainUpdates.ChanIn() <- &filteredBlock{ + select { + case n.chainUpdates.ChanIn() <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, connect: true, + }: + case <-n.quit: } } @@ -236,10 +244,29 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. - n.chainUpdates.ChanIn() <- &filteredBlock{ + select { + case n.chainUpdates.ChanIn() <- &filteredBlock{ hash: header.BlockHash(), height: uint32(height), connect: false, + }: + case <-n.quit: + } +} + +// relevantTx represents a relevant transaction to the notifier that fulfills +// any outstanding spend requests. +type relevantTx struct { + tx *btcutil.Tx + details *btcjson.BlockDetails +} + +// onRelevantTx is a callback that proxies relevant transaction notifications +// from the backend to the notifier's main event handler. +func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { + select { + case n.txUpdates.ChanIn() <- &relevantTx{tx, details}: + case <-n.quit: } } @@ -437,6 +464,22 @@ out: n.bestHeight = uint32(newBestBlock.Height) n.heightMtx.Unlock() + case txUpdate := <-n.txUpdates.ChanOut(): + // A new relevant transaction notification has been + // received from the backend. We'll attempt to process + // it to determine if it fulfills any outstanding + // confirmation and/or spend requests and dispatch + // notifications for them. + update := txUpdate.(*relevantTx) + err := n.txNotifier.ProcessRelevantSpendTx( + update.tx, uint32(update.details.Height), + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to process "+ + "transaction %v: %v", update.tx.Hash(), + err) + } + case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -601,8 +644,12 @@ func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistr } // RegisterSpendNtfn registers an intent to be notified once the target -// outpoint has been spent by a transaction on-chain. Once a spend of the -// target outpoint has been detected, the details of the spending event will be +// outpoint/output script has been spent by a transaction on-chain. When +// intending to be notified of the spend of an output script, a nil outpoint +// must be used. The heightHint should represent the earliest height in the +// chain of the transaction that spent the outpoint/output script. +// +// Once a spend of has been detected, the details of the spending event will be // sent across the 'Spend' channel. func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { @@ -610,26 +657,22 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // First, we'll construct a spend notification request and hand it off // to the txNotifier. spendID := atomic.AddUint64(&n.spendClientCounter, 1) - cancel := func() { - n.txNotifier.CancelSpend(*outpoint, spendID) - } - ntfn := &chainntnfs.SpendNtfn{ - SpendID: spendID, - OutPoint: *outpoint, - Event: chainntnfs.NewSpendEvent(cancel), - HeightHint: heightHint, - } - - historicalDispatch, err := n.txNotifier.RegisterSpend(ntfn) + spendRequest, err := chainntnfs.NewSpendRequest(outpoint, pkScript) if err != nil { return nil, err } + ntfn := &chainntnfs.SpendNtfn{ + SpendID: spendID, + SpendRequest: spendRequest, + Event: chainntnfs.NewSpendEvent(func() { + n.txNotifier.CancelSpend(spendRequest, spendID) + }), + HeightHint: heightHint, + } - // If the txNotifier didn't return any details to perform a historical - // scan of the chain, then we can return early as there's nothing left - // for us to do. - if historicalDispatch == nil { - return ntfn.Event, nil + historicalDispatch, txNotifierTip, err := n.txNotifier.RegisterSpend(ntfn) + if err != nil { + return nil, err } // To determine whether this outpoint has been spent on-chain, we'll @@ -638,22 +681,32 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // past. // // We'll update our filter first to ensure we can immediately detect the - // spend at tip. To do so, we'll map the script into an address - // type so we can instruct neutrino to match if the transaction - // containing the script is found in a block. + // spend at tip. inputToWatch := neutrino.InputWithScript{ - OutPoint: *outpoint, - PkScript: pkScript, + OutPoint: spendRequest.OutPoint, + PkScript: spendRequest.PkScript.Script(), } + updateOptions := []neutrino.UpdateOption{ + neutrino.AddInputs(inputToWatch), + neutrino.DisableDisconnectedNtfns(true), + } + + // We'll use the txNotifier's tip as the starting point of our filter + // update. In the case of an output script spend request, we'll check if + // we should perform a historical rescan and start from there, as we + // cannot do so with GetUtxo since it matches outpoints. + rewindHeight := txNotifierTip + if historicalDispatch != nil && + spendRequest.OutPoint == chainntnfs.ZeroOutPoint { + rewindHeight = historicalDispatch.StartHeight + } + updateOptions = append(updateOptions, neutrino.Rewind(rewindHeight)) + errChan := make(chan error, 1) select { case n.notificationRegistry <- &rescanFilterUpdate{ - updateOptions: []neutrino.UpdateOption{ - neutrino.AddInputs(inputToWatch), - neutrino.Rewind(historicalDispatch.EndHeight), - neutrino.DisableDisconnectedNtfns(true), - }, - errChan: errChan, + updateOptions: updateOptions, + errChan: errChan, }: case <-n.quit: return nil, ErrChainNotifierShuttingDown @@ -668,6 +721,15 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, return nil, fmt.Errorf("unable to update filter: %v", err) } + // If the txNotifier didn't return any details to perform a historical + // scan of the chain, or if we already performed one like in the case of + // output script spend requests, then we can return early as there's + // nothing left for us to do. + if historicalDispatch == nil || + spendRequest.OutPoint == chainntnfs.ZeroOutPoint { + return ntfn.Event, nil + } + // With the filter updated, we'll dispatch our historical rescan to // ensure we detect the spend if it happened in the past. We'll ensure // that neutrino is caught up to the starting height before we attempt @@ -704,7 +766,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, if spendReport != nil && spendReport.SpendingTx != nil { spendingTxHash := spendReport.SpendingTx.TxHash() spendDetails = &chainntnfs.SpendDetail{ - SpentOutPoint: outpoint, + SpentOutPoint: &spendRequest.OutPoint, SpenderTxHash: &spendingTxHash, SpendingTx: spendReport.SpendingTx, SpenderInputIndex: spendReport.SpendingInputIndex, @@ -716,7 +778,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // not, we'll mark our historical rescan as complete to ensure the // outpoint's spend hint gets updated upon connected/disconnected // blocks. - err = n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails) + err = n.txNotifier.UpdateSpendDetails(spendRequest, spendDetails) if err != nil { return nil, err } diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index f341bdf3..44c1ba6d 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -59,6 +59,7 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, n.rescanErr = n.chainView.Start() n.chainUpdates.Start() + n.txUpdates.Start() if generateBlocks != nil { // Ensure no block notifications are pending when we start the