chainntnfs/bitcoindnotify: rescan blocks manually instead of rewinding

This commit is contained in:
Alex 2018-04-06 19:25:41 -06:00
parent 5b5491338b
commit 89e2ba41c9

@ -15,6 +15,7 @@ import (
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
"github.com/roasbeef/btcwallet/chain"
"github.com/roasbeef/btcwallet/wtxmgr"
)
const (
@ -44,14 +45,6 @@ type chainUpdate struct {
blockHeight int32
}
// txUpdate encapsulates a transaction related notification sent from bitcoind
// to the registered RPC client. This struct is used as an element within an
// unbounded queue in order to avoid blocking the main rpc dispatch rule.
type txUpdate struct {
tx *btcutil.Tx
details *btcjson.BlockDetails
}
// TODO(roasbeef): generalize struct below:
// * move chans to config, allow outside callers to handle send conditions
@ -65,9 +58,6 @@ type BitcoindNotifier struct {
started int32 // To be used atomically.
stopped int32 // To be used atomically.
heightMtx sync.RWMutex
bestHeight int32
chainConn *chain.BitcoindClient
notificationCancels chan interface{}
@ -139,15 +129,11 @@ func (b *BitcoindNotifier) Start() error {
return err
}
b.heightMtx.Lock()
b.bestHeight = currentHeight
b.heightMtx.Unlock()
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
uint32(currentHeight), reorgSafetyLimit)
b.wg.Add(1)
go b.notificationDispatcher()
go b.notificationDispatcher(currentHeight)
return nil
}
@ -193,7 +179,7 @@ type blockNtfn struct {
// notificationDispatcher is the primary goroutine which handles client
// notification registrations, as well as notification dispatches.
func (b *BitcoindNotifier) notificationDispatcher() {
func (b *BitcoindNotifier) notificationDispatcher(bestHeight int32) {
out:
for {
select {
@ -260,34 +246,31 @@ out:
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.RLock()
err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.RUnlock()
case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg
case chain.RelevantTx:
b.handleRelevantTx(msg, bestHeight)
}
case ntfn := <-b.chainConn.Notifications():
switch item := ntfn.(type) {
case chain.BlockConnected:
b.heightMtx.Lock()
if item.Height != b.bestHeight+1 {
if item.Height != bestHeight+1 {
chainntnfs.Log.Warnf("Received blocks out of order: "+
"current height=%d, new height=%d",
b.bestHeight, item.Height)
b.heightMtx.Unlock()
bestHeight, item.Height)
continue
}
b.bestHeight = item.Height
bestHeight = item.Height
rawBlock, err := b.chainConn.GetBlock(&item.Hash)
if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err)
b.heightMtx.Unlock()
continue
}
@ -302,20 +285,17 @@ out:
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.Unlock()
continue
case chain.BlockDisconnected:
b.heightMtx.Lock()
if item.Height != b.bestHeight {
if item.Height != bestHeight {
chainntnfs.Log.Warnf("Received blocks "+
"out of order: current height="+
"%d, disconnected height=%d",
b.bestHeight, item.Height)
b.heightMtx.Unlock()
bestHeight, item.Height)
continue
}
b.bestHeight = item.Height - 1
bestHeight = item.Height - 1
chainntnfs.Log.Infof("Block disconnected from "+
"main chain: height=%v, sha=%v",
@ -326,53 +306,9 @@ out:
if err != nil {
chainntnfs.Log.Error(err)
}
b.heightMtx.Unlock()
case chain.RelevantTx:
tx := item.TxRecord.MsgTx
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for i, txIn := range tx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered
// notification for, then create a spend
// summary, finally sending off the details to
// the notification subscriber.
if clients, ok := b.spendNotifications[prevOut]; ok {
spenderSha := tx.TxHash()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &spenderSha,
SpendingTx: &tx,
SpenderInputIndex: uint32(i),
}
// TODO(roasbeef): after change to
// loadfilter, only notify on block
// inclusion?
if item.Block != nil {
spendDetails.SpendingHeight = item.Block.Height
} else {
b.heightMtx.RLock()
spendDetails.SpendingHeight = b.bestHeight + 1
b.heightMtx.RUnlock()
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching "+
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not
// block. This is safe to do since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
}
}
b.handleRelevantTx(item, bestHeight)
}
case <-b.quit:
@ -382,6 +318,52 @@ out:
b.wg.Done()
}
// handleRelevantTx notifies any clients of a relevant transaction.
func (b *BitcoindNotifier) handleRelevantTx(tx chain.RelevantTx, bestHeight int32) {
msgTx := tx.TxRecord.MsgTx
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for i, txIn := range msgTx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered
// notification for, then create a spend
// summary, finally sending off the details to
// the notification subscriber.
if clients, ok := b.spendNotifications[prevOut]; ok {
spenderSha := msgTx.TxHash()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: &prevOut,
SpenderTxHash: &spenderSha,
SpendingTx: &msgTx,
SpenderInputIndex: uint32(i),
}
// TODO(roasbeef): after change to
// loadfilter, only notify on block
// inclusion?
if tx.Block != nil {
spendDetails.SpendingHeight = tx.Block.Height
} else {
spendDetails.SpendingHeight = bestHeight + 1
}
for _, ntfn := range clients {
chainntnfs.Log.Infof("Dispatching "+
"spend notification for "+
"outpoint=%v", ntfn.targetOutpoint)
ntfn.spendChan <- spendDetails
// Close spendChan to ensure that any calls to Cancel will not
// block. This is safe to do since the channel is buffered, and the
// message can still be read by the receiver.
close(ntfn.spendChan)
}
delete(b.spendNotifications, prevOut)
}
}
}
// historicalConfDetails looks up whether a transaction is already included in a
// block in the active chain and, if so, returns details about the confirmation.
func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash,
@ -523,40 +505,66 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
}
}
// We'll only request a rescan if the transaction has actually
// We'll only scan old blocks if the transaction has actually
// been included within a block. Otherwise, we'll encounter an
// error when scanning for blocks. This can happens in the case
// of a race condition, wherein the output itself is unspent,
// and only arrives in the mempool after the getxout call.
if transaction != nil && transaction.BlockHash != "" {
blockhash, err := chainhash.NewHashFromStr(transaction.BlockHash)
startHash, err := chainhash.NewHashFromStr(transaction.BlockHash)
if err != nil {
return nil, err
}
// Rewind the rescan, since the btcwallet bitcoind
// back-end doesn't support that.
blockHeight, err := b.chainConn.GetBlockHeight(blockhash)
// Rescan all the blocks until the current one.
startHeight, err := b.chainConn.GetBlockHeight(startHash)
if err != nil {
return nil, err
}
b.heightMtx.Lock()
currentHeight := b.bestHeight
b.bestHeight = blockHeight
for i := currentHeight; i > blockHeight; i-- {
err = b.txConfNotifier.DisconnectTip(uint32(i))
_, endHeight, err := b.chainConn.GetBestBlock()
if err != nil {
return nil, err
}
out:
for i := startHeight; i <= endHeight; i++ {
blockHash, err := b.chainConn.GetBlockHash(int64(i))
if err != nil {
return nil, err
}
block, err := b.chainConn.GetBlock(blockHash)
if err != nil {
return nil, err
}
for _, tx := range block.Transactions {
for _, in := range tx.TxIn {
if in.PreviousOutPoint == *outpoint {
relTx := chain.RelevantTx{
TxRecord: &wtxmgr.TxRecord{
MsgTx: *tx,
Hash: tx.TxHash(),
Received: block.Header.Timestamp,
},
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: block.BlockHash(),
Height: i,
},
Time: block.Header.Timestamp,
},
}
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- relTx:
}
break out
}
}
}
}
b.heightMtx.Unlock()
ops := []*wire.OutPoint{outpoint}
if err := b.chainConn.Rescan(blockhash, nil, ops); err != nil {
chainntnfs.Log.Errorf("Rescan for spend "+
"notification txout failed: %v", err)
return nil, err
}
}
}