diff --git a/lnd.go b/lnd.go index 082cddf8..6b5dd188 100644 --- a/lnd.go +++ b/lnd.go @@ -312,67 +312,21 @@ func lndMain() error { return err } - // Check macaroon authentication if macaroons aren't disabled. - if macaroonService != nil { - serverOpts = append(serverOpts, - grpc.UnaryInterceptor(macaroonService. - UnaryServerInterceptor(permissions)), - grpc.StreamInterceptor(macaroonService. - StreamServerInterceptor(permissions)), - ) - } - // Initialize, and register our implementation of the gRPC interface // exported by the rpcServer. - rpcServer := newRPCServer(server) + rpcServer, err := newRPCServer( + server, macaroonService, cfg.SubRpcServers, serverOpts, + proxyOpts, tlsConf, + ) + if err != nil { + srvrLog.Errorf("unable to start RPC server: %v", err) + return err + } if err := rpcServer.Start(); err != nil { return err } defer rpcServer.Stop() - grpcServer := grpc.NewServer(serverOpts...) - lnrpc.RegisterLightningServer(grpcServer, rpcServer) - - // Next, Start the gRPC server listening for HTTP/2 connections. - for _, listener := range cfg.RPCListeners { - lis, err := lncfg.ListenOnAddress(listener) - if err != nil { - ltndLog.Errorf( - "RPC server unable to listen on %s", listener, - ) - return err - } - defer lis.Close() - go func() { - rpcsLog.Infof("RPC server listening on %s", lis.Addr()) - grpcServer.Serve(lis) - }() - } - - // Finally, start the REST proxy for our gRPC server above. - mux := proxy.NewServeMux() - err = lnrpc.RegisterLightningHandlerFromEndpoint( - ctx, mux, cfg.RPCListeners[0].String(), proxyOpts, - ) - if err != nil { - return err - } - for _, restEndpoint := range cfg.RESTListeners { - lis, err := lncfg.TLSListenOnAddress(restEndpoint, tlsConf) - if err != nil { - ltndLog.Errorf( - "gRPC proxy unable to listen on %s", - restEndpoint, - ) - return err - } - defer lis.Close() - go func() { - rpcsLog.Infof("gRPC proxy started at %s", lis.Addr()) - http.Serve(lis, mux) - }() - } - // If we're not in simnet mode, We'll wait until we're fully synced to // continue the start up of the remainder of the daemon. This ensures // that we don't accept any possibly invalid state transitions, or @@ -602,9 +556,9 @@ func genCertPair(certFile, keyFile string) error { return nil } -// genMacaroons generates three macaroon files; one admin-level, one -// for invoice access and one read-only. These can also be used -// to generate more granular macaroons. +// genMacaroons generates three macaroon files; one admin-level, one for +// invoice access and one read-only. These can also be used to generate more +// granular macaroons. func genMacaroons(ctx context.Context, svc *macaroons.Service, admFile, roFile, invoiceFile string) error { diff --git a/rpcserver.go b/rpcserver.go index 1e7f7986..ccdcfc15 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4,11 +4,13 @@ import ( "bytes" "crypto/rand" "crypto/sha256" + "crypto/tls" "encoding/hex" "errors" "fmt" "io" "math" + "net/http" "sort" "strings" "sync" @@ -24,17 +26,21 @@ import ( "github.com/btcsuite/btcwallet/waddrmgr" "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" + proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" + "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/zpay32" "github.com/tv42/zbase32" "golang.org/x/net/context" + "google.golang.org/grpc" "gopkg.in/macaroon-bakery.v2/bakery" ) @@ -339,6 +345,31 @@ type rpcServer struct { wg sync.WaitGroup + // subServers are a set of sub-RPC servers that use the same gRPC and + // listening sockets as the main RPC server, but which maintain their + // own independent service. This allows us to expose a set of + // micro-service like abstractions to the outside world for users to + // consume. + subServers []lnrpc.SubServer + + // grpcServer is the main gRPC server that this RPC server, and all the + // sub-servers will use to register themselves and accept client + // requests from. + grpcServer *grpc.Server + + // listenerCleanUp are a set of closures functions that will allow this + // main RPC server to clean up all the listening socket created for the + // server. + listenerCleanUp []func() + + // restServerOpts are a set of gRPC dial options that the REST server + // proxy will use to connect to the main gRPC server. + restServerOpts []grpc.DialOption + + // tlsCfg is the TLS config that allows the REST server proxy to + // connect to the main gRPC server to proxy all incoming requests. + tlsCfg *tls.Config + quit chan struct{} } @@ -346,12 +377,106 @@ type rpcServer struct { // LightningServer gRPC service. var _ lnrpc.LightningServer = (*rpcServer)(nil) -// newRPCServer creates and returns a new instance of the rpcServer. -func newRPCServer(s *server) *rpcServer { - return &rpcServer{ - server: s, - quit: make(chan struct{}, 1), +// newRPCServer creates and returns a new instance of the rpcServer. The +// rpcServer will handle creating all listening sockets needed by it, and any +// of the sub-servers that it maintains. The set of serverOpts should be the +// base level options passed to the grPC server. This typically includes things +// like requiring TLS, etc. +func newRPCServer(s *server, macService *macaroons.Service, + subServerCgs *subRpcServerConfigs, serverOpts []grpc.ServerOption, + restServerOpts []grpc.DialOption, + tlsCfg *tls.Config) (*rpcServer, error) { + + var ( + subServers []lnrpc.SubServer + subServerPerms []lnrpc.MacaroonPerms + ) + + // Before we create any of the sub-servers, we need to ensure that all + // the dependencies they need are properly populated within each sub + // server configuration struct. + err := subServerCgs.PopulateDependancies( + s.cc, networkDir, macService, + ) + if err != nil { + return nil, err } + + // Now that the sub-servers have all their dependencies in place, we + // can create each sub-server! + registeredSubServers := lnrpc.RegisteredSubServers() + for _, subServer := range registeredSubServers { + subServerInstance, macPerms, err := subServer.New(subServerCgs) + if err != nil { + return nil, err + } + + // We'll collect the sub-server, and also the set of + // permissions it needs for macaroons so we can apply the + // interceptors below. + subServers = append(subServers, subServerInstance) + subServerPerms = append(subServerPerms, macPerms) + } + + // Next, we need to merge the set of sub server macaroon permissions + // with the main RPC server permissions so we can unite them under a + // single set of interceptors. + for _, subServerPerm := range subServerPerms { + for method, ops := range subServerPerm { + // For each new method:ops combo, we also ensure that + // non of the sub-servers try to override each other. + if _, ok := permissions[method]; ok { + return nil, fmt.Errorf("detected duplicate "+ + "macaroon constraints for path: %v", + method) + } + + permissions[method] = ops + } + } + + // If macaroons aren't disabled (a non-nil service), then we'll set up + // our set of interceptors which will allow us handle the macaroon + // authentication in a single location . + if macService != nil { + unaryInterceptor := grpc.UnaryInterceptor( + macService.UnaryServerInterceptor(permissions), + ) + streamInterceptor := grpc.StreamInterceptor( + macService.StreamServerInterceptor(permissions), + ) + + serverOpts = append(serverOpts, + unaryInterceptor, streamInterceptor, + ) + } + + // Finally, with all the pre-set up complete, we can create the main + // gRPC server, and register the main lnrpc server along side. + grpcServer := grpc.NewServer(serverOpts...) + rootRpcServer := &rpcServer{ + restServerOpts: restServerOpts, + subServers: subServers, + tlsCfg: tlsCfg, + grpcServer: grpcServer, + server: s, + quit: make(chan struct{}, 1), + } + lnrpc.RegisterLightningServer(grpcServer, rootRpcServer) + + // Now the main RPC server has been registered, we'll iterate through + // all the sub-RPC servers and register them to ensure that requests + // are properly routed towards them. + for _, subServer := range subServers { + err := subServer.RegisterWithRootServer(grpcServer) + if err != nil { + return nil, fmt.Errorf("unable to register "+ + "sub-server %v with root: %v", + subServer.Name(), err) + } + } + + return rootRpcServer, nil } // Start launches any helper goroutines required for the rpcServer to function. @@ -360,6 +485,72 @@ func (r *rpcServer) Start() error { return nil } + // First, we'll start all the sub-servers to ensure that they're ready + // to take new requests in. + // + // TODO(roasbeef): some may require that the entire daemon be started + // at that point + for _, subServer := range r.subServers { + rpcsLog.Debugf("Starting sub RPC server: %v", subServer.Name()) + + if err := subServer.Start(); err != nil { + return err + } + } + + // With all the sub-servers started, we'll spin up the listeners for + // the main RPC server itself. + for _, listener := range cfg.RPCListeners { + lis, err := lncfg.ListenOnAddress(listener) + if err != nil { + ltndLog.Errorf( + "RPC server unable to listen on %s", listener, + ) + return err + } + + r.listenerCleanUp = append(r.listenerCleanUp, func() { + lis.Close() + }) + + go func() { + rpcsLog.Infof("RPC server listening on %s", lis.Addr()) + r.grpcServer.Serve(lis) + }() + } + + // Finally, start the REST proxy for our gRPC server above. + // + // TODO(roasbeef): eventually also allow the sub-servers to themselves + // have a REST proxy. + mux := proxy.NewServeMux() + err := lnrpc.RegisterLightningHandlerFromEndpoint( + context.Background(), mux, cfg.RPCListeners[0].String(), + r.restServerOpts, + ) + if err != nil { + return err + } + for _, restEndpoint := range cfg.RESTListeners { + lis, err := lncfg.TLSListenOnAddress(restEndpoint, r.tlsCfg) + if err != nil { + ltndLog.Errorf( + "gRPC proxy unable to listen on %s", + restEndpoint, + ) + return err + } + + r.listenerCleanUp = append(r.listenerCleanUp, func() { + lis.Close() + }) + + go func() { + rpcsLog.Infof("gRPC proxy started at %s", lis.Addr()) + http.Serve(lis, mux) + }() + } + return nil } @@ -369,8 +560,28 @@ func (r *rpcServer) Stop() error { return nil } + rpcsLog.Infof("Stopping RPC Server") + close(r.quit) + // After we've signalled all of our active goroutines to exit, we'll + // then do the same to signal a graceful shutdown of all the sub + // servers. + for _, subServer := range r.subServers { + rpcsLog.Infof("Stopping %v Sub-RPC Server", + subServer.Name()) + + if err := subServer.Stop(); err != nil { + continue + } + } + + // Finally, we can clean up all the listening sockets to ensure that we + // give the file descriptors back to the OS. + for _, cleanUp := range r.listenerCleanUp { + cleanUp() + } + return nil }