Merge pull request #2252 from cfromknecht/queue-coverage
queue: Add reliable 100% coverage
This commit is contained in:
commit
92be757223
@ -2,6 +2,8 @@ package queue
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
|
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
|
||||||
@ -10,10 +12,15 @@ import (
|
|||||||
// items from the in channel to the out channel in the correct order that must
|
// items from the in channel to the out channel in the correct order that must
|
||||||
// be started by calling Start().
|
// be started by calling Start().
|
||||||
type ConcurrentQueue struct {
|
type ConcurrentQueue struct {
|
||||||
|
started uint32 // to be used atomically
|
||||||
|
stopped uint32 // to be used atomically
|
||||||
|
|
||||||
chanIn chan interface{}
|
chanIn chan interface{}
|
||||||
chanOut chan interface{}
|
chanOut chan interface{}
|
||||||
quit chan struct{}
|
|
||||||
overflow *list.List
|
overflow *list.List
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
|
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
|
||||||
@ -24,8 +31,8 @@ func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
|
|||||||
return &ConcurrentQueue{
|
return &ConcurrentQueue{
|
||||||
chanIn: make(chan interface{}),
|
chanIn: make(chan interface{}),
|
||||||
chanOut: make(chan interface{}, bufferSize),
|
chanOut: make(chan interface{}, bufferSize),
|
||||||
quit: make(chan struct{}),
|
|
||||||
overflow: list.New(),
|
overflow: list.New(),
|
||||||
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,7 +51,14 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
|
|||||||
// minimize overhead, but if the out channel is full it pushes items to an
|
// minimize overhead, but if the out channel is full it pushes items to an
|
||||||
// overflow queue. This must be called before using the queue.
|
// overflow queue. This must be called before using the queue.
|
||||||
func (cq *ConcurrentQueue) Start() {
|
func (cq *ConcurrentQueue) Start() {
|
||||||
|
if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cq.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer cq.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
nextElement := cq.overflow.Front()
|
nextElement := cq.overflow.Front()
|
||||||
if nextElement == nil {
|
if nextElement == nil {
|
||||||
@ -82,5 +96,10 @@ func (cq *ConcurrentQueue) Start() {
|
|||||||
// channel. This does not clear the queue state, so the queue can be restarted
|
// channel. This does not clear the queue state, so the queue can be restarted
|
||||||
// without dropping items.
|
// without dropping items.
|
||||||
func (cq *ConcurrentQueue) Stop() {
|
func (cq *ConcurrentQueue) Stop() {
|
||||||
cq.quit <- struct{}{}
|
if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
close(cq.quit)
|
||||||
|
cq.wg.Wait()
|
||||||
}
|
}
|
||||||
|
@ -6,21 +6,60 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/queue"
|
"github.com/lightningnetwork/lnd/queue"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestConcurrentQueue(t *testing.T) {
|
func testQueueAddDrain(t *testing.T, size, numStart, numStop, numAdd, numDrain int) {
|
||||||
queue := queue.NewConcurrentQueue(100)
|
t.Helper()
|
||||||
|
|
||||||
|
queue := queue.NewConcurrentQueue(size)
|
||||||
|
for i := 0; i < numStart; i++ {
|
||||||
queue.Start()
|
queue.Start()
|
||||||
|
}
|
||||||
|
for i := 0; i < numStop; i++ {
|
||||||
defer queue.Stop()
|
defer queue.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
// Pushes should never block for long.
|
// Pushes should never block for long.
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < numAdd; i++ {
|
||||||
queue.ChanIn() <- i
|
queue.ChanIn() <- i
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pops also should not block for long. Expect elements in FIFO order.
|
// Pops also should not block for long. Expect elements in FIFO order.
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < numDrain; i++ {
|
||||||
item := <-queue.ChanOut()
|
item := <-queue.ChanOut()
|
||||||
if i != item.(int) {
|
if i != item.(int) {
|
||||||
t.Fatalf("Dequeued wrong value: expected %d, got %d", i, item.(int))
|
t.Fatalf("Dequeued wrong value: expected %d, got %d",
|
||||||
|
i, item.(int))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestConcurrentQueue tests that the queue properly adds 1000 items, drain all
|
||||||
|
// of them, and exit cleanly.
|
||||||
|
func TestConcurrentQueue(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testQueueAddDrain(t, 100, 1, 1, 1000, 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestConcurrentQueueEarlyStop tests that the queue properly adds 1000 items,
|
||||||
|
// drain half of them, and still exit cleanly.
|
||||||
|
func TestConcurrentQueueEarlyStop(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testQueueAddDrain(t, 100, 1, 1, 1000, 500)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestConcurrentQueueIdempotentStart asserts that calling Start multiple times
|
||||||
|
// doesn't fail, and that the queue can still exit cleanly.
|
||||||
|
func TestConcurrentQueueIdempotentStart(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testQueueAddDrain(t, 100, 10, 1, 1000, 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestConcurrentQueueIdempotentStop asserts that calling Stop multiple times
|
||||||
|
// doesn't fail, and that exiting doesn't block on subsequent Stops.
|
||||||
|
func TestConcurrentQueueIdempotentStop(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
testQueueAddDrain(t, 100, 1, 10, 1000, 1000)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user