diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 7cd26a74..13348d7e 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -64,8 +64,13 @@ type NeutrinoNotifier struct { rescanErr <-chan error - newBlocks chan filteredBlock - staleBlocks chan filteredBlock + newBlocksMtx sync.Mutex + newBlocks []*filteredBlock + newBlocksUpdateSignal chan struct{} + + staleBlocksMtx sync.Mutex + staleBlocks []*filteredBlock + staleBlocksUpdateSignal chan struct{} wg sync.WaitGroup quit chan struct{} @@ -95,8 +100,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { rescanErr: make(chan error), - newBlocks: make(chan filteredBlock), - staleBlocks: make(chan filteredBlock), + newBlocksUpdateSignal: make(chan struct{}), + + staleBlocksUpdateSignal: make(chan struct{}), quit: make(chan struct{}), } @@ -178,7 +184,6 @@ func (n *NeutrinoNotifier) Stop() error { } } for _, epochClient := range n.blockEpochClients { - close(epochClient.cancelChan) close(epochClient.epochChan) } @@ -200,11 +205,22 @@ type filteredBlock struct { func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, header *wire.BlockHeader, txns []*btcutil.Tx) { - n.newBlocks <- filteredBlock{ + // Append this new chain update to the end of the queue of new chain + // updates. + n.newBlocksMtx.Lock() + n.newBlocks = append(n.newBlocks, &filteredBlock{ hash: header.BlockHash(), height: uint32(height), txns: txns, - } + }) + n.newBlocksMtx.Unlock() + + // Launch a goroutine to signal the notification dispatcher that a new + // transaction update is available. We do this in a new goroutine in + // order to avoid blocking the main loop of the rescan. + go func() { + n.newBlocksUpdateSignal <- struct{}{} + }() } // onFilteredBlockDisconnected is a callback which is executed each time a new @@ -212,10 +228,21 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, header *wire.BlockHeader) { - n.staleBlocks <- filteredBlock{ + // Append this new chain update to the end of the queue of new chain + // disconnects. + n.staleBlocksMtx.Lock() + n.staleBlocks = append(n.staleBlocks, &filteredBlock{ hash: header.BlockHash(), height: uint32(height), - } + }) + n.staleBlocksMtx.Unlock() + + // Launch a goroutine to signal the notification dispatcher that a new + // transaction update is available. We do this in a new goroutine in + // order to avoid blocking the main loop of the rescan. + go func() { + n.staleBlocksUpdateSignal <- struct{}{} + }() } // notificationDispatcher is the primary goroutine which handles client @@ -243,7 +270,17 @@ func (n *NeutrinoNotifier) notificationDispatcher() { chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) + // First, close the cancel channel for this + // specific client, and wait for the client to + // exit. close(n.blockEpochClients[msg.epochID].cancelChan) + n.blockEpochClients[msg.epochID].wg.Wait() + + // Once the client has exited, we can then + // safely close the channel used to send epoch + // notifications, in order to notify any + // listeners that the intent has been + // cancelled. close(n.blockEpochClients[msg.epochID].epochChan) delete(n.blockEpochClients, msg.epochID) @@ -298,7 +335,15 @@ func (n *NeutrinoNotifier) notificationDispatcher() { n.blockEpochClients[msg.epochID] = msg } - case newBlock := <-n.newBlocks: + case <-n.newBlocksUpdateSignal: + // A new update is available, so pop the new chain + // update from the front of the update queue. + n.newBlocksMtx.Lock() + newBlock := n.newBlocks[0] + n.newBlocks[0] = nil // Set to nil to prevent GC leak. + n.newBlocks = n.newBlocks[1:] + n.newBlocksMtx.Unlock() + n.heightMtx.Lock() n.bestHeight = newBlock.height n.heightMtx.Unlock() @@ -324,7 +369,7 @@ func (n *NeutrinoNotifier) notificationDispatcher() { mtx := tx.MsgTx() txIndex := tx.Index() txSha := mtx.TxHash() - n.checkConfirmationTrigger(&txSha, &newBlock, txIndex) + n.checkConfirmationTrigger(&txSha, newBlock, txIndex) for i, txIn := range mtx.TxIn { prevOut := txIn.PreviousOutPoint @@ -367,7 +412,14 @@ func (n *NeutrinoNotifier) notificationDispatcher() { // have been triggered by this new block. n.notifyConfs(int32(newBlock.height)) - case staleBlock := <-n.staleBlocks: + case <-n.staleBlocksUpdateSignal: + // A new update is available, so pop the new chain + // update from the front of the update queue. + n.staleBlocksMtx.Lock() + staleBlock := n.staleBlocks[0] + n.staleBlocks[0] = nil // Set to nil to prevent GC leak. + n.staleBlocks = n.staleBlocks[1:] + n.staleBlocksMtx.Unlock() chainntnfs.Log.Warnf("Block disconnected from main "+ "chain: %v", staleBlock.hash) @@ -509,7 +561,11 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash. for _, epochClient := range n.blockEpochClients { n.wg.Add(1) - go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}) { + epochClient.wg.Add(1) + go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}, + clientWg sync.WaitGroup) { + + defer clientWg.Done() defer n.wg.Done() select { @@ -521,7 +577,7 @@ func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash. case <-n.quit: return } - }(epochClient.epochChan, epochClient.cancelChan) + }(epochClient.epochChan, epochClient.cancelChan, epochClient.wg) } } @@ -757,11 +813,13 @@ func (n *NeutrinoNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, // blockEpochRegistration represents a client's intent to receive a // notification with each newly connected block. type blockEpochRegistration struct { + epochID uint64 + epochChan chan *chainntnfs.BlockEpoch cancelChan chan struct{} - epochID uint64 + wg sync.WaitGroup } // epochCancel is a message sent to the NeutrinoNotifier when a client wishes