diff --git a/routing/chainview/queue.go b/routing/chainview/queue.go new file mode 100644 index 00000000..5d86c3c1 --- /dev/null +++ b/routing/chainview/queue.go @@ -0,0 +1,148 @@ +package chainview + +import "sync" + +// blockEventType is the possible types of a blockEvent. +type blockEventType uint8 + +const ( + // connected is the type of a blockEvent representing a block + // that was connected to our current chain. + connected blockEventType = iota + + // disconnected is the type of a blockEvent representing a + // block that is stale/disconnected from our current chain. + disconnected +) + +// blockEvent represent a block that was either connected +// or disconnected from the current chain. +type blockEvent struct { + eventType blockEventType + block *FilteredBlock +} + +// blockEventQueue is an ordered queue for block events sent from a +// FilteredChainView. The two types of possible block events are +// connected/new blocks, and disconnected/stale blocks. The +// blockEventQueue keeps the order of these events intact, while +// still being non-blocking. This is important in order for the +// chainView's call to onBlockConnected/onBlockDisconnected to not +// get blocked, and for the consumer of the block events to always +// get the events in the correct order. +type blockEventQueue struct { + queueCond *sync.Cond + queueMtx sync.Mutex + queue []*blockEvent + + // newBlocks is the channel where the consumer of the queue + // will receive connected/new blocks from the FilteredChainView. + newBlocks chan *FilteredBlock + + // stleBlocks is the channel where the consumer of the queue will + // receive disconnected/stale blocks from the FilteredChainView. + staleBlocks chan *FilteredBlock + + wg sync.WaitGroup + quit chan struct{} +} + +// newBlockEventQueue creates a new blockEventQueue. +func newBlockEventQueue() *blockEventQueue { + b := &blockEventQueue{ + newBlocks: make(chan *FilteredBlock), + staleBlocks: make(chan *FilteredBlock), + quit: make(chan struct{}), + } + b.queueCond = sync.NewCond(&b.queueMtx) + + return b +} + +// Start starts the blockEventQueue coordinator such that it can start handling +// events. +func (b *blockEventQueue) Start() { + b.wg.Add(1) + go b.queueCoordinator() +} + +// Stop signals the queue coordinator to stop, such that the queue can be +// shut down. +func (b *blockEventQueue) Stop() { + close(b.quit) + + b.queueCond.Signal() + b.wg.Wait() +} + +// queueCoordinator is the queue's main loop, handling incoming block events +// and handing them off to the correct output channel. +// +// NB: MUST be run as a goroutine from the Start() method. +func (b *blockEventQueue) queueCoordinator() { + defer b.wg.Done() + + for { + // First, we'll check our condition. If the queue of events is + // empty, then we'll wait until a new item is added. + b.queueCond.L.Lock() + for len(b.queue) == 0 { + b.queueCond.Wait() + + // If we were woke up in order to exit, then we'll do + // so. Otherwise, we'll check the queue for any new + // items. + select { + case <-b.quit: + b.queueCond.L.Unlock() + return + default: + } + } + + // Grab the first element in the queue, and nil the index to + // avoid gc leak. + event := b.queue[0] + b.queue[0] = nil + b.queue = b.queue[1:] + b.queueCond.L.Unlock() + + // In the case this is a connected block, we'll send it on the + // newBlocks channel. In case it is a disconnected block, we'll + // send it on the staleBlocks channel. This send will block + // until it is received by the consumer on the other end, making + // sure we won't try to send any other block event before the + // consumer is aware of this one. + switch event.eventType { + case connected: + select { + case b.newBlocks <- event.block: + case <-b.quit: + return + } + case disconnected: + select { + case b.staleBlocks <- event.block: + case <-b.quit: + return + } + } + } +} + +// Add puts the provided blockEvent at the end of the event queue, making sure +// it will first be received after all previous events. This method is +// non-blocking, in the sense that it will never wait for the consumer of the +// queue to read form the other end, making it safe to call from the +// FilteredChainView's onBlockConnected/onBlockDisconnected. +func (b *blockEventQueue) Add(event *blockEvent) { + + // Lock the condition, and add the event to the end of queue. + b.queueCond.L.Lock() + b.queue = append(b.queue, event) + b.queueCond.L.Unlock() + + // With the event added, we signal to the queueCoordinator that + // there are new events to handle. + b.queueCond.Signal() +}