diff --git a/queue/queue.go b/queue/queue.go index 497e6f7e..07eca3fa 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -2,6 +2,8 @@ package queue import ( "container/list" + "sync" + "sync/atomic" ) // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. @@ -10,10 +12,15 @@ import ( // items from the in channel to the out channel in the correct order that must // be started by calling Start(). type ConcurrentQueue struct { + started uint32 // to be used atomically + stopped uint32 // to be used atomically + chanIn chan interface{} chanOut chan interface{} - quit chan struct{} overflow *list.List + + wg sync.WaitGroup + quit chan struct{} } // NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is @@ -24,8 +31,8 @@ func NewConcurrentQueue(bufferSize int) *ConcurrentQueue { return &ConcurrentQueue{ chanIn: make(chan interface{}), chanOut: make(chan interface{}, bufferSize), - quit: make(chan struct{}), overflow: list.New(), + quit: make(chan struct{}), } } @@ -44,7 +51,14 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} { // minimize overhead, but if the out channel is full it pushes items to an // overflow queue. This must be called before using the queue. func (cq *ConcurrentQueue) Start() { + if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) { + return + } + + cq.wg.Add(1) go func() { + defer cq.wg.Done() + for { nextElement := cq.overflow.Front() if nextElement == nil { @@ -82,5 +96,10 @@ func (cq *ConcurrentQueue) Start() { // channel. This does not clear the queue state, so the queue can be restarted // without dropping items. func (cq *ConcurrentQueue) Stop() { - cq.quit <- struct{}{} + if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) { + return + } + + close(cq.quit) + cq.wg.Wait() }