From a4442608e6fd20a04e48b9d4a17178d880304612 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Nov 2018 16:33:40 -0800 Subject: [PATCH 1/2] queue/queue: wait group inner go routines, add idempotency --- queue/queue.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 497e6f7e..07eca3fa 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -2,6 +2,8 @@ package queue import ( "container/list" + "sync" + "sync/atomic" ) // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. @@ -10,10 +12,15 @@ import ( // items from the in channel to the out channel in the correct order that must // be started by calling Start(). type ConcurrentQueue struct { + started uint32 // to be used atomically + stopped uint32 // to be used atomically + chanIn chan interface{} chanOut chan interface{} - quit chan struct{} overflow *list.List + + wg sync.WaitGroup + quit chan struct{} } // NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is @@ -24,8 +31,8 @@ func NewConcurrentQueue(bufferSize int) *ConcurrentQueue { return &ConcurrentQueue{ chanIn: make(chan interface{}), chanOut: make(chan interface{}, bufferSize), - quit: make(chan struct{}), overflow: list.New(), + quit: make(chan struct{}), } } @@ -44,7 +51,14 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} { // minimize overhead, but if the out channel is full it pushes items to an // overflow queue. This must be called before using the queue. func (cq *ConcurrentQueue) Start() { + if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) { + return + } + + cq.wg.Add(1) go func() { + defer cq.wg.Done() + for { nextElement := cq.overflow.Front() if nextElement == nil { @@ -82,5 +96,10 @@ func (cq *ConcurrentQueue) Start() { // channel. This does not clear the queue state, so the queue can be restarted // without dropping items. func (cq *ConcurrentQueue) Stop() { - cq.quit <- struct{}{} + if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) { + return + } + + close(cq.quit) + cq.wg.Wait() } From c27b90f76c6d495ff5b0c52614580b04363417cc Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Nov 2018 16:37:01 -0800 Subject: [PATCH 2/2] queue/queue_test: adds 100% test coverage --- queue/queue_test.go | 53 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 7 deletions(-) diff --git a/queue/queue_test.go b/queue/queue_test.go index 37d8a291..9aee0cfb 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -6,21 +6,60 @@ import ( "github.com/lightningnetwork/lnd/queue" ) -func TestConcurrentQueue(t *testing.T) { - queue := queue.NewConcurrentQueue(100) - queue.Start() - defer queue.Stop() +func testQueueAddDrain(t *testing.T, size, numStart, numStop, numAdd, numDrain int) { + t.Helper() + + queue := queue.NewConcurrentQueue(size) + for i := 0; i < numStart; i++ { + queue.Start() + } + for i := 0; i < numStop; i++ { + defer queue.Stop() + } // Pushes should never block for long. - for i := 0; i < 1000; i++ { + for i := 0; i < numAdd; i++ { queue.ChanIn() <- i } // Pops also should not block for long. Expect elements in FIFO order. - for i := 0; i < 1000; i++ { + for i := 0; i < numDrain; i++ { item := <-queue.ChanOut() if i != item.(int) { - t.Fatalf("Dequeued wrong value: expected %d, got %d", i, item.(int)) + t.Fatalf("Dequeued wrong value: expected %d, got %d", + i, item.(int)) } } } + +// TestConcurrentQueue tests that the queue properly adds 1000 items, drain all +// of them, and exit cleanly. +func TestConcurrentQueue(t *testing.T) { + t.Parallel() + + testQueueAddDrain(t, 100, 1, 1, 1000, 1000) +} + +// TestConcurrentQueueEarlyStop tests that the queue properly adds 1000 items, +// drain half of them, and still exit cleanly. +func TestConcurrentQueueEarlyStop(t *testing.T) { + t.Parallel() + + testQueueAddDrain(t, 100, 1, 1, 1000, 500) +} + +// TestConcurrentQueueIdempotentStart asserts that calling Start multiple times +// doesn't fail, and that the queue can still exit cleanly. +func TestConcurrentQueueIdempotentStart(t *testing.T) { + t.Parallel() + + testQueueAddDrain(t, 100, 10, 1, 1000, 1000) +} + +// TestConcurrentQueueIdempotentStop asserts that calling Stop multiple times +// doesn't fail, and that exiting doesn't block on subsequent Stops. +func TestConcurrentQueueIdempotentStop(t *testing.T) { + t.Parallel() + + testQueueAddDrain(t, 100, 1, 10, 1000, 1000) +}