diff --git a/chainntnfs/queue.go b/chainntnfs/queue.go new file mode 100644 index 00000000..90198a4b --- /dev/null +++ b/chainntnfs/queue.go @@ -0,0 +1,86 @@ +package chainntnfs + +import ( + "container/list" +) + +// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. +// Clients interact with the queue by pushing items into the in channel and +// popping items from the out channel. There is a goroutine that manages moving +// items from the in channel to the out channel in the correct order that must +// be started by calling Start(). +type ConcurrentQueue struct { + chanIn chan interface{} + chanOut chan interface{} + quit chan struct{} + overflow *list.List +} + +// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is +// the capacity of the output channel. When the size of the queue is below this +// threshold, pushes do not incur the overhead of the less efficient overflow +// structure. +func NewConcurrentQueue(bufferSize int) *ConcurrentQueue { + return &ConcurrentQueue{ + chanIn: make(chan interface{}), + chanOut: make(chan interface{}, bufferSize), + quit: make(chan struct{}), + overflow: list.New(), + } +} + +// ChanIn returns a channel that can be used to push new items into the queue. +func (cq *ConcurrentQueue) ChanIn() chan<- interface{} { + return cq.chanIn +} + +// ChanOut returns a channel that can be used to pop items from the queue. +func (cq *ConcurrentQueue) ChanOut() <-chan interface{} { + return cq.chanOut +} + +// Start begins a goroutine that manages moving items from the in channel to the +// out channel. The queue tries to move items directly to the out channel +// minimize overhead, but if the out channel is full it pushes items to an +// overflow queue. This must be called before using the queue. +func (cq *ConcurrentQueue) Start() { + go func() { + for { + nextElement := cq.overflow.Front() + if nextElement == nil { + // Overflow queue is empty so incoming items can be pushed + // directly to the output channel. If output channel is full + // though, push to overflow. + select { + case item := <-cq.chanIn: + select { + case cq.chanOut <- item: + // Optimistically push directly to chanOut + default: + cq.overflow.PushBack(item) + } + case <-cq.quit: + return + } + } else { + // Overflow queue is not empty, so any new items get pushed to + // the back to preserve order. + select { + case item := <-cq.chanIn: + cq.overflow.PushBack(item) + case cq.chanOut <- nextElement.Value: + cq.overflow.Remove(nextElement) + case <-cq.quit: + return + } + } + } + }() +} + +// Stop ends the goroutine that moves items from the in channel to the out +// channel. This does not clear the queue state, so the queue can be restarted +// without dropping items. +func (cq *ConcurrentQueue) Stop() { + cq.quit <- struct{}{} +} diff --git a/chainntnfs/queue_test.go b/chainntnfs/queue_test.go new file mode 100644 index 00000000..44d62735 --- /dev/null +++ b/chainntnfs/queue_test.go @@ -0,0 +1,26 @@ +package chainntnfs_test + +import ( + "testing" + + "github.com/lightningnetwork/lnd/chainntnfs" +) + +func TestConcurrentQueue(t *testing.T) { + queue := chainntnfs.NewConcurrentQueue(100) + queue.Start() + defer queue.Stop() + + // Pushes should never block for long. + for i := 0; i < 1000; i++ { + queue.ChanIn() <- i + } + + // Pops also should not block for long. Expect elements in FIFO order. + for i := 0; i < 1000; i++ { + item := <-queue.ChanOut() + if i != item.(int) { + t.Fatalf("Dequeued wrong value: expected %d, got %d", i, item.(int)) + } + } +}