queue: detect close of incoming channel

This commit is contained in:
Joost Jager 2020-04-06 14:47:49 +02:00
parent 278915e598
commit f907fbcadc
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
2 changed files with 47 additions and 2 deletions

View File

@ -58,6 +58,7 @@ func (cq *ConcurrentQueue) start() {
go func() { go func() {
defer cq.wg.Done() defer cq.wg.Done()
readLoop:
for { for {
nextElement := cq.overflow.Front() nextElement := cq.overflow.Front()
if nextElement == nil { if nextElement == nil {
@ -65,7 +66,10 @@ func (cq *ConcurrentQueue) start() {
// directly to the output channel. If output channel is full // directly to the output channel. If output channel is full
// though, push to overflow. // though, push to overflow.
select { select {
case item := <-cq.chanIn: case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
select { select {
case cq.chanOut <- item: case cq.chanOut <- item:
// Optimistically push directly to chanOut // Optimistically push directly to chanOut
@ -79,7 +83,10 @@ func (cq *ConcurrentQueue) start() {
// Overflow queue is not empty, so any new items get pushed to // Overflow queue is not empty, so any new items get pushed to
// the back to preserve order. // the back to preserve order.
select { select {
case item := <-cq.chanIn: case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
cq.overflow.PushBack(item) cq.overflow.PushBack(item)
case cq.chanOut <- nextElement.Value: case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement) cq.overflow.Remove(nextElement)
@ -88,6 +95,22 @@ func (cq *ConcurrentQueue) start() {
} }
} }
} }
// Incoming channel has been closed. Empty overflow queue into
// the outgoing channel.
nextElement := cq.overflow.Front()
for nextElement != nil {
select {
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
case <-cq.quit:
return
}
nextElement = cq.overflow.Front()
}
// Close outgoing channel.
close(cq.chanOut)
}() }()
} }

View File

@ -63,3 +63,25 @@ func TestConcurrentQueueIdempotentStop(t *testing.T) {
testQueueAddDrain(t, 100, 1, 10, 1000, 1000) testQueueAddDrain(t, 100, 1, 10, 1000, 1000)
} }
// TestQueueCloseIncoming tests that the queue properly handles an incoming
// channel that is closed.
func TestQueueCloseIncoming(t *testing.T) {
t.Parallel()
queue := queue.NewConcurrentQueue(10)
queue.Start()
queue.ChanIn() <- 1
close(queue.ChanIn())
item := <-queue.ChanOut()
if item.(int) != 1 {
t.Fatalf("unexpected item")
}
_, ok := <-queue.ChanOut()
if ok {
t.Fatalf("expected outgoing channel being closed")
}
}