Merge pull request #2137 from halseth/payintent-recv-eof

rpcserver: fix error handling in bidirectional sendPayment
This commit is contained in:
Johan T. Halseth 2020-01-23 11:53:56 +01:00 committed by GitHub
commit bebb6395ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3680,25 +3680,28 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
htlcSema <- struct{}{}
}
// We keep track of the running goroutines and set up a quit signal we
// can use to request them to exit if the method returns because of an
// encountered error.
var wg sync.WaitGroup
reqQuit := make(chan struct{})
defer close(reqQuit)
// Launch a new goroutine to handle reading new payment requests from
// the client. This way we can handle errors independently of blocking
// and waiting for the next payment request to come through.
reqQuit := make(chan struct{})
defer func() {
close(reqQuit)
}()
// TODO(joostjager): Callers expect result to come in in the same order
// as the request were sent, but this is far from guarantueed in the
// code below.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-reqQuit:
return
case <-r.quit:
errChan <- nil
return
default:
// Receive the next pending payment within the
// stream sent by the client. If we read the
@ -3706,13 +3709,15 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
// stream, and we can exit normally.
nextPayment, err := stream.recv()
if err == io.EOF {
errChan <- nil
close(payChan)
return
} else if err != nil {
rpcsLog.Errorf("Failed receiving from "+
"stream: %v", err)
select {
case errChan <- err:
case <-reqQuit:
return
default:
}
return
}
@ -3730,18 +3735,22 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
PaymentError: err.Error(),
PaymentHash: payIntent.rHash[:],
}); err != nil {
rpcsLog.Errorf("Failed "+
"sending on "+
"stream: %v", err)
select {
case errChan <- err:
case <-reqQuit:
return
default:
}
return
}
continue
}
// If the payment was well formed, then we'll
// send to the dispatch goroutine, or exit,
// which ever comes first
// which ever comes first.
select {
case payChan <- &payIntent:
case <-reqQuit:
@ -3751,20 +3760,41 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
}
}()
sendLoop:
for {
select {
// If we encounter and error either during sending or
// receiving, we return directly, closing the stream.
case err := <-errChan:
return err
case payIntent := <-payChan:
case <-r.quit:
return errors.New("rpc server shutting down")
case payIntent, ok := <-payChan:
// If the receive loop is done, we break the send loop
// and wait for the ongoing payments to finish before
// exiting.
if !ok {
break sendLoop
}
// We launch a new goroutine to execute the current
// payment so we can continue to serve requests while
// this payment is being dispatched.
wg.Add(1)
go func() {
defer wg.Done()
// Attempt to grab a free semaphore slot, using
// a defer to eventually release the slot
// regardless of payment success.
<-htlcSema
select {
case <-htlcSema:
case <-reqQuit:
return
}
defer func() {
htlcSema <- struct{}{}
}()
@ -3778,7 +3808,13 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
// payment, then we'll return the error to the
// user, and terminate.
case saveErr != nil:
errChan <- saveErr
rpcsLog.Errorf("Failed dispatching "+
"payment intent: %v", saveErr)
select {
case errChan <- saveErr:
default:
}
return
// If we receive payment error than, instead of
@ -3790,7 +3826,14 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
PaymentHash: payIntent.rHash[:],
})
if err != nil {
errChan <- err
rpcsLog.Errorf("Failed "+
"sending error "+
"response: %v", err)
select {
case errChan <- err:
default:
}
}
return
}
@ -3810,12 +3853,22 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
PaymentRoute: marshalledRouted,
})
if err != nil {
errChan <- err
rpcsLog.Errorf("Failed sending "+
"response: %v", err)
select {
case errChan <- err:
default:
}
return
}
}()
}
}
// Wait for all goroutines to finish before closing the stream.
wg.Wait()
return nil
}
// SendPaymentSync is the synchronous non-streaming version of SendPayment.