From 9ef556624eac1eba3848f8fe66609075efab60c9 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Tue, 9 Feb 2021 15:12:21 +0100 Subject: [PATCH] lnrpc: add StateService --- lnd.go | 24 ++- lnrpc/gen_protos.sh | 2 +- lnrpc/rest-annotations.yaml | 4 + lnrpc/stateservice.pb.go | 265 ++++++++++++++++++++++++++++++++ lnrpc/stateservice.pb.gw.go | 133 ++++++++++++++++ lnrpc/stateservice.proto | 46 ++++++ lnrpc/stateservice.swagger.json | 125 +++++++++++++++ rpcperms/interceptor.go | 118 ++++++++++++++ 8 files changed, 715 insertions(+), 2 deletions(-) create mode 100644 lnrpc/stateservice.pb.go create mode 100644 lnrpc/stateservice.pb.gw.go create mode 100644 lnrpc/stateservice.proto create mode 100644 lnrpc/stateservice.swagger.json diff --git a/lnd.go b/lnd.go index bd0c6d03..e3b4ef80 100644 --- a/lnd.go +++ b/lnd.go @@ -349,6 +349,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { interceptorChain := rpcperms.NewInterceptorChain( rpcsLog, cfg.NoMacaroons, walletExists, ) + if err := interceptorChain.Start(); err != nil { + return err + } + defer func() { + err := interceptorChain.Stop() + if err != nil { + ltndLog.Warnf("error stopping RPC interceptor "+ + "chain: %v", err) + } + }() + rpcServerOpts := interceptorChain.CreateServerOpts() serverOpts = append(serverOpts, rpcServerOpts...) @@ -358,6 +369,10 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error { // Register the WalletUnlockerService with the GRPC server. lnrpc.RegisterWalletUnlockerServer(grpcServer, pwService) + // We'll also register the RPC interceptor chain as the StateServer, as + // it can be used to query for the current state of the wallet. + lnrpc.RegisterStateServer(grpcServer, interceptorChain) + // Initialize, and register our implementation of the gRPC interface // exported by the rpcServer. rpcServer := newRPCServer( @@ -1239,7 +1254,7 @@ func startRestProxy(cfg *Config, rpcServer *rpcServer, restDialOpts []grpc.DialO ) mux := proxy.NewServeMux(customMarshalerOption) - // Register both services with the REST proxy. + // Register our services with the REST proxy. err := lnrpc.RegisterWalletUnlockerHandlerFromEndpoint( ctx, mux, restProxyDest, restDialOpts, ) @@ -1247,6 +1262,13 @@ func startRestProxy(cfg *Config, rpcServer *rpcServer, restDialOpts []grpc.DialO return nil, err } + err = lnrpc.RegisterStateHandlerFromEndpoint( + ctx, mux, restProxyDest, restDialOpts, + ) + if err != nil { + return nil, err + } + err = rpcServer.RegisterWithRestProxy( ctx, mux, restDialOpts, restProxyDest, ) diff --git a/lnrpc/gen_protos.sh b/lnrpc/gen_protos.sh index 13892fee..17c86edf 100755 --- a/lnrpc/gen_protos.sh +++ b/lnrpc/gen_protos.sh @@ -6,7 +6,7 @@ set -e function generate() { echo "Generating root gRPC server protos" - PROTOS="rpc.proto walletunlocker.proto **/*.proto" + PROTOS="rpc.proto walletunlocker.proto stateservice.proto **/*.proto" # For each of the sub-servers, we then generate their protos, but a restricted # set as they don't yet require REST proxies, or swagger docs. diff --git a/lnrpc/rest-annotations.yaml b/lnrpc/rest-annotations.yaml index 10e68963..1913a6e0 100644 --- a/lnrpc/rest-annotations.yaml +++ b/lnrpc/rest-annotations.yaml @@ -247,6 +247,10 @@ http: post: "/v2/signer/sharedkey" body: "*" + # stateservice.proto + - selector: lnrpc.State.SubscribeState + get: "/v1/state/subscribe" + # verrpc/verrpc.proto - selector: verrpc.Versioner.GetVersion get: "/v2/versioner/version" diff --git a/lnrpc/stateservice.pb.go b/lnrpc/stateservice.pb.go new file mode 100644 index 00000000..4dca5aab --- /dev/null +++ b/lnrpc/stateservice.pb.go @@ -0,0 +1,265 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: stateservice.proto + +package lnrpc + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type WalletState int32 + +const ( + WalletState_NON_EXISTING WalletState = 0 + WalletState_LOCKED WalletState = 1 + WalletState_UNLOCKED WalletState = 2 + WalletState_RPC_ACTIVE WalletState = 3 +) + +var WalletState_name = map[int32]string{ + 0: "NON_EXISTING", + 1: "LOCKED", + 2: "UNLOCKED", + 3: "RPC_ACTIVE", +} + +var WalletState_value = map[string]int32{ + "NON_EXISTING": 0, + "LOCKED": 1, + "UNLOCKED": 2, + "RPC_ACTIVE": 3, +} + +func (x WalletState) String() string { + return proto.EnumName(WalletState_name, int32(x)) +} + +func (WalletState) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_9cd64a11048f05dd, []int{0} +} + +type SubscribeStateRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SubscribeStateRequest) Reset() { *m = SubscribeStateRequest{} } +func (m *SubscribeStateRequest) String() string { return proto.CompactTextString(m) } +func (*SubscribeStateRequest) ProtoMessage() {} +func (*SubscribeStateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_9cd64a11048f05dd, []int{0} +} + +func (m *SubscribeStateRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SubscribeStateRequest.Unmarshal(m, b) +} +func (m *SubscribeStateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SubscribeStateRequest.Marshal(b, m, deterministic) +} +func (m *SubscribeStateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscribeStateRequest.Merge(m, src) +} +func (m *SubscribeStateRequest) XXX_Size() int { + return xxx_messageInfo_SubscribeStateRequest.Size(m) +} +func (m *SubscribeStateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SubscribeStateRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscribeStateRequest proto.InternalMessageInfo + +type SubscribeStateResponse struct { + State WalletState `protobuf:"varint,1,opt,name=State,proto3,enum=lnrpc.WalletState" json:"State,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SubscribeStateResponse) Reset() { *m = SubscribeStateResponse{} } +func (m *SubscribeStateResponse) String() string { return proto.CompactTextString(m) } +func (*SubscribeStateResponse) ProtoMessage() {} +func (*SubscribeStateResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_9cd64a11048f05dd, []int{1} +} + +func (m *SubscribeStateResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SubscribeStateResponse.Unmarshal(m, b) +} +func (m *SubscribeStateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SubscribeStateResponse.Marshal(b, m, deterministic) +} +func (m *SubscribeStateResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscribeStateResponse.Merge(m, src) +} +func (m *SubscribeStateResponse) XXX_Size() int { + return xxx_messageInfo_SubscribeStateResponse.Size(m) +} +func (m *SubscribeStateResponse) XXX_DiscardUnknown() { + xxx_messageInfo_SubscribeStateResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscribeStateResponse proto.InternalMessageInfo + +func (m *SubscribeStateResponse) GetState() WalletState { + if m != nil { + return m.State + } + return WalletState_NON_EXISTING +} + +func init() { + proto.RegisterEnum("lnrpc.WalletState", WalletState_name, WalletState_value) + proto.RegisterType((*SubscribeStateRequest)(nil), "lnrpc.SubscribeStateRequest") + proto.RegisterType((*SubscribeStateResponse)(nil), "lnrpc.SubscribeStateResponse") +} + +func init() { proto.RegisterFile("stateservice.proto", fileDescriptor_9cd64a11048f05dd) } + +var fileDescriptor_9cd64a11048f05dd = []byte{ + // 244 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2a, 0x2e, 0x49, 0x2c, + 0x49, 0x2d, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, + 0xcd, 0xc9, 0x2b, 0x2a, 0x48, 0x56, 0x12, 0xe7, 0x12, 0x0d, 0x2e, 0x4d, 0x2a, 0x4e, 0x2e, 0xca, + 0x4c, 0x4a, 0x0d, 0x06, 0xa9, 0x0a, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x51, 0x72, 0xe2, 0x12, + 0x43, 0x97, 0x28, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x15, 0xd2, 0xe0, 0x62, 0x05, 0x0b, 0x48, 0x30, + 0x2a, 0x30, 0x6a, 0xf0, 0x19, 0x09, 0xe9, 0x81, 0x4d, 0xd2, 0x0b, 0x4f, 0xcc, 0xc9, 0x49, 0x2d, + 0x81, 0x28, 0x85, 0x28, 0xd0, 0xf2, 0xe4, 0xe2, 0x46, 0x12, 0x15, 0x12, 0xe0, 0xe2, 0xf1, 0xf3, + 0xf7, 0x8b, 0x77, 0x8d, 0xf0, 0x0c, 0x0e, 0xf1, 0xf4, 0x73, 0x17, 0x60, 0x10, 0xe2, 0xe2, 0x62, + 0xf3, 0xf1, 0x77, 0xf6, 0x76, 0x75, 0x11, 0x60, 0x14, 0xe2, 0xe1, 0xe2, 0x08, 0xf5, 0x83, 0xf2, + 0x98, 0x84, 0xf8, 0xb8, 0xb8, 0x82, 0x02, 0x9c, 0xe3, 0x1d, 0x9d, 0x43, 0x3c, 0xc3, 0x5c, 0x05, + 0x98, 0x8d, 0x22, 0xa0, 0x96, 0x0a, 0xf9, 0x73, 0xf1, 0xa1, 0xba, 0x4b, 0x48, 0x06, 0xea, 0x00, + 0xac, 0xfe, 0x90, 0x92, 0xc5, 0x21, 0x0b, 0xf1, 0x8c, 0x01, 0xa3, 0x93, 0x7a, 0x94, 0x6a, 0x7a, + 0x66, 0x49, 0x46, 0x69, 0x92, 0x5e, 0x72, 0x7e, 0xae, 0x7e, 0x4e, 0x66, 0x7a, 0x46, 0x49, 0x5e, + 0x66, 0x5e, 0x7a, 0x5e, 0x6a, 0x49, 0x79, 0x7e, 0x51, 0xb6, 0x7e, 0x4e, 0x5e, 0x8a, 0x3e, 0xd8, + 0x84, 0x24, 0x36, 0x70, 0xc0, 0x19, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x05, 0xd6, 0xa9, 0xb4, + 0x4e, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// StateClient is the client API for State service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type StateClient interface { + // SubscribeState subscribes to the state of the wallet. The current wallet + // state will always be delivered immediately. + SubscribeState(ctx context.Context, in *SubscribeStateRequest, opts ...grpc.CallOption) (State_SubscribeStateClient, error) +} + +type stateClient struct { + cc *grpc.ClientConn +} + +func NewStateClient(cc *grpc.ClientConn) StateClient { + return &stateClient{cc} +} + +func (c *stateClient) SubscribeState(ctx context.Context, in *SubscribeStateRequest, opts ...grpc.CallOption) (State_SubscribeStateClient, error) { + stream, err := c.cc.NewStream(ctx, &_State_serviceDesc.Streams[0], "/lnrpc.State/SubscribeState", opts...) + if err != nil { + return nil, err + } + x := &stateSubscribeStateClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type State_SubscribeStateClient interface { + Recv() (*SubscribeStateResponse, error) + grpc.ClientStream +} + +type stateSubscribeStateClient struct { + grpc.ClientStream +} + +func (x *stateSubscribeStateClient) Recv() (*SubscribeStateResponse, error) { + m := new(SubscribeStateResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// StateServer is the server API for State service. +type StateServer interface { + // SubscribeState subscribes to the state of the wallet. The current wallet + // state will always be delivered immediately. + SubscribeState(*SubscribeStateRequest, State_SubscribeStateServer) error +} + +// UnimplementedStateServer can be embedded to have forward compatible implementations. +type UnimplementedStateServer struct { +} + +func (*UnimplementedStateServer) SubscribeState(req *SubscribeStateRequest, srv State_SubscribeStateServer) error { + return status.Errorf(codes.Unimplemented, "method SubscribeState not implemented") +} + +func RegisterStateServer(s *grpc.Server, srv StateServer) { + s.RegisterService(&_State_serviceDesc, srv) +} + +func _State_SubscribeState_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeStateRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(StateServer).SubscribeState(m, &stateSubscribeStateServer{stream}) +} + +type State_SubscribeStateServer interface { + Send(*SubscribeStateResponse) error + grpc.ServerStream +} + +type stateSubscribeStateServer struct { + grpc.ServerStream +} + +func (x *stateSubscribeStateServer) Send(m *SubscribeStateResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _State_serviceDesc = grpc.ServiceDesc{ + ServiceName: "lnrpc.State", + HandlerType: (*StateServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SubscribeState", + Handler: _State_SubscribeState_Handler, + ServerStreams: true, + }, + }, + Metadata: "stateservice.proto", +} diff --git a/lnrpc/stateservice.pb.gw.go b/lnrpc/stateservice.pb.gw.go new file mode 100644 index 00000000..74b45a6b --- /dev/null +++ b/lnrpc/stateservice.pb.gw.go @@ -0,0 +1,133 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: stateservice.proto + +/* +Package lnrpc is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package lnrpc + +import ( + "context" + "io" + "net/http" + + "github.com/golang/protobuf/descriptor" + "github.com/golang/protobuf/proto" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/grpc-ecosystem/grpc-gateway/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/status" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = descriptor.ForMessage + +func request_State_SubscribeState_0(ctx context.Context, marshaler runtime.Marshaler, client StateClient, req *http.Request, pathParams map[string]string) (State_SubscribeStateClient, runtime.ServerMetadata, error) { + var protoReq SubscribeStateRequest + var metadata runtime.ServerMetadata + + stream, err := client.SubscribeState(ctx, &protoReq) + if err != nil { + return nil, metadata, err + } + header, err := stream.Header() + if err != nil { + return nil, metadata, err + } + metadata.HeaderMD = header + return stream, metadata, nil + +} + +// RegisterStateHandlerServer registers the http handlers for service State to "mux". +// UnaryRPC :call StateServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +func RegisterStateHandlerServer(ctx context.Context, mux *runtime.ServeMux, server StateServer) error { + + mux.Handle("GET", pattern_State_SubscribeState_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") + _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + }) + + return nil +} + +// RegisterStateHandlerFromEndpoint is same as RegisterStateHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterStateHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterStateHandler(ctx, mux, conn) +} + +// RegisterStateHandler registers the http handlers for service State to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterStateHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterStateHandlerClient(ctx, mux, NewStateClient(conn)) +} + +// RegisterStateHandlerClient registers the http handlers for service State +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "StateClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "StateClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "StateClient" to call the correct interceptors. +func RegisterStateHandlerClient(ctx context.Context, mux *runtime.ServeMux, client StateClient) error { + + mux.Handle("GET", pattern_State_SubscribeState_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_State_SubscribeState_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_State_SubscribeState_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_State_SubscribeState_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "state", "subscribe"}, "", runtime.AssumeColonVerbOpt(true))) +) + +var ( + forward_State_SubscribeState_0 = runtime.ForwardResponseStream +) diff --git a/lnrpc/stateservice.proto b/lnrpc/stateservice.proto new file mode 100644 index 00000000..a884e781 --- /dev/null +++ b/lnrpc/stateservice.proto @@ -0,0 +1,46 @@ +syntax = "proto3"; + +package lnrpc; + +option go_package = "github.com/lightningnetwork/lnd/lnrpc"; + +/* + * Comments in this file will be directly parsed into the API + * Documentation as descriptions of the associated method, message, or field. + * These descriptions should go right above the definition of the object, and + * can be in either block or // comment format. + * + * An RPC method can be matched to an lncli command by placing a line in the + * beginning of the description in exactly the following format: + * lncli: `methodname` + * + * Failure to specify the exact name of the command will cause documentation + * generation to fail. + * + * More information on how exactly the gRPC documentation is generated from + * this proto file can be found here: + * https://github.com/lightninglabs/lightning-api + */ + +// State service is a always running service that exposes the current state of +// the wallet and RPC server. +service State { + // SubscribeState subscribes to the state of the wallet. The current wallet + // state will always be delivered immediately. + rpc SubscribeState (SubscribeStateRequest) + returns (stream SubscribeStateResponse); +} + +enum WalletState { + NON_EXISTING = 0; + LOCKED = 1; + UNLOCKED = 2; + RPC_ACTIVE = 3; +} + +message SubscribeStateRequest { +} + +message SubscribeStateResponse { + WalletState State = 1; +} diff --git a/lnrpc/stateservice.swagger.json b/lnrpc/stateservice.swagger.json new file mode 100644 index 00000000..ef521fc4 --- /dev/null +++ b/lnrpc/stateservice.swagger.json @@ -0,0 +1,125 @@ +{ + "swagger": "2.0", + "info": { + "title": "stateservice.proto", + "version": "version not set" + }, + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "paths": { + "/v1/state/subscribe": { + "get": { + "summary": "SubscribeState subscribes to the state of the wallet. The current wallet\nstate will always be delivered immediately.", + "operationId": "SubscribeState", + "responses": { + "200": { + "description": "A successful response.(streaming responses)", + "schema": { + "type": "object", + "properties": { + "result": { + "$ref": "#/definitions/lnrpcSubscribeStateResponse" + }, + "error": { + "$ref": "#/definitions/runtimeStreamError" + } + }, + "title": "Stream result of lnrpcSubscribeStateResponse" + } + }, + "default": { + "description": "An unexpected error response", + "schema": { + "$ref": "#/definitions/runtimeError" + } + } + }, + "tags": [ + "State" + ] + } + } + }, + "definitions": { + "lnrpcSubscribeStateResponse": { + "type": "object", + "properties": { + "State": { + "$ref": "#/definitions/lnrpcWalletState" + } + } + }, + "lnrpcWalletState": { + "type": "string", + "enum": [ + "NON_EXISTING", + "LOCKED", + "UNLOCKED", + "RPC_ACTIVE" + ], + "default": "NON_EXISTING" + }, + "protobufAny": { + "type": "object", + "properties": { + "type_url": { + "type": "string" + }, + "value": { + "type": "string", + "format": "byte" + } + } + }, + "runtimeError": { + "type": "object", + "properties": { + "error": { + "type": "string" + }, + "code": { + "type": "integer", + "format": "int32" + }, + "message": { + "type": "string" + }, + "details": { + "type": "array", + "items": { + "$ref": "#/definitions/protobufAny" + } + } + } + }, + "runtimeStreamError": { + "type": "object", + "properties": { + "grpc_code": { + "type": "integer", + "format": "int32" + }, + "http_code": { + "type": "integer", + "format": "int32" + }, + "message": { + "type": "string" + }, + "http_status": { + "type": "string" + }, + "details": { + "type": "array", + "items": { + "$ref": "#/definitions/protobufAny" + } + } + } + } + } +} diff --git a/rpcperms/interceptor.go b/rpcperms/interceptor.go index 207344e3..3fd6ec0f 100644 --- a/rpcperms/interceptor.go +++ b/rpcperms/interceptor.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/monitoring" + "github.com/lightningnetwork/lnd/subscribe" "google.golang.org/grpc" "gopkg.in/macaroon-bakery.v2/bakery" ) @@ -66,6 +67,10 @@ var ( "/lnrpc.WalletUnlocker/InitWallet": {}, "/lnrpc.WalletUnlocker/UnlockWallet": {}, "/lnrpc.WalletUnlocker/ChangePassword": {}, + + // The State service must be available at all times, even + // before we can check macaroons, so we whitelist it. + "/lnrpc.State/SubscribeState": {}, } ) @@ -73,9 +78,16 @@ var ( // intercepting API calls. This is useful for logging, enforcing permissions // etc. type InterceptorChain struct { + started sync.Once + stopped sync.Once + // state is the current RPC state of our RPC server. state rpcState + // ntfnServer is a subscription server we use to notify clients of the + // State service when the state changes. + ntfnServer *subscribe.Server + // noMacaroons should be set true if we don't want to check macaroons. noMacaroons bool @@ -89,9 +101,14 @@ type InterceptorChain struct { // rpcsLog is the logger used to log calles to the RPCs intercepted. rpcsLog btclog.Logger + quit chan struct{} sync.RWMutex } +// A compile time check to ensure that InterceptorChain fully implements the +// StateServer gRPC service. +var _ lnrpc.StateServer = (*InterceptorChain)(nil) + // NewInterceptorChain creates a new InterceptorChain. func NewInterceptorChain(log btclog.Logger, noMacaroons, walletExists bool) *InterceptorChain { @@ -103,12 +120,36 @@ func NewInterceptorChain(log btclog.Logger, noMacaroons, return &InterceptorChain{ state: startState, + ntfnServer: subscribe.NewServer(), noMacaroons: noMacaroons, permissionMap: make(map[string][]bakery.Op), rpcsLog: log, + quit: make(chan struct{}), } } +// Start starts the InterceptorChain, which is needed to start the state +// subscription server it powers. +func (r *InterceptorChain) Start() error { + var err error + r.started.Do(func() { + err = r.ntfnServer.Start() + }) + + return err +} + +// Stop stops the InterceptorChain and its internal state subscription server. +func (r *InterceptorChain) Stop() error { + var err error + r.stopped.Do(func() { + close(r.quit) + err = r.ntfnServer.Stop() + }) + + return err +} + // SetWalletUnlocked moves the RPC state from either walletNotCreated or // walletLocked to walletUnlocked. func (r *InterceptorChain) SetWalletUnlocked() { @@ -116,6 +157,7 @@ func (r *InterceptorChain) SetWalletUnlocked() { defer r.Unlock() r.state = walletUnlocked + _ = r.ntfnServer.SendUpdate(r.state) } // SetRPCActive moves the RPC state from walletUnlocked to rpcActive. @@ -124,6 +166,75 @@ func (r *InterceptorChain) SetRPCActive() { defer r.Unlock() r.state = rpcActive + _ = r.ntfnServer.SendUpdate(r.state) +} + +// SubscribeState subscribes to the state of the wallet. The current wallet +// state will always be delivered immediately. +// +// NOTE: Part of the StateService interface. +func (r *InterceptorChain) SubscribeState(req *lnrpc.SubscribeStateRequest, + stream lnrpc.State_SubscribeStateServer) error { + + sendStateUpdate := func(state rpcState) error { + resp := &lnrpc.SubscribeStateResponse{} + switch state { + case walletNotCreated: + resp.State = lnrpc.WalletState_NON_EXISTING + case walletLocked: + resp.State = lnrpc.WalletState_LOCKED + case walletUnlocked: + resp.State = lnrpc.WalletState_UNLOCKED + case rpcActive: + resp.State = lnrpc.WalletState_RPC_ACTIVE + + default: + return fmt.Errorf("unknown wallet "+ + "state %v", state) + } + + return stream.Send(resp) + } + + // Subscribe to state updates. + client, err := r.ntfnServer.Subscribe() + if err != nil { + return err + } + defer client.Cancel() + + // Always start by sending the current state. + r.RLock() + state := r.state + r.RUnlock() + + if err := sendStateUpdate(state); err != nil { + return err + } + + for { + select { + case e := <-client.Updates(): + newState := e.(rpcState) + + // Ignore already sent state. + if newState == state { + continue + } + + state = newState + err := sendStateUpdate(state) + if err != nil { + return err + } + + case <-stream.Context().Done(): + return stream.Context().Err() + + case <-r.quit: + return fmt.Errorf("server exiting") + } + } } // AddMacaroonService adds a macaroon service to the interceptor. After this is @@ -335,6 +446,13 @@ func (r *InterceptorChain) MacaroonStreamServerInterceptor() grpc.StreamServerIn // checkRPCState checks whether a call to the given server is allowed in the // current RPC state. func (r *InterceptorChain) checkRPCState(srv interface{}) error { + // The StateService is being accessed, we allow the call regardless of + // the current state. + _, ok := srv.(lnrpc.StateServer) + if ok { + return nil + } + r.RLock() state := r.state r.RUnlock()