From 2562fdbddaadae369dd43c546e90cc315b421106 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 9 Jul 2019 11:09:19 +0200 Subject: [PATCH] lnd+rpcserver: extract listener setup into closure --- lnd.go | 102 +++++++++++++++++++++++++++++++++++++-------------- rpcserver.go | 56 ++++++++++++++-------------- 2 files changed, 103 insertions(+), 55 deletions(-) diff --git a/lnd.go b/lnd.go index ec21f4a9..d550407a 100644 --- a/lnd.go +++ b/lnd.go @@ -93,6 +93,13 @@ var ( } ) +// rpcListeners is a function type used for closures that fetches a set of RPC +// listeners for the current configuration, and the GRPC server options to use +// with these listeners. If no custom listeners are present, this should return +// normal listeners from the RPC endpoints defined in the config, and server +// options specifying TLS. +type rpcListeners func() ([]net.Listener, func(), []grpc.ServerOption, error) + // Main is the true entry point for lnd. This function is required since defers // created in the top-level scope of a main method aren't executed if os.Exit() // is called. @@ -240,13 +247,49 @@ func Main() error { // this information. walletInitParams.Birthday = time.Now() + // getListeners is a closure that creates listeners from the + // RPCListeners defined in the config. It also returns a cleanup + // closure and the server options to use for the GRPC server. + getListeners := func() ([]net.Listener, func(), []grpc.ServerOption, + error) { + + var grpcListeners []net.Listener + for _, grpcEndpoint := range cfg.RPCListeners { + // Start a gRPC server listening for HTTP/2 + // connections. + lis, err := lncfg.ListenOnAddress(grpcEndpoint) + if err != nil { + ltndLog.Errorf("unable to listen on %s", + grpcEndpoint) + return nil, nil, nil, err + } + grpcListeners = append(grpcListeners, lis) + } + + cleanup := func() { + for _, lis := range grpcListeners { + lis.Close() + } + } + return grpcListeners, cleanup, serverOpts, nil + } + + // walletUnlockerListeners is a closure we'll hand to the wallet + // unlocker, that will be called when it needs listeners for its GPRC + // server. + walletUnlockerListeners := func() ([]net.Listener, func(), + []grpc.ServerOption, error) { + + return getListeners() + } + // We wait until the user provides a password over RPC. In case lnd is // started with the --noseedbackup flag, we use the default password // for wallet encryption. if !cfg.NoSeedBackup { params, err := waitForWalletPassword( - cfg.RPCListeners, cfg.RESTListeners, serverOpts, - restDialOpts, restProxyDest, tlsCfg, + cfg.RESTListeners, restDialOpts, restProxyDest, tlsCfg, + walletUnlockerListeners, ) if err != nil { err := fmt.Errorf("Unable to set up wallet password "+ @@ -457,12 +500,20 @@ func Main() error { } defer atplManager.Stop() + // rpcListeners is a closure we'll hand to the rpc server, that will be + // called when it needs listeners for its GPRC server. + rpcListeners := func() ([]net.Listener, func(), []grpc.ServerOption, + error) { + + return getListeners() + } + // Initialize, and register our implementation of the gRPC interface // exported by the rpcServer. rpcServer, err := newRPCServer( - server, macaroonService, cfg.SubRPCServers, serverOpts, - restDialOpts, restProxyDest, atplManager, server.invoices, - tower, tlsCfg, + server, macaroonService, cfg.SubRPCServers, restDialOpts, + restProxyDest, atplManager, server.invoices, tower, tlsCfg, + rpcListeners, ) if err != nil { err := fmt.Errorf("Unable to create RPC server: %v", err) @@ -880,9 +931,18 @@ type WalletUnlockParams struct { // waitForWalletPassword will spin up gRPC and REST endpoints for the // WalletUnlocker server, and block until a password is provided by // the user to this RPC server. -func waitForWalletPassword(grpcEndpoints, restEndpoints []net.Addr, - serverOpts []grpc.ServerOption, restDialOpts []grpc.DialOption, - restProxyDest string, tlsConf *tls.Config) (*WalletUnlockParams, error) { +func waitForWalletPassword(restEndpoints []net.Addr, + restDialOpts []grpc.DialOption, restProxyDest string, + tlsConf *tls.Config, getListeners rpcListeners) ( + *WalletUnlockParams, error) { + + // Start a gRPC server listening for HTTP/2 connections, solely used + // for getting the encryption password from the client. + listeners, cleanup, serverOpts, err := getListeners() + if err != nil { + return nil, err + } + defer cleanup() // Set up a new PasswordService, which will listen for passwords // provided over RPC. @@ -911,28 +971,14 @@ func waitForWalletPassword(grpcEndpoints, restEndpoints []net.Addr, // password is the last thing to be printed to the console. var wg sync.WaitGroup - for _, grpcEndpoint := range grpcEndpoints { - // Start a gRPC server listening for HTTP/2 connections, solely - // used for getting the encryption password from the client. - lis, err := lncfg.ListenOnAddress(grpcEndpoint) - if err != nil { - ltndLog.Errorf( - "password RPC server unable to listen on %s", - grpcEndpoint, - ) - return nil, err - } - defer lis.Close() - + for _, lis := range listeners { wg.Add(1) - go func() { - rpcsLog.Infof( - "password RPC server listening on %s", - lis.Addr(), - ) + go func(lis net.Listener) { + rpcsLog.Infof("password RPC server listening on %s", + lis.Addr()) wg.Done() grpcServer.Serve(lis) - }() + }(lis) } // Start a REST proxy for our gRPC server above. @@ -942,7 +988,7 @@ func waitForWalletPassword(grpcEndpoints, restEndpoints []net.Addr, mux := proxy.NewServeMux() - err := lnrpc.RegisterWalletUnlockerHandlerFromEndpoint( + err = lnrpc.RegisterWalletUnlockerHandlerFromEndpoint( ctx, mux, restProxyDest, restDialOpts, ) if err != nil { diff --git a/rpcserver.go b/rpcserver.go index d362bbcc..e11a13f4 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "math" + "net" "net/http" "sort" "strings" @@ -409,6 +410,11 @@ type rpcServer struct { // requests from. grpcServer *grpc.Server + // listeners is a list of listeners to use when starting the grpc + // server. We make it configurable such that the grpc server can listen + // on custom interfaces. + listeners []net.Listener + // listenerCleanUp are a set of closures functions that will allow this // main RPC server to clean up all the listening socket created for the // server. @@ -442,10 +448,10 @@ var _ lnrpc.LightningServer = (*rpcServer)(nil) // base level options passed to the grPC server. This typically includes things // like requiring TLS, etc. func newRPCServer(s *server, macService *macaroons.Service, - subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption, - restDialOpts []grpc.DialOption, restProxyDest string, - atpl *autopilot.Manager, invoiceRegistry *invoices.InvoiceRegistry, - tower *watchtower.Standalone, tlsCfg *tls.Config) (*rpcServer, error) { + subServerCgs *subRPCServerConfigs, restDialOpts []grpc.DialOption, + restProxyDest string, atpl *autopilot.Manager, + invoiceRegistry *invoices.InvoiceRegistry, tower *watchtower.Standalone, + tlsCfg *tls.Config, getListeners rpcListeners) (*rpcServer, error) { // Set up router rpc backend. channelGraph := s.chanDB.ChannelGraph() @@ -570,6 +576,12 @@ func newRPCServer(s *server, macService *macaroons.Service, strmInterceptors, errorLogStreamServerInterceptor(rpcsLog), ) + // Get the listeners and server options to use for this rpc server. + listeners, cleanup, serverOpts, err := getListeners() + if err != nil { + return nil, err + } + // If any interceptors have been set up, add them to the server options. if len(unaryInterceptors) != 0 && len(strmInterceptors) != 0 { chainedUnary := grpc_middleware.WithUnaryServerChain( @@ -585,14 +597,16 @@ func newRPCServer(s *server, macService *macaroons.Service, // gRPC server, and register the main lnrpc server along side. grpcServer := grpc.NewServer(serverOpts...) rootRPCServer := &rpcServer{ - restDialOpts: restDialOpts, - restProxyDest: restProxyDest, - subServers: subServers, - tlsCfg: tlsCfg, - grpcServer: grpcServer, - server: s, - routerBackend: routerBackend, - quit: make(chan struct{}, 1), + restDialOpts: restDialOpts, + listeners: listeners, + listenerCleanUp: []func(){cleanup}, + restProxyDest: restProxyDest, + subServers: subServers, + tlsCfg: tlsCfg, + grpcServer: grpcServer, + server: s, + routerBackend: routerBackend, + quit: make(chan struct{}, 1), } lnrpc.RegisterLightningServer(grpcServer, rootRPCServer) @@ -632,23 +646,11 @@ func (r *rpcServer) Start() error { // With all the sub-servers started, we'll spin up the listeners for // the main RPC server itself. - for _, listener := range cfg.RPCListeners { - lis, err := lncfg.ListenOnAddress(listener) - if err != nil { - ltndLog.Errorf( - "RPC server unable to listen on %s", listener, - ) - return err - } - - r.listenerCleanUp = append(r.listenerCleanUp, func() { - lis.Close() - }) - - go func() { + for _, lis := range r.listeners { + go func(lis net.Listener) { rpcsLog.Infof("RPC server listening on %s", lis.Addr()) r.grpcServer.Serve(lis) - }() + }(lis) } // If Prometheus monitoring is enabled, start the Prometheus exporter.