htlcswitch: ensure the packetQueue can handle total+partial commitment overflows

In this commit, we’ve moved away from the internal queryHandler within
the packetQueue entirely. We now use an internal queueLen variable
internally to allow callers to sample the queue’s size, and also for
synchronization purposes internally.

This commit also introduces a chan struct{} (freeSlots) that is used
internally as a semaphore. The current value of freeSlots reflects the
number of available slots within the commitment transaction. Within the
link, after an HTLC has been removed/modified, then a “slot” is freed
up. The main packetConsumer then interprets these messages as a signal
to attempt to free up a new slot within the queue itself by dumping off
to the commitment transaction.
This commit is contained in:
Olaoluwa Osuntokun 2017-09-25 12:47:28 -07:00
parent 210fc6e714
commit be5b2d46a5
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
3 changed files with 73 additions and 27 deletions

@ -240,13 +240,13 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
upstream: make(chan lnwire.Message),
downstream: make(chan *htlcPacket),
linkControl: make(chan interface{}),
cancelReasons: make(map[uint64]lnwire.OpaqueReason),
logCommitTimer: time.NewTimer(300 * time.Millisecond),
overflowQueue: newPacketQueue(),
bestHeight: currentHeight,
quit: make(chan struct{}),
// TODO(roasbeef): just do reserve here?
availableBandwidth: uint64(channel.StateSnapshot().LocalBalance),
cancelReasons: make(map[uint64]lnwire.OpaqueReason),
logCommitTimer: time.NewTimer(300 * time.Millisecond),
overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2),
bestHeight: currentHeight,
quit: make(chan struct{}),
}
}
@ -1032,6 +1032,7 @@ func (l *channelLink) processLockedInHtlcs(
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
packetsToForward = append(packetsToForward, settlePacket)
l.overflowQueue.SignalFreeSlot()
// A failureCode message for a previously forwarded HTLC has been
// received. As a result a new slot will be freed up in our
@ -1053,6 +1054,7 @@ func (l *channelLink) processLockedInHtlcs(
// notify the overflow queue that a spare spot has been
// freed up within the commitment state.
packetsToForward = append(packetsToForward, failPacket)
l.overflowQueue.SignalFreeSlot()
// An incoming HTLC add has been full-locked in. As a result we
// can no examine the forwarding details of the HTLC, and the

@ -2,17 +2,27 @@ package htlcswitch
import (
"sync"
"github.com/lightningnetwork/lnd/lnwire"
"sync/atomic"
)
// packetQueue is an goroutine-safe queue of htlc packets which over flow the
// current commitment transaction. An HTLC will overflow the current commitment
// transaction if it is attempted to be added to a state machine which already
// has the max number of pending HTLC's present on the commitment transaction.
// Packets are removed from the queue by the channelLink itself as additional
// slots become available on the commitment transaction itself.
// transaction if one attempts to add a new HTLC to the state machine which
// already has the max number of pending HTLC's present on the commitment
// transaction. Packets are removed from the queue by the channelLink itself
// as additional slots become available on the commitment transaction itself.
// In order to synchronize properly we use a semaphore to allow the channelLink
// to signal the number of slots available, and a condition variable to allow
// the packetQueue to know when new items have been added to the queue.
type packetQueue struct {
// queueLen is an internal counter that reflects the size of the queue
// at any given instance. This value is intended to be use atomically
// as this value is used by internal methods to obtain the length of
// the queue w/o grabbing the main lock. This allows callers to avoid a
// deadlock situation where the main goroutine is attempting a send
// with the lock held.
queueLen int32
queueCond *sync.Cond
queueMtx sync.Mutex
queue []*htlcPacket
@ -22,17 +32,21 @@ type packetQueue struct {
// commitment transaction.
outgoingPkts chan *htlcPacket
// freeSlots serves as a semaphore who's current value signals the
// number of available slots on the commitment transaction.
freeSlots chan struct{}
wg sync.WaitGroup
quit chan struct{}
}
// newPacketQueue returns a new instance of the packetQueue.
func newPacketQueue() *packetQueue {
// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots
// value should reflect the max number of HTLC's that we're allowed to have
// outstanding within the commitment transaction.
func newPacketQueue(maxFreeSlots int) *packetQueue {
p := &packetQueue{
outgoingPkts: make(chan *htlcPacket),
freeSlots: make(chan struct{}, maxFreeSlots),
quit: make(chan struct{}),
}
p.queueCond = sync.NewCond(&p.queueMtx)
@ -97,15 +111,22 @@ func (p *packetQueue) packetCoordinator() {
// didn't immediately read our message), then we'll block and
// wait for a new message to be sent into the overflow queue,
// or for the link's htlcForwarder to wake up.
select {
case <-p.freeSlots:
select {
case p.outgoingPkts <- nextPkt:
// Pop the item off the front of the queue and slide
// down the reference one to re-position the head
// pointer. This will set us up for the next iteration.
// If the queue is empty at this point, then we'll
// block at the top.
// Pop the item off the front of the queue and
// slide down the reference one to re-position
// the head pointer. This will set us up for
// the next iteration. If the queue is empty
// at this point, then we'll block at the top.
p.queue[0] = nil
p.queue = p.queue[1:]
atomic.AddInt32(&p.queueLen, -1)
case <-p.quit:
p.queueCond.L.Unlock()
return
}
case <-p.quit:
p.queueCond.L.Unlock()
@ -122,9 +143,11 @@ func (p *packetQueue) packetCoordinator() {
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
// First, we'll lock the condition, and add the message to the end of
// the message queue.
// the message queue, and increment the internal atomic for tracking
// the queue's length.
p.queueCond.L.Lock()
p.queue = append(p.queue, pkt)
atomic.AddInt32(&p.queueLen, 1)
p.queueCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are
@ -132,11 +155,29 @@ func (p *packetQueue) AddPkt(pkt *htlcPacket) {
p.queueCond.Signal()
}
// SignalFreeSlot signals to the queue that a new slot has opened up within the
// commitment transaction. The max amount of free slots has been defined when
// initially creating the packetQueue itself. This method, combined with AddPkt
// creates the following abstraction: a synchronized queue of infinite length
// which can be added to at will, which flows onto a commitment of fixed
// capacity.
func (p *packetQueue) SignalFreeSlot() {
// We'll only send over a free slot signal if the queue *is not* empty.
// Otherwise, it's possible that we attempt to overfill the free slots
// semaphore and block indefinitely below.
if atomic.LoadInt32(&p.queueLen) == 0 {
return
}
select {
case p.freeSlots <- struct{}{}:
case <-p.quit:
return
}
}
// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int32 {
return atomic.LoadInt32(&p.queueLen)
}

@ -14,11 +14,12 @@ import (
func TestWaitingQueueThreadSafety(t *testing.T) {
t.Parallel()
q := newPacketQueue()
const numPkts = 1000
q := newPacketQueue(numPkts)
q.Start()
defer q.Stop()
const numPkts = 1000
a := make([]lnwire.MilliSatoshi, numPkts)
for i := 0; i < numPkts; i++ {
a[i] = lnwire.MilliSatoshi(i)
@ -30,6 +31,8 @@ func TestWaitingQueueThreadSafety(t *testing.T) {
var b []lnwire.MilliSatoshi
for i := 0; i < numPkts; i++ {
q.SignalFreeSlot()
select {
case packet := <-q.outgoingPkts:
b = append(b, packet.amount)