From 12816a910dc95ca8d18e8c7802dbf6eaa425b7dc Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Thu, 26 Jul 2018 21:32:55 -0700 Subject: [PATCH] chainntnfs: make historical confirmation rescans async --- chainntnfs/bitcoindnotify/bitcoind.go | 61 ++++++++++++++++------- chainntnfs/btcdnotify/btcd.go | 58 ++++++++++++++++------ chainntnfs/neutrinonotify/neutrino.go | 71 +++++++++++++++++++-------- 3 files changed, 137 insertions(+), 53 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index dc898a04..6e3981cf 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -243,27 +243,40 @@ out: "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - _, currentHeight, err := b.chainConn.GetBestBlock() - if err != nil { - chainntnfs.Log.Error(err) - } + currentHeight := uint32(bestHeight) - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, uint32(currentHeight), - ) - if err != nil { - chainntnfs.Log.Error(err) - } + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + confDetails, err := b.historicalConfDetails( + msg.TxID, msg.heightHint, + currentHeight, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + if confDetails != nil { + err := b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + } + }() - err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg + case chain.RelevantTx: b.handleRelevantTx(msg, bestHeight) } @@ -474,6 +487,14 @@ func (b *BitcoindNotifier) confDetailsManually(txid *chainhash.Hash, // Begin scanning blocks at every height to determine where the // transaction was included in. for height := heightHint; height <= currentHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { return nil, fmt.Errorf("unable to get hash from block "+ @@ -725,11 +746,15 @@ func (b *BitcoindNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-b.quit: + return nil, ErrChainNotifierShuttingDown } } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 5e6ee526..fa4f1826 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -299,24 +299,42 @@ out: b.spendNotifications[op] = make(map[uint64]*spendNotification) } b.spendNotifications[op][msg.spendID] = msg + case *confirmationNotification: chainntnfs.Log.Infof("New confirmation "+ "subscription: txid=%v, numconfs=%v", msg.TxID, msg.NumConfirmations) - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := b.historicalConfDetails( - msg.TxID, msg.heightHint, uint32(currentHeight), - ) - if err != nil { - chainntnfs.Log.Error(err) - } + bestHeight := uint32(currentHeight) + + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + b.wg.Add(1) + go func() { + defer b.wg.Done() + + confDetails, err := b.historicalConfDetails( + msg.TxID, msg.heightHint, + bestHeight, + ) + if err != nil { + chainntnfs.Log.Error(err) + return + } + + if confDetails != nil { + err = b.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + } + }() - err = b.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") b.blockEpochClients[msg.epochID] = msg @@ -533,6 +551,14 @@ func (b *BtcdNotifier) confDetailsManually(txid *chainhash.Hash, // Begin scanning blocks at every height to determine where the // transaction was included in. for height := heightHint; height <= currentHeight; height++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-b.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + blockHash, err := b.chainConn.GetBlockHash(int64(height)) if err != nil { return nil, fmt.Errorf("unable to get hash from block "+ @@ -809,11 +835,15 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := b.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-b.quit: - return nil, ErrChainNotifierShuttingDown case b.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-b.quit: + return nil, ErrChainNotifierShuttingDown } } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index d811f0e5..7d4dc5d0 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -307,31 +307,48 @@ func (n *NeutrinoNotifier) notificationDispatcher() { currentHeight := n.bestHeight n.heightMtx.RUnlock() - // Lookup whether the transaction is already included in the - // active chain. - txConf, err := n.historicalConfDetails(msg.TxID, currentHeight, - msg.heightHint) - if err != nil { - chainntnfs.Log.Error(err) - } + // Look up whether the transaction is already + // included in the active chain. We'll do this + // in a goroutine to prevent blocking + // potentially long rescans. + n.wg.Add(1) + go func() { + defer n.wg.Done() - if txConf == nil { - // If we can't fully dispatch confirmation, - // then we'll update our filter so we can be - // notified of its future initial confirmation. + confDetails, err := n.historicalConfDetails( + msg.TxID, currentHeight, + msg.heightHint, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + + if confDetails != nil { + err := n.txConfNotifier.UpdateConfDetails( + *msg.TxID, msg.ConfID, + confDetails, + ) + if err != nil { + chainntnfs.Log.Error(err) + } + return + } + + // If we can't fully dispatch + // confirmation, then we'll update our + // filter so we can be notified of its + // future initial confirmation. rescanUpdate := []neutrino.UpdateOption{ neutrino.AddTxIDs(*msg.TxID), neutrino.Rewind(currentHeight), } - if err := n.chainView.Update(rescanUpdate...); err != nil { - chainntnfs.Log.Errorf("unable to update rescan: %v", err) + err = n.chainView.Update(rescanUpdate...) + if err != nil { + chainntnfs.Log.Errorf("Unable "+ + "to update rescan: %v", + err) } - } - - err = n.txConfNotifier.Register(&msg.ConfNtfn, txConf) - if err != nil { - chainntnfs.Log.Error(err) - } + }() case *blockEpochRegistration: chainntnfs.Log.Infof("New block epoch subscription") @@ -401,6 +418,14 @@ func (n *NeutrinoNotifier) historicalConfDetails(targetHash *chainhash.Hash, // Starting from the height hint, we'll walk forwards in the chain to // see if this transaction has already been confirmed. for scanHeight := heightHint; scanHeight <= currentHeight; scanHeight++ { + // Ensure we haven't been requested to shut down before + // processing the next height. + select { + case <-n.quit: + return nil, ErrChainNotifierShuttingDown + default: + } + // First, we'll fetch the block header for this height so we // can compute the current block hash. header, err := n.p2pNode.BlockHeaders.FetchHeaderByHeight(scanHeight) @@ -705,11 +730,15 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, heightHint: heightHint, } + if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { + return nil, err + } + select { - case <-n.quit: - return nil, ErrChainNotifierShuttingDown case n.notificationRegistry <- ntfn: return ntfn.Event, nil + case <-n.quit: + return nil, ErrChainNotifierShuttingDown } }