chainntnfs/btcdnotify: handle spend notification registration w/ TxNotifier

In this commit, we modify the logic within RegisterSpendNtfn for the
BtcdNotifier to account for the recent changes made to the TxNotifier.
Since it is now able to handle spend notification registration and
dispatch, we can bypass all the current logic within the
BtcdNotifier and interact directly with the TxNotifier instead.

The most notable change is that now we'll only attempt a historical
rescan if the TxNotifier tells us so.
This commit is contained in:
Wilmer Paulino 2018-10-05 02:07:55 -07:00
parent 0927f35dc1
commit 88edd320d5
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 124 additions and 253 deletions

@ -168,6 +168,7 @@ func (b *BtcdNotifier) Start() error {
b.txNotifier = chainntnfs.NewTxNotifier(
uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache,
b.spendHintCache,
)
b.bestBlock = chainntnfs.BlockEpoch{
@ -329,6 +330,8 @@ out:
// included in the active chain. We'll do this
// in a goroutine to prevent blocking
// potentially long rescans.
//
// TODO(wilmer): add retry logic if rescan fails?
b.wg.Add(1)
go func() {
defer b.wg.Done()
@ -449,60 +452,23 @@ out:
// partially completed.
b.bestBlock = newBestBlock
// NOTE: we currently only use txUpdates for mempool spends and
// rescan spends. It might get removed entirely in the future.
case item := <-b.txUpdates.ChanOut():
newSpend := item.(*txUpdate)
// We only care about notifying on confirmed spends, so
// in case this is a mempool spend, we can continue,
// and wait for the spend to appear in chain.
// if this is a mempool spend, we can ignore it and wait
// for the spend to appear in on-chain.
if newSpend.details == nil {
continue
}
spendingTx := newSpend.tx
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for i, txIn := range spendingTx.MsgTx().TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered
// notification for, then create a spend
// summary, finally sending off the details to
// the notification subscriber.
if clients, ok := b.spendNotifications[prevOut]; ok {
spenderSha := newSpend.tx.Hash()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: spenderSha,
SpendingTx: spendingTx.MsgTx(),
SpenderInputIndex: uint32(i),
}
spendDetails.SpendingHeight = newSpend.details.Height
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching "+
"confirmed spend "+
"notification for "+
"outpoint=%v at height %v",
ntfn.targetOutpoint,
spendDetails.SpendingHeight)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure
// that any calls to Cancel
// will not block. This is safe
// to do since the channel is
// buffered, and the message
// can still be read by the
// receiver.
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
}
tx := newSpend.tx.MsgTx()
err := b.txNotifier.ProcessRelevantSpendTx(
tx, newSpend.details.Height,
)
if err != nil {
chainntnfs.Log.Errorf("Unable to process "+
"transaction %v: %v", tx.TxHash(), err)
}
case <-b.quit:
@ -713,94 +679,12 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error {
chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height,
epoch.Hash)
// Define a helper struct for coalescing the spend notifications we will
// dispatch after trying to commit the spend hints.
type spendNtfnBatch struct {
details *chainntnfs.SpendDetail
clients map[uint64]*spendNotification
}
// Scan over the list of relevant transactions and possibly dispatch
// notifications for spends.
spendBatches := make(map[wire.OutPoint]spendNtfnBatch)
for _, tx := range newBlock.txns {
mtx := tx.MsgTx()
txSha := mtx.TxHash()
for i, txIn := range mtx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an output which
// we have a registered notification for, then create a
// spend summary, finally sending off the details to the
// notification subscriber.
clients, ok := b.spendNotifications[prevOut]
if !ok {
continue
}
delete(b.spendNotifications, prevOut)
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &txSha,
SpendingTx: mtx,
SpenderInputIndex: uint32(i),
SpendingHeight: int32(newBlock.height),
}
spendBatches[prevOut] = spendNtfnBatch{
details: spendDetails,
clients: clients,
}
}
}
// Finally, we'll update the spend height hint for all of our watched
// outpoints that have not been spent yet. This is safe to do as we do
// not watch already spent outpoints for spend notifications.
ops := make([]wire.OutPoint, 0, len(b.spendNotifications))
for op := range b.spendNotifications {
ops = append(ops, op)
}
if len(ops) > 0 {
err := b.spendHintCache.CommitSpendHint(
uint32(epoch.Height), ops...,
)
if err != nil {
// The error is not fatal since we are connecting a
// block, and advancing the spend hint is an optimistic
// optimization.
chainntnfs.Log.Errorf("Unable to update spend hint to "+
"%d for %v: %v", epoch.Height, ops, err)
}
}
// We want to set the best block before dispatching notifications
// so if any subscribers make queries based on their received
// block epoch, our state is fully updated in time.
// We want to set the best block before dispatching notifications so if
// any subscribers make queries based on their received block epoch, our
// state is fully updated in time.
b.bestBlock = epoch
// Next we'll notify any subscribed clients of the block.
b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
// Finally, send off the spend details to the notification subscribers.
for _, batch := range spendBatches {
for _, ntfn := range batch.clients {
chainntnfs.Log.Infof("Dispatching spend "+
"notification for outpoint=%v",
ntfn.targetOutpoint)
ntfn.spendChan <- batch.details
// Close spendChan to ensure that any calls to
// Cancel will not block. This is safe to do
// since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
}
}
return nil
}
@ -859,145 +743,131 @@ type spendCancel struct {
func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
// First, we'll construct a spend notification request and hand it off
// to the txNotifier.
spendID := atomic.AddUint64(&b.spendClientCounter, 1)
cancel := func() {
b.txNotifier.CancelSpend(*outpoint, spendID)
}
ntfn := &chainntnfs.SpendNtfn{
SpendID: spendID,
OutPoint: *outpoint,
PkScript: pkScript,
Event: chainntnfs.NewSpendEvent(cancel),
HeightHint: heightHint,
}
// Construct a notification request for the outpoint and send it to the
// main event loop.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&b.spendClientCounter, 1),
heightHint: heightHint,
}
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn:
}
// TODO(roasbeef): update btcd rescan logic to also use both
if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil {
return nil, err
}
// The following conditional checks to ensure that when a spend
// notification is registered, the output hasn't already been spent. If
// the output is no longer in the UTXO set, the chain will be rescanned
// from the point where the output was added. The rescan will dispatch
// the notification.
txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true)
historicalDispatch, err := b.txNotifier.RegisterSpend(ntfn)
if err != nil {
return nil, err
}
// If the output is unspent, then we'll write it to the cache with the
// given height hint. This allows us to increase the height hint as the
// chain extends and the output remains unspent.
// If the txNotifier didn't return any details to perform a historical
// scan of the chain, then we can return early as there's nothing left
// for us to do.
if historicalDispatch == nil {
return ntfn.Event, nil
}
// We'll then request the backend to notify us when it has detected the
// outpoint as spent.
ops := []*wire.OutPoint{outpoint}
if err := b.chainConn.NotifySpent(ops); err != nil {
return nil, err
}
// In addition to the check above, we'll also check the backend's UTXO
// set to determine whether the outpoint has been spent. If it hasn't,
// we can return to the caller as well.
txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true)
if err != nil {
return nil, err
}
if txOut != nil {
err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint)
// We'll let the txNotifier know the outpoint is still unspent
// in order to begin updating its spend hint.
err := b.txNotifier.UpdateSpendDetails(*outpoint, nil)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Error("Unable to update spend hint to "+
"%d for %v: %v", heightHint, *outpoint, err)
}
} else {
// Otherwise, we'll determine when the output was spent.
//
// First, we'll attempt to retrieve the transaction's block hash
// using the backend's transaction index.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
if err != nil {
// Avoid returning an error if the transaction was not
// found to proceed with fallback methods.
jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, fmt.Errorf("unable to query for "+
"txid %v: %v", outpoint.Hash, err)
}
return nil, err
}
var blockHash *chainhash.Hash
if tx != nil && tx.BlockHash != "" {
// If we're able to retrieve a valid block hash from the
// transaction, then we'll use it as our rescan starting
// point.
blockHash, err = chainhash.NewHashFromStr(tx.BlockHash)
if err != nil {
return nil, err
}
} else {
// Otherwise, we'll attempt to retrieve the hash for the
// block at the heightHint.
blockHash, err = b.chainConn.GetBlockHash(
int64(heightHint),
)
if err != nil {
return nil, err
}
}
return ntfn.Event, nil
}
// We'll only request a rescan if the transaction has actually
// been included within a block. Otherwise, we'll encounter an
// error when scanning for blocks. This can happen in the case
// of a race condition, wherein the output itself is unspent,
// and only arrives in the mempool after the getxout call.
if blockHash != nil {
ops := []*wire.OutPoint{outpoint}
// Otherwise, we'll determine when the output was spent by scanning the
// chain. We'll begin by determining where to start our historical
// rescan.
startHash, err := b.chainConn.GetBlockHash(
int64(historicalDispatch.StartHeight),
)
if err != nil {
return nil, fmt.Errorf("unable to get block hash for height "+
"%d: %v", historicalDispatch.StartHeight, err)
}
// In order to ensure that we don't block the caller on
// what may be a long rescan, we'll launch a new
// goroutine to handle the async result of the rescan.
asyncResult := b.chainConn.RescanAsync(
blockHash, nil, ops,
)
go func() {
rescanErr := asyncResult.Receive()
if rescanErr != nil {
chainntnfs.Log.Errorf("Rescan for spend "+
"notification txout(%x) "+
"failed: %v", outpoint, rescanErr)
}
}()
// As a minimal optimization, we'll query the backend's transaction
// index (if enabled) to determine if we have a better rescan starting
// height. We can do this as the GetRawTransaction call will return the
// hash of the block it was included in within the chain.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
if err != nil {
// Avoid returning an error if the transaction was not found to
// proceed with fallback methods.
jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, fmt.Errorf("unable to query for "+
"txid %v: %v", outpoint.Hash, err)
}
}
return &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
}
// If the transaction index was enabled, we'll use the block's hash to
// retrieve its height and check whether it provides a better starting
// point for our rescan.
if tx != nil {
// If the transaction containing the outpoint hasn't confirmed
// on-chain, then there's no need to perform a rescan.
if tx.BlockHash == "" {
return ntfn.Event, nil
}
// Submit spend cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the spend chan until it is
// closed before yielding to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
if !ok {
return
}
case <-b.quit:
return
}
}
case <-b.quit:
blockHash, err := chainhash.NewHashFromStr(tx.BlockHash)
if err != nil {
return nil, err
}
blockHeader, err := b.chainConn.GetBlockHeaderVerbose(blockHash)
if err != nil {
return nil, fmt.Errorf("unable to get header for "+
"block %v: %v", blockHash, err)
}
if uint32(blockHeader.Height) > historicalDispatch.StartHeight {
startHash, err = b.chainConn.GetBlockHash(
int64(blockHeader.Height),
)
if err != nil {
return nil, fmt.Errorf("unable to get block "+
"hash for height %d: %v",
blockHeader.Height, err)
}
},
}, nil
}
}
// In order to ensure that we don't block the caller on what may be a
// long rescan, we'll launch a new goroutine to handle the async result
// of the rescan. We purposefully prevent from adding this goroutine to
// the WaitGroup as we cannnot wait for a quit signal due to the
// asyncResult channel not being exposed.
//
// TODO(wilmer): add retry logic if rescan fails?
asyncResult := b.chainConn.RescanAsync(startHash, nil, ops)
go func() {
if rescanErr := asyncResult.Receive(); rescanErr != nil {
chainntnfs.Log.Errorf("Rescan to determine the spend "+
"details of %v failed: %v", outpoint, rescanErr)
}
}()
return ntfn.Event, nil
}
// RegisterConfirmationsNtfn registers a notification with BtcdNotifier

@ -30,6 +30,7 @@ func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash,
b.txNotifier = chainntnfs.NewTxNotifier(
uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache,
b.spendHintCache,
)
b.chainUpdates.Start()