diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 3fe740a7..6547b398 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -168,6 +168,7 @@ func (b *BtcdNotifier) Start() error { b.txNotifier = chainntnfs.NewTxNotifier( uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache, + b.spendHintCache, ) b.bestBlock = chainntnfs.BlockEpoch{ @@ -329,6 +330,8 @@ out: // included in the active chain. We'll do this // in a goroutine to prevent blocking // potentially long rescans. + // + // TODO(wilmer): add retry logic if rescan fails? b.wg.Add(1) go func() { defer b.wg.Done() @@ -449,60 +452,23 @@ out: // partially completed. b.bestBlock = newBestBlock - // NOTE: we currently only use txUpdates for mempool spends and - // rescan spends. It might get removed entirely in the future. case item := <-b.txUpdates.ChanOut(): newSpend := item.(*txUpdate) // We only care about notifying on confirmed spends, so - // in case this is a mempool spend, we can continue, - // and wait for the spend to appear in chain. + // if this is a mempool spend, we can ignore it and wait + // for the spend to appear in on-chain. if newSpend.details == nil { continue } - spendingTx := newSpend.tx - - // First, check if this transaction spends an output - // that has an existing spend notification for it. - for i, txIn := range spendingTx.MsgTx().TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does spend an - // output which we have a registered - // notification for, then create a spend - // summary, finally sending off the details to - // the notification subscriber. - if clients, ok := b.spendNotifications[prevOut]; ok { - spenderSha := newSpend.tx.Hash() - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: spenderSha, - SpendingTx: spendingTx.MsgTx(), - SpenderInputIndex: uint32(i), - } - spendDetails.SpendingHeight = newSpend.details.Height - - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching "+ - "confirmed spend "+ - "notification for "+ - "outpoint=%v at height %v", - ntfn.targetOutpoint, - spendDetails.SpendingHeight) - ntfn.spendChan <- spendDetails - - // Close spendChan to ensure - // that any calls to Cancel - // will not block. This is safe - // to do since the channel is - // buffered, and the message - // can still be read by the - // receiver. - close(ntfn.spendChan) - } - delete(b.spendNotifications, prevOut) - } + tx := newSpend.tx.MsgTx() + err := b.txNotifier.ProcessRelevantSpendTx( + tx, newSpend.details.Height, + ) + if err != nil { + chainntnfs.Log.Errorf("Unable to process "+ + "transaction %v: %v", tx.TxHash(), err) } case <-b.quit: @@ -713,94 +679,12 @@ func (b *BtcdNotifier) handleBlockConnected(epoch chainntnfs.BlockEpoch) error { chainntnfs.Log.Infof("New block: height=%v, sha=%v", epoch.Height, epoch.Hash) - // Define a helper struct for coalescing the spend notifications we will - // dispatch after trying to commit the spend hints. - type spendNtfnBatch struct { - details *chainntnfs.SpendDetail - clients map[uint64]*spendNotification - } - - // Scan over the list of relevant transactions and possibly dispatch - // notifications for spends. - spendBatches := make(map[wire.OutPoint]spendNtfnBatch) - for _, tx := range newBlock.txns { - mtx := tx.MsgTx() - txSha := mtx.TxHash() - - for i, txIn := range mtx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does spend an output which - // we have a registered notification for, then create a - // spend summary, finally sending off the details to the - // notification subscriber. - clients, ok := b.spendNotifications[prevOut] - if !ok { - continue - } - delete(b.spendNotifications, prevOut) - - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: &txSha, - SpendingTx: mtx, - SpenderInputIndex: uint32(i), - SpendingHeight: int32(newBlock.height), - } - - spendBatches[prevOut] = spendNtfnBatch{ - details: spendDetails, - clients: clients, - } - } - } - - // Finally, we'll update the spend height hint for all of our watched - // outpoints that have not been spent yet. This is safe to do as we do - // not watch already spent outpoints for spend notifications. - ops := make([]wire.OutPoint, 0, len(b.spendNotifications)) - for op := range b.spendNotifications { - ops = append(ops, op) - } - - if len(ops) > 0 { - err := b.spendHintCache.CommitSpendHint( - uint32(epoch.Height), ops..., - ) - if err != nil { - // The error is not fatal since we are connecting a - // block, and advancing the spend hint is an optimistic - // optimization. - chainntnfs.Log.Errorf("Unable to update spend hint to "+ - "%d for %v: %v", epoch.Height, ops, err) - } - } - - // We want to set the best block before dispatching notifications - // so if any subscribers make queries based on their received - // block epoch, our state is fully updated in time. + // We want to set the best block before dispatching notifications so if + // any subscribers make queries based on their received block epoch, our + // state is fully updated in time. b.bestBlock = epoch - - // Next we'll notify any subscribed clients of the block. b.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - // Finally, send off the spend details to the notification subscribers. - for _, batch := range spendBatches { - for _, ntfn := range batch.clients { - chainntnfs.Log.Infof("Dispatching spend "+ - "notification for outpoint=%v", - ntfn.targetOutpoint) - - ntfn.spendChan <- batch.details - - // Close spendChan to ensure that any calls to - // Cancel will not block. This is safe to do - // since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) - } - } - return nil } @@ -859,145 +743,131 @@ type spendCancel struct { func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { - // Before proceeding to register the notification, we'll query our - // height hint cache to determine whether a better one exists. - if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil { - if hint > heightHint { - chainntnfs.Log.Debugf("Using height hint %d retrieved "+ - "from cache for %v", hint, outpoint) - heightHint = hint - } + // First, we'll construct a spend notification request and hand it off + // to the txNotifier. + spendID := atomic.AddUint64(&b.spendClientCounter, 1) + cancel := func() { + b.txNotifier.CancelSpend(*outpoint, spendID) + } + ntfn := &chainntnfs.SpendNtfn{ + SpendID: spendID, + OutPoint: *outpoint, + PkScript: pkScript, + Event: chainntnfs.NewSpendEvent(cancel), + HeightHint: heightHint, } - // Construct a notification request for the outpoint and send it to the - // main event loop. - ntfn := &spendNotification{ - targetOutpoint: outpoint, - spendChan: make(chan *chainntnfs.SpendDetail, 1), - spendID: atomic.AddUint64(&b.spendClientCounter, 1), - heightHint: heightHint, - } - - select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown - case b.notificationRegistry <- ntfn: - } - - // TODO(roasbeef): update btcd rescan logic to also use both - if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil { - return nil, err - } - - // The following conditional checks to ensure that when a spend - // notification is registered, the output hasn't already been spent. If - // the output is no longer in the UTXO set, the chain will be rescanned - // from the point where the output was added. The rescan will dispatch - // the notification. - txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true) + historicalDispatch, err := b.txNotifier.RegisterSpend(ntfn) if err != nil { return nil, err } - // If the output is unspent, then we'll write it to the cache with the - // given height hint. This allows us to increase the height hint as the - // chain extends and the output remains unspent. + // If the txNotifier didn't return any details to perform a historical + // scan of the chain, then we can return early as there's nothing left + // for us to do. + if historicalDispatch == nil { + return ntfn.Event, nil + } + + // We'll then request the backend to notify us when it has detected the + // outpoint as spent. + ops := []*wire.OutPoint{outpoint} + if err := b.chainConn.NotifySpent(ops); err != nil { + return nil, err + } + + // In addition to the check above, we'll also check the backend's UTXO + // set to determine whether the outpoint has been spent. If it hasn't, + // we can return to the caller as well. + txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true) + if err != nil { + return nil, err + } if txOut != nil { - err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint) + // We'll let the txNotifier know the outpoint is still unspent + // in order to begin updating its spend hint. + err := b.txNotifier.UpdateSpendDetails(*outpoint, nil) if err != nil { - // The error is not fatal, so we should not return an - // error to the caller. - chainntnfs.Log.Error("Unable to update spend hint to "+ - "%d for %v: %v", heightHint, *outpoint, err) - } - } else { - // Otherwise, we'll determine when the output was spent. - // - // First, we'll attempt to retrieve the transaction's block hash - // using the backend's transaction index. - tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) - if err != nil { - // Avoid returning an error if the transaction was not - // found to proceed with fallback methods. - jsonErr, ok := err.(*btcjson.RPCError) - if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { - return nil, fmt.Errorf("unable to query for "+ - "txid %v: %v", outpoint.Hash, err) - } + return nil, err } - var blockHash *chainhash.Hash - if tx != nil && tx.BlockHash != "" { - // If we're able to retrieve a valid block hash from the - // transaction, then we'll use it as our rescan starting - // point. - blockHash, err = chainhash.NewHashFromStr(tx.BlockHash) - if err != nil { - return nil, err - } - } else { - // Otherwise, we'll attempt to retrieve the hash for the - // block at the heightHint. - blockHash, err = b.chainConn.GetBlockHash( - int64(heightHint), - ) - if err != nil { - return nil, err - } - } + return ntfn.Event, nil + } - // We'll only request a rescan if the transaction has actually - // been included within a block. Otherwise, we'll encounter an - // error when scanning for blocks. This can happen in the case - // of a race condition, wherein the output itself is unspent, - // and only arrives in the mempool after the getxout call. - if blockHash != nil { - ops := []*wire.OutPoint{outpoint} + // Otherwise, we'll determine when the output was spent by scanning the + // chain. We'll begin by determining where to start our historical + // rescan. + startHash, err := b.chainConn.GetBlockHash( + int64(historicalDispatch.StartHeight), + ) + if err != nil { + return nil, fmt.Errorf("unable to get block hash for height "+ + "%d: %v", historicalDispatch.StartHeight, err) + } - // In order to ensure that we don't block the caller on - // what may be a long rescan, we'll launch a new - // goroutine to handle the async result of the rescan. - asyncResult := b.chainConn.RescanAsync( - blockHash, nil, ops, - ) - go func() { - rescanErr := asyncResult.Receive() - if rescanErr != nil { - chainntnfs.Log.Errorf("Rescan for spend "+ - "notification txout(%x) "+ - "failed: %v", outpoint, rescanErr) - } - }() + // As a minimal optimization, we'll query the backend's transaction + // index (if enabled) to determine if we have a better rescan starting + // height. We can do this as the GetRawTransaction call will return the + // hash of the block it was included in within the chain. + tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash) + if err != nil { + // Avoid returning an error if the transaction was not found to + // proceed with fallback methods. + jsonErr, ok := err.(*btcjson.RPCError) + if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo { + return nil, fmt.Errorf("unable to query for "+ + "txid %v: %v", outpoint.Hash, err) } } - return &chainntnfs.SpendEvent{ - Spend: ntfn.spendChan, - Cancel: func() { - cancel := &spendCancel{ - op: *outpoint, - spendID: ntfn.spendID, - } + // If the transaction index was enabled, we'll use the block's hash to + // retrieve its height and check whether it provides a better starting + // point for our rescan. + if tx != nil { + // If the transaction containing the outpoint hasn't confirmed + // on-chain, then there's no need to perform a rescan. + if tx.BlockHash == "" { + return ntfn.Event, nil + } - // Submit spend cancellation to notification dispatcher. - select { - case b.notificationCancels <- cancel: - // Cancellation is being handled, drain the spend chan until it is - // closed before yielding to the caller. - for { - select { - case _, ok := <-ntfn.spendChan: - if !ok { - return - } - case <-b.quit: - return - } - } - case <-b.quit: + blockHash, err := chainhash.NewHashFromStr(tx.BlockHash) + if err != nil { + return nil, err + } + blockHeader, err := b.chainConn.GetBlockHeaderVerbose(blockHash) + if err != nil { + return nil, fmt.Errorf("unable to get header for "+ + "block %v: %v", blockHash, err) + } + + if uint32(blockHeader.Height) > historicalDispatch.StartHeight { + startHash, err = b.chainConn.GetBlockHash( + int64(blockHeader.Height), + ) + if err != nil { + return nil, fmt.Errorf("unable to get block "+ + "hash for height %d: %v", + blockHeader.Height, err) } - }, - }, nil + } + } + + // In order to ensure that we don't block the caller on what may be a + // long rescan, we'll launch a new goroutine to handle the async result + // of the rescan. We purposefully prevent from adding this goroutine to + // the WaitGroup as we cannnot wait for a quit signal due to the + // asyncResult channel not being exposed. + // + // TODO(wilmer): add retry logic if rescan fails? + asyncResult := b.chainConn.RescanAsync(startHash, nil, ops) + go func() { + if rescanErr := asyncResult.Receive(); rescanErr != nil { + chainntnfs.Log.Errorf("Rescan to determine the spend "+ + "details of %v failed: %v", outpoint, rescanErr) + } + }() + + return ntfn.Event, nil } // RegisterConfirmationsNtfn registers a notification with BtcdNotifier diff --git a/chainntnfs/btcdnotify/btcd_dev.go b/chainntnfs/btcdnotify/btcd_dev.go index 4146baa5..7e39d8a5 100644 --- a/chainntnfs/btcdnotify/btcd_dev.go +++ b/chainntnfs/btcdnotify/btcd_dev.go @@ -30,6 +30,7 @@ func (b *BtcdNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Hash, b.txNotifier = chainntnfs.NewTxNotifier( uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache, + b.spendHintCache, ) b.chainUpdates.Start()