htlcswitch: re-write link's packet overflow queue for readability+extensibility

This commit completes a full re-write of the link’s packet overflow
queue with the goals of the making the code itself more understandable
and also allowing it to be more extensible in the future with various
algorithms for handling HTLC congestion avoidance and persistent queue
back pressure.

The new design is simpler and consumes much less coroutines (no longer
a new goroutine for each active HLTC). We now implement a simple
synchronized queue using a standard condition variable.
This commit is contained in:
Olaoluwa Osuntokun 2017-09-22 15:54:10 -07:00
parent 94b54f0243
commit 6f5ef249e4
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
3 changed files with 219 additions and 99 deletions

@ -234,7 +234,7 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel,
linkControl: make(chan interface{}), linkControl: make(chan interface{}),
cancelReasons: make(map[uint64]lnwire.OpaqueReason), cancelReasons: make(map[uint64]lnwire.OpaqueReason),
logCommitTimer: time.NewTimer(300 * time.Millisecond), logCommitTimer: time.NewTimer(300 * time.Millisecond),
overflowQueue: newWaitingQueue(), overflowQueue: newPacketQueue(),
bestHeight: currentHeight, bestHeight: currentHeight,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -256,6 +256,8 @@ func (l *channelLink) Start() error {
log.Infof("ChannelLink(%v) is starting", l) log.Infof("ChannelLink(%v) is starting", l)
l.overflowQueue.Start()
l.wg.Add(1) l.wg.Add(1)
go l.htlcManager() go l.htlcManager()
@ -274,6 +276,8 @@ func (l *channelLink) Stop() {
log.Infof("ChannelLink(%v) is stopping", l) log.Infof("ChannelLink(%v) is stopping", l)
l.overflowQueue.Stop()
close(l.quit) close(l.quit)
l.wg.Wait() l.wg.Wait()
@ -390,7 +394,7 @@ out:
// transaction is now eligible for processing once again. So // transaction is now eligible for processing once again. So
// we'll attempt to re-process the packet in order to allow it // we'll attempt to re-process the packet in order to allow it
// to continue propagating within the network. // to continue propagating within the network.
case packet := <-l.overflowQueue.pending: case packet := <-l.overflowQueue.outgoingPkts:
msg := packet.htlc.(*lnwire.UpdateAddHTLC) msg := packet.htlc.(*lnwire.UpdateAddHTLC)
log.Tracef("Reprocessing downstream add update "+ log.Tracef("Reprocessing downstream add update "+
"with payment hash(%x)", msg.PaymentHash[:]) "with payment hash(%x)", msg.PaymentHash[:])
@ -406,14 +410,14 @@ out:
// directly. Once an active HTLC is either settled or // directly. Once an active HTLC is either settled or
// failed, then we'll free up a new slot. // failed, then we'll free up a new slot.
htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC)
if ok && l.overflowQueue.length() != 0 { if ok && l.overflowQueue.Length() != 0 {
log.Infof("Downstream htlc add update with "+ log.Infof("Downstream htlc add update with "+
"payment hash(%x) have been added to "+ "payment hash(%x) have been added to "+
"reprocessing queue, batch: %v", "reprocessing queue, batch: %v",
htlc.PaymentHash[:], htlc.PaymentHash[:],
l.batchCounter) l.batchCounter)
l.overflowQueue.consume(pkt) l.overflowQueue.AddPkt(pkt)
continue continue
} }
l.handleDownStreamPkt(pkt) l.handleDownStreamPkt(pkt)
@ -483,7 +487,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
"reprocessing queue, batch: %v", "reprocessing queue, batch: %v",
htlc.PaymentHash[:], htlc.PaymentHash[:],
l.batchCounter) l.batchCounter)
l.overflowQueue.consume(pkt) l.overflowQueue.AddPkt(pkt)
return return
// The HTLC was unable to be added to the state // The HTLC was unable to be added to the state
@ -872,7 +876,7 @@ func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
// not be accurate. // not be accurate.
func (l *channelLink) getBandwidth() lnwire.MilliSatoshi { func (l *channelLink) getBandwidth() lnwire.MilliSatoshi {
// TODO(roasbeef): factor in reserve, just grab mutex // TODO(roasbeef): factor in reserve, just grab mutex
return l.channel.LocalAvailableBalance() - l.overflowQueue.pendingAmount() return l.channel.LocalAvailableBalance() - l.overflowQueue.PendingAmount()
} }
// policyUpdate is a message sent to a channel link when an outside sub-system // policyUpdate is a message sent to a channel link when an outside sub-system
@ -994,7 +998,6 @@ func (l *channelLink) processLockedInHtlcs(
// notify the overflow queue that a spare spot has been // notify the overflow queue that a spare spot has been
// freed up within the commitment state. // freed up within the commitment state.
packetsToForward = append(packetsToForward, settlePacket) packetsToForward = append(packetsToForward, settlePacket)
l.overflowQueue.release()
// A failureCode message for a previously forwarded HTLC has been // A failureCode message for a previously forwarded HTLC has been
// received. As a result a new slot will be freed up in our // received. As a result a new slot will be freed up in our
@ -1016,7 +1019,6 @@ func (l *channelLink) processLockedInHtlcs(
// notify the overflow queue that a spare spot has been // notify the overflow queue that a spare spot has been
// freed up within the commitment state. // freed up within the commitment state.
packetsToForward = append(packetsToForward, failPacket) packetsToForward = append(packetsToForward, failPacket)
l.overflowQueue.release()
// An incoming HTLC add has been full-locked in. As a result we // An incoming HTLC add has been full-locked in. As a result we
// can no examine the forwarding details of the HTLC, and the // can no examine the forwarding details of the HTLC, and the

@ -1,117 +1,234 @@
package htlcswitch package htlcswitch
import ( import (
"container/list"
"sync" "sync"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
// packetQueue represent the wrapper around the original queue plus the // packetQueue is an goroutine-safe queue of htlc packets which over flow the
// functionality for releasing the queue objects in object channel. Such // current commitment transaction. An HTLC will overflow the current commitment
// structures allows storing of all pending object in queue before the moment // transaction if it is attempted to be added to a state machine which already
// of actual releasing. // has the max number of pending HTLC's present on the commitment transaction.
// // Packets are removed from the queue by the channelLink itself as additional
// TODO(andrew.shvv) structure not preserve the order if object failed second // slots become available on the commitment transaction itself.
// time.
type packetQueue struct { type packetQueue struct {
*list.List queueCond *sync.Cond
sync.Mutex queueMtx sync.Mutex
queue []*htlcPacket
// pending channel is needed in order to send the object which should // outgoingPkts is a channel that the channelLink will receive on in
// be re-proceed. // order to drain the packetQueue as new slots become available on the
pending chan *htlcPacket // commitment transaction.
outgoingPkts chan *htlcPacket
// grab channel represents the channel-lock which is needed in order to queries chan interface{}
// make "release" goroutines block during other release goroutine
// processing. wg sync.WaitGroup
grab chan struct{} quit chan struct{}
} }
func newWaitingQueue() *packetQueue { // newPacketQueue returns a new instance of the packetQueue.
// Initialize grab channel and release one slot, otherwise release func newPacketQueue() *packetQueue {
// function will block. p := &packetQueue{
done := make(chan struct{}, 1) outgoingPkts: make(chan *htlcPacket),
done <- struct{}{}
return &packetQueue{ queries: make(chan interface{}),
pending: make(chan *htlcPacket),
grab: done, quit: make(chan struct{}),
List: list.New(), }
p.queueCond = sync.NewCond(&p.queueMtx)
return p
}
// Start starts all goroutines that packetQueue needs to perform its normal
// duties.
func (p *packetQueue) Start() {
p.wg.Add(2)
go p.packetCoordinator()
go p.queryHandler()
}
// Stop signals the packetQueue for a graceful shutdown, and waits for all
// goroutines to exit.
func (p *packetQueue) Stop() {
close(p.quit)
p.queueCond.Signal()
p.wg.Wait()
}
// packetCoordinator is a goroutine that handles the packet overflow queue.
// Using a synchronized queue, outside callers are able to append to the end of
// the queue, waking up the coordinator when the queue transitions from empty
// to non-empty. The packetCoordinator will then aggressively try to empty out
// the queue, passing new htlcPackets to the channelLink as slots within the
// commitment transaction become available.
//
// Future iterations of the packetCoordinator will implement congestion
// avoidance logic in the face of persistent htlcPacket back-pressure.
//
// TODO(roasbeef): later will need to add back pressure handling heuristics
// like reg congestion avoidance:
// * random dropping, RED, etc
func (p *packetQueue) packetCoordinator() {
defer p.wg.Done()
for {
// First, we'll check our condition. If the queue of packets is
// empty, then we'll wait until a new item is added.
p.queueCond.L.Lock()
for len(p.queue) == 0 {
p.queueCond.Wait()
// If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the message queue for any
// new items.
select {
case <-p.quit:
p.queueCond.L.Unlock()
return
default:
}
}
nextPkt := p.queue[0]
// If there aren't any further messages to sent (or the link
// didn't immediately read our message), then we'll block and
// wait for a new message to be sent into the overflow queue,
// or for the link's htlcForwarder to wake up.
select {
case p.outgoingPkts <- nextPkt:
// Pop the item off the front of the queue and slide
// down the reference one to re-position the head
// pointer. This will set us up for the next iteration.
// If the queue is empty at this point, then we'll
// block at the top.
p.queue[0] = nil
p.queue = p.queue[1:]
case <-p.quit:
p.queueCond.L.Unlock()
return
default:
}
p.queueCond.L.Unlock()
} }
} }
// consume function take the given packet and store it in queue till release // queueLenReq is a request sent to the queryHandler to query for the length of
// function will be executed. // the current pending packet queue.
func (q *packetQueue) consume(packet *htlcPacket) { type queueLenReq struct {
q.Lock() resp chan int
defer q.Unlock()
q.PushBack(packet)
} }
// release function spawn the goroutine which grab the object from pending // totalPendingReq is a request sent to the queryHandler to query for the total
// queue and pass it in processing channel. // amount of satoshis pending in the queue at a given instant.
func (q *packetQueue) release() { type totalPendingReq struct {
q.Lock() resp chan lnwire.MilliSatoshi
defer q.Unlock() }
if q.Len() == 0 { // queryHandler is a dedicated goroutine for handling queries from outside
// sub-systems to the packetQueue itself.
func (p *packetQueue) queryHandler() {
defer p.wg.Done()
for {
select {
case query := <-p.queries:
switch req := query.(type) {
case *queueLenReq:
p.queueCond.L.Lock()
select {
case req.resp <- len(p.queue):
case <-p.quit:
p.queueCond.L.Unlock()
return return
} }
go func() { p.queueCond.L.Unlock()
// Grab the pending mutex so that other goroutines waits before case *totalPendingReq:
// grabbing the object, otherwise the objects will be send in p.queueCond.L.Lock()
// the pending channel in random sequence.
<-q.grab
defer func() {
// Release the channel-lock and give other goroutines
// the ability to
q.grab <- struct{}{}
}()
// Fetch object after releasing the pending mutex in order to
// preserver the order of the stored objects.
q.Lock()
e := q.Front()
q.Unlock()
if e != nil {
// Send the object in object queue and wait it to be
// processed by other side.
q.pending <- e.Value.(*htlcPacket)
// After object have been preprocessed remove it from
// the queue.
q.Lock()
q.Remove(e)
q.Unlock()
return
}
}()
}
// length returns the number of pending htlc packets which is stored in queue.
func (q *packetQueue) length() int {
q.Lock()
defer q.Unlock()
return q.Len()
}
// pendingAmount returns the amount of money which is stored in pending queue.
func (q *packetQueue) pendingAmount() lnwire.MilliSatoshi {
q.Lock()
defer q.Unlock()
var amount lnwire.MilliSatoshi var amount lnwire.MilliSatoshi
for e := q.Front(); e != nil; e = e.Next() { for _, pkt := range p.queue {
packet := e.Value.(*htlcPacket) amount += pkt.amount
htlc := packet.htlc.(*lnwire.UpdateAddHTLC)
amount += htlc.Amount
} }
return amount select {
case req.resp <- amount:
case <-p.quit:
p.queueCond.L.Unlock()
return
}
p.queueCond.L.Unlock()
}
case <-p.quit:
return
}
}
}
// AddPkt adds the referenced packet to the overflow queue, preserving ordering
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
// First, we'll lock the condition, and add the message to the end of
// the message queue.
p.queueCond.L.Lock()
p.queue = append(p.queue, pkt)
p.queueCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are
// additional messages to consume.
p.queueCond.Signal()
}
// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int {
lenReq := &queueLenReq{
resp: make(chan int, 1),
}
select {
case p.queries <- lenReq:
case <-p.quit:
return 0
}
select {
case len := <-lenReq.resp:
return len
case <-p.quit:
return 0
}
}
// PendingAmount returns the total sum in satoshis of all the pending
// htlcPackets within the queue.
func (p *packetQueue) PendingAmount() lnwire.MilliSatoshi {
amtReq := &totalPendingReq{
resp: make(chan lnwire.MilliSatoshi, 1),
}
select {
case p.queries <- amtReq:
case <-p.quit:
return 0
}
select {
case amt := <-amtReq.resp:
return amt
case <-p.quit:
return 0
}
} }

