htlcswitch: ensure the packet queue exits on stop

In this commit, we ensure that the packet queue will always exit, by
continually signalling the main goroutine until it atomically sets a
bool that indicates its has been fully shutdown. It has been observed
that at times the main goroutine will wake up (due to the signal), but
then bypass the select and actually miss the quit signal, as a result
another signal is required. We'll continue to signals in a lazy loop
until the goroutine has fully exited.
This commit is contained in:
Olaoluwa Osuntokun 2018-06-06 17:03:39 -07:00
parent 15f812b10f
commit b2e24f876b
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -3,6 +3,7 @@ package htlcswitch
import ( import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -20,7 +21,7 @@ type packetQueue struct {
// totalHtlcAmt is the sum of the value of all pending HTLC's currently // totalHtlcAmt is the sum of the value of all pending HTLC's currently
// residing within the overflow queue. This value should only read or // residing within the overflow queue. This value should only read or
// modified *atomically*. // modified *atomically*.
totalHtlcAmt int64 totalHtlcAmt int64 // To be used atomically.
// queueLen is an internal counter that reflects the size of the queue // queueLen is an internal counter that reflects the size of the queue
// at any given instance. This value is intended to be use atomically // at any given instance. This value is intended to be use atomically
@ -28,7 +29,9 @@ type packetQueue struct {
// the queue w/o grabbing the main lock. This allows callers to avoid a // the queue w/o grabbing the main lock. This allows callers to avoid a
// deadlock situation where the main goroutine is attempting a send // deadlock situation where the main goroutine is attempting a send
// with the lock held. // with the lock held.
queueLen int32 queueLen int32 // To be used atomically.
streamShutdown int32 // To be used atomically.
queue []*htlcPacket queue []*htlcPacket
@ -75,7 +78,12 @@ func (p *packetQueue) Start() {
func (p *packetQueue) Stop() { func (p *packetQueue) Stop() {
close(p.quit) close(p.quit)
// Now that we've closed the channel, we'll repeatedly signal the msg
// consumer until we've detected that it has exited.
for atomic.LoadInt32(&p.streamShutdown) == 0 {
p.queueCond.Signal() p.queueCond.Signal()
time.Sleep(time.Millisecond * 100)
}
} }
// packetCoordinator is a goroutine that handles the packet overflow queue. // packetCoordinator is a goroutine that handles the packet overflow queue.
@ -92,7 +100,7 @@ func (p *packetQueue) Stop() {
// like reg congestion avoidance: // like reg congestion avoidance:
// * random dropping, RED, etc // * random dropping, RED, etc
func (p *packetQueue) packetCoordinator() { func (p *packetQueue) packetCoordinator() {
defer p.wg.Done() defer atomic.StoreInt32(&p.streamShutdown, 1)
for { for {
// First, we'll check our condition. If the queue of packets is // First, we'll check our condition. If the queue of packets is