From be5b2d46a57fde27f9984132a1b81717018d08b8 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 25 Sep 2017 12:47:28 -0700 Subject: [PATCH] htlcswitch: ensure the packetQueue can handle total+partial commitment overflows MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we’ve moved away from the internal queryHandler within the packetQueue entirely. We now use an internal queueLen variable internally to allow callers to sample the queue’s size, and also for synchronization purposes internally. This commit also introduces a chan struct{} (freeSlots) that is used internally as a semaphore. The current value of freeSlots reflects the number of available slots within the commitment transaction. Within the link, after an HTLC has been removed/modified, then a “slot” is freed up. The main packetConsumer then interprets these messages as a signal to attempt to free up a new slot within the queue itself by dumping off to the commitment transaction. --- htlcswitch/link.go | 12 +++--- htlcswitch/queue.go | 81 ++++++++++++++++++++++++++++++---------- htlcswitch/queue_test.go | 7 +++- 3 files changed, 73 insertions(+), 27 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d9433542..8eca6de7 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -240,13 +240,13 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, upstream: make(chan lnwire.Message), downstream: make(chan *htlcPacket), linkControl: make(chan interface{}), - cancelReasons: make(map[uint64]lnwire.OpaqueReason), - logCommitTimer: time.NewTimer(300 * time.Millisecond), - overflowQueue: newPacketQueue(), - bestHeight: currentHeight, - quit: make(chan struct{}), // TODO(roasbeef): just do reserve here? availableBandwidth: uint64(channel.StateSnapshot().LocalBalance), + cancelReasons: make(map[uint64]lnwire.OpaqueReason), + logCommitTimer: time.NewTimer(300 * time.Millisecond), + overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), + bestHeight: currentHeight, + quit: make(chan struct{}), } } @@ -1032,6 +1032,7 @@ func (l *channelLink) processLockedInHtlcs( // notify the overflow queue that a spare spot has been // freed up within the commitment state. packetsToForward = append(packetsToForward, settlePacket) + l.overflowQueue.SignalFreeSlot() // A failureCode message for a previously forwarded HTLC has been // received. As a result a new slot will be freed up in our @@ -1053,6 +1054,7 @@ func (l *channelLink) processLockedInHtlcs( // notify the overflow queue that a spare spot has been // freed up within the commitment state. packetsToForward = append(packetsToForward, failPacket) + l.overflowQueue.SignalFreeSlot() // An incoming HTLC add has been full-locked in. As a result we // can no examine the forwarding details of the HTLC, and the diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go index ec1e5d59..262824ed 100644 --- a/htlcswitch/queue.go +++ b/htlcswitch/queue.go @@ -2,17 +2,27 @@ package htlcswitch import ( "sync" - - "github.com/lightningnetwork/lnd/lnwire" + "sync/atomic" ) // packetQueue is an goroutine-safe queue of htlc packets which over flow the // current commitment transaction. An HTLC will overflow the current commitment -// transaction if it is attempted to be added to a state machine which already -// has the max number of pending HTLC's present on the commitment transaction. -// Packets are removed from the queue by the channelLink itself as additional -// slots become available on the commitment transaction itself. +// transaction if one attempts to add a new HTLC to the state machine which +// already has the max number of pending HTLC's present on the commitment +// transaction. Packets are removed from the queue by the channelLink itself +// as additional slots become available on the commitment transaction itself. +// In order to synchronize properly we use a semaphore to allow the channelLink +// to signal the number of slots available, and a condition variable to allow +// the packetQueue to know when new items have been added to the queue. type packetQueue struct { + // queueLen is an internal counter that reflects the size of the queue + // at any given instance. This value is intended to be use atomically + // as this value is used by internal methods to obtain the length of + // the queue w/o grabbing the main lock. This allows callers to avoid a + // deadlock situation where the main goroutine is attempting a send + // with the lock held. + queueLen int32 + queueCond *sync.Cond queueMtx sync.Mutex queue []*htlcPacket @@ -22,18 +32,22 @@ type packetQueue struct { // commitment transaction. outgoingPkts chan *htlcPacket + // freeSlots serves as a semaphore who's current value signals the + // number of available slots on the commitment transaction. + freeSlots chan struct{} wg sync.WaitGroup quit chan struct{} } -// newPacketQueue returns a new instance of the packetQueue. -func newPacketQueue() *packetQueue { +// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots +// value should reflect the max number of HTLC's that we're allowed to have +// outstanding within the commitment transaction. +func newPacketQueue(maxFreeSlots int) *packetQueue { p := &packetQueue{ outgoingPkts: make(chan *htlcPacket), - - - quit: make(chan struct{}), + freeSlots: make(chan struct{}, maxFreeSlots), + quit: make(chan struct{}), } p.queueCond = sync.NewCond(&p.queueMtx) @@ -98,14 +112,21 @@ func (p *packetQueue) packetCoordinator() { // wait for a new message to be sent into the overflow queue, // or for the link's htlcForwarder to wake up. select { - case p.outgoingPkts <- nextPkt: - // Pop the item off the front of the queue and slide - // down the reference one to re-position the head - // pointer. This will set us up for the next iteration. - // If the queue is empty at this point, then we'll - // block at the top. - p.queue[0] = nil - p.queue = p.queue[1:] + case <-p.freeSlots: + select { + case p.outgoingPkts <- nextPkt: + // Pop the item off the front of the queue and + // slide down the reference one to re-position + // the head pointer. This will set us up for + // the next iteration. If the queue is empty + // at this point, then we'll block at the top. + p.queue[0] = nil + p.queue = p.queue[1:] + atomic.AddInt32(&p.queueLen, -1) + case <-p.quit: + p.queueCond.L.Unlock() + return + } case <-p.quit: p.queueCond.L.Unlock() @@ -122,9 +143,11 @@ func (p *packetQueue) packetCoordinator() { // of the existing items. func (p *packetQueue) AddPkt(pkt *htlcPacket) { // First, we'll lock the condition, and add the message to the end of - // the message queue. + // the message queue, and increment the internal atomic for tracking + // the queue's length. p.queueCond.L.Lock() p.queue = append(p.queue, pkt) + atomic.AddInt32(&p.queueLen, 1) p.queueCond.L.Unlock() // With the message added, we signal to the msgConsumer that there are @@ -132,11 +155,29 @@ func (p *packetQueue) AddPkt(pkt *htlcPacket) { p.queueCond.Signal() } +// SignalFreeSlot signals to the queue that a new slot has opened up within the +// commitment transaction. The max amount of free slots has been defined when +// initially creating the packetQueue itself. This method, combined with AddPkt +// creates the following abstraction: a synchronized queue of infinite length +// which can be added to at will, which flows onto a commitment of fixed +// capacity. +func (p *packetQueue) SignalFreeSlot() { + // We'll only send over a free slot signal if the queue *is not* empty. + // Otherwise, it's possible that we attempt to overfill the free slots + // semaphore and block indefinitely below. + if atomic.LoadInt32(&p.queueLen) == 0 { + return } select { + case p.freeSlots <- struct{}{}: case <-p.quit: + return } } +// Length returns the number of pending htlc packets present within the over +// flow queue. +func (p *packetQueue) Length() int32 { + return atomic.LoadInt32(&p.queueLen) } diff --git a/htlcswitch/queue_test.go b/htlcswitch/queue_test.go index 8b18e3d7..a8e59ec1 100644 --- a/htlcswitch/queue_test.go +++ b/htlcswitch/queue_test.go @@ -14,11 +14,12 @@ import ( func TestWaitingQueueThreadSafety(t *testing.T) { t.Parallel() - q := newPacketQueue() + const numPkts = 1000 + + q := newPacketQueue(numPkts) q.Start() defer q.Stop() - const numPkts = 1000 a := make([]lnwire.MilliSatoshi, numPkts) for i := 0; i < numPkts; i++ { a[i] = lnwire.MilliSatoshi(i) @@ -30,6 +31,8 @@ func TestWaitingQueueThreadSafety(t *testing.T) { var b []lnwire.MilliSatoshi for i := 0; i < numPkts; i++ { + q.SignalFreeSlot() + select { case packet := <-q.outgoingPkts: b = append(b, packet.amount)