chainntnfs: Implement unbounded concurrent-safe FIFO queue.
This can be used in at least one place in the notifiers to improve efficiency and reduce complexity.
This commit is contained in:
parent
555cead5ad
commit
726c8b2301
86
chainntnfs/queue.go
Normal file
86
chainntnfs/queue.go
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
package chainntnfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
|
||||||
|
// Clients interact with the queue by pushing items into the in channel and
|
||||||
|
// popping items from the out channel. There is a goroutine that manages moving
|
||||||
|
// items from the in channel to the out channel in the correct order that must
|
||||||
|
// be started by calling Start().
|
||||||
|
type ConcurrentQueue struct {
|
||||||
|
chanIn chan interface{}
|
||||||
|
chanOut chan interface{}
|
||||||
|
quit chan struct{}
|
||||||
|
overflow *list.List
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
|
||||||
|
// the capacity of the output channel. When the size of the queue is below this
|
||||||
|
// threshold, pushes do not incur the overhead of the less efficient overflow
|
||||||
|
// structure.
|
||||||
|
func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
|
||||||
|
return &ConcurrentQueue{
|
||||||
|
chanIn: make(chan interface{}),
|
||||||
|
chanOut: make(chan interface{}, bufferSize),
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
overflow: list.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChanIn returns a channel that can be used to push new items into the queue.
|
||||||
|
func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
|
||||||
|
return cq.chanIn
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChanOut returns a channel that can be used to pop items from the queue.
|
||||||
|
func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
|
||||||
|
return cq.chanOut
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins a goroutine that manages moving items from the in channel to the
|
||||||
|
// out channel. The queue tries to move items directly to the out channel
|
||||||
|
// minimize overhead, but if the out channel is full it pushes items to an
|
||||||
|
// overflow queue. This must be called before using the queue.
|
||||||
|
func (cq *ConcurrentQueue) Start() {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
nextElement := cq.overflow.Front()
|
||||||
|
if nextElement == nil {
|
||||||
|
// Overflow queue is empty so incoming items can be pushed
|
||||||
|
// directly to the output channel. If output channel is full
|
||||||
|
// though, push to overflow.
|
||||||
|
select {
|
||||||
|
case item := <-cq.chanIn:
|
||||||
|
select {
|
||||||
|
case cq.chanOut <- item:
|
||||||
|
// Optimistically push directly to chanOut
|
||||||
|
default:
|
||||||
|
cq.overflow.PushBack(item)
|
||||||
|
}
|
||||||
|
case <-cq.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Overflow queue is not empty, so any new items get pushed to
|
||||||
|
// the back to preserve order.
|
||||||
|
select {
|
||||||
|
case item := <-cq.chanIn:
|
||||||
|
cq.overflow.PushBack(item)
|
||||||
|
case cq.chanOut <- nextElement.Value:
|
||||||
|
cq.overflow.Remove(nextElement)
|
||||||
|
case <-cq.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop ends the goroutine that moves items from the in channel to the out
|
||||||
|
// channel. This does not clear the queue state, so the queue can be restarted
|
||||||
|
// without dropping items.
|
||||||
|
func (cq *ConcurrentQueue) Stop() {
|
||||||
|
cq.quit <- struct{}{}
|
||||||
|
}
|
26
chainntnfs/queue_test.go
Normal file
26
chainntnfs/queue_test.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package chainntnfs_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConcurrentQueue(t *testing.T) {
|
||||||
|
queue := chainntnfs.NewConcurrentQueue(100)
|
||||||
|
queue.Start()
|
||||||
|
defer queue.Stop()
|
||||||
|
|
||||||
|
// Pushes should never block for long.
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
queue.ChanIn() <- i
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pops also should not block for long. Expect elements in FIFO order.
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
item := <-queue.ChanOut()
|
||||||
|
if i != item.(int) {
|
||||||
|
t.Fatalf("Dequeued wrong value: expected %d, got %d", i, item.(int))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user