lnd.xprv/htlcswitch/queue.go
Olaoluwa Osuntokun b2e24f876b
htlcswitch: ensure the packet queue exits on stop
In this commit, we ensure that the packet queue will always exit, by
continually signalling the main goroutine until it atomically sets a
bool that indicates its has been fully shutdown. It has been observed
that at times the main goroutine will wake up (due to the signal), but
then bypass the select and actually miss the quit signal, as a result
another signal is required. We'll continue to signals in a lazy loop
until the goroutine has fully exited.
2018-06-08 13:51:37 -07:00

209 lines
6.8 KiB
Go

package htlcswitch
import (
"sync"
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnwire"
)
// packetQueue is a goroutine-safe queue of htlc packets which over flow the
// current commitment transaction. An HTLC will overflow the current commitment
// transaction if one attempts to add a new HTLC to the state machine which
// already has the max number of pending HTLC's present on the commitment
// transaction. Packets are removed from the queue by the channelLink itself
// as additional slots become available on the commitment transaction itself.
// In order to synchronize properly we use a semaphore to allow the channelLink
// to signal the number of slots available, and a condition variable to allow
// the packetQueue to know when new items have been added to the queue.
type packetQueue struct {
// totalHtlcAmt is the sum of the value of all pending HTLC's currently
// residing within the overflow queue. This value should only read or
// modified *atomically*.
totalHtlcAmt int64 // To be used atomically.
// queueLen is an internal counter that reflects the size of the queue
// at any given instance. This value is intended to be use atomically
// as this value is used by internal methods to obtain the length of
// the queue w/o grabbing the main lock. This allows callers to avoid a
// deadlock situation where the main goroutine is attempting a send
// with the lock held.
queueLen int32 // To be used atomically.
streamShutdown int32 // To be used atomically.
queue []*htlcPacket
wg sync.WaitGroup
// freeSlots serves as a semaphore who's current value signals the
// number of available slots on the commitment transaction.
freeSlots chan struct{}
queueCond *sync.Cond
queueMtx sync.Mutex
// outgoingPkts is a channel that the channelLink will receive on in
// order to drain the packetQueue as new slots become available on the
// commitment transaction.
outgoingPkts chan *htlcPacket
quit chan struct{}
}
// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots
// value should reflect the max number of HTLC's that we're allowed to have
// outstanding within the commitment transaction.
func newPacketQueue(maxFreeSlots int) *packetQueue {
p := &packetQueue{
outgoingPkts: make(chan *htlcPacket),
freeSlots: make(chan struct{}, maxFreeSlots),
quit: make(chan struct{}),
}
p.queueCond = sync.NewCond(&p.queueMtx)
return p
}
// Start starts all goroutines that packetQueue needs to perform its normal
// duties.
func (p *packetQueue) Start() {
p.wg.Add(1)
go p.packetCoordinator()
}
// Stop signals the packetQueue for a graceful shutdown, and waits for all
// goroutines to exit.
func (p *packetQueue) Stop() {
close(p.quit)
// Now that we've closed the channel, we'll repeatedly signal the msg
// consumer until we've detected that it has exited.
for atomic.LoadInt32(&p.streamShutdown) == 0 {
p.queueCond.Signal()
time.Sleep(time.Millisecond * 100)
}
}
// packetCoordinator is a goroutine that handles the packet overflow queue.
// Using a synchronized queue, outside callers are able to append to the end of
// the queue, waking up the coordinator when the queue transitions from empty
// to non-empty. The packetCoordinator will then aggressively try to empty out
// the queue, passing new htlcPackets to the channelLink as slots within the
// commitment transaction become available.
//
// Future iterations of the packetCoordinator will implement congestion
// avoidance logic in the face of persistent htlcPacket back-pressure.
//
// TODO(roasbeef): later will need to add back pressure handling heuristics
// like reg congestion avoidance:
// * random dropping, RED, etc
func (p *packetQueue) packetCoordinator() {
defer atomic.StoreInt32(&p.streamShutdown, 1)
for {
// First, we'll check our condition. If the queue of packets is
// empty, then we'll wait until a new item is added.
p.queueCond.L.Lock()
for len(p.queue) == 0 {
p.queueCond.Wait()
// If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the message queue for any
// new items.
select {
case <-p.quit:
p.queueCond.L.Unlock()
return
default:
}
}
nextPkt := p.queue[0]
p.queueCond.L.Unlock()
// If there aren't any further messages to sent (or the link
// didn't immediately read our message), then we'll block and
// wait for a new message to be sent into the overflow queue,
// or for the link's htlcForwarder to wake up.
select {
case <-p.freeSlots:
select {
case p.outgoingPkts <- nextPkt:
// Pop the item off the front of the queue and
// slide down the reference one to re-position
// the head pointer. This will set us up for
// the next iteration. If the queue is empty
// at this point, then we'll block at the top.
p.queueCond.L.Lock()
p.queue[0] = nil
p.queue = p.queue[1:]
atomic.AddInt32(&p.queueLen, -1)
atomic.AddInt64(&p.totalHtlcAmt, int64(-nextPkt.amount))
p.queueCond.L.Unlock()
case <-p.quit:
return
}
case <-p.quit:
return
default:
}
}
}
// AddPkt adds the referenced packet to the overflow queue, preserving ordering
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
// First, we'll lock the condition, and add the message to the end of
// the message queue, and increment the internal atomic for tracking
// the queue's length.
p.queueCond.L.Lock()
p.queue = append(p.queue, pkt)
atomic.AddInt32(&p.queueLen, 1)
atomic.AddInt64(&p.totalHtlcAmt, int64(pkt.amount))
p.queueCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are
// additional messages to consume.
p.queueCond.Signal()
}
// SignalFreeSlot signals to the queue that a new slot has opened up within the
// commitment transaction. The max amount of free slots has been defined when
// initially creating the packetQueue itself. This method, combined with AddPkt
// creates the following abstraction: a synchronized queue of infinite length
// which can be added to at will, which flows onto a commitment of fixed
// capacity.
func (p *packetQueue) SignalFreeSlot() {
// We'll only send over a free slot signal if the queue *is not* empty.
// Otherwise, it's possible that we attempt to overfill the free slots
// semaphore and block indefinitely below.
if atomic.LoadInt32(&p.queueLen) == 0 {
return
}
select {
case p.freeSlots <- struct{}{}:
case <-p.quit:
return
}
}
// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int32 {
return atomic.LoadInt32(&p.queueLen)
}
// TotalHtlcAmount is the total amount (in mSAT) of all HTLC's currently
// residing within the overflow queue.
func (p *packetQueue) TotalHtlcAmount() lnwire.MilliSatoshi {
// TODO(roasbeef): also factor in fee rate?
return lnwire.MilliSatoshi(atomic.LoadInt64(&p.totalHtlcAmt))
}