chainntnfs: make historical confirmation rescans async

This commit is contained in:
Wilmer Paulino 2018-07-26 21:32:55 -07:00
parent 867d8524bf
commit 12816a910d
3 changed files with 137 additions and 53 deletions

@ -243,27 +243,40 @@ out:
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
_, currentHeight, err := b.chainConn.GetBestBlock() currentHeight := uint32(bestHeight)
if err != nil {
chainntnfs.Log.Error(err)
}
// Lookup whether the transaction is already included in the // Look up whether the transaction is already
// active chain. // included in the active chain. We'll do this
txConf, err := b.historicalConfDetails( // in a goroutine to prevent blocking
msg.TxID, msg.heightHint, uint32(currentHeight), // potentially long rescans.
) b.wg.Add(1)
if err != nil { go func() {
chainntnfs.Log.Error(err) defer b.wg.Done()
}
confDetails, err := b.historicalConfDetails(
msg.TxID, msg.heightHint,
currentHeight,
)
if err != nil {
chainntnfs.Log.Error(err)
return
}
if confDetails != nil {
err := b.txConfNotifier.UpdateConfDetails(
*msg.TxID, msg.ConfID,
confDetails,
)
if err != nil {
chainntnfs.Log.Error(err)
}
}
}()
err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
}
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
case chain.RelevantTx: case chain.RelevantTx:
b.handleRelevantTx(msg, bestHeight) b.handleRelevantTx(msg, bestHeight)
} }
@ -474,6 +487,14 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash,
// Begin scanning blocks at every height to determine where the // Begin scanning blocks at every height to determine where the
// transaction was included in. // transaction was included in.
for height := heightHint; height <= currentHeight; height++ { for height := heightHint; height <= currentHeight; height++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
default:
}
blockHash, err := b.chainConn.GetBlockHash(int64(height)) blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to get hash from block "+ return nil, fmt.Errorf("unable to get hash from block "+
@ -725,11 +746,15 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
heightHint: heightHint, heightHint: heightHint,
} }
if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil {
return nil, err
}
select { select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn: case b.notificationRegistry <- ntfn:
return ntfn.Event, nil return ntfn.Event, nil
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
} }
} }

@ -299,24 +299,42 @@ out:
b.spendNotifications[op] = make(map[uint64]*spendNotification) b.spendNotifications[op] = make(map[uint64]*spendNotification)
} }
b.spendNotifications[op][msg.spendID] = msg b.spendNotifications[op][msg.spendID] = msg
case *confirmationNotification: case *confirmationNotification:
chainntnfs.Log.Infof("New confirmation "+ chainntnfs.Log.Infof("New confirmation "+
"subscription: txid=%v, numconfs=%v", "subscription: txid=%v, numconfs=%v",
msg.TxID, msg.NumConfirmations) msg.TxID, msg.NumConfirmations)
// Lookup whether the transaction is already included in the bestHeight := uint32(currentHeight)
// active chain.
txConf, err := b.historicalConfDetails( // Look up whether the transaction is already
msg.TxID, msg.heightHint, uint32(currentHeight), // included in the active chain. We'll do this
) // in a goroutine to prevent blocking
if err != nil { // potentially long rescans.
chainntnfs.Log.Error(err) b.wg.Add(1)
} go func() {
defer b.wg.Done()
confDetails, err := b.historicalConfDetails(
msg.TxID, msg.heightHint,
bestHeight,
)
if err != nil {
chainntnfs.Log.Error(err)
return
}
if confDetails != nil {
err = b.txConfNotifier.UpdateConfDetails(
*msg.TxID, msg.ConfID,
confDetails,
)
if err != nil {
chainntnfs.Log.Error(err)
}
}
}()
err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
}
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
b.blockEpochClients[msg.epochID] = msg b.blockEpochClients[msg.epochID] = msg
@ -533,6 +551,14 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash,
// Begin scanning blocks at every height to determine where the // Begin scanning blocks at every height to determine where the
// transaction was included in. // transaction was included in.
for height := heightHint; height <= currentHeight; height++ { for height := heightHint; height <= currentHeight; height++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
default:
}
blockHash, err := b.chainConn.GetBlockHash(int64(height)) blockHash, err := b.chainConn.GetBlockHash(int64(height))
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to get hash from block "+ return nil, fmt.Errorf("unable to get hash from block "+
@ -809,11 +835,15 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
heightHint: heightHint, heightHint: heightHint,
} }
if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil {
return nil, err
}
select { select {
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn: case b.notificationRegistry <- ntfn:
return ntfn.Event, nil return ntfn.Event, nil
case <-b.quit:
return nil, ErrChainNotifierShuttingDown
} }
} }

@ -307,31 +307,48 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
currentHeight := n.bestHeight currentHeight := n.bestHeight
n.heightMtx.RUnlock() n.heightMtx.RUnlock()
// Lookup whether the transaction is already included in the // Look up whether the transaction is already
// active chain. // included in the active chain. We'll do this
txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, // in a goroutine to prevent blocking
msg.heightHint) // potentially long rescans.
if err != nil { n.wg.Add(1)
chainntnfs.Log.Error(err) go func() {
} defer n.wg.Done()
if txConf == nil { confDetails, err := n.historicalConfDetails(
// If we can't fully dispatch confirmation, msg.TxID, currentHeight,
// then we'll update our filter so we can be msg.heightHint,
// notified of its future initial confirmation. )
if err != nil {
chainntnfs.Log.Error(err)
}
if confDetails != nil {
err := n.txConfNotifier.UpdateConfDetails(
*msg.TxID, msg.ConfID,
confDetails,
)
if err != nil {
chainntnfs.Log.Error(err)
}
return
}
// If we can't fully dispatch
// confirmation, then we'll update our
// filter so we can be notified of its
// future initial confirmation.
rescanUpdate := []neutrino.UpdateOption{ rescanUpdate := []neutrino.UpdateOption{
neutrino.AddTxIDs(*msg.TxID), neutrino.AddTxIDs(*msg.TxID),
neutrino.Rewind(currentHeight), neutrino.Rewind(currentHeight),
} }
if err := n.chainView.Update(rescanUpdate...); err != nil { err = n.chainView.Update(rescanUpdate...)
chainntnfs.Log.Errorf("unable to update rescan: %v", err) if err != nil {
chainntnfs.Log.Errorf("Unable "+
"to update rescan: %v",
err)
} }
} }()
err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf)
if err != nil {
chainntnfs.Log.Error(err)
}
case *blockEpochRegistration: case *blockEpochRegistration:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
@ -401,6 +418,14 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash,
// Starting from the height hint, we'll walk forwards in the chain to // Starting from the height hint, we'll walk forwards in the chain to
// see if this transaction has already been confirmed. // see if this transaction has already been confirmed.
for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ {
// Ensure we haven't been requested to shut down before
// processing the next height.
select {
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
default:
}
// First, we'll fetch the block header for this height so we // First, we'll fetch the block header for this height so we
// can compute the current block hash. // can compute the current block hash.
header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight)
@ -705,11 +730,15 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
heightHint: heightHint, heightHint: heightHint,
} }
if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil {
return nil, err
}
select { select {
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
case n.notificationRegistry <- ntfn: case n.notificationRegistry <- ntfn:
return ntfn.Event, nil return ntfn.Event, nil
case <-n.quit:
return nil, ErrChainNotifierShuttingDown
} }
} }