chainntnfs/bitcoindnotify: make historical spend rescans async

This commit is contained in:
Wilmer Paulino 2018-07-26 21:33:32 -07:00
parent 12816a910d
commit 65b6257e1e

@ -19,7 +19,6 @@ import (
)
const (
// notifierType uniquely identifies this concrete implementation of the
// ChainNotifier interface.
notifierType = "bitcoind"
@ -35,6 +34,11 @@ var (
// measure a spend notification when notifier is already stopped.
ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " +
"while attempting to register for spend notification.")
// ErrTransactionNotFound is an error returned when we attempt to find a
// transaction by manually scanning the chain within a specific range
// but it is not found.
ErrTransactionNotFound = errors.New("transaction not found within range")
)
// chainUpdate encapsulates an update to the current main chain. This struct is
@ -237,7 +241,7 @@ out:
b.spendNotifications[op] = make(map[uint64]*spendNotification)
}
b.spendNotifications[op][msg.spendID] = msg
b.chainConn.NotifySpent([]*wire.OutPoint{&op})
case *confirmationNotification:
chainntnfs.Log.Infof("New confirmation "+
"subscription: txid=%v, numconfs=%v",
@ -654,43 +658,22 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return nil, err
}
out:
for i := startHeight; i <= endHeight; i++ {
blockHash, err := b.chainConn.GetBlockHash(int64(i))
// In order to ensure we don't block the caller on what
// may be a long rescan, we'll launch a goroutine to do
// so in the background.
b.wg.Add(1)
go func() {
defer b.wg.Done()
err := b.dispatchSpendDetailsManually(
*outpoint, startHeight, endHeight,
)
if err != nil {
return nil, err
}
block, err := b.chainConn.GetBlock(blockHash)
if err != nil {
return nil, err
}
for _, tx := range block.Transactions {
for _, in := range tx.TxIn {
if in.PreviousOutPoint == *outpoint {
relTx := chain.RelevantTx{
TxRecord: &wtxmgr.TxRecord{
MsgTx: *tx,
Hash: tx.TxHash(),
Received: block.Header.Timestamp,
},
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: block.BlockHash(),
Height: i,
},
Time: block.Header.Timestamp,
},
}
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- relTx:
}
break out
}
}
}
chainntnfs.Log.Errorf("Rescan for spend "+
"notification txout(%x) "+
"failed: %v", outpoint, err)
}
}()
}
}
@ -705,8 +688,9 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// Submit spend cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, drain the spend chan until it is
// closed before yielding to the caller.
// Cancellation is being handled, drain the
// spend chan until it is closed before yielding
// to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
@ -723,6 +707,72 @@ func (b *BitcoindNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
}, nil
}
// disaptchSpendDetailsManually attempts to manually scan the chain within the
// given height range for a transaction that spends the given outpoint. If one
// is found, it's spending details are sent to the notifier dispatcher, which
// will then dispatch the notification to all of its clients.
func (b *BitcoindNotifier) dispatchSpendDetailsManually(op wire.OutPoint,
startHeight, endHeight int32) error {
// Begin scanning blocks at every height to determine if the outpoint
// was spent.
for height := startHeight; height <= endHeight; height++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-b.quit:
return ErrChainNotifierShuttingDown
default:
}
blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil {
return err
}
block, err := b.chainConn.GetBlock(blockHash)
if err != nil {
return err
}
for _, tx := range block.Transactions {
for _, in := range tx.TxIn {
if in.PreviousOutPoint != op {
continue
}
// If this transaction input spends the
// outpoint, we'll gather the details of the
// spending transaction and dispatch a spend
// notification to our clients.
relTx := chain.RelevantTx{
TxRecord: &wtxmgr.TxRecord{
MsgTx: *tx,
Hash: tx.TxHash(),
Received: block.Header.Timestamp,
},
Block: &wtxmgr.BlockMeta{
Block: wtxmgr.Block{
Hash: *blockHash,
Height: height,
},
Time: block.Header.Timestamp,
},
}
select {
case b.notificationRegistry <- relTx:
case <-b.quit:
return ErrChainNotifierShuttingDown
}
return nil
}
}
}
return ErrTransactionNotFound
}
// confirmationNotification represents a client's intent to receive a
// notification once the target txid reaches numConfirmations confirmations.
type confirmationNotification struct {