From f20f29696aa82aac0a91ba5bde9956b43a3a0928 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 2 Nov 2018 11:33:01 +0100 Subject: [PATCH 1/4] rpcserver: let sendPayment sendLoop listen for shutdown Intead of checking for shutdown in the receive loop, we let the sendLoop handle it, as it can return the error directly. This works since the returning sendLoop will trigger a close of the `reqQuit` channel, which will ensure the receive loop exits. --- rpcserver.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 11227fe6..ff02ea0d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3696,9 +3696,7 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { select { case <-reqQuit: return - case <-r.quit: - errChan <- nil - return + default: // Receive the next pending payment within the // stream sent by the client. If we read the @@ -3756,6 +3754,9 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { case err := <-errChan: return err + case <-r.quit: + return errors.New("rpc server shutting down") + case payIntent := <-payChan: // We launch a new goroutine to execute the current // payment so we can continue to serve requests while From c49ba0c5cb53bc55d48e374e33f2268c72b54231 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 2 Nov 2018 11:33:01 +0100 Subject: [PATCH 2/4] rpcServer: ensure we don't get blocked on bidirectional payment errors This commit fixes a potential issue in the bidirectional sendPayment case, where multiple goroutines could be sending on an errChan with buffer 1. Instead we select on default as well, as it is enough to handle the first error that was received. --- rpcserver.go | 44 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index ff02ea0d..43f7f762 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3707,10 +3707,12 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { errChan <- nil return } else if err != nil { + rpcsLog.Errorf("Failed receiving from "+ + "stream: %v", err) + select { case errChan <- err: - case <-reqQuit: - return + default: } return } @@ -3728,18 +3730,22 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { PaymentError: err.Error(), PaymentHash: payIntent.rHash[:], }); err != nil { + rpcsLog.Errorf("Failed "+ + "sending on "+ + "stream: %v", err) + select { case errChan <- err: - case <-reqQuit: - return + default: } + return } continue } // If the payment was well formed, then we'll // send to the dispatch goroutine, or exit, - // which ever comes first + // which ever comes first. select { case payChan <- &payIntent: case <-reqQuit: @@ -3751,6 +3757,9 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { for { select { + + // If we encounter and error either during sending or + // receiving, we return directly, closing the stream. case err := <-errChan: return err @@ -3779,7 +3788,13 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { // payment, then we'll return the error to the // user, and terminate. case saveErr != nil: - errChan <- saveErr + rpcsLog.Errorf("Failed dispatching "+ + "payment intent: %v", saveErr) + + select { + case errChan <- saveErr: + default: + } return // If we receive payment error than, instead of @@ -3791,7 +3806,14 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { PaymentHash: payIntent.rHash[:], }) if err != nil { - errChan <- err + rpcsLog.Errorf("Failed "+ + "sending error "+ + "response: %v", err) + + select { + case errChan <- err: + default: + } } return } @@ -3811,7 +3833,13 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { PaymentRoute: marshalledRouted, }) if err != nil { - errChan <- err + rpcsLog.Errorf("Failed sending "+ + "response: %v", err) + + select { + case errChan <- err: + default: + } return } }() From 5b85721c04028f69999a29834cc137437473fe75 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 2 Nov 2018 11:33:01 +0100 Subject: [PATCH 3/4] rpcServer: ensure all payIntents are handled before exiting This commit fixes a problem that could arise when handling a continuous stream of payIntents. We would risk the client sending a set of payment intents and closing the stream, but we wouldn't be sure the sendLoop had read all messages on the `payChan` before exiting with the nil error from the `errChan`. Instead, we close the `payChan` to indicate that no more payments are going to be received. --- rpcserver.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 43f7f762..7dd1ef5e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3704,7 +3704,7 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { // stream, and we can exit normally. nextPayment, err := stream.recv() if err == io.EOF { - errChan <- nil + close(payChan) return } else if err != nil { rpcsLog.Errorf("Failed receiving from "+ @@ -3755,6 +3755,7 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { } }() +sendLoop: for { select { @@ -3766,7 +3767,14 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { case <-r.quit: return errors.New("rpc server shutting down") - case payIntent := <-payChan: + case payIntent, ok := <-payChan: + // If the receive loop is done, we break the send loop + // and wait for the ongoing payments to finish before + // exiting. + if !ok { + break sendLoop + } + // We launch a new goroutine to execute the current // payment so we can continue to serve requests while // this payment is being dispatched. From dc238101183fae60ef5ea1799727ea529160349d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Fri, 2 Nov 2018 11:33:01 +0100 Subject: [PATCH 4/4] rpcServer: wait for all goroutines to finish before exiting Returning from the sendPayments method closes the underlying stream, so we would risk payments still being in flight when returning. We add the running goroutines to a waitgroup, such that we can wait for them all to exit before exiting and closing the stream. In case an error is encountered in the process, we will return directly, which will close the `reqQuit` channel and prompt the gorpoutines to shut down. --- rpcserver.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 7dd1ef5e..35eec952 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3680,18 +3680,23 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { htlcSema <- struct{}{} } + // We keep track of the running goroutines and set up a quit signal we + // can use to request them to exit if the method returns because of an + // encountered error. + var wg sync.WaitGroup + reqQuit := make(chan struct{}) + defer close(reqQuit) + // Launch a new goroutine to handle reading new payment requests from // the client. This way we can handle errors independently of blocking // and waiting for the next payment request to come through. - reqQuit := make(chan struct{}) - defer func() { - close(reqQuit) - }() - // TODO(joostjager): Callers expect result to come in in the same order // as the request were sent, but this is far from guarantueed in the // code below. + wg.Add(1) go func() { + defer wg.Done() + for { select { case <-reqQuit: @@ -3778,11 +3783,18 @@ sendLoop: // We launch a new goroutine to execute the current // payment so we can continue to serve requests while // this payment is being dispatched. + wg.Add(1) go func() { + defer wg.Done() + // Attempt to grab a free semaphore slot, using // a defer to eventually release the slot // regardless of payment success. - <-htlcSema + select { + case <-htlcSema: + case <-reqQuit: + return + } defer func() { htlcSema <- struct{}{} }() @@ -3853,6 +3865,10 @@ sendLoop: }() } } + + // Wait for all goroutines to finish before closing the stream. + wg.Wait() + return nil } // SendPaymentSync is the synchronous non-streaming version of SendPayment.