diff --git a/htlcswitch/link.go b/htlcswitch/link.go index d9433542..8eca6de7 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -240,13 +240,13 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, upstream: make(chan lnwire.Message), downstream: make(chan *htlcPacket), linkControl: make(chan interface{}), - cancelReasons: make(map[uint64]lnwire.OpaqueReason), - logCommitTimer: time.NewTimer(300 * time.Millisecond), - overflowQueue: newPacketQueue(), - bestHeight: currentHeight, - quit: make(chan struct{}), // TODO(roasbeef): just do reserve here? availableBandwidth: uint64(channel.StateSnapshot().LocalBalance), + cancelReasons: make(map[uint64]lnwire.OpaqueReason), + logCommitTimer: time.NewTimer(300 * time.Millisecond), + overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), + bestHeight: currentHeight, + quit: make(chan struct{}), } } @@ -1032,6 +1032,7 @@ func (l *channelLink) processLockedInHtlcs( // notify the overflow queue that a spare spot has been // freed up within the commitment state. packetsToForward = append(packetsToForward, settlePacket) + l.overflowQueue.SignalFreeSlot() // A failureCode message for a previously forwarded HTLC has been // received. As a result a new slot will be freed up in our @@ -1053,6 +1054,7 @@ func (l *channelLink) processLockedInHtlcs( // notify the overflow queue that a spare spot has been // freed up within the commitment state. packetsToForward = append(packetsToForward, failPacket) + l.overflowQueue.SignalFreeSlot() // An incoming HTLC add has been full-locked in. As a result we // can no examine the forwarding details of the HTLC, and the diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go index ec1e5d59..262824ed 100644 --- a/htlcswitch/queue.go +++ b/htlcswitch/queue.go @@ -2,17 +2,27 @@ package htlcswitch import ( "sync" - - "github.com/lightningnetwork/lnd/lnwire" + "sync/atomic" ) // packetQueue is an goroutine-safe queue of htlc packets which over flow the // current commitment transaction. An HTLC will overflow the current commitment -// transaction if it is attempted to be added to a state machine which already -// has the max number of pending HTLC's present on the commitment transaction. -// Packets are removed from the queue by the channelLink itself as additional -// slots become available on the commitment transaction itself. +// transaction if one attempts to add a new HTLC to the state machine which +// already has the max number of pending HTLC's present on the commitment +// transaction. Packets are removed from the queue by the channelLink itself +// as additional slots become available on the commitment transaction itself. +// In order to synchronize properly we use a semaphore to allow the channelLink +// to signal the number of slots available, and a condition variable to allow +// the packetQueue to know when new items have been added to the queue. type packetQueue struct { + // queueLen is an internal counter that reflects the size of the queue + // at any given instance. This value is intended to be use atomically + // as this value is used by internal methods to obtain the length of + // the queue w/o grabbing the main lock. This allows callers to avoid a + // deadlock situation where the main goroutine is attempting a send + // with the lock held. + queueLen int32 + queueCond *sync.Cond queueMtx sync.Mutex queue []*htlcPacket @@ -22,18 +32,22 @@ type packetQueue struct { // commitment transaction. outgoingPkts chan *htlcPacket + // freeSlots serves as a semaphore who's current value signals the + // number of available slots on the commitment transaction. + freeSlots chan struct{} wg sync.WaitGroup quit chan struct{} } -// newPacketQueue returns a new instance of the packetQueue. -func newPacketQueue() *packetQueue { +// newPacketQueue returns a new instance of the packetQueue. The maxFreeSlots +// value should reflect the max number of HTLC's that we're allowed to have +// outstanding within the commitment transaction. +func newPacketQueue(maxFreeSlots int) *packetQueue { p := &packetQueue{ outgoingPkts: make(chan *htlcPacket), - - - quit: make(chan struct{}), + freeSlots: make(chan struct{}, maxFreeSlots), + quit: make(chan struct{}), } p.queueCond = sync.NewCond(&p.queueMtx) @@ -98,14 +112,21 @@ func (p *packetQueue) packetCoordinator() { // wait for a new message to be sent into the overflow queue, // or for the link's htlcForwarder to wake up. select { - case p.outgoingPkts <- nextPkt: - // Pop the item off the front of the queue and slide - // down the reference one to re-position the head - // pointer. This will set us up for the next iteration. - // If the queue is empty at this point, then we'll - // block at the top. - p.queue[0] = nil - p.queue = p.queue[1:] + case <-p.freeSlots: + select { + case p.outgoingPkts <- nextPkt: + // Pop the item off the front of the queue and + // slide down the reference one to re-position + // the head pointer. This will set us up for + // the next iteration. If the queue is empty + // at this point, then we'll block at the top. + p.queue[0] = nil + p.queue = p.queue[1:] + atomic.AddInt32(&p.queueLen, -1) + case <-p.quit: + p.queueCond.L.Unlock() + return + } case <-p.quit: p.queueCond.L.Unlock() @@ -122,9 +143,11 @@ func (p *packetQueue) packetCoordinator() { // of the existing items. func (p *packetQueue) AddPkt(pkt *htlcPacket) { // First, we'll lock the condition, and add the message to the end of - // the message queue. + // the message queue, and increment the internal atomic for tracking + // the queue's length. p.queueCond.L.Lock() p.queue = append(p.queue, pkt) + atomic.AddInt32(&p.queueLen, 1) p.queueCond.L.Unlock() // With the message added, we signal to the msgConsumer that there are @@ -132,11 +155,29 @@ func (p *packetQueue) AddPkt(pkt *htlcPacket) { p.queueCond.Signal() } +// SignalFreeSlot signals to the queue that a new slot has opened up within the +// commitment transaction. The max amount of free slots has been defined when +// initially creating the packetQueue itself. This method, combined with AddPkt +// creates the following abstraction: a synchronized queue of infinite length +// which can be added to at will, which flows onto a commitment of fixed +// capacity. +func (p *packetQueue) SignalFreeSlot() { + // We'll only send over a free slot signal if the queue *is not* empty. + // Otherwise, it's possible that we attempt to overfill the free slots + // semaphore and block indefinitely below. + if atomic.LoadInt32(&p.queueLen) == 0 { + return } select { + case p.freeSlots <- struct{}{}: case <-p.quit: + return } } +// Length returns the number of pending htlc packets present within the over +// flow queue. +func (p *packetQueue) Length() int32 { + return atomic.LoadInt32(&p.queueLen) } diff --git a/htlcswitch/queue_test.go b/htlcswitch/queue_test.go index 8b18e3d7..a8e59ec1 100644 --- a/htlcswitch/queue_test.go +++ b/htlcswitch/queue_test.go @@ -14,11 +14,12 @@ import ( func TestWaitingQueueThreadSafety(t *testing.T) { t.Parallel() - q := newPacketQueue() + const numPkts = 1000 + + q := newPacketQueue(numPkts) q.Start() defer q.Stop() - const numPkts = 1000 a := make([]lnwire.MilliSatoshi, numPkts) for i := 0; i < numPkts; i++ { a[i] = lnwire.MilliSatoshi(i) @@ -30,6 +31,8 @@ func TestWaitingQueueThreadSafety(t *testing.T) { var b []lnwire.MilliSatoshi for i := 0; i < numPkts; i++ { + q.SignalFreeSlot() + select { case packet := <-q.outgoingPkts: b = append(b, packet.amount)