queue/queue: wait group inner go routines, add idempotency

This commit is contained in:
Conner Fromknecht 2018-11-30 16:33:40 -08:00
parent 5107feb1ce
commit a4442608e6
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -2,6 +2,8 @@ package queue
import ( import (
"container/list" "container/list"
"sync"
"sync/atomic"
) )
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
@ -10,10 +12,15 @@ import (
// items from the in channel to the out channel in the correct order that must // items from the in channel to the out channel in the correct order that must
// be started by calling Start(). // be started by calling Start().
type ConcurrentQueue struct { type ConcurrentQueue struct {
started uint32 // to be used atomically
stopped uint32 // to be used atomically
chanIn chan interface{} chanIn chan interface{}
chanOut chan interface{} chanOut chan interface{}
quit chan struct{}
overflow *list.List overflow *list.List
wg sync.WaitGroup
quit chan struct{}
} }
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is // NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
@ -24,8 +31,8 @@ func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
return &ConcurrentQueue{ return &ConcurrentQueue{
chanIn: make(chan interface{}), chanIn: make(chan interface{}),
chanOut: make(chan interface{}, bufferSize), chanOut: make(chan interface{}, bufferSize),
quit: make(chan struct{}),
overflow: list.New(), overflow: list.New(),
quit: make(chan struct{}),
} }
} }
@ -44,7 +51,14 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
// minimize overhead, but if the out channel is full it pushes items to an // minimize overhead, but if the out channel is full it pushes items to an
// overflow queue. This must be called before using the queue. // overflow queue. This must be called before using the queue.
func (cq *ConcurrentQueue) Start() { func (cq *ConcurrentQueue) Start() {
if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) {
return
}
cq.wg.Add(1)
go func() { go func() {
defer cq.wg.Done()
for { for {
nextElement := cq.overflow.Front() nextElement := cq.overflow.Front()
if nextElement == nil { if nextElement == nil {
@ -82,5 +96,10 @@ func (cq *ConcurrentQueue) Start() {
// channel. This does not clear the queue state, so the queue can be restarted // channel. This does not clear the queue state, so the queue can be restarted
// without dropping items. // without dropping items.
func (cq *ConcurrentQueue) Stop() { func (cq *ConcurrentQueue) Stop() {
cq.quit <- struct{}{} if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) {
return
}
close(cq.quit)
cq.wg.Wait()
} }