chainntnfs: Use the new ConcurrentQueue in neutrino notifier.

This commit is contained in:
Jim Posen 2017-09-29 12:10:38 -07:00 committed by Olaoluwa Osuntokun
parent 726c8b2301
commit c9ad9b2269

@ -65,13 +65,8 @@ type NeutrinoNotifier struct {
rescanErr <-chan error rescanErr <-chan error
newBlocksMtx sync.Mutex newBlocks *chainntnfs.ConcurrentQueue
newBlocks []*filteredBlock staleBlocks *chainntnfs.ConcurrentQueue
newBlocksUpdateSignal chan struct{}
staleBlocksMtx sync.Mutex
staleBlocks []*filteredBlock
staleBlocksUpdateSignal chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
@ -101,9 +96,9 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) {
rescanErr: make(chan error), rescanErr: make(chan error),
newBlocksUpdateSignal: make(chan struct{}), newBlocks: chainntnfs.NewConcurrentQueue(10),
staleBlocksUpdateSignal: make(chan struct{}), staleBlocks: chainntnfs.NewConcurrentQueue(10),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -155,6 +150,9 @@ func (n *NeutrinoNotifier) Start() error {
n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.chainView = n.p2pNode.NewRescan(rescanOptions...)
n.rescanErr = n.chainView.Start() n.rescanErr = n.chainView.Start()
n.newBlocks.Start()
n.staleBlocks.Start()
n.wg.Add(1) n.wg.Add(1)
go n.notificationDispatcher() go n.notificationDispatcher()
@ -171,6 +169,9 @@ func (n *NeutrinoNotifier) Stop() error {
close(n.quit) close(n.quit)
n.wg.Wait() n.wg.Wait()
n.newBlocks.Stop()
n.staleBlocks.Stop()
// Notify all pending clients of our shutdown by closing the related // Notify all pending clients of our shutdown by closing the related
// notification channels. // notification channels.
for _, spendClients := range n.spendNotifications { for _, spendClients := range n.spendNotifications {
@ -208,20 +209,11 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32,
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// updates. // updates.
n.newBlocksMtx.Lock() n.newBlocks.ChanIn() <- &filteredBlock{
n.newBlocks = append(n.newBlocks, &filteredBlock{
hash: header.BlockHash(), hash: header.BlockHash(),
height: uint32(height), height: uint32(height),
txns: txns, txns: txns,
}) }
n.newBlocksMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// transaction update is available. We do this in a new goroutine in
// order to avoid blocking the main loop of the rescan.
go func() {
n.newBlocksUpdateSignal <- struct{}{}
}()
} }
// onFilteredBlockDisconnected is a callback which is executed each time a new // onFilteredBlockDisconnected is a callback which is executed each time a new
@ -231,19 +223,10 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32,
// Append this new chain update to the end of the queue of new chain // Append this new chain update to the end of the queue of new chain
// disconnects. // disconnects.
n.staleBlocksMtx.Lock() n.staleBlocks.ChanIn() <- &filteredBlock{
n.staleBlocks = append(n.staleBlocks, &filteredBlock{
hash: header.BlockHash(), hash: header.BlockHash(),
height: uint32(height), height: uint32(height),
}) }
n.staleBlocksMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// transaction update is available. We do this in a new goroutine in
// order to avoid blocking the main loop of the rescan.
go func() {
n.staleBlocksUpdateSignal <- struct{}{}
}()
} }
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
@ -335,14 +318,8 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
n.blockEpochClients[msg.epochID] = msg n.blockEpochClients[msg.epochID] = msg
} }
case <-n.newBlocksUpdateSignal: case item := <-n.newBlocks.ChanOut():
// A new update is available, so pop the new chain newBlock := item.(*filteredBlock)
// update from the front of the update queue.
n.newBlocksMtx.Lock()
newBlock := n.newBlocks[0]
n.newBlocks[0] = nil // Set to nil to prevent GC leak.
n.newBlocks = n.newBlocks[1:]
n.newBlocksMtx.Unlock()
n.heightMtx.Lock() n.heightMtx.Lock()
n.bestHeight = newBlock.height n.bestHeight = newBlock.height
@ -417,15 +394,8 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
// have been triggered by this new block. // have been triggered by this new block.
n.notifyConfs(int32(newBlock.height)) n.notifyConfs(int32(newBlock.height))
case <-n.staleBlocksUpdateSignal: case item := <-n.staleBlocks.ChanOut():
// A new update is available, so pop the new chain staleBlock := item.(*filteredBlock)
// update from the front of the update queue.
n.staleBlocksMtx.Lock()
staleBlock := n.staleBlocks[0]
n.staleBlocks[0] = nil // Set to nil to prevent GC leak.
n.staleBlocks = n.staleBlocks[1:]
n.staleBlocksMtx.Unlock()
chainntnfs.Log.Warnf("Block disconnected from main "+ chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlock.hash) "chain: %v", staleBlock.hash)