diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 4cd62592..40278553 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -234,7 +234,7 @@ func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, linkControl: make(chan interface{}), cancelReasons: make(map[uint64]lnwire.OpaqueReason), logCommitTimer: time.NewTimer(300 * time.Millisecond), - overflowQueue: newWaitingQueue(), + overflowQueue: newPacketQueue(), bestHeight: currentHeight, quit: make(chan struct{}), } @@ -256,6 +256,8 @@ func (l *channelLink) Start() error { log.Infof("ChannelLink(%v) is starting", l) + l.overflowQueue.Start() + l.wg.Add(1) go l.htlcManager() @@ -274,6 +276,8 @@ func (l *channelLink) Stop() { log.Infof("ChannelLink(%v) is stopping", l) + l.overflowQueue.Stop() + close(l.quit) l.wg.Wait() @@ -390,7 +394,7 @@ out: // transaction is now eligible for processing once again. So // we'll attempt to re-process the packet in order to allow it // to continue propagating within the network. - case packet := <-l.overflowQueue.pending: + case packet := <-l.overflowQueue.outgoingPkts: msg := packet.htlc.(*lnwire.UpdateAddHTLC) log.Tracef("Reprocessing downstream add update "+ "with payment hash(%x)", msg.PaymentHash[:]) @@ -406,14 +410,14 @@ out: // directly. Once an active HTLC is either settled or // failed, then we'll free up a new slot. htlc, ok := pkt.htlc.(*lnwire.UpdateAddHTLC) - if ok && l.overflowQueue.length() != 0 { + if ok && l.overflowQueue.Length() != 0 { log.Infof("Downstream htlc add update with "+ "payment hash(%x) have been added to "+ "reprocessing queue, batch: %v", htlc.PaymentHash[:], l.batchCounter) - l.overflowQueue.consume(pkt) + l.overflowQueue.AddPkt(pkt) continue } l.handleDownStreamPkt(pkt) @@ -483,7 +487,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { "reprocessing queue, batch: %v", htlc.PaymentHash[:], l.batchCounter) - l.overflowQueue.consume(pkt) + l.overflowQueue.AddPkt(pkt) return // The HTLC was unable to be added to the state @@ -872,7 +876,7 @@ func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { // not be accurate. func (l *channelLink) getBandwidth() lnwire.MilliSatoshi { // TODO(roasbeef): factor in reserve, just grab mutex - return l.channel.LocalAvailableBalance() - l.overflowQueue.pendingAmount() + return l.channel.LocalAvailableBalance() - l.overflowQueue.PendingAmount() } // policyUpdate is a message sent to a channel link when an outside sub-system @@ -994,7 +998,6 @@ func (l *channelLink) processLockedInHtlcs( // notify the overflow queue that a spare spot has been // freed up within the commitment state. packetsToForward = append(packetsToForward, settlePacket) - l.overflowQueue.release() // A failureCode message for a previously forwarded HTLC has been // received. As a result a new slot will be freed up in our @@ -1016,7 +1019,6 @@ func (l *channelLink) processLockedInHtlcs( // notify the overflow queue that a spare spot has been // freed up within the commitment state. packetsToForward = append(packetsToForward, failPacket) - l.overflowQueue.release() // An incoming HTLC add has been full-locked in. As a result we // can no examine the forwarding details of the HTLC, and the diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go index 0d729f61..db6e8199 100644 --- a/htlcswitch/queue.go +++ b/htlcswitch/queue.go @@ -1,117 +1,234 @@ package htlcswitch import ( - "container/list" "sync" "github.com/lightningnetwork/lnd/lnwire" ) -// packetQueue represent the wrapper around the original queue plus the -// functionality for releasing the queue objects in object channel. Such -// structures allows storing of all pending object in queue before the moment -// of actual releasing. -// -// TODO(andrew.shvv) structure not preserve the order if object failed second -// time. +// packetQueue is an goroutine-safe queue of htlc packets which over flow the +// current commitment transaction. An HTLC will overflow the current commitment +// transaction if it is attempted to be added to a state machine which already +// has the max number of pending HTLC's present on the commitment transaction. +// Packets are removed from the queue by the channelLink itself as additional +// slots become available on the commitment transaction itself. type packetQueue struct { - *list.List - sync.Mutex + queueCond *sync.Cond + queueMtx sync.Mutex + queue []*htlcPacket - // pending channel is needed in order to send the object which should - // be re-proceed. - pending chan *htlcPacket + // outgoingPkts is a channel that the channelLink will receive on in + // order to drain the packetQueue as new slots become available on the + // commitment transaction. + outgoingPkts chan *htlcPacket - // grab channel represents the channel-lock which is needed in order to - // make "release" goroutines block during other release goroutine - // processing. - grab chan struct{} + queries chan interface{} + + wg sync.WaitGroup + quit chan struct{} } -func newWaitingQueue() *packetQueue { - // Initialize grab channel and release one slot, otherwise release - // function will block. - done := make(chan struct{}, 1) - done <- struct{}{} +// newPacketQueue returns a new instance of the packetQueue. +func newPacketQueue() *packetQueue { + p := &packetQueue{ + outgoingPkts: make(chan *htlcPacket), - return &packetQueue{ - pending: make(chan *htlcPacket), - grab: done, - List: list.New(), + queries: make(chan interface{}), + + quit: make(chan struct{}), + } + p.queueCond = sync.NewCond(&p.queueMtx) + + return p +} + +// Start starts all goroutines that packetQueue needs to perform its normal +// duties. +func (p *packetQueue) Start() { + p.wg.Add(2) + go p.packetCoordinator() + go p.queryHandler() +} + +// Stop signals the packetQueue for a graceful shutdown, and waits for all +// goroutines to exit. +func (p *packetQueue) Stop() { + close(p.quit) + + p.queueCond.Signal() + + p.wg.Wait() +} + +// packetCoordinator is a goroutine that handles the packet overflow queue. +// Using a synchronized queue, outside callers are able to append to the end of +// the queue, waking up the coordinator when the queue transitions from empty +// to non-empty. The packetCoordinator will then aggressively try to empty out +// the queue, passing new htlcPackets to the channelLink as slots within the +// commitment transaction become available. +// +// Future iterations of the packetCoordinator will implement congestion +// avoidance logic in the face of persistent htlcPacket back-pressure. +// +// TODO(roasbeef): later will need to add back pressure handling heuristics +// like reg congestion avoidance: +// * random dropping, RED, etc +func (p *packetQueue) packetCoordinator() { + defer p.wg.Done() + + for { + // First, we'll check our condition. If the queue of packets is + // empty, then we'll wait until a new item is added. + p.queueCond.L.Lock() + for len(p.queue) == 0 { + p.queueCond.Wait() + + // If we were woke up in order to exit, then we'll do + // so. Otherwise, we'll check the message queue for any + // new items. + select { + case <-p.quit: + p.queueCond.L.Unlock() + return + default: + } + } + + nextPkt := p.queue[0] + + // If there aren't any further messages to sent (or the link + // didn't immediately read our message), then we'll block and + // wait for a new message to be sent into the overflow queue, + // or for the link's htlcForwarder to wake up. + select { + case p.outgoingPkts <- nextPkt: + // Pop the item off the front of the queue and slide + // down the reference one to re-position the head + // pointer. This will set us up for the next iteration. + // If the queue is empty at this point, then we'll + // block at the top. + p.queue[0] = nil + p.queue = p.queue[1:] + + case <-p.quit: + p.queueCond.L.Unlock() + return + + default: + } + + p.queueCond.L.Unlock() } } -// consume function take the given packet and store it in queue till release -// function will be executed. -func (q *packetQueue) consume(packet *htlcPacket) { - q.Lock() - defer q.Unlock() - - q.PushBack(packet) +// queueLenReq is a request sent to the queryHandler to query for the length of +// the current pending packet queue. +type queueLenReq struct { + resp chan int } -// release function spawn the goroutine which grab the object from pending -// queue and pass it in processing channel. -func (q *packetQueue) release() { - q.Lock() - defer q.Unlock() +// totalPendingReq is a request sent to the queryHandler to query for the total +// amount of satoshis pending in the queue at a given instant. +type totalPendingReq struct { + resp chan lnwire.MilliSatoshi +} - if q.Len() == 0 { - return - } +// queryHandler is a dedicated goroutine for handling queries from outside +// sub-systems to the packetQueue itself. +func (p *packetQueue) queryHandler() { + defer p.wg.Done() - go func() { - // Grab the pending mutex so that other goroutines waits before - // grabbing the object, otherwise the objects will be send in - // the pending channel in random sequence. - <-q.grab + for { + select { + case query := <-p.queries: + switch req := query.(type) { + case *queueLenReq: + p.queueCond.L.Lock() - defer func() { - // Release the channel-lock and give other goroutines - // the ability to - q.grab <- struct{}{} - }() + select { + case req.resp <- len(p.queue): + case <-p.quit: + p.queueCond.L.Unlock() + return + } - // Fetch object after releasing the pending mutex in order to - // preserver the order of the stored objects. - q.Lock() - e := q.Front() - q.Unlock() + p.queueCond.L.Unlock() + case *totalPendingReq: + p.queueCond.L.Lock() - if e != nil { - // Send the object in object queue and wait it to be - // processed by other side. - q.pending <- e.Value.(*htlcPacket) + var amount lnwire.MilliSatoshi + for _, pkt := range p.queue { + amount += pkt.amount + } - // After object have been preprocessed remove it from - // the queue. - q.Lock() - q.Remove(e) - q.Unlock() + select { + case req.resp <- amount: + case <-p.quit: + p.queueCond.L.Unlock() + return + } + + p.queueCond.L.Unlock() + } + + case <-p.quit: return } - }() + } } -// length returns the number of pending htlc packets which is stored in queue. -func (q *packetQueue) length() int { - q.Lock() - defer q.Unlock() +// AddPkt adds the referenced packet to the overflow queue, preserving ordering +// of the existing items. +func (p *packetQueue) AddPkt(pkt *htlcPacket) { + // First, we'll lock the condition, and add the message to the end of + // the message queue. + p.queueCond.L.Lock() + p.queue = append(p.queue, pkt) + p.queueCond.L.Unlock() - return q.Len() + // With the message added, we signal to the msgConsumer that there are + // additional messages to consume. + p.queueCond.Signal() } -// pendingAmount returns the amount of money which is stored in pending queue. -func (q *packetQueue) pendingAmount() lnwire.MilliSatoshi { - q.Lock() - defer q.Unlock() - - var amount lnwire.MilliSatoshi - for e := q.Front(); e != nil; e = e.Next() { - packet := e.Value.(*htlcPacket) - htlc := packet.htlc.(*lnwire.UpdateAddHTLC) - amount += htlc.Amount +// Length returns the number of pending htlc packets present within the over +// flow queue. +func (p *packetQueue) Length() int { + lenReq := &queueLenReq{ + resp: make(chan int, 1), } - return amount + select { + case p.queries <- lenReq: + case <-p.quit: + return 0 + } + + select { + case len := <-lenReq.resp: + return len + case <-p.quit: + return 0 + } +} + +// PendingAmount returns the total sum in satoshis of all the pending +// htlcPackets within the queue. +func (p *packetQueue) PendingAmount() lnwire.MilliSatoshi { + amtReq := &totalPendingReq{ + resp: make(chan lnwire.MilliSatoshi, 1), + } + + select { + case p.queries <- amtReq: + case <-p.quit: + return 0 + } + + select { + case amt := <-amtReq.resp: + return amt + case <-p.quit: + return 0 + } } diff --git a/htlcswitch/queue_test.go b/htlcswitch/queue_test.go index 6963b17e..8b18e3d7 100644 --- a/htlcswitch/queue_test.go +++ b/htlcswitch/queue_test.go @@ -9,28 +9,29 @@ import ( ) // TestWaitingQueueThreadSafety test the thread safety properties of the -// waiting queue, by executing methods in seprate goroutines which operates +// waiting queue, by executing methods in separate goroutines which operates // with the same data. func TestWaitingQueueThreadSafety(t *testing.T) { t.Parallel() - q := newWaitingQueue() + q := newPacketQueue() + q.Start() + defer q.Stop() - a := make([]lnwire.MilliSatoshi, 1000) - for i := 0; i < len(a); i++ { + const numPkts = 1000 + a := make([]lnwire.MilliSatoshi, numPkts) + for i := 0; i < numPkts; i++ { a[i] = lnwire.MilliSatoshi(i) - q.consume(&htlcPacket{ + q.AddPkt(&htlcPacket{ amount: lnwire.MilliSatoshi(i), htlc: &lnwire.UpdateAddHTLC{}, }) } var b []lnwire.MilliSatoshi - for i := 0; i < len(a); i++ { - q.release() - + for i := 0; i < numPkts; i++ { select { - case packet := <-q.pending: + case packet := <-q.outgoingPkts: b = append(b, packet.amount) case <-time.After(2 * time.Second):