diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 6a3e5738..a2db0763 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -44,8 +44,8 @@ type NeutrinoNotifier struct { started int32 // To be used atomically. stopped int32 // To be used atomically. - heightMtx sync.RWMutex - bestHeight uint32 + bestBlockMtx sync.RWMutex + bestBlock chainntnfs.BlockEpoch p2pNode *neutrino.ChainService chainView *neutrino.Rescan @@ -126,10 +126,12 @@ func (n *NeutrinoNotifier) Start() error { if err != nil { return err } - n.bestHeight = uint32(startingPoint.Height) + n.bestBlock.Hash = &startingPoint.Hash + n.bestBlock.Height = startingPoint.Height + n.txNotifier = chainntnfs.NewTxNotifier( - n.bestHeight, chainntnfs.ReorgSafetyLimit, n.confirmHintCache, - n.spendHintCache, + uint32(n.bestBlock.Height), chainntnfs.ReorgSafetyLimit, + n.confirmHintCache, n.spendHintCache, ) // Next, we'll create our set of rescan options. Currently it's @@ -334,9 +336,9 @@ out: chainntnfs.Log.Infof("New block epoch subscription") n.blockEpochClients[msg.epochID] = msg if msg.bestBlock != nil { - n.heightMtx.Lock() - bestHeight := int32(n.bestHeight) - n.heightMtx.Unlock() + n.bestBlockMtx.Lock() + bestHeight := n.bestBlock.Height + n.bestBlockMtx.Unlock() missedBlocks, err := chainntnfs.GetClientMissedBlocks( n.chainConn, msg.bestBlock, @@ -365,7 +367,7 @@ out: case item := <-n.chainUpdates.ChanOut(): update := item.(*filteredBlock) if update.connect { - n.heightMtx.Lock() + n.bestBlockMtx.Lock() // Since neutrino has no way of knowing what // height to rewind to in the case of a reorged // best known height, there is no point in @@ -374,27 +376,24 @@ out: // the other notifiers do when they receive // a new connected block. Therefore, we just // compare the heights. - if update.height != n.bestHeight+1 { + if update.height != uint32(n.bestBlock.Height+1) { // Handle the case where the notifier // missed some blocks from its chain // backend chainntnfs.Log.Infof("Missed blocks, " + "attempting to catch up") - bestBlock := chainntnfs.BlockEpoch{ - Height: int32(n.bestHeight), - Hash: nil, - } + _, missedBlocks, err := chainntnfs.HandleMissedBlocks( n.chainConn, n.txNotifier, - bestBlock, + n.bestBlock, int32(update.height), false, ) if err != nil { chainntnfs.Log.Error(err) - n.heightMtx.Unlock() + n.bestBlockMtx.Unlock() continue } @@ -403,13 +402,13 @@ out: n.getFilteredBlock(block) if err != nil { chainntnfs.Log.Error(err) - n.heightMtx.Unlock() + n.bestBlockMtx.Unlock() continue out } err = n.handleBlockConnected(filteredBlock) if err != nil { chainntnfs.Log.Error(err) - n.heightMtx.Unlock() + n.bestBlockMtx.Unlock() continue out } } @@ -420,42 +419,30 @@ out: if err != nil { chainntnfs.Log.Error(err) } - n.heightMtx.Unlock() + + n.bestBlockMtx.Unlock() continue } - n.heightMtx.Lock() - if update.height != uint32(n.bestHeight) { + n.bestBlockMtx.Lock() + if update.height != uint32(n.bestBlock.Height) { chainntnfs.Log.Infof("Missed disconnected " + "blocks, attempting to catch up") } - - hash, err := n.p2pNode.GetBlockHash(int64(n.bestHeight)) - if err != nil { - chainntnfs.Log.Errorf("Unable to fetch block hash "+ - "for height %d: %v", n.bestHeight, err) - n.heightMtx.Unlock() - continue - } - - notifierBestBlock := chainntnfs.BlockEpoch{ - Height: int32(n.bestHeight), - Hash: hash, - } newBestBlock, err := chainntnfs.RewindChain( - n.chainConn, n.txNotifier, notifierBestBlock, + n.chainConn, n.txNotifier, n.bestBlock, int32(update.height-1), ) if err != nil { chainntnfs.Log.Errorf("Unable to rewind chain "+ "from height %d to height %d: %v", - n.bestHeight, update.height-1, err) + n.bestBlock.Height, update.height-1, err) } // Set the bestHeight here in case a chain rewind // partially completed. - n.bestHeight = uint32(newBestBlock.Height) - n.heightMtx.Unlock() + n.bestBlock = newBestBlock + n.bestBlockMtx.Unlock() case txUpdate := <-n.txUpdates.ChanOut(): // A new relevant transaction notification has been @@ -570,6 +557,8 @@ func (n *NeutrinoNotifier) historicalConfDetails(confRequest chainntnfs.ConfRequ // handleBlockConnected applies a chain update for a new block. Any watched // transactions included this block will processed to either send notifications // now or after numConfirmations confs. +// +// NOTE: This method must be called with the bestBlockMtx lock held. func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // We'll extend the txNotifier's height with the information of this new // block, which will handle all of the notification logic for us. @@ -588,7 +577,8 @@ func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { // registered clients whom have had notifications fulfilled. Before // doing so, we'll make sure update our in memory state in order to // satisfy any client requests based upon the new block. - n.bestHeight = newBlock.height + n.bestBlock.Hash = &newBlock.hash + n.bestBlock.Height = int32(newBlock.height) n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) return n.txNotifier.NotifyHeight(newBlock.height) @@ -730,9 +720,9 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // to fetch the UTXO from the chain. If we're behind, then we may miss a // notification dispatch. for { - n.heightMtx.RLock() - currentHeight := n.bestHeight - n.heightMtx.RUnlock() + n.bestBlockMtx.RLock() + currentHeight := uint32(n.bestBlock.Height) + n.bestBlockMtx.RUnlock() if currentHeight >= historicalDispatch.StartHeight { break diff --git a/chainntnfs/neutrinonotify/neutrino_dev.go b/chainntnfs/neutrinonotify/neutrino_dev.go index 44c1ba6d..61c24942 100644 --- a/chainntnfs/neutrinonotify/neutrino_dev.go +++ b/chainntnfs/neutrinonotify/neutrino_dev.go @@ -89,7 +89,8 @@ func (n *NeutrinoNotifier) UnsafeStart(bestHeight int32, // Run notificationDispatcher after setting the notifier's best height // to avoid a race condition. - n.bestHeight = uint32(bestHeight) + n.bestBlock.Hash = bestHash + n.bestBlock.Height = bestHeight n.wg.Add(1) go n.notificationDispatcher()