package chainntnfs

import (
	"container/list"
)

// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
// Clients interact with the queue by pushing items into the in channel and
// popping items from the out channel. There is a goroutine that manages moving
// items from the in channel to the out channel in the correct order that must
// be started by calling Start().
type ConcurrentQueue struct {
	chanIn   chan interface{}
	chanOut  chan interface{}
	quit     chan struct{}
	overflow *list.List
}

// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
// the capacity of the output channel. When the size of the queue is below this
// threshold, pushes do not incur the overhead of the less efficient overflow
// structure.
func NewConcurrentQueue(bufferSize int) *ConcurrentQueue {
	return &ConcurrentQueue{
		chanIn:   make(chan interface{}),
		chanOut:  make(chan interface{}, bufferSize),
		quit:     make(chan struct{}),
		overflow: list.New(),
	}
}

// ChanIn returns a channel that can be used to push new items into the queue.
func (cq *ConcurrentQueue) ChanIn() chan<- interface{} {
	return cq.chanIn
}

// ChanOut returns a channel that can be used to pop items from the queue.
func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
	return cq.chanOut
}

// Start begins a goroutine that manages moving items from the in channel to the
// out channel. The queue tries to move items directly to the out channel
// minimize overhead, but if the out channel is full it pushes items to an
// overflow queue. This must be called before using the queue.
func (cq *ConcurrentQueue) Start() {
	go func() {
		for {
			nextElement := cq.overflow.Front()
			if nextElement == nil {
				// Overflow queue is empty so incoming items can be pushed
				// directly to the output channel. If output channel is full
				// though, push to overflow.
				select {
				case item := <-cq.chanIn:
					select {
					case cq.chanOut <- item:
						// Optimistically push directly to chanOut
					default:
						cq.overflow.PushBack(item)
					}
				case <-cq.quit:
					return
				}
			} else {
				// Overflow queue is not empty, so any new items get pushed to
				// the back to preserve order.
				select {
				case item := <-cq.chanIn:
					cq.overflow.PushBack(item)
				case cq.chanOut <- nextElement.Value:
					cq.overflow.Remove(nextElement)
				case <-cq.quit:
					return
				}
			}
		}
	}()
}

// Stop ends the goroutine that moves items from the in channel to the out
// channel. This does not clear the queue state, so the queue can be restarted
// without dropping items.
func (cq *ConcurrentQueue) Stop() {
	cq.quit <- struct{}{}
}