lnd.xprv/htlcswitch/queue.go
Olaoluwa Osuntokun be5b2d46a5
htlcswitch: ensure the packetQueue can handle total+partial commitment overflows
In this commit, we’ve moved away from the internal queryHandler within
the packetQueue entirely. We now use an internal queueLen variable
internally to allow callers to sample the queue’s size, and also for
synchronization purposes internally.

This commit also introduces a chan struct{} (freeSlots) that is used
internally as a semaphore. The current value of freeSlots reflects the
number of available slots within the commitment transaction. Within the
link, after an HTLC has been removed/modified, then a “slot” is freed
up. The main packetConsumer then interprets these messages as a signal
to attempt to free up a new slot within the queue itself by dumping off
to the commitment transaction.
2017-09-25 12:47:34 -07:00

184 lines
5.9 KiB
Go

package htlcswitch
import (
"sync"
"sync/atomic"
)
// packetQueue is an goroutine-safe queue of htlc packets which over flow the
// current commitment transaction. An HTLC will overflow the current commitment
// transaction if one attempts to add a new HTLC to the state machine which
// already has the max number of pending HTLC's present on the commitment
// transaction. Packets are removed from the queue by the channelLink itself
// as additional slots become available on the commitment transaction itself.
// In order to synchronize properly we use a semaphore to allow the channelLink
// to signal the number of slots available, and a condition variable to allow
// the packetQueue to know when new items have been added to the queue.
type packetQueue struct {
// queueLen is an internal counter that reflects the size of the queue
// at any given instance. This value is intended to be use atomically
// as this value is used by internal methods to obtain the length of
// the queue w/o grabbing the main lock. This allows callers to avoid a
// deadlock situation where the main goroutine is attempting a send
// with the lock held.
queueLen int32
queueCond *sync.Cond
queueMtx sync.Mutex
queue []*htlcPacket
// outgoingPkts is a channel that the channelLink will receive on in
// order to drain the packetQueue as new slots become available on the
// commitment transaction.
outgoingPkts chan *htlcPacket
// freeSlots serves as a semaphore who's current value signals the
// number of available slots on the commitment transaction.
freeSlots chan struct{}
wg sync.WaitGroup
quit chan struct{}
}
// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots
// value should reflect the max number of HTLC's that we're allowed to have
// outstanding within the commitment transaction.
func newPacketQueue(maxFreeSlots int) *packetQueue {
p := &packetQueue{
outgoingPkts: make(chan *htlcPacket),
freeSlots: make(chan struct{}, maxFreeSlots),
quit: make(chan struct{}),
}
p.queueCond = sync.NewCond(&p.queueMtx)
return p
}
// Start starts all goroutines that packetQueue needs to perform its normal
// duties.
func (p *packetQueue) Start() {
p.wg.Add(1)
go p.packetCoordinator()
}
// Stop signals the packetQueue for a graceful shutdown, and waits for all
// goroutines to exit.
func (p *packetQueue) Stop() {
close(p.quit)
p.queueCond.Signal()
p.wg.Wait()
}
// packetCoordinator is a goroutine that handles the packet overflow queue.
// Using a synchronized queue, outside callers are able to append to the end of
// the queue, waking up the coordinator when the queue transitions from empty
// to non-empty. The packetCoordinator will then aggressively try to empty out
// the queue, passing new htlcPackets to the channelLink as slots within the
// commitment transaction become available.
//
// Future iterations of the packetCoordinator will implement congestion
// avoidance logic in the face of persistent htlcPacket back-pressure.
//
// TODO(roasbeef): later will need to add back pressure handling heuristics
// like reg congestion avoidance:
// * random dropping, RED, etc
func (p *packetQueue) packetCoordinator() {
defer p.wg.Done()
for {
// First, we'll check our condition. If the queue of packets is
// empty, then we'll wait until a new item is added.
p.queueCond.L.Lock()
for len(p.queue) == 0 {
p.queueCond.Wait()
// If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the message queue for any
// new items.
select {
case <-p.quit:
p.queueCond.L.Unlock()
return
default:
}
}
nextPkt := p.queue[0]
// If there aren't any further messages to sent (or the link
// didn't immediately read our message), then we'll block and
// wait for a new message to be sent into the overflow queue,
// or for the link's htlcForwarder to wake up.
select {
case <-p.freeSlots:
select {
case p.outgoingPkts <- nextPkt:
// Pop the item off the front of the queue and
// slide down the reference one to re-position
// the head pointer. This will set us up for
// the next iteration. If the queue is empty
// at this point, then we'll block at the top.
p.queue[0] = nil
p.queue = p.queue[1:]
atomic.AddInt32(&p.queueLen, -1)
case <-p.quit:
p.queueCond.L.Unlock()
return
}
case <-p.quit:
p.queueCond.L.Unlock()
return
default:
}
p.queueCond.L.Unlock()
}
}
// AddPkt adds the referenced packet to the overflow queue, preserving ordering
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
// First, we'll lock the condition, and add the message to the end of
// the message queue, and increment the internal atomic for tracking
// the queue's length.
p.queueCond.L.Lock()
p.queue = append(p.queue, pkt)
atomic.AddInt32(&p.queueLen, 1)
p.queueCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are
// additional messages to consume.
p.queueCond.Signal()
}
// SignalFreeSlot signals to the queue that a new slot has opened up within the
// commitment transaction. The max amount of free slots has been defined when
// initially creating the packetQueue itself. This method, combined with AddPkt
// creates the following abstraction: a synchronized queue of infinite length
// which can be added to at will, which flows onto a commitment of fixed
// capacity.
func (p *packetQueue) SignalFreeSlot() {
// We'll only send over a free slot signal if the queue *is not* empty.
// Otherwise, it's possible that we attempt to overfill the free slots
// semaphore and block indefinitely below.
if atomic.LoadInt32(&p.queueLen) == 0 {
return
}
select {
case p.freeSlots <- struct{}{}:
case <-p.quit:
return
}
}
// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int32 {
return atomic.LoadInt32(&p.queueLen)
}