package htlcswitch

import (
	"sync"
	"sync/atomic"
)

// packetQueue is an goroutine-safe queue of htlc packets which over flow the
// current commitment transaction. An HTLC will overflow the current commitment
// transaction if one attempts to add a new HTLC to the state machine which
// already has the max number of pending HTLC's present on the commitment
// transaction.  Packets are removed from the queue by the channelLink itself
// as additional slots become available on the commitment transaction itself.
// In order to synchronize properly we use a semaphore to allow the channelLink
// to signal the number of slots available, and a condition variable to allow
// the packetQueue to know when new items have been added to the queue.
type packetQueue struct {
	// queueLen is an internal counter that reflects the size of the queue
	// at any given instance. This value is intended to be use atomically
	// as this value is used by internal methods to obtain the length of
	// the queue w/o grabbing the main lock. This allows callers to avoid a
	// deadlock situation where the main goroutine is attempting a send
	// with the lock held.
	queueLen int32

	queueCond *sync.Cond
	queueMtx  sync.Mutex
	queue     []*htlcPacket

	// outgoingPkts is a channel that the channelLink will receive on in
	// order to drain the packetQueue as new slots become available on the
	// commitment transaction.
	outgoingPkts chan *htlcPacket

	// freeSlots serves as a semaphore who's current value signals the
	// number of available slots on the commitment transaction.
	freeSlots chan struct{}

	wg   sync.WaitGroup
	quit chan struct{}
}

// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots
// value should reflect the max number of HTLC's that we're allowed to have
// outstanding within the commitment transaction.
func newPacketQueue(maxFreeSlots int) *packetQueue {
	p := &packetQueue{
		outgoingPkts: make(chan *htlcPacket),
		freeSlots:    make(chan struct{}, maxFreeSlots),
		quit:         make(chan struct{}),
	}
	p.queueCond = sync.NewCond(&p.queueMtx)

	return p
}

// Start starts all goroutines that packetQueue needs to perform its normal
// duties.
func (p *packetQueue) Start() {
	p.wg.Add(1)
	go p.packetCoordinator()
}

// Stop signals the packetQueue for a graceful shutdown, and waits for all
// goroutines to exit.
func (p *packetQueue) Stop() {
	close(p.quit)

	p.queueCond.Signal()
}

// packetCoordinator is a goroutine that handles the packet overflow queue.
// Using a synchronized queue, outside callers are able to append to the end of
// the queue, waking up the coordinator when the queue transitions from empty
// to non-empty. The packetCoordinator will then aggressively try to empty out
// the queue, passing new htlcPackets to the channelLink as slots within the
// commitment transaction become available.
//
// Future iterations of the packetCoordinator will implement congestion
// avoidance logic in the face of persistent htlcPacket back-pressure.
//
// TODO(roasbeef): later will need to add back pressure handling heuristics
// like reg congestion avoidance:
//   * random dropping, RED, etc
func (p *packetQueue) packetCoordinator() {
	defer p.wg.Done()

	for {
		// First, we'll check our condition. If the queue of packets is
		// empty, then we'll wait until a new item is added.
		p.queueCond.L.Lock()
		for len(p.queue) == 0 {
			p.queueCond.Wait()

			// If we were woke up in order to exit, then we'll do
			// so. Otherwise, we'll check the message queue for any
			// new items.
			select {
			case <-p.quit:
				p.queueCond.L.Unlock()
				return
			default:
			}
		}

		nextPkt := p.queue[0]

		p.queueCond.L.Unlock()

		// If there aren't any further messages to sent (or the link
		// didn't immediately read our message), then we'll block and
		// wait for a new message to be sent into the overflow queue,
		// or for the link's htlcForwarder to wake up.
		select {
		case <-p.freeSlots:

			select {
			case p.outgoingPkts <- nextPkt:
				// Pop the item off the front of the queue and
				// slide down the reference one to re-position
				// the head pointer. This will set us up for
				// the next iteration.  If the queue is empty
				// at this point, then we'll block at the top.
				p.queueCond.L.Lock()
				p.queue[0] = nil
				p.queue = p.queue[1:]
				atomic.AddInt32(&p.queueLen, -1)
				p.queueCond.L.Unlock()
			case <-p.quit:
				return
			}

		case <-p.quit:
			return

		default:
		}
	}
}

// AddPkt adds the referenced packet to the overflow queue, preserving ordering
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
	// First, we'll lock the condition, and add the message to the end of
	// the message queue, and increment the internal atomic for tracking
	// the queue's length.
	p.queueCond.L.Lock()
	p.queue = append(p.queue, pkt)
	atomic.AddInt32(&p.queueLen, 1)
	p.queueCond.L.Unlock()

	// With the message added, we signal to the msgConsumer that there are
	// additional messages to consume.
	p.queueCond.Signal()
}

// SignalFreeSlot signals to the queue that a new slot has opened up within the
// commitment transaction. The max amount of free slots has been defined when
// initially creating the packetQueue itself. This method, combined with AddPkt
// creates the following abstraction: a synchronized queue of infinite length
// which can be added to at will, which flows onto a commitment of fixed
// capacity.
func (p *packetQueue) SignalFreeSlot() {
	// We'll only send over a free slot signal if the queue *is not* empty.
	// Otherwise, it's possible that we attempt to overfill the free slots
	// semaphore and block indefinitely below.
	if atomic.LoadInt32(&p.queueLen) == 0 {
		return
	}

	select {
	case p.freeSlots <- struct{}{}:
	case <-p.quit:
		return
	}
}

// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int32 {
	return atomic.LoadInt32(&p.queueLen)
}