chainntnfs/neutrino: use HistoricalConfDispatch in ntfn registry

This commit is contained in:
Conner Fromknecht 2018-08-26 21:37:10 -07:00
parent 6cd0f867ad
commit df9bb56068
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -314,19 +314,7 @@ out:
} }
n.spendNotifications[op][msg.spendID] = msg n.spendNotifications[op][msg.spendID] = msg
case *confirmationsNotification: case *chainntnfs.HistoricalConfDispatch:
chainntnfs.Log.Infof("New confirmations subscription: "+
"txid=%v, numconfs=%v, height_hint=%v",
msg.TxID, msg.NumConfirmations,
msg.ConfNtfn.HeightHint)
// If the notification can be partially or
// fully dispatched, then we can skip the first
// phase for ntfns.
n.heightMtx.RLock()
currentHeight := n.bestHeight
n.heightMtx.RUnlock()
// Look up whether the transaction is already // Look up whether the transaction is already
// included in the active chain. We'll do this // included in the active chain. We'll do this
// in a goroutine to prevent blocking // in a goroutine to prevent blocking
@ -336,8 +324,8 @@ out:
defer n.wg.Done() defer n.wg.Done()
confDetails, err := n.historicalConfDetails( confDetails, err := n.historicalConfDetails(
msg.TxID, msg.pkScript, currentHeight, msg.TxID, msg.PkScript,
msg.ConfNtfn.HeightHint, msg.StartHeight, msg.EndHeight,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
@ -349,7 +337,7 @@ out:
// the script is found in a block. // the script is found in a block.
params := n.p2pNode.ChainParams() params := n.p2pNode.ChainParams()
_, addrs, _, err := txscript.ExtractPkScriptAddrs( _, addrs, _, err := txscript.ExtractPkScriptAddrs(
msg.pkScript, &params, msg.PkScript, &params,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
@ -363,8 +351,7 @@ out:
// cache at tip, since any pending // cache at tip, since any pending
// rescans have now completed. // rescans have now completed.
err = n.txConfNotifier.UpdateConfDetails( err = n.txConfNotifier.UpdateConfDetails(
*msg.TxID, msg.ConfID, *msg.TxID, confDetails,
confDetails,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
@ -380,16 +367,14 @@ out:
// future initial confirmation. // future initial confirmation.
rescanUpdate := []neutrino.UpdateOption{ rescanUpdate := []neutrino.UpdateOption{
neutrino.AddAddrs(addrs...), neutrino.AddAddrs(addrs...),
neutrino.Rewind(currentHeight), neutrino.Rewind(msg.EndHeight),
neutrino.DisableDisconnectedNtfns(true), neutrino.DisableDisconnectedNtfns(true),
} }
err = n.chainView.Update(rescanUpdate...) err = n.chainView.Update(rescanUpdate...)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable "+ chainntnfs.Log.Errorf("Unable to update rescan: %v",
"to update rescan: %v",
err) err)
} }
}() }()
case *blockEpochRegistration: case *blockEpochRegistration:
@ -526,11 +511,11 @@ out:
// confirmation. // confirmation.
func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash,
pkScript []byte, pkScript []byte,
currentHeight, heightHint uint32) (*chainntnfs.TxConfirmation, error) { startHeight, endHeight uint32) (*chainntnfs.TxConfirmation, error) {
// Starting from the height hint, we'll walk forwards in the chain to // Starting from the height hint, we'll walk forwards in the chain to
// see if this transaction has already been confirmed. // see if this transaction has already been confirmed.
for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { for scanHeight := startHeight; scanHeight <= endHeight; scanHeight++ {
// Ensure we haven't been requested to shut down before // Ensure we haven't been requested to shut down before
// processing the next height. // processing the next height.
select { select {
@ -922,13 +907,6 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
return spendEvent, nil return spendEvent, nil
} }
// confirmationNotification represents a client's intent to receive a
// notification once the target txid reaches numConfirmations confirmations.
type confirmationsNotification struct {
chainntnfs.ConfNtfn
pkScript []byte
}
// RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier // RegisterConfirmationsNtfn registers a notification with NeutrinoNotifier
// which will be triggered once the txid reaches numConfs number of // which will be triggered once the txid reaches numConfs number of
// confirmations. // confirmations.
@ -938,23 +916,33 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
// Construct a notification request for the transaction and send it to // Construct a notification request for the transaction and send it to
// the main event loop. // the main event loop.
ntfn := &confirmationsNotification{ ntfn := &chainntnfs.ConfNtfn{
ConfNtfn: chainntnfs.ConfNtfn{
ConfID: atomic.AddUint64(&n.confClientCounter, 1), ConfID: atomic.AddUint64(&n.confClientCounter, 1),
TxID: txid, TxID: txid,
PkScript: pkScript,
NumConfirmations: numConfs, NumConfirmations: numConfs,
Event: chainntnfs.NewConfirmationEvent(numConfs), Event: chainntnfs.NewConfirmationEvent(numConfs),
HeightHint: heightHint, HeightHint: heightHint,
},
pkScript: pkScript,
} }
if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { chainntnfs.Log.Infof("New confirmation subscription: "+
"txid=%v, numconfs=%v", txid, numConfs)
// Register the conf notification with txconfnotifier. A non-nil value
// for `dispatch` will be returned if we are required to perform a
// manual scan for the confirmation. Otherwise the notifier will begin
// watching at tip for the transaction to confirm.
dispatch, err := n.txConfNotifier.Register(ntfn)
if err != nil {
return nil, err return nil, err
} }
if dispatch == nil {
return ntfn.Event, nil
}
select { select {
case n.notificationRegistry <- ntfn: case n.notificationRegistry <- dispatch:
return ntfn.Event, nil return ntfn.Event, nil
case <-n.quit: case <-n.quit:
return nil, ErrChainNotifierShuttingDown return nil, ErrChainNotifierShuttingDown