chainntnfs/neutrinonotify: support registration for script spends

In this commit, we extend the NeutrinoNotifier to support registering
scripts for spends notifications. Once the script has been detected as
spent within the chain, a spend notification will be dispatched through
the Spend channel of the SpendEvent returned upon registration.

For scripts that have been spent in the past, the rescan logic has been
modified to match on the script rather than the outpoint. A concurrent
queue for relevant transactions has been added to proxy notifications
from the underlying rescan to the txNotifier. This is needed for
scripts, as we cannot perform a historical rescan for scripts through
`GetUtxo`, like we do with outpoints.

For scripts that are unspent, a filter update is sent to the underlying
rescan to ensure that we match and dispatch on the script when
processing new blocks.

Along the way, we also address an issue where we'd miss detecting that
an outpoint/script has been spent in the future due to not receiving a
historical dispatch request from the underlying txNotifier. To fix this,
we ensure that we always request the backend to notify us of the spend
once it detects it at tip, regardless of whether a historical rescan was
detected or not.
This commit is contained in:
Wilmer Paulino 2018-12-06 21:14:31 -08:00
parent 482f05a3bc
commit 52db5ed682
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 96 additions and 33 deletions

@ -69,6 +69,7 @@ type NeutrinoNotifier struct {
rescanErr <-chan error
chainUpdates *queue.ConcurrentQueue
txUpdates *queue.ConcurrentQueue
// spendHintCache is a cache used to query and update the latest height
// hints for an outpoint. Each height hint represents the earliest
@ -107,6 +108,7 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
rescanErr: make(chan error),
chainUpdates: queue.NewConcurrentQueue(10),
txUpdates: queue.NewConcurrentQueue(10),
spendHintCache: spendHintCache,
confirmHintCache: confirmHintCache,
@ -149,6 +151,7 @@ func (n *NeutrinoNotifier) Start() error {
rpcclient.NotificationHandlers{
OnFilteredBlockConnected: n.onFilteredBlockConnected,
OnFilteredBlockDisconnected: n.onFilteredBlockDisconnected,
OnRedeemingTx: n.onRelevantTx,
},
),
neutrino.WatchInputs(zeroInput),
@ -160,6 +163,7 @@ func (n *NeutrinoNotifier) Start() error {
n.rescanErr = n.chainView.Start()
n.chainUpdates.Start()
n.txUpdates.Start()
n.wg.Add(1)
go n.notificationDispatcher()
@ -178,6 +182,7 @@ func (n *NeutrinoNotifier) Stop() error {
n.wg.Wait()
n.chainUpdates.Stop()
n.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
@ -221,11 +226,14 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
// Append this new chain update to the end of the queue of new chain
// updates.
n.chainUpdates.ChanIn() <- &filteredBlock{
select {
case n.chainUpdates.ChanIn() <- &filteredBlock{
hash: header.BlockHash(),
height: uint32(height),
txns: txns,
connect: true,
}:
case <-n.quit:
}
}
@ -236,10 +244,29 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
// Append this new chain update to the end of the queue of new chain
// disconnects.
n.chainUpdates.ChanIn() <- &filteredBlock{
select {
case n.chainUpdates.ChanIn() <- &filteredBlock{
hash: header.BlockHash(),
height: uint32(height),
connect: false,
}:
case <-n.quit:
}
}
// relevantTx represents a relevant transaction to the notifier that fulfills
// any outstanding spend requests.
type relevantTx struct {
tx *btcutil.Tx
details *btcjson.BlockDetails
}
// onRelevantTx is a callback that proxies relevant transaction notifications
// from the backend to the notifier's main event handler.
func (n *NeutrinoNotifier) onRelevantTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
select {
case n.txUpdates.ChanIn() <- &relevantTx{tx, details}:
case <-n.quit:
}
}
@ -437,6 +464,22 @@ out:
n.bestHeight = uint32(newBestBlock.Height)
n.heightMtx.Unlock()
case txUpdate := <-n.txUpdates.ChanOut():
// A new relevant transaction notification has been
// received from the backend. We'll attempt to process
// it to determine if it fulfills any outstanding
// confirmation and/or spend requests and dispatch
// notifications for them.
update := txUpdate.(*relevantTx)
err := n.txNotifier.ProcessRelevantSpendTx(
update.tx, uint32(update.details.Height),
)
if err != nil {
chainntnfs.Log.Errorf("Unable to process "+
"transaction %v: %v", update.tx.Hash(),
err)
}
case err := <-n.rescanErr:
chainntnfs.Log.Errorf("Error during rescan: %v", err)
@ -601,8 +644,12 @@ func (n *NeutrinoNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistr
}
// RegisterSpendNtfn registers an intent to be notified once the target
// outpoint has been spent by a transaction on-chain. Once a spend of the
// target outpoint has been detected, the details of the spending event will be
// outpoint/output script has been spent by a transaction on-chain. When
// intending to be notified of the spend of an output script, a nil outpoint
// must be used. The heightHint should represent the earliest height in the
// chain of the transaction that spent the outpoint/output script.
//
// Once a spend of has been detected, the details of the spending event will be
// sent across the 'Spend' channel.
func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
@ -610,26 +657,22 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// First, we'll construct a spend notification request and hand it off
// to the txNotifier.
spendID := atomic.AddUint64(&n.spendClientCounter, 1)
cancel := func() {
n.txNotifier.CancelSpend(*outpoint, spendID)
}
ntfn := &chainntnfs.SpendNtfn{
SpendID: spendID,
OutPoint: *outpoint,
Event: chainntnfs.NewSpendEvent(cancel),
HeightHint: heightHint,
}
historicalDispatch, err := n.txNotifier.RegisterSpend(ntfn)
spendRequest, err := chainntnfs.NewSpendRequest(outpoint, pkScript)
if err != nil {
return nil, err
}
ntfn := &chainntnfs.SpendNtfn{
SpendID: spendID,
SpendRequest: spendRequest,
Event: chainntnfs.NewSpendEvent(func() {
n.txNotifier.CancelSpend(spendRequest, spendID)
}),
HeightHint: heightHint,
}
// If the txNotifier didn't return any details to perform a historical
// scan of the chain, then we can return early as there's nothing left
// for us to do.
if historicalDispatch == nil {
return ntfn.Event, nil
historicalDispatch, txNotifierTip, err := n.txNotifier.RegisterSpend(ntfn)
if err != nil {
return nil, err
}
// To determine whether this outpoint has been spent on-chain, we'll
@ -638,22 +681,32 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// past.
//
// We'll update our filter first to ensure we can immediately detect the
// spend at tip. To do so, we'll map the script into an address
// type so we can instruct neutrino to match if the transaction
// containing the script is found in a block.
// spend at tip.
inputToWatch := neutrino.InputWithScript{
OutPoint: *outpoint,
PkScript: pkScript,
OutPoint: spendRequest.OutPoint,
PkScript: spendRequest.PkScript.Script(),
}
updateOptions := []neutrino.UpdateOption{
neutrino.AddInputs(inputToWatch),
neutrino.DisableDisconnectedNtfns(true),
}
// We'll use the txNotifier's tip as the starting point of our filter
// update. In the case of an output script spend request, we'll check if
// we should perform a historical rescan and start from there, as we
// cannot do so with GetUtxo since it matches outpoints.
rewindHeight := txNotifierTip
if historicalDispatch != nil &&
spendRequest.OutPoint == chainntnfs.ZeroOutPoint {
rewindHeight = historicalDispatch.StartHeight
}
updateOptions = append(updateOptions, neutrino.Rewind(rewindHeight))
errChan := make(chan error, 1)
select {
case n.notificationRegistry <- &rescanFilterUpdate{
updateOptions: []neutrino.UpdateOption{
neutrino.AddInputs(inputToWatch),
neutrino.Rewind(historicalDispatch.EndHeight),
neutrino.DisableDisconnectedNtfns(true),
},
errChan: errChan,
updateOptions: updateOptions,
errChan: errChan,
}:
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
@ -668,6 +721,15 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, fmt.Errorf("unable to update filter: %v", err)
}
// If the txNotifier didn't return any details to perform a historical
// scan of the chain, or if we already performed one like in the case of
// output script spend requests, then we can return early as there's
// nothing left for us to do.
if historicalDispatch == nil ||
spendRequest.OutPoint == chainntnfs.ZeroOutPoint {
return ntfn.Event, nil
}
// With the filter updated, we'll dispatch our historical rescan to
// ensure we detect the spend if it happened in the past. We'll ensure
// that neutrino is caught up to the starting height before we attempt
@ -704,7 +766,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
if spendReport != nil && spendReport.SpendingTx != nil {
spendingTxHash := spendReport.SpendingTx.TxHash()
spendDetails = &chainntnfs.SpendDetail{
SpentOutPoint: outpoint,
SpentOutPoint: &spendRequest.OutPoint,
SpenderTxHash: &spendingTxHash,
SpendingTx: spendReport.SpendingTx,
SpenderInputIndex: spendReport.SpendingInputIndex,
@ -716,7 +778,7 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// not, we'll mark our historical rescan as complete to ensure the
// outpoint's spend hint gets updated upon connected/disconnected
// blocks.
err = n.txNotifier.UpdateSpendDetails(*outpoint, spendDetails)
err = n.txNotifier.UpdateSpendDetails(spendRequest, spendDetails)
if err != nil {
return nil, err
}

@ -59,6 +59,7 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32,
n.rescanErr = n.chainView.Start()
n.chainUpdates.Start()
n.txUpdates.Start()
if generateBlocks != nil {
// Ensure no block notifications are pending when we start the