@ -9,28 +9,29 @@ import (
) )
// TestWaitingQueueThreadSafety test the thread safety properties of the // TestWaitingQueueThreadSafety test the thread safety properties of the
// waiting queue, by executing methods in seprate goroutines which operates // waiting queue, by executing methods in separate goroutines which operates
// with the same data. // with the same data.
func TestWaitingQueueThreadSafety(t *testing.T) { func TestWaitingQueueThreadSafety(t *testing.T) {
t.Parallel() t.Parallel()
q := newWaitingQueue() q := newPacketQueue()
q.Start()
defer q.Stop()
a := make([]lnwire.MilliSatoshi, 1000) const numPkts = 1000
for i := 0; i < len(a); i++ { a := make([]lnwire.MilliSatoshi, numPkts)
for i := 0; i < numPkts; i++ {
a[i] = lnwire.MilliSatoshi(i) a[i] = lnwire.MilliSatoshi(i)
q.consume(&htlcPacket{ q.AddPkt(&htlcPacket{
amount: lnwire.MilliSatoshi(i), amount: lnwire.MilliSatoshi(i),
htlc: &lnwire.UpdateAddHTLC{}, htlc: &lnwire.UpdateAddHTLC{},
}) })
} }
var b []lnwire.MilliSatoshi var b []lnwire.MilliSatoshi
for i := 0; i < len(a); i++ { for i := 0; i < numPkts; i++ {
q.release()
select { select {
case packet := <-q.pending: case packet := <-q.outgoingPkts:
b = append(b, packet.amount) b = append(b, packet.amount)
case <-time.After(2 * time.Second): case <-time.After(2 * time.Second):