Merge pull request #5146 from ellemouton/context-aware-rpcserver

rpcserver+lnrpc: make Subscribe RPCs context aware
This commit is contained in:
Olaoluwa Osuntokun 2021-04-22 15:44:00 -07:00 committed by GitHub
commit 045b58891a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 0 deletions

@ -253,6 +253,9 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
return nil return nil
} }
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-s.quit: case <-s.quit:
return nil return nil
} }

@ -269,3 +269,7 @@
<time> [ERR] RPCS: WS: error writing message: websocket: close sent <time> [ERR] RPCS: WS: error writing message: websocket: close sent
<time> [ERR] RPCS: [/routerrpc.Router/XImportMissionControl]: pair: <hex> -> <hex>: invalid failure: msat: <amt> and sat: 0.0000002 BTC values not equal <time> [ERR] RPCS: [/routerrpc.Router/XImportMissionControl]: pair: <hex> -> <hex>: invalid failure: msat: <amt> and sat: 0.0000002 BTC values not equal
<time> [ERR] BTCN: utxo scan failed: neutrino shutting down <time> [ERR] BTCN: utxo scan failed: neutrino shutting down
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeChannelGraph]: context canceled
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeInvoices]: context canceled
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeChannelGraph]: context deadline exceeded
<time> [ERR] RPCS: [/invoicesrpc.Invoices/SubscribeSingleInvoice]: context canceled

@ -2756,6 +2756,10 @@ func (r *rpcServer) SubscribePeerEvents(req *lnrpc.PeerEventSubscription,
if err := eventStream.Send(event); err != nil { if err := eventStream.Send(event); err != nil {
return err return err
} }
case <-eventStream.Context().Done():
return eventStream.Context().Err()
case <-r.quit: case <-r.quit:
return nil return nil
} }
@ -4029,6 +4033,10 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
if err := updateStream.Send(update); err != nil { if err := updateStream.Send(update); err != nil {
return err return err
} }
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit: case <-r.quit:
return nil return nil
} }
@ -4946,6 +4954,9 @@ func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription,
return err return err
} }
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit: case <-r.quit:
return nil return nil
} }
@ -5003,6 +5014,9 @@ func (r *rpcServer) SubscribeTransactions(req *lnrpc.GetTransactionsRequest,
return err return err
} }
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit: case <-r.quit:
return nil return nil
} }
@ -5524,6 +5538,11 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
return err return err
} }
// The context was cancelled so we report a cancellation error
// and exit immediately.
case <-updateStream.Context().Done():
return updateStream.Context().Err()
// The server is quitting, so we'll exit immediately. Returning // The server is quitting, so we'll exit immediately. Returning
// nil will close the clients read end of the stream. // nil will close the clients read end of the stream.
case <-r.quit: case <-r.quit:
@ -6446,6 +6465,9 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
return err return err
} }
case <-updateStream.Context().Done():
return updateStream.Context().Err()
case <-r.quit: case <-r.quit:
return nil return nil
} }