chainntnfs/bitcoindnotify: handle spend notification registration w/ TxNotifier

In this commit, we modify the logic within RegisterSpendNtfn for the
BitcoindNotifier to account for the recent changes made to the
TxNotifier. Since it is now able to handle spend notification
registration and dispatch, we can bypass all the current logic within
the BitcoindNotifier and interact directly with the TxNotifier instead.

The most notable changes include the following:

  1. We'll only attempt a historical rescan if the TxNotifier tells us
  so.

  2. We'll dispatch the historical rescan within the main goroutine to
  prevent WaitGroup panics, due to the asynchronous nature of the
  notifier.
This commit is contained in:
Wilmer Paulino 2018-10-05 02:07:55 -07:00
parent 1fe3d59836
commit 180dffd154
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 150 additions and 215 deletions

@ -12,7 +12,6 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/queue"
)
@ -144,6 +143,7 @@ func (b *BitcoindNotifier) Start() error {
b.txNotifier = chainntnfs.NewTxNotifier(
uint32(currentHeight), reorgSafetyLimit, b.confirmHintCache,
b.spendHintCache,
)
b.bestBlock = chainntnfs.BlockEpoch{
@ -259,6 +259,8 @@ out:
// included in the active chain. We'll do this
// in a goroutine to prevent blocking
// potentially long rescans.
//
// TODO(wilmer): add retry logic if rescan fails?
b.wg.Add(1)
go func() {
defer b.wg.Done()
@ -286,6 +288,25 @@ out:
}
}()
case *chainntnfs.HistoricalSpendDispatch:
// In order to ensure we don't block the caller
// on what may be a long rescan, we'll launch a
// goroutine to do so in the background.
//
// TODO(wilmer): add retry logic if rescan fails?
b.wg.Add(1)
go func() {
defer b.wg.Done()
err := b.dispatchSpendDetailsManually(msg)
if err != nil {
chainntnfs.Log.Errorf("Rescan to "+
"determine the spend "+
"details of %v failed: %v",
msg.OutPoint, err)
}
}()
case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg
@ -383,7 +404,23 @@ out:
b.bestBlock = newBestBlock
case chain.RelevantTx:
b.handleRelevantTx(item, b.bestBlock.Height)
// We only care about notifying on confirmed
// spends, so if this is a mempool spend, we can
// ignore it and wait for the spend to appear in
// on-chain.
if item.Block == nil {
continue
}
tx := &item.TxRecord.MsgTx
err := b.txNotifier.ProcessRelevantSpendTx(
tx, item.Block.Height,
)
if err != nil {
chainntnfs.Log.Errorf("Unable to "+
"process transaction %v: %v",
tx.TxHash(), err)
}
}
case <-b.quit:
@ -393,55 +430,6 @@ out:
b.wg.Done()
}
// handleRelevantTx notifies any clients of a relevant transaction.
func (b *BitcoindNotifier) handleRelevantTx(tx chain.RelevantTx, bestHeight int32) {
msgTx := tx.TxRecord.MsgTx
// We only care about notifying on confirmed spends, so in case this is
// a mempool spend, we can continue, and wait for the spend to appear
// in chain.
if tx.Block == nil {
return
}
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for i, txIn := range msgTx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered
// notification for, then create a spend
// summary, finally sending off the details to
// the notification subscriber.
if clients, ok := b.spendNotifications[prevOut]; ok {
spenderSha := msgTx.TxHash()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &spenderSha,
SpendingTx: &msgTx,
SpenderInputIndex: uint32(i),
}
spendDetails.SpendingHeight = tx.Block.Height
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching confirmed "+
"spend notification for outpoint=%v "+
"at height %v", ntfn.targetOutpoint,
spendDetails.SpendingHeight)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to
// Cancel will not block. This is safe to do
// since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
}
}
}
// historicalConfDetails looks up whether a transaction is already included in a
// block in the active chain and, if so, returns details about the confirmation.
func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash,
@ -717,167 +705,120 @@ type spendCancel struct {
func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
// Before proceeding to register the notification, we'll query our
// height hint cache to determine whether a better one exists.
if hint, err := b.spendHintCache.QuerySpendHint(*outpoint); err == nil {
if hint > heightHint {
chainntnfs.Log.Debugf("Using height hint %d retrieved "+
"from cache for %v", hint, outpoint)
heightHint = hint
}
// First, we'll construct a spend notification request and hand it off
// to the txNotifier.
spendID := atomic.AddUint64(&b.spendClientCounter, 1)
cancel := func() {
b.txNotifier.CancelSpend(*outpoint, spendID)
}
// Construct a notification request for the outpoint and send it to the
// main event loop.
ntfn := &spendNotification{
targetOutpoint: outpoint,
spendChan: make(chan *chainntnfs.SpendDetail, 1),
spendID: atomic.AddUint64(&b.spendClientCounter, 1),
ntfn := &chainntnfs.SpendNtfn{
SpendID: spendID,
OutPoint: *outpoint,
PkScript: pkScript,
Event: chainntnfs.NewSpendEvent(cancel),
HeightHint: heightHint,
}
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn:
}
if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil {
return nil, err
}
// The following conditional checks to ensure that when a spend
// notification is registered, the output hasn't already been spent. If
// the output is no longer in the UTXO set, the chain will be rescanned
// from the point where the output was added. The rescan will dispatch
// the notification.
txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true)
historicalDispatch, err := b.txNotifier.RegisterSpend(ntfn)
if err != nil {
return nil, err
}
// If the output is unspent, then we'll write it to the cache with the
// given height hint. This allows us to increase the height hint as the
// chain extends and the output remains unspent.
// If the txNotifier didn't return any details to perform a historical
// scan of the chain, then we can return early as there's nothing left
// for us to do.
if historicalDispatch == nil {
return ntfn.Event, nil
}
// We'll then request the backend to notify us when it has detected the
// outpoint as spent.
if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil {
return nil, err
}
// In addition to the check above, we'll also check the backend's UTXO
// set to determine whether the outpoint has been spent. If it hasn't,
// we can return to the caller as well.
txOut, err := b.chainConn.GetTxOut(&outpoint.Hash, outpoint.Index, true)
if err != nil {
return nil, err
}
if txOut != nil {
err := b.spendHintCache.CommitSpendHint(heightHint, *outpoint)
// We'll let the txNotifier know the outpoint is still unspent
// in order to begin updating its spend hint.
err := b.txNotifier.UpdateSpendDetails(*outpoint, nil)
if err != nil {
// The error is not fatal, so we should not return an
// error to the caller.
chainntnfs.Log.Error("Unable to update spend hint to "+
"%d for %v: %v", heightHint, *outpoint, err)
}
} else {
// Otherwise, we'll determine when the output was spent.
//
// First, we'll attempt to retrieve the transaction's block hash
// using the backend's transaction index.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
if err != nil {
// Avoid returning an error if the transaction was not
// found to proceed with fallback methods.
jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, fmt.Errorf("unable to query for "+
"txid %v: %v", outpoint.Hash, err)
}
return nil, err
}
var blockHash *chainhash.Hash
if tx != nil && tx.BlockHash != "" {
// If we're able to retrieve a valid block hash from the
// transaction, then we'll use it as our rescan starting
// point.
blockHash, err = chainhash.NewHashFromStr(tx.BlockHash)
if err != nil {
return nil, err
}
} else {
// Otherwise, we'll attempt to retrieve the hash for the
// block at the heightHint.
blockHash, err = b.chainConn.GetBlockHash(
int64(heightHint),
)
if err != nil {
return nil, fmt.Errorf("unable to retrieve "+
"hash for block with height %d: %v",
heightHint, err)
}
}
return ntfn.Event, nil
}
// We'll only scan old blocks if the transaction has actually
// been included within a block. Otherwise, we'll encounter an
// error when scanning for blocks. This can happens in the case
// of a race condition, wherein the output itself is unspent,
// and only arrives in the mempool after the getxout call.
if blockHash != nil {
// Rescan all the blocks until the current one.
startHeight, err := b.chainConn.GetBlockHeight(
blockHash,
)
if err != nil {
return nil, err
}
_, endHeight, err := b.chainConn.GetBestBlock()
if err != nil {
return nil, err
}
// In order to ensure we don't block the caller on what
// may be a long rescan, we'll launch a goroutine to do
// so in the background.
b.wg.Add(1)
go func() {
defer b.wg.Done()
err := b.dispatchSpendDetailsManually(
*outpoint, startHeight, endHeight,
)
if err != nil {
chainntnfs.Log.Errorf("Rescan for spend "+
"notification txout(%x) "+
"failed: %v", outpoint, err)
}
}()
// Otherwise, we'll determine when the output was spent by scanning the
// chain. We'll begin by determining where to start our historical
// rescan.
//
// As a minimal optimization, we'll query the backend's transaction
// index (if enabled) to determine if we have a better rescan starting
// height. We can do this as the GetRawTransaction call will return the
// hash of the block it was included in within the chain.
tx, err := b.chainConn.GetRawTransactionVerbose(&outpoint.Hash)
if err != nil {
// Avoid returning an error if the transaction was not found to
// proceed with fallback methods.
jsonErr, ok := err.(*btcjson.RPCError)
if !ok || jsonErr.Code != btcjson.ErrRPCNoTxInfo {
return nil, fmt.Errorf("unable to query for "+
"txid %v: %v", outpoint.Hash, err)
}
}
return &chainntnfs.SpendEvent{
Spend: ntfn.spendChan,
Cancel: func() {
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
}
// If the transaction index was enabled, we'll use the block's hash to
// retrieve its height and check whether it provides a better starting
// point for our rescan.
if tx != nil {
// If the transaction containing the outpoint hasn't confirmed
// on-chain, then there's no need to perform a rescan.
if tx.BlockHash == "" {
return ntfn.Event, nil
}
// Submit spend cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the
// spend chan until it is closed before yielding
// to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
if !ok {
return
}
case <-b.quit:
return
}
}
case <-b.quit:
}
},
}, nil
blockHash, err := chainhash.NewHashFromStr(tx.BlockHash)
if err != nil {
return nil, err
}
blockHeight, err := b.chainConn.GetBlockHeight(blockHash)
if err != nil {
return nil, err
}
if uint32(blockHeight) > historicalDispatch.StartHeight {
historicalDispatch.StartHeight = uint32(blockHeight)
}
}
// Now that we've determined the starting point of our rescan, we can
// dispatch it.
select {
case b.notificationRegistry <- historicalDispatch:
return ntfn.Event, nil
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
}
}
// disaptchSpendDetailsManually attempts to manually scan the chain within the
// given height range for a transaction that spends the given outpoint. If one
// is found, it's spending details are sent to the notifier dispatcher, which
// will then dispatch the notification to all of its clients.
func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint,
startHeight, endHeight int32) error {
func (b *BitcoindNotifier) dispatchSpendDetailsManually(
historicalDispatchDetails *chainntnfs.HistoricalSpendDispatch) error {
op := historicalDispatchDetails.OutPoint
startHeight := historicalDispatchDetails.StartHeight
endHeight := historicalDispatchDetails.EndHeight
// Begin scanning blocks at every height to determine if the outpoint
// was spent.
@ -890,6 +831,7 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint,
default:
}
// First, we'll fetch the block for the current height.
blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil {
return fmt.Errorf("unable to retrieve hash for block "+
@ -901,38 +843,30 @@ func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint,
"%v: %v", blockHash, err)
}
// Then, we'll manually go over every transaction in it and
// determine whether it spends the outpoint in question.
for _, tx := range block.Transactions {
for _, in := range tx.TxIn {
if in.PreviousOutPoint != op {
for i, txIn := range tx.TxIn {
if txIn.PreviousOutPoint != op {
continue
}
// If this transaction input spends the
// outpoint, we'll gather the details of the
// spending transaction and dispatch a spend
// notification to our clients.
relTx := chain.RelevantTx{
TxRecord: &wtxmgr.TxRecord{
MsgTx: *tx,
Hash: tx.TxHash(),
Received: block.Header.Timestamp,
},
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: *blockHash,
Height: height,
},
Time: block.Header.Timestamp,
},
// If it does, we'll construct its spend details
// and hand them over to the TxNotifier so that
// it can properly notify its registered
// clients.
txHash := tx.TxHash()
details := &chainntnfs.SpendDetail{
SpentOutPoint: &op,
SpenderTxHash: &txHash,
SpendingTx: tx,
SpenderInputIndex: uint32(i),
SpendingHeight: int32(height),
}
select {
case b.notificationRegistry <- relTx:
case <-b.quit:
return ErrChainNotifierShuttingDown
}
return nil
return b.txNotifier.UpdateSpendDetails(
op, details,
)
}
}
}

@ -31,6 +31,7 @@ func (b *BitcoindNotifier) UnsafeStart(bestHeight int32, bestHash *chainhash.Has
b.txNotifier = chainntnfs.NewTxNotifier(
uint32(bestHeight), reorgSafetyLimit, b.confirmHintCache,
b.spendHintCache,
)
if generateBlocks != nil {