lnd+rpc: modify rpcServer to fully manaage listeners and gRPC, handle sub-servers

In this commit, we modify the existing rpcServer to fully manage the
macaroons, gRPC server, and also seek out and create all sub-servers.
With this change, the RPC server gains more responsibility, as it
becomes the "root" server in the hierarchy of gRPC sub-servers.

In addition to creating each sub-server, it will also merge the set of
macaroon permissions for each sub-server, with the permissions of the
rest of the RPC infra. As a result, each sub-server is able to
independently specify what it needs w.r.t macaroon permissions and have
that taken care of by the RPC server. In order to achieve this, we need
to unify the creation of the RPC interceptors, and also fully manage the
gRPC server ourselves.

Some examples with various build configs:
```
⛰i  make build
 Building debug lnd and lncli.
go build -v -tags="dev" -o lnd-debug -ldflags "-X github.com/lightningnetwork/lnd/build.Commit=v0.5-beta-143-gb2069914c4b76109b7c59320dc48f8a5f30deb75-dirty" github.com/lightningnetwork/lnd
go build -v -tags="dev" -o lncli-debug -ldflags "-X github.com/lightningnetwork/lnd/build.Commit=v0.5-beta-143-gb2069914c4b76109b7c59320dc48f8a5f30deb75-dirty" github.com/lightningnetwork/lnd/cmd/lncli

⛰i  ./lnd-debug --debuglevel=debug --signrpc.signermacaroonpath=~/sign.macaroon
unknown flag `signrpc.signermacaroonpath'
unknown flag `signrpc.signermacaroonpath'

⛰i  make build tags=signerrpc
 Building debug lnd and lncli.
go build -v -tags="dev signerrpc" -o lnd-debug -ldflags "-X github.com/lightningnetwork/lnd/build.Commit=v0.5-beta-143-gb2069914c4b76109b7c59320dc48f8a5f30deb75-dirty" github.com/lightningnetwork/lnd
go build -v -tags="dev signerrpc" -o lncli-debug -ldflags "-X github.com/lightningnetwork/lnd/build.Commit=v0.5-beta-143-gb2069914c4b76109b7c59320dc48f8a5f30deb75-dirty" github.com/lightningnetwork/lnd/cmd/lncli

⛰i  ./lnd-debug --debuglevel=debug --signrpc.signermacaroonpath=~/sign.macaroon
2018-10-22 17:31:01.132 [INF] LTND: Version: 0.5.0-beta commit=v0.5-beta-143-gb2069914c4b76109b7c59320dc48f8a5f30deb75-dirty, build=development, logging=default
2018-10-22 17:31:01.133 [INF] LTND: Active chain: Bitcoin (network=simnet)
2018-10-22 17:31:01.140 [INF] CHDB: Checking for schema update: latest_version=6, db_version=6
2018-10-22 17:31:01.236 [INF] LTND: Primary chain is set to: bitcoin
2018-10-22 17:31:02.391 [INF] LNWL: Opened wallet
2018-10-22 17:31:03.315 [INF] LNWL: The wallet has been unlocked without a time limit
2018-10-22 17:31:03.315 [INF] LTND: LightningWallet opened
2018-10-22 17:31:03.319 [INF] LNWL: Catching up block hashes to height 3060, this will take a while...
2018-10-22 17:31:03.320 [INF] HSWC: Restoring in-memory circuit state from disk
2018-10-22 17:31:03.320 [INF] LNWL: Done catching up block hashes
2018-10-22 17:31:03.320 [INF] HSWC: Payment circuits loaded: num_pending=0, num_open=0
2018-10-22 17:31:03.322 [DBG] LTND: Populating dependencies for sub RPC server: Signrpc
```

As for the config, an example is:
```
[signrpc]
signrpc.signermacaroonpath=~/signer.macaroon
```
This commit is contained in:
Olaoluwa Osuntokun 2018-10-22 18:03:07 -07:00
parent 4002caec69
commit ff47ade13b
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2
2 changed files with 227 additions and 62 deletions

68
lnd.go
View File

