From a4442608e6fd20a04e48b9d4a17178d880304612 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 30 Nov 2018 16:33:40 -0800 Subject: [PATCH] queue/queue: wait group inner go routines, add idempotency --- queue/queue.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 497e6f7e..07eca3fa 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -2,6 +2,8 @@ package queue import ( "container/list" + "sync" + "sync/atomic" ) // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. @@ -10,10 +12,15 @@ import ( // items from the in channel to the out channel in the correct order that must // be started by calling Start(). type ConcurrentQueue struct { + started uint32 // to be used atomically + stopped uint32 // to be used atomically + chanIn chan interface{} chanOut chan interface{} - quit chan struct{} overflow *list.List + + wg sync.WaitGroup + quit chan struct{} } // NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is @@ -24,8 +31,8 @@ func NewConcurrentQueue(bufferSize int) *ConcurrentQueue { return &ConcurrentQueue{ chanIn: make(chan interface{}), chanOut: make(chan interface{}, bufferSize), - quit: make(chan struct{}), overflow: list.New(), + quit: make(chan struct{}), } } @@ -44,7 +51,14 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} { // minimize overhead, but if the out channel is full it pushes items to an // overflow queue. This must be called before using the queue. func (cq *ConcurrentQueue) Start() { + if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) { + return + } + + cq.wg.Add(1) go func() { + defer cq.wg.Done() + for { nextElement := cq.overflow.Front() if nextElement == nil { @@ -82,5 +96,10 @@ func (cq *ConcurrentQueue) Start() { // channel. This does not clear the queue state, so the queue can be restarted // without dropping items. func (cq *ConcurrentQueue) Stop() { - cq.quit <- struct{}{} + if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) { + return + } + + close(cq.quit) + cq.wg.Wait() }