chainntnfs/neutrinonotify: track best block hash

This commit is contained in:
Wilmer Paulino 2018-12-10 18:29:25 -08:00
parent 060f2f7774
commit 568f8271a0
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 34 additions and 43 deletions

@ -44,8 +44,8 @@ type NeutrinoNotifier struct {
started int32 // To be used atomically. started int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
heightMtx sync.RWMutex bestBlockMtx sync.RWMutex
bestHeight uint32 bestBlock chainntnfs.BlockEpoch
p2pNode *neutrino.ChainService p2pNode *neutrino.ChainService
chainView *neutrino.Rescan chainView *neutrino.Rescan
@ -126,10 +126,12 @@ func (n *NeutrinoNotifier) Start() error {
if err != nil { if err != nil {
return err return err
} }
n.bestHeight = uint32(startingPoint.Height) n.bestBlock.Hash = &startingPoint.Hash
n.bestBlock.Height = startingPoint.Height
n.txNotifier = chainntnfs.NewTxNotifier( n.txNotifier = chainntnfs.NewTxNotifier(
n.bestHeight, chainntnfs.ReorgSafetyLimit, n.confirmHintCache, uint32(n.bestBlock.Height), chainntnfs.ReorgSafetyLimit,
n.spendHintCache, n.confirmHintCache, n.spendHintCache,
) )
// Next, we'll create our set of rescan options. Currently it's // Next, we'll create our set of rescan options. Currently it's
@ -334,9 +336,9 @@ out:
chainntnfs.Log.Infof("New block epoch subscription") chainntnfs.Log.Infof("New block epoch subscription")
n.blockEpochClients[msg.epochID] = msg n.blockEpochClients[msg.epochID] = msg
if msg.bestBlock != nil { if msg.bestBlock != nil {
n.heightMtx.Lock() n.bestBlockMtx.Lock()
bestHeight := int32(n.bestHeight) bestHeight := n.bestBlock.Height
n.heightMtx.Unlock() n.bestBlockMtx.Unlock()
missedBlocks, err := missedBlocks, err :=
chainntnfs.GetClientMissedBlocks( chainntnfs.GetClientMissedBlocks(
n.chainConn, msg.bestBlock, n.chainConn, msg.bestBlock,
@ -365,7 +367,7 @@ out:
case item := <-n.chainUpdates.ChanOut(): case item := <-n.chainUpdates.ChanOut():
update := item.(*filteredBlock) update := item.(*filteredBlock)
if update.connect { if update.connect {
n.heightMtx.Lock() n.bestBlockMtx.Lock()
// Since neutrino has no way of knowing what // Since neutrino has no way of knowing what
// height to rewind to in the case of a reorged // height to rewind to in the case of a reorged
// best known height, there is no point in // best known height, there is no point in
@ -374,27 +376,24 @@ out:
// the other notifiers do when they receive // the other notifiers do when they receive
// a new connected block. Therefore, we just // a new connected block. Therefore, we just
// compare the heights. // compare the heights.
if update.height != n.bestHeight+1 { if update.height != uint32(n.bestBlock.Height+1) {
// Handle the case where the notifier // Handle the case where the notifier
// missed some blocks from its chain // missed some blocks from its chain
// backend // backend
chainntnfs.Log.Infof("Missed blocks, " + chainntnfs.Log.Infof("Missed blocks, " +
"attempting to catch up") "attempting to catch up")
bestBlock := chainntnfs.BlockEpoch{
Height: int32(n.bestHeight),
Hash: nil,
}
_, missedBlocks, err := _, missedBlocks, err :=
chainntnfs.HandleMissedBlocks( chainntnfs.HandleMissedBlocks(
n.chainConn, n.chainConn,
n.txNotifier, n.txNotifier,
bestBlock, n.bestBlock,
int32(update.height), int32(update.height),
false, false,
) )
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
n.heightMtx.Unlock() n.bestBlockMtx.Unlock()
continue continue
} }
@ -403,13 +402,13 @@ out:
n.getFilteredBlock(block) n.getFilteredBlock(block)
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
n.heightMtx.Unlock() n.bestBlockMtx.Unlock()
continue out continue out
} }
err = n.handleBlockConnected(filteredBlock) err = n.handleBlockConnected(filteredBlock)
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
n.heightMtx.Unlock() n.bestBlockMtx.Unlock()
continue out continue out
} }
} }
@ -420,42 +419,30 @@ out:
if err != nil { if err != nil {
chainntnfs.Log.Error(err) chainntnfs.Log.Error(err)
} }
n.heightMtx.Unlock()
n.bestBlockMtx.Unlock()
continue continue
} }
n.heightMtx.Lock() n.bestBlockMtx.Lock()
if update.height != uint32(n.bestHeight) { if update.height != uint32(n.bestBlock.Height) {
chainntnfs.Log.Infof("Missed disconnected " + chainntnfs.Log.Infof("Missed disconnected " +
"blocks, attempting to catch up") "blocks, attempting to catch up")
} }
hash, err := n.p2pNode.GetBlockHash(int64(n.bestHeight))
if err != nil {
chainntnfs.Log.Errorf("Unable to fetch block hash "+
"for height %d: %v", n.bestHeight, err)
n.heightMtx.Unlock()
continue
}
notifierBestBlock := chainntnfs.BlockEpoch{
Height: int32(n.bestHeight),
Hash: hash,
}
newBestBlock, err := chainntnfs.RewindChain( newBestBlock, err := chainntnfs.RewindChain(
n.chainConn, n.txNotifier, notifierBestBlock, n.chainConn, n.txNotifier, n.bestBlock,
int32(update.height-1), int32(update.height-1),
) )
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable to rewind chain "+ chainntnfs.Log.Errorf("Unable to rewind chain "+
"from height %d to height %d: %v", "from height %d to height %d: %v",
n.bestHeight, update.height-1, err) n.bestBlock.Height, update.height-1, err)
} }
// Set the bestHeight here in case a chain rewind // Set the bestHeight here in case a chain rewind
// partially completed. // partially completed.
n.bestHeight = uint32(newBestBlock.Height) n.bestBlock = newBestBlock
n.heightMtx.Unlock() n.bestBlockMtx.Unlock()
case txUpdate := <-n.txUpdates.ChanOut(): case txUpdate := <-n.txUpdates.ChanOut():
// A new relevant transaction notification has been // A new relevant transaction notification has been
@ -570,6 +557,8 @@ func (n *NeutrinoNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequ
// handleBlockConnected applies a chain update for a new block. Any watched // handleBlockConnected applies a chain update for a new block. Any watched
// transactions included this block will processed to either send notifications // transactions included this block will processed to either send notifications
// now or after numConfirmations confs. // now or after numConfirmations confs.
//
// NOTE: This method must be called with the bestBlockMtx lock held.
func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// We'll extend the txNotifier's height with the information of this new // We'll extend the txNotifier's height with the information of this new
// block, which will handle all of the notification logic for us. // block, which will handle all of the notification logic for us.
@ -588,7 +577,8 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error {
// registered clients whom have had notifications fulfilled. Before // registered clients whom have had notifications fulfilled. Before
// doing so, we'll make sure update our in memory state in order to // doing so, we'll make sure update our in memory state in order to
// satisfy any client requests based upon the new block. // satisfy any client requests based upon the new block.
n.bestHeight = newBlock.height n.bestBlock.Hash = &newBlock.hash
n.bestBlock.Height = int32(newBlock.height)
n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash)
return n.txNotifier.NotifyHeight(newBlock.height) return n.txNotifier.NotifyHeight(newBlock.height)
@ -730,9 +720,9 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
// to fetch the UTXO from the chain. If we're behind, then we may miss a // to fetch the UTXO from the chain. If we're behind, then we may miss a
// notification dispatch. // notification dispatch.
for { for {
n.heightMtx.RLock() n.bestBlockMtx.RLock()
currentHeight := n.bestHeight currentHeight := uint32(n.bestBlock.Height)
n.heightMtx.RUnlock() n.bestBlockMtx.RUnlock()
if currentHeight >= historicalDispatch.StartHeight { if currentHeight >= historicalDispatch.StartHeight {
break break

@ -89,7 +89,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32,
// Run notificationDispatcher after setting the notifier's best height // Run notificationDispatcher after setting the notifier's best height
// to avoid a race condition. // to avoid a race condition.
n.bestHeight = uint32(bestHeight) n.bestBlock.Hash = bestHash
n.bestBlock.Height = bestHeight
n.wg.Add(1) n.wg.Add(1)
go n.notificationDispatcher() go n.notificationDispatcher()