@ -312,67 +312,21 @@ func lndMain() error {
return err
}
// Check macaroon authentication if macaroons aren't disabled.
if macaroonService != nil {
serverOpts = append(serverOpts,
grpc.UnaryInterceptor(macaroonService.
UnaryServerInterceptor(permissions)),
grpc.StreamInterceptor(macaroonService.
StreamServerInterceptor(permissions)),
)
}
// Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer.
rpcServer := newRPCServer(server)
rpcServer, err := newRPCServer(
server, macaroonService, cfg.SubRpcServers, serverOpts,
proxyOpts, tlsConf,
)
if err != nil {
srvrLog.Errorf("unable to start RPC server: %v", err)
return err
}
if err := rpcServer.Start(); err != nil {
return err
}
defer rpcServer.Stop()
grpcServer := grpc.NewServer(serverOpts...)
lnrpc.RegisterLightningServer(grpcServer, rpcServer)
// Next, Start the gRPC server listening for HTTP/2 connections.
for _, listener := range cfg.RPCListeners {
lis, err := lncfg.ListenOnAddress(listener)
if err != nil {
ltndLog.Errorf(
"RPC server unable to listen on %s", listener,
)
return err
}
defer lis.Close()
go func() {
rpcsLog.Infof("RPC server listening on %s", lis.Addr())
grpcServer.Serve(lis)
}()
}
// Finally, start the REST proxy for our gRPC server above.
mux := proxy.NewServeMux()
err = lnrpc.RegisterLightningHandlerFromEndpoint(
ctx, mux, cfg.RPCListeners[0].String(), proxyOpts,
)
if err != nil {
return err
}
for _, restEndpoint := range cfg.RESTListeners {
lis, err := lncfg.TLSListenOnAddress(restEndpoint, tlsConf)
if err != nil {
ltndLog.Errorf(
"gRPC proxy unable to listen on %s",
restEndpoint,
)
return err
}
defer lis.Close()
go func() {
rpcsLog.Infof("gRPC proxy started at %s", lis.Addr())
http.Serve(lis, mux)
}()
}
// If we're not in simnet mode, We'll wait until we're fully synced to
// continue the start up of the remainder of the daemon. This ensures
// that we don't accept any possibly invalid state transitions, or
@ -602,9 +556,9 @@ func genCertPair(certFile, keyFile string) error {
return nil
}
// genMacaroons generates three macaroon files; one admin-level, one
// for invoice access and one read-only. These can also be used
// to generate more granular macaroons.
// genMacaroons generates three macaroon files; one admin-level, one for
// invoice access and one read-only. These can also be used to generate more
// granular macaroons.
func genMacaroons(ctx context.Context, svc *macaroons.Service,
admFile, roFile, invoiceFile string) error {

View File

@ -4,11 +4,13 @@ import (
"bytes"
"crypto/rand"
"crypto/sha256"
"crypto/tls"
"encoding/hex"
"errors"
"fmt"
"io"
"math"
"net/http"
"sort"
"strings"
"sync"
@ -24,17 +26,21 @@ import (
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/macaroons"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/zpay32"
"github.com/tv42/zbase32"
"golang.org/x/net/context"
"google.golang.org/grpc"
"gopkg.in/macaroon-bakery.v2/bakery"
)
@ -339,6 +345,31 @@ type rpcServer struct {
wg sync.WaitGroup
// subServers are a set of sub-RPC servers that use the same gRPC and
// listening sockets as the main RPC server, but which maintain their
// own independent service. This allows us to expose a set of
// micro-service like abstractions to the outside world for users to
// consume.
subServers []lnrpc.SubServer
// grpcServer is the main gRPC server that this RPC server, and all the
// sub-servers will use to register themselves and accept client
// requests from.
grpcServer *grpc.Server
// listenerCleanUp are a set of closures functions that will allow this
// main RPC server to clean up all the listening socket created for the
// server.
listenerCleanUp []func()
// restServerOpts are a set of gRPC dial options that the REST server
// proxy will use to connect to the main gRPC server.
restServerOpts []grpc.DialOption
// tlsCfg is the TLS config that allows the REST server proxy to
// connect to the main gRPC server to proxy all incoming requests.
tlsCfg *tls.Config
quit chan struct{}
}
@ -346,12 +377,106 @@ type rpcServer struct {
// LightningServer gRPC service.
var _ lnrpc.LightningServer = (*rpcServer)(nil)
// newRPCServer creates and returns a new instance of the rpcServer.
func newRPCServer(s *server) *rpcServer {
return &rpcServer{
server: s,
quit: make(chan struct{}, 1),
// newRPCServer creates and returns a new instance of the rpcServer. The
// rpcServer will handle creating all listening sockets needed by it, and any
// of the sub-servers that it maintains. The set of serverOpts should be the
// base level options passed to the grPC server. This typically includes things
// like requiring TLS, etc.
func newRPCServer(s *server, macService *macaroons.Service,
subServerCgs *subRpcServerConfigs, serverOpts []grpc.ServerOption,
restServerOpts []grpc.DialOption,
tlsCfg *tls.Config) (*rpcServer, error) {
var (
subServers []lnrpc.SubServer
subServerPerms []lnrpc.MacaroonPerms
)
// Before we create any of the sub-servers, we need to ensure that all
// the dependencies they need are properly populated within each sub
// server configuration struct.
err := subServerCgs.PopulateDependancies(
s.cc, networkDir, macService,
)
if err != nil {
return nil, err
}
// Now that the sub-servers have all their dependencies in place, we
// can create each sub-server!
registeredSubServers := lnrpc.RegisteredSubServers()
for _, subServer := range registeredSubServers {
subServerInstance, macPerms, err := subServer.New(subServerCgs)
if err != nil {
return nil, err
}
// We'll collect the sub-server, and also the set of
// permissions it needs for macaroons so we can apply the
// interceptors below.
subServers = append(subServers, subServerInstance)
subServerPerms = append(subServerPerms, macPerms)
}
// Next, we need to merge the set of sub server macaroon permissions
// with the main RPC server permissions so we can unite them under a
// single set of interceptors.
for _, subServerPerm := range subServerPerms {
for method, ops := range subServerPerm {
// For each new method:ops combo, we also ensure that
// non of the sub-servers try to override each other.
if _, ok := permissions[method]; ok {
return nil, fmt.Errorf("detected duplicate "+
"macaroon constraints for path: %v",
method)
}
permissions[method] = ops
}
}
// If macaroons aren't disabled (a non-nil service), then we'll set up
// our set of interceptors which will allow us handle the macaroon
// authentication in a single location .
if macService != nil {
unaryInterceptor := grpc.UnaryInterceptor(
macService.UnaryServerInterceptor(permissions),
)
streamInterceptor := grpc.StreamInterceptor(
macService.StreamServerInterceptor(permissions),
)
serverOpts = append(serverOpts,
unaryInterceptor, streamInterceptor,
)
}
// Finally, with all the pre-set up complete, we can create the main
// gRPC server, and register the main lnrpc server along side.
grpcServer := grpc.NewServer(serverOpts...)
rootRpcServer := &rpcServer{
restServerOpts: restServerOpts,
subServers: subServers,
tlsCfg: tlsCfg,
grpcServer: grpcServer,
server: s,
quit: make(chan struct{}, 1),
}
lnrpc.RegisterLightningServer(grpcServer, rootRpcServer)
// Now the main RPC server has been registered, we'll iterate through
// all the sub-RPC servers and register them to ensure that requests
// are properly routed towards them.
for _, subServer := range subServers {
err := subServer.RegisterWithRootServer(grpcServer)
if err != nil {
return nil, fmt.Errorf("unable to register "+
"sub-server %v with root: %v",
subServer.Name(), err)
}
}
return rootRpcServer, nil
}
// Start launches any helper goroutines required for the rpcServer to function.
@ -360,6 +485,72 @@ func (r *rpcServer) Start() error {
return nil
}
// First, we'll start all the sub-servers to ensure that they're ready
// to take new requests in.
//
// TODO(roasbeef): some may require that the entire daemon be started
// at that point
for _, subServer := range r.subServers {
rpcsLog.Debugf("Starting sub RPC server: %v", subServer.Name())
if err := subServer.Start(); err != nil {
return err
}
}
// With all the sub-servers started, we'll spin up the listeners for
// the main RPC server itself.
for _, listener := range cfg.RPCListeners {
lis, err := lncfg.ListenOnAddress(listener)
if err != nil {
ltndLog.Errorf(
"RPC server unable to listen on %s", listener,
)
return err
}
r.listenerCleanUp = append(r.listenerCleanUp, func() {
lis.Close()
})
go func() {
rpcsLog.Infof("RPC server listening on %s", lis.Addr())
r.grpcServer.Serve(lis)
}()
}
// Finally, start the REST proxy for our gRPC server above.
//
// TODO(roasbeef): eventually also allow the sub-servers to themselves
// have a REST proxy.
mux := proxy.NewServeMux()
err := lnrpc.RegisterLightningHandlerFromEndpoint(
context.Background(), mux, cfg.RPCListeners[0].String(),
r.restServerOpts,
)
if err != nil {
return err
}
for _, restEndpoint := range cfg.RESTListeners {
lis, err := lncfg.TLSListenOnAddress(restEndpoint, r.tlsCfg)
if err != nil {
ltndLog.Errorf(
"gRPC proxy unable to listen on %s",
restEndpoint,
)
return err
}
r.listenerCleanUp = append(r.listenerCleanUp, func() {
lis.Close()
})
go func() {
rpcsLog.Infof("gRPC proxy started at %s", lis.Addr())
http.Serve(lis, mux)
}()
}
return nil
}
@ -369,8 +560,28 @@ func (r *rpcServer) Stop() error {
return nil
}
rpcsLog.Infof("Stopping RPC Server")
close(r.quit)
// After we've signalled all of our active goroutines to exit, we'll
// then do the same to signal a graceful shutdown of all the sub
// servers.
for _, subServer := range r.subServers {
rpcsLog.Infof("Stopping %v Sub-RPC Server",
subServer.Name())
if err := subServer.Stop(); err != nil {
continue
}
}
// Finally, we can clean up all the listening sockets to ensure that we
// give the file descriptors back to the OS.
for _, cleanUp := range r.listenerCleanUp {
cleanUp()
}
return nil
}