diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index ca648117..6a6d0e0a 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -15,6 +15,7 @@ import ( "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" "github.com/roasbeef/btcwallet/chain" + "github.com/roasbeef/btcwallet/wtxmgr" ) const ( @@ -44,14 +45,6 @@ type chainUpdate struct { blockHeight int32 } -// txUpdate encapsulates a transaction related notification sent from bitcoind -// to the registered RPC client. This struct is used as an element within an -// unbounded queue in order to avoid blocking the main rpc dispatch rule. -type txUpdate struct { - tx *btcutil.Tx - details *btcjson.BlockDetails -} - // TODO(roasbeef): generalize struct below: // * move chans to config, allow outside callers to handle send conditions @@ -65,9 +58,6 @@ type BitcoindNotifier struct { started int32 // To be used atomically. stopped int32 // To be used atomically. - heightMtx sync.RWMutex - bestHeight int32 - chainConn *chain.BitcoindClient notificationCancels chan interface{} @@ -139,15 +129,11 @@ func (b *BitcoindNotifier) Start() error { return err } - b.heightMtx.Lock() - b.bestHeight = currentHeight - b.heightMtx.Unlock() - b.txConfNotifier = chainntnfs.NewTxConfNotifier( uint32(currentHeight), reorgSafetyLimit) b.wg.Add(1) - go b.notificationDispatcher() + go b.notificationDispatcher(currentHeight) return nil } @@ -193,7 +179,7 @@ type blockNtfn struct { // notificationDispatcher is the primary goroutine which handles client // notification registrations, as well as notification dispatches. -func (b *BitcoindNotifier) notificationDispatcher() { +func (b *BitcoindNotifier) notificationDispatcher(bestHeight int32) { out: for { select { @@ -260,34 +246,31 @@ out: if err != nil { chainntnfs.Log.Error(err) } - b.heightMtx.RLock() err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) if err != nil { chainntnfs.Log.Error(err) } - b.heightMtx.RUnlock() case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + case chain.RelevantTx: + b.handleRelevantTx(msg, bestHeight) } case ntfn := <-b.chainConn.Notifications(): switch item := ntfn.(type) { case chain.BlockConnected: - b.heightMtx.Lock() - if item.Height != b.bestHeight+1 { + if item.Height != bestHeight+1 { chainntnfs.Log.Warnf("Received blocks out of order: "+ "current height=%d, new height=%d", - b.bestHeight, item.Height) - b.heightMtx.Unlock() + bestHeight, item.Height) continue } - b.bestHeight = item.Height + bestHeight = item.Height rawBlock, err := b.chainConn.GetBlock(&item.Hash) if err != nil { chainntnfs.Log.Errorf("Unable to get block: %v", err) - b.heightMtx.Unlock() continue } @@ -302,20 +285,17 @@ out: if err != nil { chainntnfs.Log.Error(err) } - b.heightMtx.Unlock() continue case chain.BlockDisconnected: - b.heightMtx.Lock() - if item.Height != b.bestHeight { + if item.Height != bestHeight { chainntnfs.Log.Warnf("Received blocks "+ "out of order: current height="+ "%d, disconnected height=%d", - b.bestHeight, item.Height) - b.heightMtx.Unlock() + bestHeight, item.Height) continue } - b.bestHeight = item.Height - 1 + bestHeight = item.Height - 1 chainntnfs.Log.Infof("Block disconnected from "+ "main chain: height=%v, sha=%v", @@ -326,53 +306,9 @@ out: if err != nil { chainntnfs.Log.Error(err) } - b.heightMtx.Unlock() case chain.RelevantTx: - tx := item.TxRecord.MsgTx - // First, check if this transaction spends an output - // that has an existing spend notification for it. - for i, txIn := range tx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does spend an - // output which we have a registered - // notification for, then create a spend - // summary, finally sending off the details to - // the notification subscriber. - if clients, ok := b.spendNotifications[prevOut]; ok { - spenderSha := tx.TxHash() - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: &spenderSha, - SpendingTx: &tx, - SpenderInputIndex: uint32(i), - } - // TODO(roasbeef): after change to - // loadfilter, only notify on block - // inclusion? - if item.Block != nil { - spendDetails.SpendingHeight = item.Block.Height - } else { - b.heightMtx.RLock() - spendDetails.SpendingHeight = b.bestHeight + 1 - b.heightMtx.RUnlock() - } - - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching "+ - "spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) - ntfn.spendChan <- spendDetails - - // Close spendChan to ensure that any calls to Cancel will not - // block. This is safe to do since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) - } - delete(b.spendNotifications, prevOut) - } - } + b.handleRelevantTx(item, bestHeight) } case <-b.quit: @@ -382,6 +318,52 @@ out: b.wg.Done() } +// handleRelevantTx notifies any clients of a relevant transaction. +func (b *BitcoindNotifier) handleRelevantTx(tx chain.RelevantTx, bestHeight int32) { + msgTx := tx.TxRecord.MsgTx + // First, check if this transaction spends an output + // that has an existing spend notification for it. + for i, txIn := range msgTx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an + // output which we have a registered + // notification for, then create a spend + // summary, finally sending off the details to + // the notification subscriber. + if clients, ok := b.spendNotifications[prevOut]; ok { + spenderSha := msgTx.TxHash() + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &spenderSha, + SpendingTx: &msgTx, + SpenderInputIndex: uint32(i), + } + // TODO(roasbeef): after change to + // loadfilter, only notify on block + // inclusion? + if tx.Block != nil { + spendDetails.SpendingHeight = tx.Block.Height + } else { + spendDetails.SpendingHeight = bestHeight + 1 + } + + for _, ntfn := range clients { + chainntnfs.Log.Infof("Dispatching "+ + "spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails + + // Close spendChan to ensure that any calls to Cancel will not + // block. This is safe to do since the channel is buffered, and the + // message can still be read by the receiver. + close(ntfn.spendChan) + } + delete(b.spendNotifications, prevOut) + } + } +} + // historicalConfDetails looks up whether a transaction is already included in a // block in the active chain and, if so, returns details about the confirmation. func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash, @@ -523,40 +505,66 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, } } - // We'll only request a rescan if the transaction has actually + // We'll only scan old blocks if the transaction has actually // been included within a block. Otherwise, we'll encounter an // error when scanning for blocks. This can happens in the case // of a race condition, wherein the output itself is unspent, // and only arrives in the mempool after the getxout call. if transaction != nil && transaction.BlockHash != "" { - blockhash, err := chainhash.NewHashFromStr(transaction.BlockHash) + startHash, err := chainhash.NewHashFromStr(transaction.BlockHash) if err != nil { return nil, err } - // Rewind the rescan, since the btcwallet bitcoind - // back-end doesn't support that. - blockHeight, err := b.chainConn.GetBlockHeight(blockhash) + // Rescan all the blocks until the current one. + startHeight, err := b.chainConn.GetBlockHeight(startHash) if err != nil { return nil, err } - b.heightMtx.Lock() - currentHeight := b.bestHeight - b.bestHeight = blockHeight - for i := currentHeight; i > blockHeight; i-- { - err = b.txConfNotifier.DisconnectTip(uint32(i)) + + _, endHeight, err := b.chainConn.GetBestBlock() + if err != nil { + return nil, err + } + + out: + for i := startHeight; i <= endHeight; i++ { + blockHash, err := b.chainConn.GetBlockHash(int64(i)) if err != nil { return nil, err } + block, err := b.chainConn.GetBlock(blockHash) + if err != nil { + return nil, err + } + for _, tx := range block.Transactions { + for _, in := range tx.TxIn { + if in.PreviousOutPoint == *outpoint { + relTx := chain.RelevantTx{ + TxRecord: &wtxmgr.TxRecord{ + MsgTx: *tx, + Hash: tx.TxHash(), + Received: block.Header.Timestamp, + }, + Block: &wtxmgr.BlockMeta{ + Block: wtxmgr.Block{ + Hash: block.BlockHash(), + Height: i, + }, + Time: block.Header.Timestamp, + }, + } + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + case b.notificationRegistry <- relTx: + } + break out + } + } + } } - b.heightMtx.Unlock() - ops := []*wire.OutPoint{outpoint} - if err := b.chainConn.Rescan(blockhash, nil, ops); err != nil { - chainntnfs.Log.Errorf("Rescan for spend "+ - "notification txout failed: %v", err) - return nil, err - } } }