chainntnfs/btcdnotify: support registration for script spends

In this commit, we extend the BtcdNotifier to support registering
scripts for spends notifications. Once the script has been detected as
spent within the chain, a spend notification will be dispatched through
the Spend channel of the SpendEvent returned upon registration.

For scripts that have been spent in the past, the rescan logic has been
modified to match on the script rather than the outpoint. This is done
by encoding the script as an address.

For scripts that are unspent, a request to the backend will be sent to
alert the BtcdNotifier of when the script was spent by a transaction. To
make this request we encode the script as an address, as this is what
the backend uses to detect the spend. The transaction will then be
proxied through the txUpdates concurrent queue, which will hand it off
to the underlying txNotifier and dispatch spend notifications to the
relevant clients.

Along the way, we also address an issue where we'd miss detecting that
an outpoint/script has been spent in the future due to not receiving a
historical dispatch request from the underlying txNotifier. To fix this,
we ensure that we always request the backend to notify us of the spend
once it detects it at tip, regardless of whether a historical rescan was
detected or not.
This commit is contained in:
Wilmer Paulino 2018-12-06 21:14:25 -08:00
parent f02590d8c0
commit 808c6ae660
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F

@ -431,13 +431,13 @@ out:
continue continue
} }
tx := newSpend.tx.MsgTx()
err := b.txNotifier.ProcessRelevantSpendTx( err := b.txNotifier.ProcessRelevantSpendTx(
tx, newSpend.details.Height, newSpend.tx, uint32(newSpend.details.Height),
) )
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable to process "+ chainntnfs.Log.Errorf("Unable to process "+
"transaction %v: %v", tx.TxHash(), err) "transaction %v: %v",
newSpend.tx.Hash(), err)
} }
case <-b.quit: case <-b.quit:
@ -703,32 +703,57 @@ func (b *BtcdNotifier) notifyBlockEpochClient(epochClient *blockEpochRegistratio
} }
// RegisterSpendNtfn registers an intent to be notified once the target // RegisterSpendNtfn registers an intent to be notified once the target
// outpoint has been spent by a transaction on-chain. Once a spend of the target // outpoint/output script has been spent by a transaction on-chain. When
// outpoint has been detected, the details of the spending event will be sent // intending to be notified of the spend of an output script, a nil outpoint
// across the 'Spend' channel. The heightHint should represent the earliest // must be used. The heightHint should represent the earliest height in the
// height in the chain where the transaction could have been spent in. // chain of the transaction that spent the outpoint/output script.
//
// Once a spend of has been detected, the details of the spending event will be
// sent across the 'Spend' channel.
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
// First, we'll construct a spend notification request and hand it off // First, we'll construct a spend notification request and hand it off
// to the txNotifier. // to the txNotifier.
spendID := atomic.AddUint64(&b.spendClientCounter, 1) spendID := atomic.AddUint64(&b.spendClientCounter, 1)
cancel := func() { spendRequest, err := chainntnfs.NewSpendRequest(outpoint, pkScript)
b.txNotifier.CancelSpend(*outpoint, spendID) if err != nil {
return nil, err
} }
ntfn := &chainntnfs.SpendNtfn{ ntfn := &chainntnfs.SpendNtfn{
SpendID: spendID, SpendID: spendID,
OutPoint: *outpoint, SpendRequest: spendRequest,
PkScript: pkScript, Event: chainntnfs.NewSpendEvent(func() {
Event: chainntnfs.NewSpendEvent(cancel), b.txNotifier.CancelSpend(spendRequest, spendID)
}),
HeightHint: heightHint, HeightHint: heightHint,
} }
historicalDispatch, err := b.txNotifier.RegisterSpend(ntfn) historicalDispatch, _, err := b.txNotifier.RegisterSpend(ntfn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// We'll then request the backend to notify us when it has detected the
// outpoint/output script as spent.
//
// TODO(wilmer): use LoadFilter API instead.
if spendRequest.OutPoint == chainntnfs.ZeroOutPoint {
addr, err := spendRequest.PkScript.Address(b.chainParams)
if err != nil {
return nil, err
}
addrs := []btcutil.Address{addr}
if err := b.chainConn.NotifyReceived(addrs); err != nil {
return nil, err
}
} else {
ops := []*wire.OutPoint{&spendRequest.OutPoint}
if err := b.chainConn.NotifySpent(ops); err != nil {
return nil, err
}
}
// If the txNotifier didn't return any details to perform a historical // If the txNotifier didn't return any details to perform a historical
// scan of the chain, then we can return early as there's nothing left // scan of the chain, then we can return early as there's nothing left
// for us to do. // for us to do.
@ -736,24 +761,55 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return ntfn.Event, nil return ntfn.Event, nil
} }
// We'll then request the backend to notify us when it has detected the // Otherwise, we'll need to dispatch a historical rescan to determine if
// outpoint as spent. // the outpoint was already spent at a previous height.
ops := []*wire.OutPoint{outpoint} //
if err := b.chainConn.NotifySpent(ops); err != nil { // We'll short-circuit the path when dispatching the spend of a script,
// rather than an outpoint, as there aren't any additional checks we can
// make for scripts.
if spendRequest.OutPoint == chainntnfs.ZeroOutPoint {
startHash, err := b.chainConn.GetBlockHash(
int64(historicalDispatch.StartHeight),
)
if err != nil {
return nil, err return nil, err
} }
// In addition to the check above, we'll also check the backend's UTXO // TODO(wilmer): add retry logic if rescan fails?
// set to determine whether the outpoint has been spent. If it hasn't, addr, err := spendRequest.PkScript.Address(b.chainParams)
// we can return to the caller as well. if err != nil {
txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true) return nil, err
}
addrs := []btcutil.Address{addr}
asyncResult := b.chainConn.RescanAsync(startHash, addrs, nil)
go func() {
if rescanErr := asyncResult.Receive(); rescanErr != nil {
chainntnfs.Log.Errorf("Rescan to determine "+
"the spend details of %v failed: %v",
spendRequest, rescanErr)
}
}()
return ntfn.Event, nil
}
// When dispatching spends of outpoints, there are a number of checks we
// can make to start our rescan from a better height or completely avoid
// it.
//
// We'll start by checking the backend's UTXO set to determine whether
// the outpoint has been spent. If it hasn't, we can return to the
// caller as well.
txOut, err := b.chainConn.GetTxOut(
&spendRequest.OutPoint.Hash, spendRequest.OutPoint.Index, true,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if txOut != nil { if txOut != nil {
// We'll let the txNotifier know the outpoint is still unspent // We'll let the txNotifier know the outpoint is still unspent
// in order to begin updating its spend hint. // in order to begin updating its spend hint.
err := b.txNotifier.UpdateSpendDetails(*outpoint, nil) err := b.txNotifier.UpdateSpendDetails(spendRequest, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -761,9 +817,9 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return ntfn.Event, nil return ntfn.Event, nil
} }
// Otherwise, we'll determine when the output was spent by scanning the // Since the outpoint was spent, as it no longer exists within the UTXO
// chain. We'll begin by determining where to start our historical // set, we'll determine when it happened by scanning the chain. We'll
// rescan. // begin by fetching the block hash of our starting height.
startHash, err := b.chainConn.GetBlockHash( startHash, err := b.chainConn.GetBlockHash(
int64(historicalDispatch.StartHeight), int64(historicalDispatch.StartHeight),
) )
@ -776,14 +832,14 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// index (if enabled) to determine if we have a better rescan starting // index (if enabled) to determine if we have a better rescan starting
// height. We can do this as the GetRawTransaction call will return the // height. We can do this as the GetRawTransaction call will return the
// hash of the block it was included in within the chain. // hash of the block it was included in within the chain.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) tx, err := b.chainConn.GetRawTransactionVerbose(&spendRequest.OutPoint.Hash)
if err != nil { if err != nil {
// Avoid returning an error if the transaction was not found to // Avoid returning an error if the transaction was not found to
// proceed with fallback methods. // proceed with fallback methods.
jsonErr, ok := err.(*btcjson.RPCError) jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, fmt.Errorf("unable to query for "+ return nil, fmt.Errorf("unable to query for txid %v: %v",
"txid %v: %v", outpoint.Hash, err) spendRequest.OutPoint.Hash, err)
} }
} }
@ -819,6 +875,9 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
} }
} }
// Now that we've determined the best starting point for our rescan,
// we can go ahead and dispatch it.
//
// In order to ensure that we don't block the caller on what may be a // In order to ensure that we don't block the caller on what may be a
// long rescan, we'll launch a new goroutine to handle the async result // long rescan, we'll launch a new goroutine to handle the async result
// of the rescan. We purposefully prevent from adding this goroutine to // of the rescan. We purposefully prevent from adding this goroutine to
@ -826,11 +885,14 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// asyncResult channel not being exposed. // asyncResult channel not being exposed.
// //
// TODO(wilmer): add retry logic if rescan fails? // TODO(wilmer): add retry logic if rescan fails?
asyncResult := b.chainConn.RescanAsync(startHash, nil, ops) asyncResult := b.chainConn.RescanAsync(
startHash, nil, []*wire.OutPoint{&spendRequest.OutPoint},
)
go func() { go func() {
if rescanErr := asyncResult.Receive(); rescanErr != nil { if rescanErr := asyncResult.Receive(); rescanErr != nil {
chainntnfs.Log.Errorf("Rescan to determine the spend "+ chainntnfs.Log.Errorf("Rescan to determine the spend "+
"details of %v failed: %v", outpoint, rescanErr) "details of %v failed: %v", spendRequest,
rescanErr)
} }
}() }()