From b2e24f876b230d6834db71cf74febefee9480564 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 6 Jun 2018 17:03:39 -0700 Subject: [PATCH] htlcswitch: ensure the packet queue exits on stop In this commit, we ensure that the packet queue will always exit, by continually signalling the main goroutine until it atomically sets a bool that indicates its has been fully shutdown. It has been observed that at times the main goroutine will wake up (due to the signal), but then bypass the select and actually miss the quit signal, as a result another signal is required. We'll continue to signals in a lazy loop until the goroutine has fully exited. --- htlcswitch/queue.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go index be95073f..420d1f9b 100644 --- a/htlcswitch/queue.go +++ b/htlcswitch/queue.go @@ -3,6 +3,7 @@ package htlcswitch import ( "sync" "sync/atomic" + "time" "github.com/lightningnetwork/lnd/lnwire" ) @@ -20,7 +21,7 @@ type packetQueue struct { // totalHtlcAmt is the sum of the value of all pending HTLC's currently // residing within the overflow queue. This value should only read or // modified *atomically*. - totalHtlcAmt int64 + totalHtlcAmt int64 // To be used atomically. // queueLen is an internal counter that reflects the size of the queue // at any given instance. This value is intended to be use atomically @@ -28,7 +29,9 @@ type packetQueue struct { // the queue w/o grabbing the main lock. This allows callers to avoid a // deadlock situation where the main goroutine is attempting a send // with the lock held. - queueLen int32 + queueLen int32 // To be used atomically. + + streamShutdown int32 // To be used atomically. queue []*htlcPacket @@ -75,7 +78,12 @@ func (p *packetQueue) Start() { func (p *packetQueue) Stop() { close(p.quit) - p.queueCond.Signal() + // Now that we've closed the channel, we'll repeatedly signal the msg + // consumer until we've detected that it has exited. + for atomic.LoadInt32(&p.streamShutdown) == 0 { + p.queueCond.Signal() + time.Sleep(time.Millisecond * 100) + } } // packetCoordinator is a goroutine that handles the packet overflow queue. @@ -92,7 +100,7 @@ func (p *packetQueue) Stop() { // like reg congestion avoidance: // * random dropping, RED, etc func (p *packetQueue) packetCoordinator() { - defer p.wg.Done() + defer atomic.StoreInt32(&p.streamShutdown, 1) for { // First, we'll check our condition. If the queue of packets is