multi: allow external subservers to register themselves

With two new callbacks we allow processes that use lnd as a library
to register additional gRPC and REST subservers to the main server
instances that lnd creates.
This commit is contained in:
Oliver Gugger 2020-05-04 18:15:03 +02:00
parent 620eaa3199
commit a7e78112b7
No known key found for this signature in database
GPG Key ID: 8E4256593F177720
2 changed files with 121 additions and 16 deletions

47
lnd.go

@ -119,6 +119,45 @@ func AdminAuthOptions() ([]grpc.DialOption, error) {
return opts, nil return opts, nil
} }
// GrpcRegistrar is an interface that must be satisfied by an external subserver
// that wants to be able to register its own gRPC server onto lnd's main
// grpc.Server instance.
type GrpcRegistrar interface {
// RegisterGrpcSubserver is called for each net.Listener on which lnd
// creates a grpc.Server instance. External subservers implementing this
// method can then register their own gRPC server structs to the main
// server instance.
RegisterGrpcSubserver(*grpc.Server) error
}
// RestRegistrar is an interface that must be satisfied by an external subserver
// that wants to be able to register its own REST mux onto lnd's main
// proxy.ServeMux instance.
type RestRegistrar interface {
// RegisterRestSubserver is called after lnd creates the main
// proxy.ServeMux instance. External subservers implementing this method
// can then register their own REST proxy stubs to the main server
// instance.
RegisterRestSubserver(context.Context, *proxy.ServeMux, string,
[]grpc.DialOption) error
}
// RPCSubserverConfig is a struct that can be used to register an external
// subserver with the custom permissions that map to the gRPC server that is
// going to be registered with the GrpcRegistrar.
type RPCSubserverConfig struct {
// Registrar is a callback that is invoked for each net.Listener on
// which lnd creates a grpc.Server instance.
Registrar GrpcRegistrar
// Permissions is the permissions required for the external subserver.
// It is a map between the full HTTP URI of each RPC and its required
// macaroon permissions. If multiple action/entity tuples are specified
// per URI, they are all required. See rpcserver.go for a list of valid
// action and entity values.
Permissions map[string][]bakery.Op
}
// ListenerWithSignal is a net.Listener that has an additional Ready channel that // ListenerWithSignal is a net.Listener that has an additional Ready channel that
// will be closed when a server starts listening. // will be closed when a server starts listening.
type ListenerWithSignal struct { type ListenerWithSignal struct {
@ -126,6 +165,14 @@ type ListenerWithSignal struct {
// Ready will be closed by the server listening on Listener. // Ready will be closed by the server listening on Listener.
Ready chan struct{} Ready chan struct{}
// ExternalRPCSubserverCfg is optional and specifies the registration
// callback and permissions to register external gRPC subservers.
ExternalRPCSubserverCfg *RPCSubserverConfig
// ExternalRestRegistrar is optional and specifies the registration
// callback to register external REST subservers.
ExternalRestRegistrar RestRegistrar
} }
// ListenerCfg is a wrapper around custom listeners that can be passed to lnd // ListenerCfg is a wrapper around custom listeners that can be passed to lnd

@ -628,6 +628,32 @@ func newRPCServer(s *server, macService *macaroons.Service,
} }
} }
// Get the listeners and server options to use for this rpc server.
listeners, cleanup, err := getListeners()
if err != nil {
return nil, err
}
// External subserver possibly need to register their own permissions.
for _, lis := range listeners {
extSubserver := lis.ExternalRPCSubserverCfg
if extSubserver != nil {
for method, ops := range extSubserver.Permissions {
// For each new method:ops combo, we also ensure
// that non of the sub-servers try to override
// each other.
if _, ok := permissions[method]; ok {
return nil, fmt.Errorf("detected "+
"duplicate macaroon "+
"constraints for path: %v",
method)
}
permissions[method] = ops
}
}
}
// If macaroons aren't disabled (a non-nil service), then we'll set up // If macaroons aren't disabled (a non-nil service), then we'll set up
// our set of interceptors which will allow us to handle the macaroon // our set of interceptors which will allow us to handle the macaroon
// authentication in a single location. // authentication in a single location.
@ -659,12 +685,6 @@ func newRPCServer(s *server, macService *macaroons.Service,
strmInterceptors, errorLogStreamServerInterceptor(rpcsLog), strmInterceptors, errorLogStreamServerInterceptor(rpcsLog),
) )
// Get the listeners and server options to use for this rpc server.
listeners, cleanup, err := getListeners()
if err != nil {
return nil, err
}
// If any interceptors have been set up, add them to the server options. // If any interceptors have been set up, add them to the server options.
if len(unaryInterceptors) != 0 && len(strmInterceptors) != 0 { if len(unaryInterceptors) != 0 && len(strmInterceptors) != 0 {
chainedUnary := grpc_middleware.WithUnaryServerChain( chainedUnary := grpc_middleware.WithUnaryServerChain(
@ -736,9 +756,29 @@ func (r *rpcServer) Start() error {
go func(lis *ListenerWithSignal) { go func(lis *ListenerWithSignal) {
rpcsLog.Infof("RPC server listening on %s", lis.Addr()) rpcsLog.Infof("RPC server listening on %s", lis.Addr())
// Before actually listening on the gRPC listener, give
// external subservers the chance to register to our
// gRPC server. Those external subservers (think GrUB)
// are responsible for starting/stopping on their own,
// we just let them register their services to the same
// server instance so all of them can be exposed on the
// same port/listener.
extSubCfg := lis.ExternalRPCSubserverCfg
if extSubCfg != nil && extSubCfg.Registrar != nil {
registerer := extSubCfg.Registrar
err := registerer.RegisterGrpcSubserver(
r.grpcServer,
)
if err != nil {
rpcsLog.Errorf("error registering "+
"external gRPC subserver: %v",
err)
}
}
// Close the ready chan to indicate we are listening. // Close the ready chan to indicate we are listening.
close(lis.Ready) close(lis.Ready)
r.grpcServer.Serve(lis) _ = r.grpcServer.Serve(lis)
}(lis) }(lis)
} }
@ -770,32 +810,50 @@ func (r *rpcServer) Start() error {
// //
// TODO(roasbeef): eventually also allow the sub-servers to themselves // TODO(roasbeef): eventually also allow the sub-servers to themselves
// have a REST proxy. // have a REST proxy.
mux := proxy.NewServeMux(customMarshalerOption) restMux := proxy.NewServeMux(customMarshalerOption)
restCtx, restCancel := context.WithCancel(context.Background())
r.listenerCleanUp = append(r.listenerCleanUp, restCancel)
err := lnrpc.RegisterLightningHandlerFromEndpoint( err := lnrpc.RegisterLightningHandlerFromEndpoint(
context.Background(), mux, r.restProxyDest, restCtx, restMux, r.restProxyDest, r.restDialOpts,
r.restDialOpts,
) )
if err != nil { if err != nil {
return err return err
} }
// Before listening on any of the interfaces, we also want to give the
// external subservers a chance to register their own REST proxy stub
// with our mux instance.
for _, lis := range r.listeners {
if lis.ExternalRestRegistrar != nil {
err := lis.ExternalRestRegistrar.RegisterRestSubserver(
restCtx, restMux, r.restProxyDest,
r.restDialOpts,
)
if err != nil {
rpcsLog.Errorf("error registering "+
"external REST subserver: %v", err)
}
}
}
// Now spin up a network listener for each requested port and start a
// goroutine that serves REST with the created mux there.
for _, restEndpoint := range cfg.RESTListeners { for _, restEndpoint := range cfg.RESTListeners {
lis, err := lncfg.TLSListenOnAddress(restEndpoint, r.tlsCfg) lis, err := lncfg.TLSListenOnAddress(restEndpoint, r.tlsCfg)
if err != nil { if err != nil {
ltndLog.Errorf( ltndLog.Errorf("gRPC proxy unable to listen on %s",
"gRPC proxy unable to listen on %s", restEndpoint)
restEndpoint,
)
return err return err
} }
r.listenerCleanUp = append(r.listenerCleanUp, func() { r.listenerCleanUp = append(r.listenerCleanUp, func() {
lis.Close() _ = lis.Close()
}) })
go func() { go func() {
rpcsLog.Infof("gRPC proxy started at %s", lis.Addr()) rpcsLog.Infof("gRPC proxy started at %s", lis.Addr())
http.Serve(lis, mux) _ = http.Serve(lis, restMux)
}() }()
} }