rpcserver: use same grpc server for both services

This commit achieves what we have been building up to: running the
WalletUnlockerService and the LightningService on the same gRPC server
simultaneously!

To achieve this, we first create the RPC server in a "interface only"
way, only creating the struct and setting the dependencies we have
available before the wallet has been unlocked. After the wallet has been
unlocked and we have created all the subsystems we need, we add those to
the RPC server, and start the sub-servers.

This means that the WalletUnlockerService and the LightningService both
will be registered and available at all times on the gRPC server.
However, before the wallet has been unlocked, the LightningService
should not be used since the RPC server is not yet ready to handle the
calls. Similarly, after the wallet has been unlocked, the
WalletUnlockerService should not be used. This we will ensure in
following commits.
This commit is contained in:
Johan T. Halseth 2021-02-02 13:45:29 +01:00
parent 82fb22eda2
commit 4bbf5c4b6d
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 286 additions and 340 deletions

319
lnd.go
View File

@ -45,6 +45,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet" "github.com/lightningnetwork/lnd/lnwallet/btcwallet"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/monitoring"
"github.com/lightningnetwork/lnd/rpcperms" "github.com/lightningnetwork/lnd/rpcperms"
"github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/tor" "github.com/lightningnetwork/lnd/tor"
@ -185,13 +186,6 @@ type ListenerCfg struct {
ExternalRestRegistrar RestRegistrar ExternalRestRegistrar RestRegistrar
} }
// rpcListeners is a function type used for closures that fetches a set of RPC
// listeners for the current configuration. If no custom listeners are present,
// this should return normal listeners from the RPC endpoints defined in the
// config. The second return value us a closure that will close the fetched
// listeners.
type rpcListeners func() ([]*ListenerWithSignal, func(), error)
// Main is the true entry point for lnd. It accepts a fully populated and // Main is the true entry point for lnd. It accepts a fully populated and
// validated main configuration struct and an optional listener config struct. // validated main configuration struct and an optional listener config struct.
// This function starts all main system components then blocks until a signal // This function starts all main system components then blocks until a signal
@ -280,22 +274,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
defer cleanUp() defer cleanUp()
// We use the first RPC listener as the destination for our REST proxy.
// If the listener is set to listen on all interfaces, we replace it
// with localhost, as we cannot dial it directly.
restProxyDest := cfg.RPCListeners[0].String()
switch {
case strings.Contains(restProxyDest, "0.0.0.0"):
restProxyDest = strings.Replace(
restProxyDest, "0.0.0.0", "127.0.0.1", 1,
)
case strings.Contains(restProxyDest, "[::]"):
restProxyDest = strings.Replace(
restProxyDest, "[::]", "[::1]", 1,
)
}
// Before starting the wallet, we'll create and start our Neutrino // Before starting the wallet, we'll create and start our Neutrino
// light client instance, if enabled, in order to allow it to sync // light client instance, if enabled, in order to allow it to sync
// while the rest of the daemon continues startup. // while the rest of the daemon continues startup.
@ -320,7 +298,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
var ( var (
walletInitParams WalletUnlockParams walletInitParams WalletUnlockParams
shutdownUnlocker = func() {}
privateWalletPw = lnwallet.DefaultPrivatePassphrase privateWalletPw = lnwallet.DefaultPrivatePassphrase
publicWalletPw = lnwallet.DefaultPublicPassphrase publicWalletPw = lnwallet.DefaultPublicPassphrase
) )
@ -330,11 +307,14 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
// this information. // this information.
walletInitParams.Birthday = time.Now() walletInitParams.Birthday = time.Now()
// getListeners is a closure that creates listeners from the // If we have chosen to start with a dedicated listener for the
// RPCListeners defined in the config. It also returns a cleanup // rpc server, we set it directly.
// closure and the server options to use for the GRPC server. var grpcListeners []*ListenerWithSignal
getListeners := func() ([]*ListenerWithSignal, func(), error) { if lisCfg.RPCListener != nil {
var grpcListeners []*ListenerWithSignal grpcListeners = []*ListenerWithSignal{lisCfg.RPCListener}
} else {
// Otherwise we create listeners from the RPCListeners defined
// in the config.
for _, grpcEndpoint := range cfg.RPCListeners { for _, grpcEndpoint := range cfg.RPCListeners {
// Start a gRPC server listening for HTTP/2 // Start a gRPC server listening for HTTP/2
// connections. // connections.
@ -342,38 +322,16 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
if err != nil { if err != nil {
ltndLog.Errorf("unable to listen on %s", ltndLog.Errorf("unable to listen on %s",
grpcEndpoint) grpcEndpoint)
return nil, nil, err return err
} }
defer lis.Close()
grpcListeners = append( grpcListeners = append(
grpcListeners, &ListenerWithSignal{ grpcListeners, &ListenerWithSignal{
Listener: lis, Listener: lis,
Ready: make(chan struct{}), Ready: make(chan struct{}),
}) })
} }
cleanup := func() {
for _, lis := range grpcListeners {
lis.Close()
}
}
return grpcListeners, cleanup, nil
}
// walletUnlockerListeners is a closure we'll hand to the wallet
// unlocker, that will be called when it needs listeners for its GPRC
// server.
walletUnlockerListeners := func() ([]*ListenerWithSignal, func(),
error) {
// If we have chosen to start with a dedicated listener for the
// wallet unlocker, we return it directly.
if lisCfg.WalletUnlocker != nil {
return []*ListenerWithSignal{lisCfg.WalletUnlocker},
func() {}, nil
}
// Otherwise we'll return the regular listeners.
return getListeners()
} }
// Create a new RPC interceptor chain that we'll add to the GRPC // Create a new RPC interceptor chain that we'll add to the GRPC
@ -385,14 +343,50 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
rpcServerOpts := interceptorChain.CreateServerOpts() rpcServerOpts := interceptorChain.CreateServerOpts()
serverOpts = append(serverOpts, rpcServerOpts...) serverOpts = append(serverOpts, rpcServerOpts...)
grpcServer := grpc.NewServer(serverOpts...)
defer grpcServer.Stop()
// We'll create the WalletUnlockerService and register this with the
// GRPC server.
pwService := createWalletUnlockerService(cfg)
lnrpc.RegisterWalletUnlockerServer(grpcServer, pwService)
// Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer.
rpcServer := newRPCServer(
cfg, interceptorChain, lisCfg.ExternalRPCSubserverCfg,
lisCfg.ExternalRestRegistrar,
)
err = rpcServer.RegisterWithGrpcServer(grpcServer)
if err != nil {
return err
}
// Now that both the WalletUnlocker and LightningService have been
// registered with the GRPC server, we can start listening.
err = startGrpcListen(cfg, grpcServer, grpcListeners)
if err != nil {
return err
}
// Now start the REST proxy for our gRPC server above. We'll ensure
// we direct LND to connect to its loopback address rather than a
// wildcard to prevent certificate issues when accessing the proxy
// externally.
stopProxy, err := startRestProxy(
cfg, rpcServer, restDialOpts, restListen,
)
if err != nil {
return err
}
defer stopProxy()
// We wait until the user provides a password over RPC. In case lnd is // We wait until the user provides a password over RPC. In case lnd is
// started with the --noseedbackup flag, we use the default password // started with the --noseedbackup flag, we use the default password
// for wallet encryption. // for wallet encryption.
if !cfg.NoSeedBackup { if !cfg.NoSeedBackup {
params, shutdown, err := waitForWalletPassword( params, err := waitForWalletPassword(cfg, pwService)
cfg, cfg.RESTListeners, serverOpts, restDialOpts,
restProxyDest, restListen, walletUnlockerListeners,
)
if err != nil { if err != nil {
err := fmt.Errorf("unable to set up wallet password "+ err := fmt.Errorf("unable to set up wallet password "+
"listeners: %v", err) "listeners: %v", err)
@ -401,7 +395,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
} }
walletInitParams = *params walletInitParams = *params
shutdownUnlocker = shutdown
privateWalletPw = walletInitParams.Password privateWalletPw = walletInitParams.Password
publicWalletPw = walletInitParams.Password publicWalletPw = walletInitParams.Password
defer func() { defer func() {
@ -509,10 +502,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
interceptorChain.AddMacaroonService(macaroonService) interceptorChain.AddMacaroonService(macaroonService)
} }
// Now we're definitely done with the unlocker, shut it down so we can
// start the main RPC service later.
shutdownUnlocker()
// With the information parsed from the configuration, create valid // With the information parsed from the configuration, create valid
// instances of the pertinent interfaces required to operate the // instances of the pertinent interfaces required to operate the
// Lightning Network Daemon. // Lightning Network Daemon.
@ -732,31 +721,14 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
} }
defer atplManager.Stop() defer atplManager.Stop()
// rpcListeners is a closure we'll hand to the rpc server, that will be // Now we have created all dependencies necessary to populate and
// called when it needs listeners for its GPRC server. // start the RPC server.
rpcListeners := func() ([]*ListenerWithSignal, func(), error) { err = rpcServer.addDeps(
// If we have chosen to start with a dedicated listener for the server, macaroonService, cfg.SubRPCServers, atplManager,
// rpc server, we return it directly. server.invoices, tower, chainedAcceptor,
if lisCfg.RPCListener != nil {
return []*ListenerWithSignal{lisCfg.RPCListener},
func() {}, nil
}
// Otherwise we'll return the regular listeners.
return getListeners()
}
// Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer.
rpcServer, err := newRPCServer(
cfg, server, macaroonService, cfg.SubRPCServers, serverOpts,
restDialOpts, restProxyDest, atplManager, server.invoices,
tower, restListen, rpcListeners, chainedAcceptor,
interceptorChain, lisCfg.ExternalRPCSubserverCfg,
lisCfg.ExternalRestRegistrar,
) )
if err != nil { if err != nil {
err := fmt.Errorf("unable to create RPC server: %v", err) err := fmt.Errorf("unable to add deps to RPC server: %v", err)
ltndLog.Error(err) ltndLog.Error(err)
return err return err
} }
@ -1141,14 +1113,9 @@ type WalletUnlockParams struct {
MacResponseChan chan []byte MacResponseChan chan []byte
} }
// waitForWalletPassword will spin up gRPC and REST endpoints for the // createWalletUnlockerService creates a WalletUnlockerService from the passed
// WalletUnlocker server, and block until a password is provided by // config.
// the user to this RPC server. func createWalletUnlockerService(cfg *Config) *walletunlocker.UnlockerService {
func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
serverOpts []grpc.ServerOption, restDialOpts []grpc.DialOption,
restProxyDest string, restListen func(net.Addr) (net.Listener, error),
getListeners rpcListeners) (*WalletUnlockParams, func(), error) {
chainConfig := cfg.Bitcoin chainConfig := cfg.Bitcoin
if cfg.registeredChains.PrimaryChain() == chainreg.LitecoinChain { if cfg.registeredChains.PrimaryChain() == chainreg.LitecoinChain {
chainConfig = cfg.Litecoin chainConfig = cfg.Litecoin
@ -1160,36 +1127,16 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
macaroonFiles := []string{ macaroonFiles := []string{
cfg.AdminMacPath, cfg.ReadMacPath, cfg.InvoiceMacPath, cfg.AdminMacPath, cfg.ReadMacPath, cfg.InvoiceMacPath,
} }
pwService := walletunlocker.New( return walletunlocker.New(
chainConfig.ChainDir, cfg.ActiveNetParams.Params, chainConfig.ChainDir, cfg.ActiveNetParams.Params,
!cfg.SyncFreelist, macaroonFiles, cfg.DB.Bolt.DBTimeout, !cfg.SyncFreelist, macaroonFiles, cfg.DB.Bolt.DBTimeout,
cfg.ResetWalletTransactions, cfg.ResetWalletTransactions,
) )
}
// Set up a new PasswordService, which will listen for passwords // startGrpcListen starts the GRPC server on the passed listeners.
// provided over RPC. func startGrpcListen(cfg *Config, grpcServer *grpc.Server,
grpcServer := grpc.NewServer(serverOpts...) listeners []*ListenerWithSignal) error {
lnrpc.RegisterWalletUnlockerServer(grpcServer, pwService)
var shutdownFuncs []func()
shutdown := func() {
// Make sure nothing blocks on reading on the macaroon channel,
// otherwise the GracefulStop below will never return.
close(pwService.MacResponseChan)
for _, shutdownFn := range shutdownFuncs {
shutdownFn()
}
}
shutdownFuncs = append(shutdownFuncs, grpcServer.GracefulStop)
// Start a gRPC server listening for HTTP/2 connections, solely used
// for getting the encryption password from the client.
listeners, cleanup, err := getListeners()
if err != nil {
return nil, shutdown, err
}
shutdownFuncs = append(shutdownFuncs, cleanup)
// Use a WaitGroup so we can be sure the instructions on how to input the // Use a WaitGroup so we can be sure the instructions on how to input the
// password is the last thing to be printed to the console. // password is the last thing to be printed to the console.
@ -1198,8 +1145,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
for _, lis := range listeners { for _, lis := range listeners {
wg.Add(1) wg.Add(1)
go func(lis *ListenerWithSignal) { go func(lis *ListenerWithSignal) {
rpcsLog.Infof("Password RPC server listening on %s", rpcsLog.Infof("RPC server listening on %s", lis.Addr())
lis.Addr())
// Close the ready chan to indicate we are listening. // Close the ready chan to indicate we are listening.
close(lis.Ready) close(lis.Ready)
@ -1209,29 +1155,102 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
}(lis) }(lis)
} }
// Start a REST proxy for our gRPC server above. // If Prometheus monitoring is enabled, start the Prometheus exporter.
if cfg.Prometheus.Enabled() {
err := monitoring.ExportPrometheusMetrics(
grpcServer, cfg.Prometheus,
)
if err != nil {
return err
}
}
// Wait for gRPC servers to be up running.
wg.Wait()
return nil
}
// startRestProxy starts the given REST proxy on the listeners found in the
// config.
func startRestProxy(cfg *Config, rpcServer *rpcServer, restDialOpts []grpc.DialOption,
restListen func(net.Addr) (net.Listener, error)) (func(), error) {
// We use the first RPC listener as the destination for our REST proxy.
// If the listener is set to listen on all interfaces, we replace it
// with localhost, as we cannot dial it directly.
restProxyDest := cfg.RPCListeners[0].String()
switch {
case strings.Contains(restProxyDest, "0.0.0.0"):
restProxyDest = strings.Replace(
restProxyDest, "0.0.0.0", "127.0.0.1", 1,
)
case strings.Contains(restProxyDest, "[::]"):
restProxyDest = strings.Replace(
restProxyDest, "[::]", "[::1]", 1,
)
}
var shutdownFuncs []func()
shutdown := func() {
for _, shutdownFn := range shutdownFuncs {
shutdownFn()
}
}
// Start a REST proxy for our gRPC server.
ctx := context.Background() ctx := context.Background()
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
shutdownFuncs = append(shutdownFuncs, cancel) shutdownFuncs = append(shutdownFuncs, cancel)
mux := proxy.NewServeMux() // We'll set up a proxy that will forward REST calls to the GRPC
// server.
//
// The default JSON marshaler of the REST proxy only sets OrigName to
// true, which instructs it to use the same field names as specified in
// the proto file and not switch to camel case. What we also want is
// that the marshaler prints all values, even if they are falsey.
customMarshalerOption := proxy.WithMarshalerOption(
proxy.MIMEWildcard, &proxy.JSONPb{
OrigName: true,
EmitDefaults: true,
},
)
mux := proxy.NewServeMux(customMarshalerOption)
err = lnrpc.RegisterWalletUnlockerHandlerFromEndpoint( // Register both services with the REST proxy.
err := lnrpc.RegisterWalletUnlockerHandlerFromEndpoint(
ctx, mux, restProxyDest, restDialOpts, ctx, mux, restProxyDest, restDialOpts,
) )
if err != nil { if err != nil {
return nil, shutdown, err return nil, err
} }
srv := &http.Server{Handler: allowCORS(mux, cfg.RestCORS)} err = rpcServer.RegisterWithRestProxy(
ctx, mux, restDialOpts, restProxyDest,
)
if err != nil {
return nil, err
}
for _, restEndpoint := range restEndpoints { // Wrap the default grpc-gateway handler with the WebSocket handler.
restHandler := lnrpc.NewWebSocketProxy(mux, rpcsLog)
// Use a WaitGroup so we can be sure the instructions on how to input the
// password is the last thing to be printed to the console.
var wg sync.WaitGroup
// Now spin up a network listener for each requested port and start a
// goroutine that serves REST with the created mux there.
for _, restEndpoint := range cfg.RESTListeners {
lis, err := restListen(restEndpoint) lis, err := restListen(restEndpoint)
if err != nil { if err != nil {
ltndLog.Errorf("Password gRPC proxy unable to listen "+ ltndLog.Errorf("gRPC proxy unable to listen on %s",
"on %s", restEndpoint) restEndpoint)
return nil, shutdown, err return nil, err
} }
shutdownFuncs = append(shutdownFuncs, func() { shutdownFuncs = append(shutdownFuncs, func() {
err := lis.Close() err := lis.Close()
if err != nil { if err != nil {
@ -1242,16 +1261,38 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
wg.Add(1) wg.Add(1)
go func() { go func() {
rpcsLog.Infof("Password gRPC proxy started at %s", rpcsLog.Infof("gRPC proxy started at %s", lis.Addr())
lis.Addr())
// Create our proxy chain now. A request will pass
// through the following chain:
// req ---> CORS handler --> WS proxy --->
// REST proxy --> gRPC endpoint
corsHandler := allowCORS(restHandler, cfg.RestCORS)
wg.Done() wg.Done()
_ = srv.Serve(lis) err := http.Serve(lis, corsHandler)
if err != nil && !lnrpc.IsClosedConnError(err) {
rpcsLog.Error(err)
}
}() }()
} }
// Wait for gRPC and REST servers to be up running. // Wait for REST servers to be up running.
wg.Wait() wg.Wait()
return shutdown, nil
}
// waitForWalletPassword blocks until a password is provided by the user to
// this RPC server.
func waitForWalletPassword(cfg *Config,
pwService *walletunlocker.UnlockerService) (*WalletUnlockParams, error) {
chainConfig := cfg.Bitcoin
if cfg.registeredChains.PrimaryChain() == chainreg.LitecoinChain {
chainConfig = cfg.Litecoin
}
// Wait for user to provide the password. // Wait for user to provide the password.
ltndLog.Infof("Waiting for wallet encryption password. Use `lncli " + ltndLog.Infof("Waiting for wallet encryption password. Use `lncli " +
"create` to create a wallet, `lncli unlock` to unlock an " + "create` to create a wallet, `lncli unlock` to unlock an " +
@ -1276,7 +1317,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
// version, then we'll return an error as we don't understand // version, then we'll return an error as we don't understand
// this. // this.
if cipherSeed.InternalVersion != keychain.KeyDerivationVersion { if cipherSeed.InternalVersion != keychain.KeyDerivationVersion {
return nil, shutdown, fmt.Errorf("invalid internal "+ return nil, fmt.Errorf("invalid internal "+
"seed version %v, current version is %v", "seed version %v, current version is %v",
cipherSeed.InternalVersion, cipherSeed.InternalVersion,
keychain.KeyDerivationVersion) keychain.KeyDerivationVersion)
@ -1303,7 +1344,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
ltndLog.Errorf("Could not unload new "+ ltndLog.Errorf("Could not unload new "+
"wallet: %v", err) "wallet: %v", err)
} }
return nil, shutdown, err return nil, err
} }
// For new wallets, the ResetWalletTransactions flag is a no-op. // For new wallets, the ResetWalletTransactions flag is a no-op.
@ -1321,7 +1362,7 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
UnloadWallet: loader.UnloadWallet, UnloadWallet: loader.UnloadWallet,
StatelessInit: initMsg.StatelessInit, StatelessInit: initMsg.StatelessInit,
MacResponseChan: pwService.MacResponseChan, MacResponseChan: pwService.MacResponseChan,
}, shutdown, nil }, nil
// The wallet has already been created in the past, and is simply being // The wallet has already been created in the past, and is simply being
// unlocked. So we'll just return these passphrases. // unlocked. So we'll just return these passphrases.
@ -1345,10 +1386,10 @@ func waitForWalletPassword(cfg *Config, restEndpoints []net.Addr,
UnloadWallet: unlockMsg.UnloadWallet, UnloadWallet: unlockMsg.UnloadWallet,
StatelessInit: unlockMsg.StatelessInit, StatelessInit: unlockMsg.StatelessInit,
MacResponseChan: pwService.MacResponseChan, MacResponseChan: pwService.MacResponseChan,
}, shutdown, nil }, nil
case <-signal.ShutdownChannel(): case <-signal.ShutdownChannel():
return nil, shutdown, fmt.Errorf("shutting down") return nil, fmt.Errorf("shutting down")
} }
} }

View File

@ -8,7 +8,6 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"net"
"net/http" "net/http"
"runtime" "runtime"
"sort" "sort"
@ -60,7 +59,6 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chanfunding" "github.com/lightningnetwork/lnd/lnwallet/chanfunding"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/monitoring"
"github.com/lightningnetwork/lnd/peer" "github.com/lightningnetwork/lnd/peer"
"github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/record"
@ -495,33 +493,6 @@ type rpcServer struct {
subServers []lnrpc.SubServer subServers []lnrpc.SubServer
subGrpcHandlers []lnrpc.GrpcHandler subGrpcHandlers []lnrpc.GrpcHandler
// grpcServer is the main gRPC server that this RPC server, and all the
// sub-servers will use to register themselves and accept client
// requests from.
grpcServer *grpc.Server
// listeners is a list of listeners to use when starting the grpc
// server. We make it configurable such that the grpc server can listen
// on custom interfaces.
listeners []*ListenerWithSignal
// listenerCleanUp are a set of closures functions that will allow this
// main RPC server to clean up all the listening socket created for the
// server.
listenerCleanUp []func()
// restDialOpts are a set of gRPC dial options that the REST server
// proxy will use to connect to the main gRPC server.
restDialOpts []grpc.DialOption
// restProxyDest is the address to forward REST requests to.
restProxyDest string
// restListen is a function closure that allows the REST server proxy to
// connect to the main gRPC server to proxy all incoming requests,
// applying the current TLS configuration, if any.
restListen func(net.Addr) (net.Listener, error)
// routerBackend contains the backend implementation of the router // routerBackend contains the backend implementation of the router
// rpc sub server. // rpc sub server.
routerBackend *routerrpc.RouterBackend routerBackend *routerrpc.RouterBackend
@ -555,27 +526,48 @@ type rpcServer struct {
// LightningServer gRPC service. // LightningServer gRPC service.
var _ lnrpc.LightningServer = (*rpcServer)(nil) var _ lnrpc.LightningServer = (*rpcServer)(nil)
// newRPCServer creates and returns a new instance of the rpcServer. The // newRPCServer creates and returns a new instance of the rpcServer. Before
// rpcServer will handle creating all listening sockets needed by it, and any // dependencies are added, this will be an non-functioning RPC server only to
// of the sub-servers that it maintains. The set of serverOpts should be the // be used to register the LightningService with the gRPC server.
// base level options passed to the grPC server. This typically includes things func newRPCServer(cfg *Config, interceptorChain *rpcperms.InterceptorChain,
// like requiring TLS, etc. extSubserverCfg *RPCSubserverConfig,
func newRPCServer(cfg *Config, s *server, macService *macaroons.Service, extRestRegistrar RestRegistrar) *rpcServer {
subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption,
restDialOpts []grpc.DialOption, restProxyDest string, // We go trhough the list of registered sub-servers, and create a gRPC
atpl *autopilot.Manager, invoiceRegistry *invoices.InvoiceRegistry, // handler for each. These are used to register with the gRPC server
tower *watchtower.Standalone, // before all dependencies are available.
restListen func(net.Addr) (net.Listener, error), registeredSubServers := lnrpc.RegisteredSubServers()
getListeners rpcListeners, chanPredicate *chanacceptor.ChainedAcceptor,
interceptorChain *rpcperms.InterceptorChain, var subServerHandlers []lnrpc.GrpcHandler
extSubserverCfg *RPCSubserverConfig, extRestRegistrar RestRegistrar) ( for _, subServer := range registeredSubServers {
*rpcServer, error) { subServerHandlers = append(
subServerHandlers, subServer.NewGrpcHandler(),
)
}
return &rpcServer{
cfg: cfg,
subGrpcHandlers: subServerHandlers,
interceptorChain: interceptorChain,
extSubserverCfg: extSubserverCfg,
extRestRegistrar: extRestRegistrar,
quit: make(chan struct{}, 1),
}
}
// addDeps populates all dependencies needed by the RPC server, and any
// of the sub-servers that it maintains. When this is done, the RPC server can
// be started, and start accepting RPC calls.
func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
subServerCgs *subRPCServerConfigs, atpl *autopilot.Manager,
invoiceRegistry *invoices.InvoiceRegistry, tower *watchtower.Standalone,
chanPredicate *chanacceptor.ChainedAcceptor) error {
// Set up router rpc backend. // Set up router rpc backend.
channelGraph := s.localChanDB.ChannelGraph() channelGraph := s.localChanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode() selfNode, err := channelGraph.SourceNode()
if err != nil { if err != nil {
return nil, err return err
} }
graph := s.localChanDB.ChannelGraph() graph := s.localChanDB.ChannelGraph()
routerBackend := &routerrpc.RouterBackend{ routerBackend := &routerrpc.RouterBackend{
@ -606,10 +598,10 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
}, },
FindRoute: s.chanRouter.FindRoute, FindRoute: s.chanRouter.FindRoute,
MissionControl: s.missionControl, MissionControl: s.missionControl,
ActiveNetParams: cfg.ActiveNetParams.Params, ActiveNetParams: r.cfg.ActiveNetParams.Params,
Tower: s.controlTower, Tower: s.controlTower,
MaxTotalTimelock: cfg.MaxOutgoingCltvExpiry, MaxTotalTimelock: r.cfg.MaxOutgoingCltvExpiry,
DefaultFinalCltvDelta: uint16(cfg.Bitcoin.TimeLockDelta), DefaultFinalCltvDelta: uint16(r.cfg.Bitcoin.TimeLockDelta),
SubscribeHtlcEvents: s.htlcNotifier.SubscribeHtlcEvents, SubscribeHtlcEvents: s.htlcNotifier.SubscribeHtlcEvents,
InterceptableForwarder: s.interceptableSwitch, InterceptableForwarder: s.interceptableSwitch,
SetChannelEnabled: func(outpoint wire.OutPoint) error { SetChannelEnabled: func(outpoint wire.OutPoint) error {
@ -626,9 +618,8 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
} }
var ( var (
subServers []lnrpc.SubServer subServers []lnrpc.SubServer
subGrpcHandlers []lnrpc.GrpcHandler subServerPerms []lnrpc.MacaroonPerms
subServerPerms []lnrpc.MacaroonPerms
) )
// Before we create any of the sub-servers, we need to ensure that all // Before we create any of the sub-servers, we need to ensure that all
@ -637,31 +628,30 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
// //
// TODO(roasbeef): extend sub-sever config to have both (local vs remote) DB // TODO(roasbeef): extend sub-sever config to have both (local vs remote) DB
err = subServerCgs.PopulateDependencies( err = subServerCgs.PopulateDependencies(
cfg, s.cc, cfg.networkDir, macService, atpl, invoiceRegistry, r.cfg, s.cc, r.cfg.networkDir, macService, atpl, invoiceRegistry,
s.htlcSwitch, cfg.ActiveNetParams.Params, s.chanRouter, s.htlcSwitch, r.cfg.ActiveNetParams.Params, s.chanRouter,
routerBackend, s.nodeSigner, s.localChanDB, s.remoteChanDB, routerBackend, s.nodeSigner, s.localChanDB, s.remoteChanDB,
s.sweeper, tower, s.towerClient, s.anchorTowerClient, s.sweeper, tower, s.towerClient, s.anchorTowerClient,
cfg.net.ResolveTCPAddr, genInvoiceFeatures, rpcsLog, r.cfg.net.ResolveTCPAddr, genInvoiceFeatures, rpcsLog,
) )
if err != nil { if err != nil {
return nil, err return err
} }
// Now that the sub-servers have all their dependencies in place, we // Now that the sub-servers have all their dependencies in place, we
// can create each sub-server! // can create each sub-server!
registeredSubServers := lnrpc.RegisteredSubServers() for _, subServerInstance := range r.subGrpcHandlers {
for _, driver := range registeredSubServers { subServer, macPerms, err := subServerInstance.CreateSubServer(
handler := driver.NewGrpcHandler() subServerCgs,
subServer, macPerms, err := handler.CreateSubServer(subServerCgs) )
if err != nil { if err != nil {
return nil, err return err
} }
// We'll collect the sub-server, and also the set of // We'll collect the sub-server, and also the set of
// permissions it needs for macaroons so we can apply the // permissions it needs for macaroons so we can apply the
// interceptors below. // interceptors below.
subServers = append(subServers, subServer) subServers = append(subServers, subServer)
subGrpcHandlers = append(subGrpcHandlers, handler)
subServerPerms = append(subServerPerms, macPerms) subServerPerms = append(subServerPerms, macPerms)
} }
@ -669,35 +659,29 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
// with the main RPC server permissions so we can unite them under a // with the main RPC server permissions so we can unite them under a
// single set of interceptors. // single set of interceptors.
for m, ops := range MainRPCServerPermissions() { for m, ops := range MainRPCServerPermissions() {
err := interceptorChain.AddPermission(m, ops) err := r.interceptorChain.AddPermission(m, ops)
if err != nil { if err != nil {
return nil, err return err
} }
} }
for _, subServerPerm := range subServerPerms { for _, subServerPerm := range subServerPerms {
for method, ops := range subServerPerm { for method, ops := range subServerPerm {
err := interceptorChain.AddPermission(method, ops) err := r.interceptorChain.AddPermission(method, ops)
if err != nil { if err != nil {
return nil, err return err
} }
} }
} }
// Get the listeners and server options to use for this rpc server.
listeners, cleanup, err := getListeners()
if err != nil {
return nil, err
}
// External subserver possibly need to register their own permissions // External subserver possibly need to register their own permissions
// and macaroon validator. // and macaroon validator.
if extSubserverCfg != nil { if r.extSubserverCfg != nil {
macValidator := extSubserverCfg.MacaroonValidator macValidator := r.extSubserverCfg.MacaroonValidator
for method, ops := range extSubserverCfg.Permissions { for method, ops := range r.extSubserverCfg.Permissions {
err := interceptorChain.AddPermission(method, ops) err := r.interceptorChain.AddPermission(method, ops)
if err != nil { if err != nil {
return nil, err return err
} }
// Give the external subservers the possibility // Give the external subservers the possibility
@ -710,7 +694,7 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
method, macValidator, method, macValidator,
) )
if err != nil { if err != nil {
return nil, fmt.Errorf("could "+ return fmt.Errorf("could "+
"not register "+ "not register "+
"external macaroon "+ "external macaroon "+
"validator: %v", err) "validator: %v", err)
@ -719,43 +703,50 @@ func newRPCServer(cfg *Config, s *server, macService *macaroons.Service,
} }
} }
// Finally, with all the pre-set up complete, we can create the main // Finally, with all the set up complete, add the last dependencies to
// gRPC server, and register the main lnrpc server along side. // the rpc server.
grpcServer := grpc.NewServer(serverOpts...) r.server = s
rootRPCServer := &rpcServer{ r.subServers = subServers
cfg: cfg, r.routerBackend = routerBackend
restDialOpts: restDialOpts, r.chanPredicate = chanPredicate
listeners: listeners, r.macService = macService
listenerCleanUp: []func(){cleanup}, r.selfNode = selfNode.PubKeyBytes
restProxyDest: restProxyDest, return nil
subServers: subServers, }
subGrpcHandlers: subGrpcHandlers,
restListen: restListen, // RegisterWithGrpcServer registers the rpcServer and any subservers with the
grpcServer: grpcServer, // root gRPC server.
server: s, func (r *rpcServer) RegisterWithGrpcServer(grpcServer *grpc.Server) error {
routerBackend: routerBackend, // Register the main RPC server.
chanPredicate: chanPredicate, lnrpc.RegisterLightningServer(grpcServer, r)
quit: make(chan struct{}, 1),
macService: macService,
selfNode: selfNode.PubKeyBytes,
interceptorChain: interceptorChain,
extSubserverCfg: extSubserverCfg,
extRestRegistrar: extRestRegistrar,
}
lnrpc.RegisterLightningServer(grpcServer, rootRPCServer)
// Now the main RPC server has been registered, we'll iterate through // Now the main RPC server has been registered, we'll iterate through
// all the sub-RPC servers and register them to ensure that requests // all the sub-RPC servers and register them to ensure that requests
// are properly routed towards them. // are properly routed towards them.
for _, subServer := range subGrpcHandlers { for _, subServer := range r.subGrpcHandlers {
err := subServer.RegisterWithRootServer(grpcServer) err := subServer.RegisterWithRootServer(grpcServer)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to register "+ return fmt.Errorf("unable to register "+
"sub-server with root: %v", err) "sub-server with root: %v", err)
} }
} }
return rootRPCServer, nil // Before actually listening on the gRPC listener, give external
// subservers the chance to register to our gRPC server. Those external
// subservers (think GrUB) are responsible for starting/stopping on
// their own, we just let them register their services to the same
// server instance so all of them can be exposed on the same
// port/listener.
if r.extSubserverCfg != nil && r.extSubserverCfg.Registrar != nil {
registerer := r.extSubserverCfg.Registrar
err := registerer.RegisterGrpcSubserver(grpcServer)
if err != nil {
rpcsLog.Errorf("error registering external gRPC "+
"subserver: %v", err)
}
}
return nil
} }
// Start launches any helper goroutines required for the rpcServer to function. // Start launches any helper goroutines required for the rpcServer to function.
@ -777,76 +768,27 @@ func (r *rpcServer) Start() error {
} }
} }
// Before actually listening on the gRPC listener, give external return nil
// subservers the chance to register to our gRPC server. Those external }
// subservers (think GrUB) are responsible for starting/stopping on
// their own, we just let them register their services to the same
// server instance so all of them can be exposed on the same
// port/listener.
if r.extSubserverCfg != nil && r.extSubserverCfg.Registrar != nil {
registerer := r.extSubserverCfg.Registrar
err := registerer.RegisterGrpcSubserver(r.grpcServer)
if err != nil {
rpcsLog.Errorf("error registering external gRPC "+
"subserver: %v", err)
}
}
// With all the sub-servers started, we'll spin up the listeners for // RegisterWithRestProxy registers the RPC server and any subservers with the
// the main RPC server itself. // given REST proxy.
for _, lis := range r.listeners { func (r *rpcServer) RegisterWithRestProxy(restCtx context.Context,
go func(lis *ListenerWithSignal) { restMux *proxy.ServeMux, restDialOpts []grpc.DialOption,
rpcsLog.Infof("RPC server listening on %s", lis.Addr()) restProxyDest string) error {
// Close the ready chan to indicate we are listening.
close(lis.Ready)
_ = r.grpcServer.Serve(lis)
}(lis)
}
// If Prometheus monitoring is enabled, start the Prometheus exporter.
if r.cfg.Prometheus.Enabled() {
err := monitoring.ExportPrometheusMetrics(
r.grpcServer, r.cfg.Prometheus,
)
if err != nil {
return err
}
}
// The default JSON marshaler of the REST proxy only sets OrigName to
// true, which instructs it to use the same field names as specified in
// the proto file and not switch to camel case. What we also want is
// that the marshaler prints all values, even if they are falsey.
customMarshalerOption := proxy.WithMarshalerOption(
proxy.MIMEWildcard, &proxy.JSONPb{
OrigName: true,
EmitDefaults: true,
},
)
// Now start the REST proxy for our gRPC server above. We'll ensure
// we direct LND to connect to its loopback address rather than a
// wildcard to prevent certificate issues when accessing the proxy
// externally.
restMux := proxy.NewServeMux(customMarshalerOption)
restCtx, restCancel := context.WithCancel(context.Background())
r.listenerCleanUp = append(r.listenerCleanUp, restCancel)
// Wrap the default grpc-gateway handler with the WebSocket handler.
restHandler := lnrpc.NewWebSocketProxy(restMux, rpcsLog)
// With our custom REST proxy mux created, register our main RPC and // With our custom REST proxy mux created, register our main RPC and
// give all subservers a chance to register as well. // give all subservers a chance to register as well.
err := lnrpc.RegisterLightningHandlerFromEndpoint( err := lnrpc.RegisterLightningHandlerFromEndpoint(
restCtx, restMux, r.restProxyDest, r.restDialOpts, restCtx, restMux, restProxyDest, restDialOpts,
) )
if err != nil { if err != nil {
return err return err
} }
for _, subServer := range r.subGrpcHandlers { for _, subServer := range r.subGrpcHandlers {
err := subServer.RegisterWithRestServer( err := subServer.RegisterWithRestServer(
restCtx, restMux, r.restProxyDest, r.restDialOpts, restCtx, restMux, restProxyDest, restDialOpts,
) )
if err != nil { if err != nil {
return fmt.Errorf("unable to register REST sub-server "+ return fmt.Errorf("unable to register REST sub-server "+
@ -859,44 +801,13 @@ func (r *rpcServer) Start() error {
// with our mux instance. // with our mux instance.
if r.extRestRegistrar != nil { if r.extRestRegistrar != nil {
err := r.extRestRegistrar.RegisterRestSubserver( err := r.extRestRegistrar.RegisterRestSubserver(
restCtx, restMux, r.restProxyDest, restCtx, restMux, restProxyDest, restDialOpts,
r.restDialOpts,
) )
if err != nil { if err != nil {
rpcsLog.Errorf("error registering "+ rpcsLog.Errorf("error registering "+
"external REST subserver: %v", err) "external REST subserver: %v", err)
} }
} }
// Now spin up a network listener for each requested port and start a
// goroutine that serves REST with the created mux there.
for _, restEndpoint := range r.cfg.RESTListeners {
lis, err := r.restListen(restEndpoint)
if err != nil {
ltndLog.Errorf("gRPC proxy unable to listen on %s",
restEndpoint)
return err
}
r.listenerCleanUp = append(r.listenerCleanUp, func() {
_ = lis.Close()
})
go func() {
rpcsLog.Infof("gRPC proxy started at %s", lis.Addr())
// Create our proxy chain now. A request will pass
// through the following chain:
// req ---> CORS handler --> WS proxy --->
// REST proxy --> gRPC endpoint
corsHandler := allowCORS(restHandler, r.cfg.RestCORS)
err := http.Serve(lis, corsHandler)
if err != nil && !lnrpc.IsClosedConnError(err) {
rpcsLog.Error(err)
}
}()
}
return nil return nil
} }
@ -924,12 +835,6 @@ func (r *rpcServer) Stop() error {
} }
} }
// Finally, we can clean up all the listening sockets to ensure that we
// give the file descriptors back to the OS.
for _, cleanUp := range r.listenerCleanUp {
cleanUp()
}
return nil return nil
} }