lnd+rpcserver: extract listener setup into closure

This commit is contained in:
Johan T. Halseth 2019-07-09 11:09:19 +02:00
parent 187bd29d2d
commit 2562fdbdda
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 103 additions and 55 deletions

102
lnd.go

@ -93,6 +93,13 @@ var (
} }
) )
// rpcListeners is a function type used for closures that fetches a set of RPC
// listeners for the current configuration, and the GRPC server options to use
// with these listeners. If no custom listeners are present, this should return
// normal listeners from the RPC endpoints defined in the config, and server
// options specifying TLS.
type rpcListeners func() ([]net.Listener, func(), []grpc.ServerOption, error)
// Main is the true entry point for lnd. This function is required since defers // Main is the true entry point for lnd. This function is required since defers
// created in the top-level scope of a main method aren't executed if os.Exit() // created in the top-level scope of a main method aren't executed if os.Exit()
// is called. // is called.
@ -240,13 +247,49 @@ func Main() error {
// this information. // this information.
walletInitParams.Birthday = time.Now() walletInitParams.Birthday = time.Now()
// getListeners is a closure that creates listeners from the
// RPCListeners defined in the config. It also returns a cleanup
// closure and the server options to use for the GRPC server.
getListeners := func() ([]net.Listener, func(), []grpc.ServerOption,
error) {
var grpcListeners []net.Listener
for _, grpcEndpoint := range cfg.RPCListeners {
// Start a gRPC server listening for HTTP/2
// connections.
lis, err := lncfg.ListenOnAddress(grpcEndpoint)
if err != nil {
ltndLog.Errorf("unable to listen on %s",
grpcEndpoint)
return nil, nil, nil, err
}
grpcListeners = append(grpcListeners, lis)
}
cleanup := func() {
for _, lis := range grpcListeners {
lis.Close()
}
}
return grpcListeners, cleanup, serverOpts, nil
}
// walletUnlockerListeners is a closure we'll hand to the wallet
// unlocker, that will be called when it needs listeners for its GPRC
// server.
walletUnlockerListeners := func() ([]net.Listener, func(),
[]grpc.ServerOption, error) {
return getListeners()
}
// We wait until the user provides a password over RPC. In case lnd is // We wait until the user provides a password over RPC. In case lnd is
// started with the --noseedbackup flag, we use the default password // started with the --noseedbackup flag, we use the default password
// for wallet encryption. // for wallet encryption.
if !cfg.NoSeedBackup { if !cfg.NoSeedBackup {
params, err := waitForWalletPassword( params, err := waitForWalletPassword(
cfg.RPCListeners, cfg.RESTListeners, serverOpts, cfg.RESTListeners, restDialOpts, restProxyDest, tlsCfg,
restDialOpts, restProxyDest, tlsCfg, walletUnlockerListeners,
) )
if err != nil { if err != nil {
err := fmt.Errorf("Unable to set up wallet password "+ err := fmt.Errorf("Unable to set up wallet password "+
@ -457,12 +500,20 @@ func Main() error {
} }
defer atplManager.Stop() defer atplManager.Stop()
// rpcListeners is a closure we'll hand to the rpc server, that will be
// called when it needs listeners for its GPRC server.
rpcListeners := func() ([]net.Listener, func(), []grpc.ServerOption,
error) {
return getListeners()
}
// Initialize, and register our implementation of the gRPC interface // Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer. // exported by the rpcServer.
rpcServer, err := newRPCServer( rpcServer, err := newRPCServer(
server, macaroonService, cfg.SubRPCServers, serverOpts, server, macaroonService, cfg.SubRPCServers, restDialOpts,
restDialOpts, restProxyDest, atplManager, server.invoices, restProxyDest, atplManager, server.invoices, tower, tlsCfg,
tower, tlsCfg, rpcListeners,
) )
if err != nil { if err != nil {
err := fmt.Errorf("Unable to create RPC server: %v", err) err := fmt.Errorf("Unable to create RPC server: %v", err)
@ -880,9 +931,18 @@ type WalletUnlockParams struct {
// waitForWalletPassword will spin up gRPC and REST endpoints for the // waitForWalletPassword will spin up gRPC and REST endpoints for the
// WalletUnlocker server, and block until a password is provided by // WalletUnlocker server, and block until a password is provided by
// the user to this RPC server. // the user to this RPC server.
func waitForWalletPassword(grpcEndpoints, restEndpoints []net.Addr, func waitForWalletPassword(restEndpoints []net.Addr,
serverOpts []grpc.ServerOption, restDialOpts []grpc.DialOption, restDialOpts []grpc.DialOption, restProxyDest string,
restProxyDest string, tlsConf *tls.Config) (*WalletUnlockParams, error) { tlsConf *tls.Config, getListeners rpcListeners) (
*WalletUnlockParams, error) {
// Start a gRPC server listening for HTTP/2 connections, solely used
// for getting the encryption password from the client.
listeners, cleanup, serverOpts, err := getListeners()
if err != nil {
return nil, err
}
defer cleanup()
// Set up a new PasswordService, which will listen for passwords // Set up a new PasswordService, which will listen for passwords
// provided over RPC. // provided over RPC.
@ -911,28 +971,14 @@ func waitForWalletPassword(grpcEndpoints, restEndpoints []net.Addr,
// password is the last thing to be printed to the console. // password is the last thing to be printed to the console.
var wg sync.WaitGroup var wg sync.WaitGroup
for _, grpcEndpoint := range grpcEndpoints { for _, lis := range listeners {
// Start a gRPC server listening for HTTP/2 connections, solely
// used for getting the encryption password from the client.
lis, err := lncfg.ListenOnAddress(grpcEndpoint)
if err != nil {
ltndLog.Errorf(
"password RPC server unable to listen on %s",
grpcEndpoint,
)
return nil, err
}
defer lis.Close()
wg.Add(1) wg.Add(1)
go func() { go func(lis net.Listener) {
rpcsLog.Infof( rpcsLog.Infof("password RPC server listening on %s",
"password RPC server listening on %s", lis.Addr())
lis.Addr(),
)
wg.Done() wg.Done()
grpcServer.Serve(lis) grpcServer.Serve(lis)
}() }(lis)
} }
// Start a REST proxy for our gRPC server above. // Start a REST proxy for our gRPC server above.
@ -942,7 +988,7 @@ func waitForWalletPassword(grpcEndpoints, restEndpoints []net.Addr,
mux := proxy.NewServeMux() mux := proxy.NewServeMux()
err := lnrpc.RegisterWalletUnlockerHandlerFromEndpoint( err = lnrpc.RegisterWalletUnlockerHandlerFromEndpoint(
ctx, mux, restProxyDest, restDialOpts, ctx, mux, restProxyDest, restDialOpts,
) )
if err != nil { if err != nil {

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"io" "io"
"math" "math"
"net"
"net/http" "net/http"
"sort" "sort"
"strings" "strings"
@ -409,6 +410,11 @@ type rpcServer struct {
// requests from. // requests from.
grpcServer *grpc.Server grpcServer *grpc.Server
// listeners is a list of listeners to use when starting the grpc
// server. We make it configurable such that the grpc server can listen
// on custom interfaces.
listeners []net.Listener
// listenerCleanUp are a set of closures functions that will allow this // listenerCleanUp are a set of closures functions that will allow this
// main RPC server to clean up all the listening socket created for the // main RPC server to clean up all the listening socket created for the
// server. // server.
@ -442,10 +448,10 @@ var _ lnrpc.LightningServer = (*rpcServer)(nil)
// base level options passed to the grPC server. This typically includes things // base level options passed to the grPC server. This typically includes things
// like requiring TLS, etc. // like requiring TLS, etc.
func newRPCServer(s *server, macService *macaroons.Service, func newRPCServer(s *server, macService *macaroons.Service,
subServerCgs *subRPCServerConfigs, serverOpts []grpc.ServerOption, subServerCgs *subRPCServerConfigs, restDialOpts []grpc.DialOption,
restDialOpts []grpc.DialOption, restProxyDest string, restProxyDest string, atpl *autopilot.Manager,
atpl *autopilot.Manager, invoiceRegistry *invoices.InvoiceRegistry, invoiceRegistry *invoices.InvoiceRegistry, tower *watchtower.Standalone,
tower *watchtower.Standalone, tlsCfg *tls.Config) (*rpcServer, error) { tlsCfg *tls.Config, getListeners rpcListeners) (*rpcServer, error) {
// Set up router rpc backend. // Set up router rpc backend.
channelGraph := s.chanDB.ChannelGraph() channelGraph := s.chanDB.ChannelGraph()
@ -570,6 +576,12 @@ func newRPCServer(s *server, macService *macaroons.Service,
strmInterceptors, errorLogStreamServerInterceptor(rpcsLog), strmInterceptors, errorLogStreamServerInterceptor(rpcsLog),
) )
// Get the listeners and server options to use for this rpc server.
listeners, cleanup, serverOpts, err := getListeners()
if err != nil {
return nil, err
}
// If any interceptors have been set up, add them to the server options. // If any interceptors have been set up, add them to the server options.
if len(unaryInterceptors) != 0 && len(strmInterceptors) != 0 { if len(unaryInterceptors) != 0 && len(strmInterceptors) != 0 {
chainedUnary := grpc_middleware.WithUnaryServerChain( chainedUnary := grpc_middleware.WithUnaryServerChain(
@ -585,14 +597,16 @@ func newRPCServer(s *server, macService *macaroons.Service,
// gRPC server, and register the main lnrpc server along side. // gRPC server, and register the main lnrpc server along side.
grpcServer := grpc.NewServer(serverOpts...) grpcServer := grpc.NewServer(serverOpts...)
rootRPCServer := &rpcServer{ rootRPCServer := &rpcServer{
restDialOpts: restDialOpts, restDialOpts: restDialOpts,
restProxyDest: restProxyDest, listeners: listeners,
subServers: subServers, listenerCleanUp: []func(){cleanup},
tlsCfg: tlsCfg, restProxyDest: restProxyDest,
grpcServer: grpcServer, subServers: subServers,
server: s, tlsCfg: tlsCfg,
routerBackend: routerBackend, grpcServer: grpcServer,
quit: make(chan struct{}, 1), server: s,
routerBackend: routerBackend,
quit: make(chan struct{}, 1),
} }
lnrpc.RegisterLightningServer(grpcServer, rootRPCServer) lnrpc.RegisterLightningServer(grpcServer, rootRPCServer)
@ -632,23 +646,11 @@ func (r *rpcServer) Start() error {
// With all the sub-servers started, we'll spin up the listeners for // With all the sub-servers started, we'll spin up the listeners for
// the main RPC server itself. // the main RPC server itself.
for _, listener := range cfg.RPCListeners { for _, lis := range r.listeners {
lis, err := lncfg.ListenOnAddress(listener) go func(lis net.Listener) {
if err != nil {
ltndLog.Errorf(
"RPC server unable to listen on %s", listener,
)
return err
}
r.listenerCleanUp = append(r.listenerCleanUp, func() {
lis.Close()
})
go func() {
rpcsLog.Infof("RPC server listening on %s", lis.Addr()) rpcsLog.Infof("RPC server listening on %s", lis.Addr())
r.grpcServer.Serve(lis) r.grpcServer.Serve(lis)
}() }(lis)
} }
// If Prometheus monitoring is enabled, start the Prometheus exporter. // If Prometheus monitoring is enabled, start the Prometheus exporter.