Merge pull request #1041 from aakselrod/bitcoind-cn-ooo-fix
chainntnfs/bitcoindnotify: rescan blocks manually instead of rewinding
This commit is contained in:
commit
13945de806
@ -15,6 +15,7 @@ import (
|
||||
"github.com/roasbeef/btcd/wire"
|
||||
"github.com/roasbeef/btcutil"
|
||||
"github.com/roasbeef/btcwallet/chain"
|
||||
"github.com/roasbeef/btcwallet/wtxmgr"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -44,14 +45,6 @@ type chainUpdate struct {
|
||||
blockHeight int32
|
||||
}
|
||||
|
||||
// txUpdate encapsulates a transaction related notification sent from bitcoind
|
||||
// to the registered RPC client. This struct is used as an element within an
|
||||
// unbounded queue in order to avoid blocking the main rpc dispatch rule.
|
||||
type txUpdate struct {
|
||||
tx *btcutil.Tx
|
||||
details *btcjson.BlockDetails
|
||||
}
|
||||
|
||||
// TODO(roasbeef): generalize struct below:
|
||||
// * move chans to config, allow outside callers to handle send conditions
|
||||
|
||||
@ -65,9 +58,6 @@ type BitcoindNotifier struct {
|
||||
started int32 // To be used atomically.
|
||||
stopped int32 // To be used atomically.
|
||||
|
||||
heightMtx sync.RWMutex
|
||||
bestHeight int32
|
||||
|
||||
chainConn *chain.BitcoindClient
|
||||
|
||||
notificationCancels chan interface{}
|
||||
@ -139,15 +129,11 @@ func (b *BitcoindNotifier) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
b.heightMtx.Lock()
|
||||
b.bestHeight = currentHeight
|
||||
b.heightMtx.Unlock()
|
||||
|
||||
b.txConfNotifier = chainntnfs.NewTxConfNotifier(
|
||||
uint32(currentHeight), reorgSafetyLimit)
|
||||
|
||||
b.wg.Add(1)
|
||||
go b.notificationDispatcher()
|
||||
go b.notificationDispatcher(currentHeight)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -193,7 +179,7 @@ type blockNtfn struct {
|
||||
|
||||
// notificationDispatcher is the primary goroutine which handles client
|
||||
// notification registrations, as well as notification dispatches.
|
||||
func (b *BitcoindNotifier) notificationDispatcher() {
|
||||
func (b *BitcoindNotifier) notificationDispatcher(bestHeight int32) {
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
@ -260,34 +246,31 @@ out:
|
||||
if err != nil {
|
||||
chainntnfs.Log.Error(err)
|
||||
}
|
||||
b.heightMtx.RLock()
|
||||
err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
|
||||
if err != nil {
|
||||
chainntnfs.Log.Error(err)
|
||||
}
|
||||
b.heightMtx.RUnlock()
|
||||
case *blockEpochRegistration:
|
||||
chainntnfs.Log.Infof("New block epoch subscription")
|
||||
b.blockEpochClients[msg.epochID] = msg
|
||||
case chain.RelevantTx:
|
||||
b.handleRelevantTx(msg, bestHeight)
|
||||
}
|
||||
|
||||
case ntfn := <-b.chainConn.Notifications():
|
||||
switch item := ntfn.(type) {
|
||||
case chain.BlockConnected:
|
||||
b.heightMtx.Lock()
|
||||
if item.Height != b.bestHeight+1 {
|
||||
if item.Height != bestHeight+1 {
|
||||
chainntnfs.Log.Warnf("Received blocks out of order: "+
|
||||
"current height=%d, new height=%d",
|
||||
b.bestHeight, item.Height)
|
||||
b.heightMtx.Unlock()
|
||||
bestHeight, item.Height)
|
||||
continue
|
||||
}
|
||||
b.bestHeight = item.Height
|
||||
bestHeight = item.Height
|
||||
|
||||
rawBlock, err := b.chainConn.GetBlock(&item.Hash)
|
||||
if err != nil {
|
||||
chainntnfs.Log.Errorf("Unable to get block: %v", err)
|
||||
b.heightMtx.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
@ -302,20 +285,17 @@ out:
|
||||
if err != nil {
|
||||
chainntnfs.Log.Error(err)
|
||||
}
|
||||
b.heightMtx.Unlock()
|
||||
continue
|
||||
|
||||
case chain.BlockDisconnected:
|
||||
b.heightMtx.Lock()
|
||||
if item.Height != b.bestHeight {
|
||||
if item.Height != bestHeight {
|
||||
chainntnfs.Log.Warnf("Received blocks "+
|
||||
"out of order: current height="+
|
||||
"%d, disconnected height=%d",
|
||||
b.bestHeight, item.Height)
|
||||
b.heightMtx.Unlock()
|
||||
bestHeight, item.Height)
|
||||
continue
|
||||
}
|
||||
b.bestHeight = item.Height - 1
|
||||
bestHeight = item.Height - 1
|
||||
|
||||
chainntnfs.Log.Infof("Block disconnected from "+
|
||||
"main chain: height=%v, sha=%v",
|
||||
@ -326,53 +306,9 @@ out:
|
||||
if err != nil {
|
||||
chainntnfs.Log.Error(err)
|
||||
}
|
||||
b.heightMtx.Unlock()
|
||||
|
||||
case chain.RelevantTx:
|
||||
tx := item.TxRecord.MsgTx
|
||||
// First, check if this transaction spends an output
|
||||
// that has an existing spend notification for it.
|
||||
for i, txIn := range tx.TxIn {
|
||||
prevOut := txIn.PreviousOutPoint
|
||||
|
||||
// If this transaction indeed does spend an
|
||||
// output which we have a registered
|
||||
// notification for, then create a spend
|
||||
// summary, finally sending off the details to
|
||||
// the notification subscriber.
|
||||
if clients, ok := b.spendNotifications[prevOut]; ok {
|
||||
spenderSha := tx.TxHash()
|
||||
spendDetails := &chainntnfs.SpendDetail{
|
||||
SpentOutPoint: &prevOut,
|
||||
SpenderTxHash: &spenderSha,
|
||||
SpendingTx: &tx,
|
||||
SpenderInputIndex: uint32(i),
|
||||
}
|
||||
// TODO(roasbeef): after change to
|
||||
// loadfilter, only notify on block
|
||||
// inclusion?
|
||||
if item.Block != nil {
|
||||
spendDetails.SpendingHeight = item.Block.Height
|
||||
} else {
|
||||
b.heightMtx.RLock()
|
||||
spendDetails.SpendingHeight = b.bestHeight + 1
|
||||
b.heightMtx.RUnlock()
|
||||
}
|
||||
|
||||
for _, ntfn := range clients {
|
||||
chainntnfs.Log.Infof("Dispatching "+
|
||||
"spend notification for "+
|
||||
"outpoint=%v", ntfn.targetOutpoint)
|
||||
ntfn.spendChan <- spendDetails
|
||||
|
||||
// Close spendChan to ensure that any calls to Cancel will not
|
||||
// block. This is safe to do since the channel is buffered, and the
|
||||
// message can still be read by the receiver.
|
||||
close(ntfn.spendChan)
|
||||
}
|
||||
delete(b.spendNotifications, prevOut)
|
||||
}
|
||||
}
|
||||
b.handleRelevantTx(item, bestHeight)
|
||||
}
|
||||
|
||||
case <-b.quit:
|
||||
@ -382,6 +318,52 @@ out:
|
||||
b.wg.Done()
|
||||
}
|
||||
|
||||
// handleRelevantTx notifies any clients of a relevant transaction.
|
||||
func (b *BitcoindNotifier) handleRelevantTx(tx chain.RelevantTx, bestHeight int32) {
|
||||
msgTx := tx.TxRecord.MsgTx
|
||||
// First, check if this transaction spends an output
|
||||
// that has an existing spend notification for it.
|
||||
for i, txIn := range msgTx.TxIn {
|
||||
prevOut := txIn.PreviousOutPoint
|
||||
|
||||
// If this transaction indeed does spend an
|
||||
// output which we have a registered
|
||||
// notification for, then create a spend
|
||||
// summary, finally sending off the details to
|
||||
// the notification subscriber.
|
||||
if clients, ok := b.spendNotifications[prevOut]; ok {
|
||||
spenderSha := msgTx.TxHash()
|
||||
spendDetails := &chainntnfs.SpendDetail{
|
||||
SpentOutPoint: &prevOut,
|
||||
SpenderTxHash: &spenderSha,
|
||||
SpendingTx: &msgTx,
|
||||
SpenderInputIndex: uint32(i),
|
||||
}
|
||||
// TODO(roasbeef): after change to
|
||||
// loadfilter, only notify on block
|
||||
// inclusion?
|
||||
if tx.Block != nil {
|
||||
spendDetails.SpendingHeight = tx.Block.Height
|
||||
} else {
|
||||
spendDetails.SpendingHeight = bestHeight + 1
|
||||
}
|
||||
|
||||
for _, ntfn := range clients {
|
||||
chainntnfs.Log.Infof("Dispatching "+
|
||||
"spend notification for "+
|
||||
"outpoint=%v", ntfn.targetOutpoint)
|
||||
ntfn.spendChan <- spendDetails
|
||||
|
||||
// Close spendChan to ensure that any calls to Cancel will not
|
||||
// block. This is safe to do since the channel is buffered, and the
|
||||
// message can still be read by the receiver.
|
||||
close(ntfn.spendChan)
|
||||
}
|
||||
delete(b.spendNotifications, prevOut)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// historicalConfDetails looks up whether a transaction is already included in a
|
||||
// block in the active chain and, if so, returns details about the confirmation.
|
||||
func (b *BitcoindNotifier) historicalConfDetails(txid *chainhash.Hash,
|
||||
@ -523,40 +505,66 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
|
||||
}
|
||||
}
|
||||
|
||||
// We'll only request a rescan if the transaction has actually
|
||||
// We'll only scan old blocks if the transaction has actually
|
||||
// been included within a block. Otherwise, we'll encounter an
|
||||
// error when scanning for blocks. This can happens in the case
|
||||
// of a race condition, wherein the output itself is unspent,
|
||||
// and only arrives in the mempool after the getxout call.
|
||||
if transaction != nil && transaction.BlockHash != "" {
|
||||
blockhash, err := chainhash.NewHashFromStr(transaction.BlockHash)
|
||||
startHash, err := chainhash.NewHashFromStr(transaction.BlockHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Rewind the rescan, since the btcwallet bitcoind
|
||||
// back-end doesn't support that.
|
||||
blockHeight, err := b.chainConn.GetBlockHeight(blockhash)
|
||||
// Rescan all the blocks until the current one.
|
||||
startHeight, err := b.chainConn.GetBlockHeight(startHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b.heightMtx.Lock()
|
||||
currentHeight := b.bestHeight
|
||||
b.bestHeight = blockHeight
|
||||
for i := currentHeight; i > blockHeight; i-- {
|
||||
err = b.txConfNotifier.DisconnectTip(uint32(i))
|
||||
|
||||
_, endHeight, err := b.chainConn.GetBestBlock()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out:
|
||||
for i := startHeight; i <= endHeight; i++ {
|
||||
blockHash, err := b.chainConn.GetBlockHash(int64(i))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
block, err := b.chainConn.GetBlock(blockHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, tx := range block.Transactions {
|
||||
for _, in := range tx.TxIn {
|
||||
if in.PreviousOutPoint == *outpoint {
|
||||
relTx := chain.RelevantTx{
|
||||
TxRecord: &wtxmgr.TxRecord{
|
||||
MsgTx: *tx,
|
||||
Hash: tx.TxHash(),
|
||||
Received: block.Header.Timestamp,
|
||||
},
|
||||
Block: &wtxmgr.BlockMeta{
|
||||
Block: wtxmgr.Block{
|
||||
Hash: block.BlockHash(),
|
||||
Height: i,
|
||||
},
|
||||
Time: block.Header.Timestamp,
|
||||
},
|
||||
}
|
||||
select {
|
||||
case <-b.quit:
|
||||
return nil, ErrChainNotifierShuttingDown
|
||||
case b.notificationRegistry <- relTx:
|
||||
}
|
||||
break out
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
b.heightMtx.Unlock()
|
||||
|
||||
ops := []*wire.OutPoint{outpoint}
|
||||
if err := b.chainConn.Rescan(blockhash, nil, ops); err != nil {
|
||||
chainntnfs.Log.Errorf("Rescan for spend "+
|
||||
"notification txout failed: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user