rpcServer: wait for all goroutines to finish before exiting

Returning from the sendPayments method closes the underlying stream, so
we would risk payments still being in flight when returning.

We add the running goroutines to a waitgroup, such that we can wait for
them all to exit before exiting and closing the stream.

In case an error is encountered in the process, we will return directly,
which will close the `reqQuit` channel and prompt the gorpoutines to
shut down.
This commit is contained in:
Johan T. Halseth 2018-11-02 11:33:01 +01:00
parent 5b85721c04
commit dc23810118
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

View File

@ -3680,18 +3680,23 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
htlcSema <- struct{}{}
}
// We keep track of the running goroutines and set up a quit signal we
// can use to request them to exit if the method returns because of an
// encountered error.
var wg sync.WaitGroup
reqQuit := make(chan struct{})
defer close(reqQuit)
// Launch a new goroutine to handle reading new payment requests from
// the client. This way we can handle errors independently of blocking
// and waiting for the next payment request to come through.
reqQuit := make(chan struct{})
defer func() {
close(reqQuit)
}()
// TODO(joostjager): Callers expect result to come in in the same order
// as the request were sent, but this is far from guarantueed in the
// code below.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-reqQuit:
@ -3778,11 +3783,18 @@ sendLoop:
// We launch a new goroutine to execute the current
// payment so we can continue to serve requests while
// this payment is being dispatched.
wg.Add(1)
go func() {
defer wg.Done()
// Attempt to grab a free semaphore slot, using
// a defer to eventually release the slot
// regardless of payment success.
<-htlcSema
select {
case <-htlcSema:
case <-reqQuit:
return
}
defer func() {
htlcSema <- struct{}{}
}()
@ -3853,6 +3865,10 @@ sendLoop:
}()
}
}
// Wait for all goroutines to finish before closing the stream.
wg.Wait()
return nil
}
// SendPaymentSync is the synchronous non-streaming version of SendPayment.