diff --git a/lnd.go b/lnd.go index 14016b9e..8aa31739 100644 --- a/lnd.go +++ b/lnd.go @@ -45,6 +45,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/btcwallet" "github.com/lightningnetwork/lnd/macaroons" + "github.com/lightningnetwork/lnd/monitoring" "github.com/lightningnetwork/lnd/rpcperms" "github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/tor" @@ -185,13 +186,6 @@ type ListenerCfg struct { ExternalRestRegistrar RestRegistrar } -// rpcListeners is a function type used for closures that fetches a set of RPC -// listeners for the current configuration. If no custom listeners are present, -// this should return normal listeners from the RPC endpoints defined in the -// config. The second return value us a closure that will close the fetched -// listeners. -type rpcListeners func() ([]*ListenerWithSignal, func(), error) - // Main is the true entry point for lnd. It accepts a fully populated and // validated main configuration struct and an optional listener config struct. // This function starts all main system components then blocks until a signal @@ -280,22 +274,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { defer cleanUp() - // We use the first RPC listener as the destination for our REST proxy. - // If the listener is set to listen on all interfaces, we replace it - // with localhost, as we cannot dial it directly. - restProxyDest := cfg.RPCListeners[0].String() - switch { - case strings.Contains(restProxyDest, "0.0.0.0"): - restProxyDest = strings.Replace( - restProxyDest, "0.0.0.0", "127.0.0.1", 1, - ) - - case strings.Contains(restProxyDest, "[::]"): - restProxyDest = strings.Replace( - restProxyDest, "[::]", "[::1]", 1, - ) - } - // Before starting the wallet, we'll create and start our Neutrino // light client instance, if enabled, in order to allow it to sync // while the rest of the daemon continues startup. @@ -320,7 +298,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { var ( walletInitParams WalletUnlockParams - shutdownUnlocker = func() {} privateWalletPw = lnwallet.DefaultPrivatePassphrase publicWalletPw = lnwallet.DefaultPublicPassphrase ) @@ -330,11 +307,14 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { // this information. walletInitParams.Birthday = time.Now() - // getListeners is a closure that creates listeners from the - // RPCListeners defined in the config. It also returns a cleanup - // closure and the server options to use for the GRPC server. - getListeners := func() ([]*ListenerWithSignal, func(), error) { - var grpcListeners []*ListenerWithSignal + // If we have chosen to start with a dedicated listener for the + // rpc server, we set it directly. + var grpcListeners []*ListenerWithSignal + if lisCfg.RPCListener != nil { + grpcListeners = []*ListenerWithSignal{lisCfg.RPCListener} + } else { + // Otherwise we create listeners from the RPCListeners defined + // in the config. for _, grpcEndpoint := range cfg.RPCListeners { // Start a gRPC server listening for HTTP/2 // connections. @@ -342,38 +322,16 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { if err != nil { ltndLog.Errorf("unable to listen on %s", grpcEndpoint) - return nil, nil, err + return err } + defer lis.Close() + grpcListeners = append( grpcListeners, &ListenerWithSignal{ Listener: lis, Ready: make(chan struct{}), }) } - - cleanup := func() { - for _, lis := range grpcListeners { - lis.Close() - } - } - return grpcListeners, cleanup, nil - } - - // walletUnlockerListeners is a closure we'll hand to the wallet - // unlocker, that will be called when it needs listeners for its GPRC - // server. - walletUnlockerListeners := func() ([]*ListenerWithSignal, func(), - error) { - - // If we have chosen to start with a dedicated listener for the - // wallet unlocker, we return it directly. - if lisCfg.WalletUnlocker != nil { - return []*ListenerWithSignal{lisCfg.WalletUnlocker}, - func() {}, nil - } - - // Otherwise we'll return the regular listeners. - return getListeners() } // Create a new RPC interceptor chain that we'll add to the GRPC @@ -385,14 +343,50 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { rpcServerOpts := interceptorChain.CreateServerOpts() serverOpts = append(serverOpts, rpcServerOpts...) + grpcServer := grpc.NewServer(serverOpts...) + defer grpcServer.Stop() + + // We'll create the WalletUnlockerService and register this with the + // GRPC server. + pwService := createWalletUnlockerService(cfg) + lnrpc.RegisterWalletUnlockerServer(grpcServer, pwService) + + // Initialize, and register our implementation of the gRPC interface + // exported by the rpcServer. + rpcServer := newRPCServer( + cfg, interceptorChain, lisCfg.ExternalRPCSubserverCfg, + lisCfg.ExternalRestRegistrar, + ) + + err = rpcServer.RegisterWithGrpcServer(grpcServer) + if err != nil { + return err + } + + // Now that both the WalletUnlocker and LightningService have been + // registered with the GRPC server, we can start listening. + err = startGrpcListen(cfg, grpcServer, grpcListeners) + if err != nil { + return err + } + + // Now start the REST proxy for our gRPC server above. We'll ensure + // we direct LND to connect to its loopback address rather than a + // wildcard to prevent certificate issues when accessing the proxy + // externally. + stopProxy, err := startRestProxy( + cfg, rpcServer, restDialOpts, restListen, + ) + if err != nil { + return err + } + defer stopProxy() + // We wait until the user provides a password over RPC. In case lnd is // started with the --noseedbackup flag, we use the default password // for wallet encryption. if !cfg.NoSeedBackup { - params, shutdown, err := waitForWalletPassword( - cfg, cfg.RESTListeners, serverOpts, restDialOpts, - restProxyDest, restListen, walletUnlockerListeners, - ) + params, err := waitForWalletPassword(cfg, pwService) if err != nil { err := fmt.Errorf("unable to set up wallet password "+ "listeners: %v", err) @@ -401,7 +395,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { } walletInitParams = *params - shutdownUnlocker = shutdown privateWalletPw = walletInitParams.Password publicWalletPw = walletInitParams.Password defer func() { @@ -509,10 +502,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { interceptorChain.AddMacaroonService(macaroonService) } - // Now we're definitely done with the unlocker, shut it down so we can - // start the main RPC service later. - shutdownUnlocker() - // With the information parsed from the configuration, create valid // instances of the pertinent interfaces required to operate the // Lightning Network Daemon. @@ -732,31 +721,14 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { } defer atplManager.Stop() - // rpcListeners is a closure we'll hand to the rpc server, that will be - // called when it needs listeners for its GPRC server. - rpcListeners := func() ([]*ListenerWithSignal, func(), error) { - // If we have chosen to start with a dedicated listener for the - // rpc server, we return it directly. - if lisCfg.RPCListener != nil { - return []*ListenerWithSignal{lisCfg.RPCListener}, - func() {}, nil - } - - // Otherwise we'll return the regular listeners. - return getListeners() - } - - // Initialize, and register our implementation of the gRPC interface - // exported by the rpcServer. - rpcServer, err := newRPCServer( - cfg, server, macaroonService, cfg.SubRPCServers, serverOpts, - restDialOpts, restProxyDest, atplManager, server.invoices, - tower, restListen, rpcListeners, chainedAcceptor, - interceptorChain, lisCfg.ExternalRPCSubserverCfg, - lisCfg.ExternalRestRegistrar, + // Now we have created all dependencies necessary to populate and + // start the RPC server. + err = rpcServer.addDeps( + server, macaroonService, cfg.SubRPCServers, atplManager, + server.invoices, tower, chainedAcceptor, ) if err != nil { - err := fmt.Errorf("unable to create RPC server: %v", err) + err := fmt.Errorf("unable to add deps to RPC server: %v", err) ltndLog.Error(err) return err } @@ -1141,14 +1113,9 @@ type WalletUnlockParams struct { MacResponseChan chan []byte } -// waitForWalletPassword will spin up gRPC and REST endpoints for the -// WalletUnlocker server, and block until a password is provided by -// the user to this RPC server. -func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, - serverOpts []grpc.ServerOption, restDialOpts []grpc.DialOption, - restProxyDest string, restListen func(net.Addr) (net.Listener, error), - getListeners rpcListeners) (*WalletUnlockParams, func(), error) { - +// createWalletUnlockerService creates a WalletUnlockerService from the passed +// config. +func createWalletUnlockerService(cfg *Config) *walletunlocker.UnlockerService { chainConfig := cfg.Bitcoin if cfg.registeredChains.PrimaryChain() == chainreg.LitecoinChain { chainConfig = cfg.Litecoin @@ -1160,36 +1127,16 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, macaroonFiles := []string{ cfg.AdminMacPath, cfg.ReadMacPath, cfg.InvoiceMacPath, } - pwService := walletunlocker.New( + return walletunlocker.New( chainConfig.ChainDir, cfg.ActiveNetParams.Params, !cfg.SyncFreelist, macaroonFiles, cfg.DB.Bolt.DBTimeout, cfg.ResetWalletTransactions, ) +} - // Set up a new PasswordService, which will listen for passwords - // provided over RPC. - grpcServer := grpc.NewServer(serverOpts...) - lnrpc.RegisterWalletUnlockerServer(grpcServer, pwService) - - var shutdownFuncs []func() - shutdown := func() { - // Make sure nothing blocks on reading on the macaroon channel, - // otherwise the GracefulStop below will never return. - close(pwService.MacResponseChan) - - for _, shutdownFn := range shutdownFuncs { - shutdownFn() - } - } - shutdownFuncs = append(shutdownFuncs, grpcServer.GracefulStop) - - // Start a gRPC server listening for HTTP/2 connections, solely used - // for getting the encryption password from the client. - listeners, cleanup, err := getListeners() - if err != nil { - return nil, shutdown, err - } - shutdownFuncs = append(shutdownFuncs, cleanup) +// startGrpcListen starts the GRPC server on the passed listeners. +func startGrpcListen(cfg *Config, grpcServer *grpc.Server, + listeners []*ListenerWithSignal) error { // Use a WaitGroup so we can be sure the instructions on how to input the // password is the last thing to be printed to the console. @@ -1198,8 +1145,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, for _, lis := range listeners { wg.Add(1) go func(lis *ListenerWithSignal) { - rpcsLog.Infof("Password RPC server listening on %s", - lis.Addr()) + rpcsLog.Infof("RPC server listening on %s", lis.Addr()) // Close the ready chan to indicate we are listening. close(lis.Ready) @@ -1209,29 +1155,102 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, }(lis) } - // Start a REST proxy for our gRPC server above. + // If Prometheus monitoring is enabled, start the Prometheus exporter. + if cfg.Prometheus.Enabled() { + err := monitoring.ExportPrometheusMetrics( + grpcServer, cfg.Prometheus, + ) + if err != nil { + return err + } + } + + // Wait for gRPC servers to be up running. + wg.Wait() + + return nil +} + +// startRestProxy starts the given REST proxy on the listeners found in the +// config. +func startRestProxy(cfg *Config, rpcServer *rpcServer, restDialOpts []grpc.DialOption, + restListen func(net.Addr) (net.Listener, error)) (func(), error) { + + // We use the first RPC listener as the destination for our REST proxy. + // If the listener is set to listen on all interfaces, we replace it + // with localhost, as we cannot dial it directly. + restProxyDest := cfg.RPCListeners[0].String() + switch { + case strings.Contains(restProxyDest, "0.0.0.0"): + restProxyDest = strings.Replace( + restProxyDest, "0.0.0.0", "127.0.0.1", 1, + ) + + case strings.Contains(restProxyDest, "[::]"): + restProxyDest = strings.Replace( + restProxyDest, "[::]", "[::1]", 1, + ) + } + + var shutdownFuncs []func() + shutdown := func() { + for _, shutdownFn := range shutdownFuncs { + shutdownFn() + } + } + + // Start a REST proxy for our gRPC server. ctx := context.Background() ctx, cancel := context.WithCancel(ctx) shutdownFuncs = append(shutdownFuncs, cancel) - mux := proxy.NewServeMux() + // We'll set up a proxy that will forward REST calls to the GRPC + // server. + // + // The default JSON marshaler of the REST proxy only sets OrigName to + // true, which instructs it to use the same field names as specified in + // the proto file and not switch to camel case. What we also want is + // that the marshaler prints all values, even if they are falsey. + customMarshalerOption := proxy.WithMarshalerOption( + proxy.MIMEWildcard, &proxy.JSONPb{ + OrigName: true, + EmitDefaults: true, + }, + ) + mux := proxy.NewServeMux(customMarshalerOption) - err = lnrpc.RegisterWalletUnlockerHandlerFromEndpoint( + // Register both services with the REST proxy. + err := lnrpc.RegisterWalletUnlockerHandlerFromEndpoint( ctx, mux, restProxyDest, restDialOpts, ) if err != nil { - return nil, shutdown, err + return nil, err } - srv := &http.Server{Handler: allowCORS(mux, cfg.RestCORS)} + err = rpcServer.RegisterWithRestProxy( + ctx, mux, restDialOpts, restProxyDest, + ) + if err != nil { + return nil, err + } - for _, restEndpoint := range restEndpoints { + // Wrap the default grpc-gateway handler with the WebSocket handler. + restHandler := lnrpc.NewWebSocketProxy(mux, rpcsLog) + + // Use a WaitGroup so we can be sure the instructions on how to input the + // password is the last thing to be printed to the console. + var wg sync.WaitGroup + + // Now spin up a network listener for each requested port and start a + // goroutine that serves REST with the created mux there. + for _, restEndpoint := range cfg.RESTListeners { lis, err := restListen(restEndpoint) if err != nil { - ltndLog.Errorf("Password gRPC proxy unable to listen "+ - "on %s", restEndpoint) - return nil, shutdown, err + ltndLog.Errorf("gRPC proxy unable to listen on %s", + restEndpoint) + return nil, err } + shutdownFuncs = append(shutdownFuncs, func() { err := lis.Close() if err != nil { @@ -1242,16 +1261,38 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, wg.Add(1) go func() { - rpcsLog.Infof("Password gRPC proxy started at %s", - lis.Addr()) + rpcsLog.Infof("gRPC proxy started at %s", lis.Addr()) + + // Create our proxy chain now. A request will pass + // through the following chain: + // req ---> CORS handler --> WS proxy ---> + // REST proxy --> gRPC endpoint + corsHandler := allowCORS(restHandler, cfg.RestCORS) + wg.Done() - _ = srv.Serve(lis) + err := http.Serve(lis, corsHandler) + if err != nil && !lnrpc.IsClosedConnError(err) { + rpcsLog.Error(err) + } }() } - // Wait for gRPC and REST servers to be up running. + // Wait for REST servers to be up running. wg.Wait() + return shutdown, nil +} + +// waitForWalletPassword blocks until a password is provided by the user to +// this RPC server. +func waitForWalletPassword(cfg *Config, + pwService *walletunlocker.UnlockerService) (*WalletUnlockParams, error) { + + chainConfig := cfg.Bitcoin + if cfg.registeredChains.PrimaryChain() == chainreg.LitecoinChain { + chainConfig = cfg.Litecoin + } + // Wait for user to provide the password. ltndLog.Infof("Waiting for wallet encryption password. Use `lncli " + "create` to create a wallet, `lncli unlock` to unlock an " + @@ -1276,7 +1317,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, // version, then we'll return an error as we don't understand // this. if cipherSeed.InternalVersion != keychain.KeyDerivationVersion { - return nil, shutdown, fmt.Errorf("invalid internal "+ + return nil, fmt.Errorf("invalid internal "+ "seed version %v, current version is %v", cipherSeed.InternalVersion, keychain.KeyDerivationVersion) @@ -1303,7 +1344,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, ltndLog.Errorf("Could not unload new "+ "wallet: %v", err) } - return nil, shutdown, err + return nil, err } // For new wallets, the ResetWalletTransactions flag is a no-op. @@ -1321,7 +1362,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, UnloadWallet: loader.UnloadWallet, StatelessInit: initMsg.StatelessInit, MacResponseChan: pwService.MacResponseChan, - }, shutdown, nil + }, nil // The wallet has already been created in the past, and is simply being // unlocked. So we'll just return these passphrases. @@ -1345,10 +1386,10 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr, UnloadWallet: unlockMsg.UnloadWallet, StatelessInit: unlockMsg.StatelessInit, MacResponseChan: pwService.MacResponseChan, - }, shutdown, nil + }, nil case <-signal.ShutdownChannel(): - return nil, shutdown, fmt.Errorf("shutting down") + return nil, fmt.Errorf("shutting down") } } diff --git a/rpcserver.go b/rpcserver.go index 0d355e00..123da961 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "math" - "net" "net/http" "runtime" "sort" @@ -60,7 +59,6 @@ import ( "github.com/lightningnetwork/lnd/lnwallet/chanfunding" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/macaroons" - "github.com/lightningnetwork/lnd/monitoring" "github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/record" @@ -495,33 +493,6 @@ type rpcServer struct { subServers []lnrpc.SubServer subGrpcHandlers []lnrpc.GrpcHandler - // grpcServer is the main gRPC server that this RPC server, and all the - // sub-servers will use to register themselves and accept client - // requests from. - grpcServer *grpc.Server - - // listeners is a list of listeners to use when starting the grpc - // server. We make it configurable such that the grpc server can listen - // on custom interfaces. - listeners []*ListenerWithSignal - - // listenerCleanUp are a set of closures functions that will allow this - // main RPC server to clean up all the listening socket created for the - // server. - listenerCleanUp []func() - - // restDialOpts are a set of gRPC dial options that the REST server - // proxy will use to connect to the main gRPC server. - restDialOpts []grpc.DialOption - - // restProxyDest is the address to forward REST requests to. - restProxyDest string - - // restListen is a function closure that allows the REST server proxy to - // connect to the main gRPC server to proxy all incoming requests, - // applying the current TLS configuration, if any. - restListen func(net.Addr) (net.Listener, error) - // routerBackend contains the backend implementation of the router // rpc sub server. routerBackend *routerrpc.RouterBackend @@ -555,27 +526,48 @@ type rpcServer struct { // LightningServer gRPC service. var _ lnrpc.LightningServer = (*rpcServer)(nil) -// newRPCServer creates and returns a new instance of the rpcServer. The -// rpcServer will handle creating all listening sockets needed by it, and any -// of the sub-servers that it maintains. The set of serverOpts should be the -// base level options passed to the grPC server. This typically includes things -// like requiring TLS, etc. -func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, - subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption, - restDialOpts []grpc.DialOption, restProxyDest string, - atpl *autopilot.Manager, invoiceRegistry *invoices.InvoiceRegistry, - tower *watchtower.Standalone, - restListen func(net.Addr) (net.Listener, error), - getListeners rpcListeners, chanPredicate *chanacceptor.ChainedAcceptor, - interceptorChain *rpcperms.InterceptorChain, - extSubserverCfg *RPCSubserverConfig, extRestRegistrar RestRegistrar) ( - *rpcServer, error) { +// newRPCServer creates and returns a new instance of the rpcServer. Before +// dependencies are added, this will be an non-functioning RPC server only to +// be used to register the LightningService with the gRPC server. +func newRPCServer(cfg *Config, interceptorChain *rpcperms.InterceptorChain, + extSubserverCfg *RPCSubserverConfig, + extRestRegistrar RestRegistrar) *rpcServer { + + // We go trhough the list of registered sub-servers, and create a gRPC + // handler for each. These are used to register with the gRPC server + // before all dependencies are available. + registeredSubServers := lnrpc.RegisteredSubServers() + + var subServerHandlers []lnrpc.GrpcHandler + for _, subServer := range registeredSubServers { + subServerHandlers = append( + subServerHandlers, subServer.NewGrpcHandler(), + ) + } + + return &rpcServer{ + cfg: cfg, + subGrpcHandlers: subServerHandlers, + interceptorChain: interceptorChain, + extSubserverCfg: extSubserverCfg, + extRestRegistrar: extRestRegistrar, + quit: make(chan struct{}, 1), + } +} + +// addDeps populates all dependencies needed by the RPC server, and any +// of the sub-servers that it maintains. When this is done, the RPC server can +// be started, and start accepting RPC calls. +func (r *rpcServer) addDeps(s *server, macService *macaroons.Service, + subServerCgs *subRPCServerConfigs, atpl *autopilot.Manager, + invoiceRegistry *invoices.InvoiceRegistry, tower *watchtower.Standalone, + chanPredicate *chanacceptor.ChainedAcceptor) error { // Set up router rpc backend. channelGraph := s.localChanDB.ChannelGraph() selfNode, err := channelGraph.SourceNode() if err != nil { - return nil, err + return err } graph := s.localChanDB.ChannelGraph() routerBackend := &routerrpc.RouterBackend{ @@ -606,10 +598,10 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, }, FindRoute: s.chanRouter.FindRoute, MissionControl: s.missionControl, - ActiveNetParams: cfg.ActiveNetParams.Params, + ActiveNetParams: r.cfg.ActiveNetParams.Params, Tower: s.controlTower, - MaxTotalTimelock: cfg.MaxOutgoingCltvExpiry, - DefaultFinalCltvDelta: uint16(cfg.Bitcoin.TimeLockDelta), + MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry, + DefaultFinalCltvDelta: uint16(r.cfg.Bitcoin.TimeLockDelta), SubscribeHtlcEvents: s.htlcNotifier.SubscribeHtlcEvents, InterceptableForwarder: s.interceptableSwitch, SetChannelEnabled: func(outpoint wire.OutPoint) error { @@ -626,9 +618,8 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, } var ( - subServers []lnrpc.SubServer - subGrpcHandlers []lnrpc.GrpcHandler - subServerPerms []lnrpc.MacaroonPerms + subServers []lnrpc.SubServer + subServerPerms []lnrpc.MacaroonPerms ) // Before we create any of the sub-servers, we need to ensure that all @@ -637,31 +628,30 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, // // TODO(roasbeef): extend sub-sever config to have both (local vs remote) DB err = subServerCgs.PopulateDependencies( - cfg, s.cc, cfg.networkDir, macService, atpl, invoiceRegistry, - s.htlcSwitch, cfg.ActiveNetParams.Params, s.chanRouter, + r.cfg, s.cc, r.cfg.networkDir, macService, atpl, invoiceRegistry, + s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter, routerBackend, s.nodeSigner, s.localChanDB, s.remoteChanDB, s.sweeper, tower, s.towerClient, s.anchorTowerClient, - cfg.net.ResolveTCPAddr, genInvoiceFeatures, rpcsLog, + r.cfg.net.ResolveTCPAddr, genInvoiceFeatures, rpcsLog, ) if err != nil { - return nil, err + return err } // Now that the sub-servers have all their dependencies in place, we // can create each sub-server! - registeredSubServers := lnrpc.RegisteredSubServers() - for _, driver := range registeredSubServers { - handler := driver.NewGrpcHandler() - subServer, macPerms, err := handler.CreateSubServer(subServerCgs) + for _, subServerInstance := range r.subGrpcHandlers { + subServer, macPerms, err := subServerInstance.CreateSubServer( + subServerCgs, + ) if err != nil { - return nil, err + return err } // We'll collect the sub-server, and also the set of // permissions it needs for macaroons so we can apply the // interceptors below. subServers = append(subServers, subServer) - subGrpcHandlers = append(subGrpcHandlers, handler) subServerPerms = append(subServerPerms, macPerms) } @@ -669,35 +659,29 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, // with the main RPC server permissions so we can unite them under a // single set of interceptors. for m, ops := range MainRPCServerPermissions() { - err := interceptorChain.AddPermission(m, ops) + err := r.interceptorChain.AddPermission(m, ops) if err != nil { - return nil, err + return err } } for _, subServerPerm := range subServerPerms { for method, ops := range subServerPerm { - err := interceptorChain.AddPermission(method, ops) + err := r.interceptorChain.AddPermission(method, ops) if err != nil { - return nil, err + return err } } } - // Get the listeners and server options to use for this rpc server. - listeners, cleanup, err := getListeners() - if err != nil { - return nil, err - } - // External subserver possibly need to register their own permissions // and macaroon validator. - if extSubserverCfg != nil { - macValidator := extSubserverCfg.MacaroonValidator - for method, ops := range extSubserverCfg.Permissions { - err := interceptorChain.AddPermission(method, ops) + if r.extSubserverCfg != nil { + macValidator := r.extSubserverCfg.MacaroonValidator + for method, ops := range r.extSubserverCfg.Permissions { + err := r.interceptorChain.AddPermission(method, ops) if err != nil { - return nil, err + return err } // Give the external subservers the possibility @@ -710,7 +694,7 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, method, macValidator, ) if err != nil { - return nil, fmt.Errorf("could "+ + return fmt.Errorf("could "+ "not register "+ "external macaroon "+ "validator: %v", err) @@ -719,43 +703,50 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, } } - // Finally, with all the pre-set up complete, we can create the main - // gRPC server, and register the main lnrpc server along side. - grpcServer := grpc.NewServer(serverOpts...) - rootRPCServer := &rpcServer{ - cfg: cfg, - restDialOpts: restDialOpts, - listeners: listeners, - listenerCleanUp: []func(){cleanup}, - restProxyDest: restProxyDest, - subServers: subServers, - subGrpcHandlers: subGrpcHandlers, - restListen: restListen, - grpcServer: grpcServer, - server: s, - routerBackend: routerBackend, - chanPredicate: chanPredicate, - quit: make(chan struct{}, 1), - macService: macService, - selfNode: selfNode.PubKeyBytes, - interceptorChain: interceptorChain, - extSubserverCfg: extSubserverCfg, - extRestRegistrar: extRestRegistrar, - } - lnrpc.RegisterLightningServer(grpcServer, rootRPCServer) + // Finally, with all the set up complete, add the last dependencies to + // the rpc server. + r.server = s + r.subServers = subServers + r.routerBackend = routerBackend + r.chanPredicate = chanPredicate + r.macService = macService + r.selfNode = selfNode.PubKeyBytes + return nil +} + +// RegisterWithGrpcServer registers the rpcServer and any subservers with the +// root gRPC server. +func (r *rpcServer) RegisterWithGrpcServer(grpcServer *grpc.Server) error { + // Register the main RPC server. + lnrpc.RegisterLightningServer(grpcServer, r) // Now the main RPC server has been registered, we'll iterate through // all the sub-RPC servers and register them to ensure that requests // are properly routed towards them. - for _, subServer := range subGrpcHandlers { + for _, subServer := range r.subGrpcHandlers { err := subServer.RegisterWithRootServer(grpcServer) if err != nil { - return nil, fmt.Errorf("unable to register "+ + return fmt.Errorf("unable to register "+ "sub-server with root: %v", err) } } - return rootRPCServer, nil + // Before actually listening on the gRPC listener, give external + // subservers the chance to register to our gRPC server. Those external + // subservers (think GrUB) are responsible for starting/stopping on + // their own, we just let them register their services to the same + // server instance so all of them can be exposed on the same + // port/listener. + if r.extSubserverCfg != nil && r.extSubserverCfg.Registrar != nil { + registerer := r.extSubserverCfg.Registrar + err := registerer.RegisterGrpcSubserver(grpcServer) + if err != nil { + rpcsLog.Errorf("error registering external gRPC "+ + "subserver: %v", err) + } + } + + return nil } // Start launches any helper goroutines required for the rpcServer to function. @@ -777,76 +768,27 @@ func (r *rpcServer) Start() error { } } - // Before actually listening on the gRPC listener, give external - // subservers the chance to register to our gRPC server. Those external - // subservers (think GrUB) are responsible for starting/stopping on - // their own, we just let them register their services to the same - // server instance so all of them can be exposed on the same - // port/listener. - if r.extSubserverCfg != nil && r.extSubserverCfg.Registrar != nil { - registerer := r.extSubserverCfg.Registrar - err := registerer.RegisterGrpcSubserver(r.grpcServer) - if err != nil { - rpcsLog.Errorf("error registering external gRPC "+ - "subserver: %v", err) - } - } + return nil +} - // With all the sub-servers started, we'll spin up the listeners for - // the main RPC server itself. - for _, lis := range r.listeners { - go func(lis *ListenerWithSignal) { - rpcsLog.Infof("RPC server listening on %s", lis.Addr()) - - // Close the ready chan to indicate we are listening. - close(lis.Ready) - _ = r.grpcServer.Serve(lis) - }(lis) - } - - // If Prometheus monitoring is enabled, start the Prometheus exporter. - if r.cfg.Prometheus.Enabled() { - err := monitoring.ExportPrometheusMetrics( - r.grpcServer, r.cfg.Prometheus, - ) - if err != nil { - return err - } - } - - // The default JSON marshaler of the REST proxy only sets OrigName to - // true, which instructs it to use the same field names as specified in - // the proto file and not switch to camel case. What we also want is - // that the marshaler prints all values, even if they are falsey. - customMarshalerOption := proxy.WithMarshalerOption( - proxy.MIMEWildcard, &proxy.JSONPb{ - OrigName: true, - EmitDefaults: true, - }, - ) - - // Now start the REST proxy for our gRPC server above. We'll ensure - // we direct LND to connect to its loopback address rather than a - // wildcard to prevent certificate issues when accessing the proxy - // externally. - restMux := proxy.NewServeMux(customMarshalerOption) - restCtx, restCancel := context.WithCancel(context.Background()) - r.listenerCleanUp = append(r.listenerCleanUp, restCancel) - - // Wrap the default grpc-gateway handler with the WebSocket handler. - restHandler := lnrpc.NewWebSocketProxy(restMux, rpcsLog) +// RegisterWithRestProxy registers the RPC server and any subservers with the +// given REST proxy. +func (r *rpcServer) RegisterWithRestProxy(restCtx context.Context, + restMux *proxy.ServeMux, restDialOpts []grpc.DialOption, + restProxyDest string) error { // With our custom REST proxy mux created, register our main RPC and // give all subservers a chance to register as well. err := lnrpc.RegisterLightningHandlerFromEndpoint( - restCtx, restMux, r.restProxyDest, r.restDialOpts, + restCtx, restMux, restProxyDest, restDialOpts, ) if err != nil { return err } + for _, subServer := range r.subGrpcHandlers { err := subServer.RegisterWithRestServer( - restCtx, restMux, r.restProxyDest, r.restDialOpts, + restCtx, restMux, restProxyDest, restDialOpts, ) if err != nil { return fmt.Errorf("unable to register REST sub-server "+ @@ -859,44 +801,13 @@ func (r *rpcServer) Start() error { // with our mux instance. if r.extRestRegistrar != nil { err := r.extRestRegistrar.RegisterRestSubserver( - restCtx, restMux, r.restProxyDest, - r.restDialOpts, + restCtx, restMux, restProxyDest, restDialOpts, ) if err != nil { rpcsLog.Errorf("error registering "+ "external REST subserver: %v", err) } } - - // Now spin up a network listener for each requested port and start a - // goroutine that serves REST with the created mux there. - for _, restEndpoint := range r.cfg.RESTListeners { - lis, err := r.restListen(restEndpoint) - if err != nil { - ltndLog.Errorf("gRPC proxy unable to listen on %s", - restEndpoint) - return err - } - - r.listenerCleanUp = append(r.listenerCleanUp, func() { - _ = lis.Close() - }) - - go func() { - rpcsLog.Infof("gRPC proxy started at %s", lis.Addr()) - - // Create our proxy chain now. A request will pass - // through the following chain: - // req ---> CORS handler --> WS proxy ---> - // REST proxy --> gRPC endpoint - corsHandler := allowCORS(restHandler, r.cfg.RestCORS) - err := http.Serve(lis, corsHandler) - if err != nil && !lnrpc.IsClosedConnError(err) { - rpcsLog.Error(err) - } - }() - } - return nil } @@ -924,12 +835,6 @@ func (r *rpcServer) Stop() error { } } - // Finally, we can clean up all the listening sockets to ensure that we - // give the file descriptors back to the OS. - for _, cleanUp := range r.listenerCleanUp { - cleanUp() - } - return nil }