diff --git a/docs/rest/websockets.md b/docs/rest/websockets.md index 705a4c73..f2c85769 100644 --- a/docs/rest/websockets.md +++ b/docs/rest/websockets.md @@ -97,3 +97,55 @@ ws.on('message', function(body) { // "height": , // } ``` + +## Request-streaming RPCs + +Starting with `lnd v0.13.0-beta` all RPCs can be used through REST, even those +that are fully bi-directional (e.g. the client can also send multiple request +messages to the stream). + +**Example**: + +As an example we show how one can use the bi-directional channel acceptor RPC. +Through that RPC each incoming channel open request (another peer opening a +channel to our node) will be passed in for inspection. We can decide +programmatically whether to accept or reject the channel. + +```javascript +// -------------------------- +// Example with websockets: +// -------------------------- +const WebSocket = require('ws'); +const fs = require('fs'); +const macaroon = fs.readFileSync('LND_DIR/data/chain/bitcoin/simnet/admin.macaroon').toString('hex'); +let ws = new WebSocket('wss://localhost:8080/v1/channels/acceptor?method=POST', { + // Work-around for self-signed certificates. + rejectUnauthorized: false, + headers: { + 'Grpc-Metadata-Macaroon': macaroon, + }, +}); +ws.on('open', function() { + // We always _need_ to send an initial message to kickstart the request. + // This empty message will be ignored by the channel acceptor though, this + // is just for telling the grpc-gateway library that it can forward the + // request to the gRPC interface now. If this were an RPC where the client + // always sends the first message (for example the streaming payment RPC + // /v1/channels/transaction-stream), we'd simply send the first "real" + // message here when needed. + ws.send('{}'); +}); +ws.on('error', function(err) { + console.log('Error: ' + err); +}); +ws.on('ping', function ping(event) { + console.log('Received ping from server: ' + JSON.stringify(event)); +}); +ws.on('message', function incoming(event) { + console.log('New channel accept message: ' + event); + const result = JSON.parse(event).result; + + // Accept the channel after inspecting it. + ws.send(JSON.stringify({accept: true, pending_chan_id: result.pending_chan_id})); +}); +``` diff --git a/lnd.go b/lnd.go index b82d2f7c..589ec38c 100644 --- a/lnd.go +++ b/lnd.go @@ -1265,7 +1265,9 @@ func startRestProxy(cfg *Config, rpcServer *rpcServer, restDialOpts []grpc.DialO } // Wrap the default grpc-gateway handler with the WebSocket handler. - restHandler := lnrpc.NewWebSocketProxy(mux, rpcsLog) + restHandler := lnrpc.NewWebSocketProxy( + mux, rpcsLog, lnrpc.LndClientStreamingURIs, + ) // Use a WaitGroup so we can be sure the instructions on how to input the // password is the last thing to be printed to the console. diff --git a/lnrpc/metadata.go b/lnrpc/metadata.go new file mode 100644 index 00000000..fc54560b --- /dev/null +++ b/lnrpc/metadata.go @@ -0,0 +1,17 @@ +package lnrpc + +import "regexp" + +var ( + // LndClientStreamingURIs is a list of all lnd RPCs that use a request- + // streaming interface. Those request-streaming RPCs need to be handled + // differently in the WebsocketProxy because of how the request body + // parsing is implemented in the grpc-gateway library. Unfortunately + // there is no straightforward way of obtaining this information on + // runtime so we need to keep a hard coded list here. + LndClientStreamingURIs = []*regexp.Regexp{ + regexp.MustCompile("^/v1/channels/acceptor$"), + regexp.MustCompile("^/v1/channels/transaction-stream$"), + regexp.MustCompile("^/v2/router/htlcinterceptor$"), + } +) diff --git a/lnrpc/rest-annotations.yaml b/lnrpc/rest-annotations.yaml index c10b2605..7b388da0 100644 --- a/lnrpc/rest-annotations.yaml +++ b/lnrpc/rest-annotations.yaml @@ -1,6 +1,10 @@ type: google.api.Service config_version: 3 +# Mapping for the grpc-gateway REST proxy. +# Please make sure to also update the `metadata.go` file when editing this file +# and adding a new client-streaming RPC! + http: rules: # rpc.proto @@ -61,12 +65,15 @@ http: post: "/v1/funding/step" body: "*" - selector: lnrpc.Lightning.ChannelAcceptor - # request streaming RPC, REST not supported + post: "/v1/channels/acceptor" + body: "*" - selector: lnrpc.Lightning.CloseChannel delete: "/v1/channels/{channel_point.funding_txid_str}/{channel_point.output_index}" - selector: lnrpc.Lightning.AbandonChannel delete: "/v1/channels/abandon/{channel_point.funding_txid_str}/{channel_point.output_index}" - selector: lnrpc.Lightning.SendPayment + post: "/v1/channels/transaction-stream" + body: "*" - selector: lnrpc.Lightning.SendPaymentSync post: "/v1/channels/transactions" body: "*" @@ -228,7 +235,8 @@ http: - selector: routerrpc.Router.TrackPayment # deprecated, no REST endpoint - selector: routerrpc.HtlcInterceptor - # request streaming RPC, REST not supported + post: "/v2/router/htlcinterceptor" + body: "*" - selector: routerrpc.UpdateChanStatus post: "/v2/router/updatechanstatus" body: "*" diff --git a/lnrpc/rpc.pb.gw.go b/lnrpc/rpc.pb.gw.go index 41021334..ce69a3d6 100644 --- a/lnrpc/rpc.pb.gw.go +++ b/lnrpc/rpc.pb.gw.go @@ -731,6 +731,58 @@ func local_request_Lightning_FundingStateStep_0(ctx context.Context, marshaler r } +func request_Lightning_ChannelAcceptor_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (Lightning_ChannelAcceptorClient, runtime.ServerMetadata, error) { + var metadata runtime.ServerMetadata + stream, err := client.ChannelAcceptor(ctx) + if err != nil { + grpclog.Infof("Failed to start streaming: %v", err) + return nil, metadata, err + } + dec := marshaler.NewDecoder(req.Body) + handleSend := func() error { + var protoReq ChannelAcceptResponse + err := dec.Decode(&protoReq) + if err == io.EOF { + return err + } + if err != nil { + grpclog.Infof("Failed to decode request: %v", err) + return err + } + if err := stream.Send(&protoReq); err != nil { + grpclog.Infof("Failed to send request: %v", err) + return err + } + return nil + } + if err := handleSend(); err != nil { + if cerr := stream.CloseSend(); cerr != nil { + grpclog.Infof("Failed to terminate client stream: %v", cerr) + } + if err == io.EOF { + return stream, metadata, nil + } + return nil, metadata, err + } + go func() { + for { + if err := handleSend(); err != nil { + break + } + } + if err := stream.CloseSend(); err != nil { + grpclog.Infof("Failed to terminate client stream: %v", err) + } + }() + header, err := stream.Header() + if err != nil { + grpclog.Infof("Failed to get header from client: %v", err) + return nil, metadata, err + } + metadata.HeaderMD = header + return stream, metadata, nil +} + var ( filter_Lightning_CloseChannel_0 = &utilities.DoubleArray{Encoding: map[string]int{"channel_point": 0, "funding_txid_str": 1, "output_index": 2}, Base: []int{1, 1, 1, 2, 0, 0}, Check: []int{0, 1, 2, 2, 3, 4}} ) @@ -879,6 +931,58 @@ func local_request_Lightning_AbandonChannel_0(ctx context.Context, marshaler run } +func request_Lightning_SendPayment_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (Lightning_SendPaymentClient, runtime.ServerMetadata, error) { + var metadata runtime.ServerMetadata + stream, err := client.SendPayment(ctx) + if err != nil { + grpclog.Infof("Failed to start streaming: %v", err) + return nil, metadata, err + } + dec := marshaler.NewDecoder(req.Body) + handleSend := func() error { + var protoReq SendRequest + err := dec.Decode(&protoReq) + if err == io.EOF { + return err + } + if err != nil { + grpclog.Infof("Failed to decode request: %v", err) + return err + } + if err := stream.Send(&protoReq); err != nil { + grpclog.Infof("Failed to send request: %v", err) + return err + } + return nil + } + if err := handleSend(); err != nil { + if cerr := stream.CloseSend(); cerr != nil { + grpclog.Infof("Failed to terminate client stream: %v", cerr) + } + if err == io.EOF { + return stream, metadata, nil + } + return nil, metadata, err + } + go func() { + for { + if err := handleSend(); err != nil { + break + } + } + if err := stream.CloseSend(); err != nil { + grpclog.Infof("Failed to terminate client stream: %v", err) + } + }() + header, err := stream.Header() + if err != nil { + grpclog.Infof("Failed to get header from client: %v", err) + return nil, metadata, err + } + metadata.HeaderMD = header + return stream, metadata, nil +} + func request_Lightning_SendPaymentSync_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var protoReq SendRequest var metadata runtime.ServerMetadata @@ -2451,6 +2555,13 @@ func RegisterLightningHandlerServer(ctx context.Context, mux *runtime.ServeMux, }) + mux.Handle("POST", pattern_Lightning_ChannelAcceptor_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") + _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + }) + mux.Handle("DELETE", pattern_Lightning_CloseChannel_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) @@ -2478,6 +2589,13 @@ func RegisterLightningHandlerServer(ctx context.Context, mux *runtime.ServeMux, }) + mux.Handle("POST", pattern_Lightning_SendPayment_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") + _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + }) + mux.Handle("POST", pattern_Lightning_SendPaymentSync_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -3560,6 +3678,26 @@ func RegisterLightningHandlerClient(ctx context.Context, mux *runtime.ServeMux, }) + mux.Handle("POST", pattern_Lightning_ChannelAcceptor_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Lightning_ChannelAcceptor_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Lightning_ChannelAcceptor_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) + + }) + mux.Handle("DELETE", pattern_Lightning_CloseChannel_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -3600,6 +3738,26 @@ func RegisterLightningHandlerClient(ctx context.Context, mux *runtime.ServeMux, }) + mux.Handle("POST", pattern_Lightning_SendPayment_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Lightning_SendPayment_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Lightning_SendPayment_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) + + }) + mux.Handle("POST", pattern_Lightning_SendPaymentSync_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -4252,10 +4410,14 @@ var ( pattern_Lightning_FundingStateStep_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "funding", "step"}, "", runtime.AssumeColonVerbOpt(true))) + pattern_Lightning_ChannelAcceptor_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "channels", "acceptor"}, "", runtime.AssumeColonVerbOpt(true))) + pattern_Lightning_CloseChannel_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 1, 0, 4, 1, 5, 3}, []string{"v1", "channels", "channel_point.funding_txid_str", "channel_point.output_index"}, "", runtime.AssumeColonVerbOpt(true))) pattern_Lightning_AbandonChannel_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 1, 0, 4, 1, 5, 3, 1, 0, 4, 1, 5, 4}, []string{"v1", "channels", "abandon", "channel_point.funding_txid_str", "channel_point.output_index"}, "", runtime.AssumeColonVerbOpt(true))) + pattern_Lightning_SendPayment_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "channels", "transaction-stream"}, "", runtime.AssumeColonVerbOpt(true))) + pattern_Lightning_SendPaymentSync_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "channels", "transactions"}, "", runtime.AssumeColonVerbOpt(true))) pattern_Lightning_SendToRouteSync_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 2, 3}, []string{"v1", "channels", "transactions", "route"}, "", runtime.AssumeColonVerbOpt(true))) @@ -4366,10 +4528,14 @@ var ( forward_Lightning_FundingStateStep_0 = runtime.ForwardResponseMessage + forward_Lightning_ChannelAcceptor_0 = runtime.ForwardResponseStream + forward_Lightning_CloseChannel_0 = runtime.ForwardResponseStream forward_Lightning_AbandonChannel_0 = runtime.ForwardResponseMessage + forward_Lightning_SendPayment_0 = runtime.ForwardResponseStream + forward_Lightning_SendPaymentSync_0 = runtime.ForwardResponseMessage forward_Lightning_SendToRouteSync_0 = runtime.ForwardResponseMessage diff --git a/lnrpc/rpc.swagger.json b/lnrpc/rpc.swagger.json index 8cbe8838..433eb887 100644 --- a/lnrpc/rpc.swagger.json +++ b/lnrpc/rpc.swagger.json @@ -204,6 +204,49 @@ ] } }, + "/v1/channels/acceptor": { + "post": { + "summary": "ChannelAcceptor dispatches a bi-directional streaming RPC in which\nOpenChannel requests are sent to the client and the client responds with\na boolean that tells LND whether or not to accept the channel. This allows\nnode operators to specify their own criteria for accepting inbound channels\nthrough a single persistent connection.", + "operationId": "ChannelAcceptor", + "responses": { + "200": { + "description": "A successful response.(streaming responses)", + "schema": { + "type": "object", + "properties": { + "result": { + "$ref": "#/definitions/lnrpcChannelAcceptRequest" + }, + "error": { + "$ref": "#/definitions/runtimeStreamError" + } + }, + "title": "Stream result of lnrpcChannelAcceptRequest" + } + }, + "default": { + "description": "An unexpected error response", + "schema": { + "$ref": "#/definitions/runtimeError" + } + } + }, + "parameters": [ + { + "name": "body", + "description": " (streaming inputs)", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/lnrpcChannelAcceptResponse" + } + } + ], + "tags": [ + "Lightning" + ] + } + }, "/v1/channels/backup": { "get": { "summary": "ExportAllChannelBackups returns static channel backups for all existing\nchannels known to lnd. A set of regular singular static channel backups for\neach channel are returned. Additionally, a multi-channel backup is returned\nas well, which contains a single encrypted blob containing the backups of\neach channel.", @@ -537,6 +580,49 @@ ] } }, + "/v1/channels/transaction-stream": { + "post": { + "summary": "lncli: `sendpayment`\nDeprecated, use routerrpc.SendPaymentV2. SendPayment dispatches a\nbi-directional streaming RPC for sending payments through the Lightning\nNetwork. A single RPC invocation creates a persistent bi-directional\nstream allowing clients to rapidly send payments through the Lightning\nNetwork with a single persistent connection.", + "operationId": "SendPayment", + "responses": { + "200": { + "description": "A successful response.(streaming responses)", + "schema": { + "type": "object", + "properties": { + "result": { + "$ref": "#/definitions/lnrpcSendResponse" + }, + "error": { + "$ref": "#/definitions/runtimeStreamError" + } + }, + "title": "Stream result of lnrpcSendResponse" + } + }, + "default": { + "description": "An unexpected error response", + "schema": { + "$ref": "#/definitions/runtimeError" + } + } + }, + "parameters": [ + { + "name": "body", + "description": " (streaming inputs)", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/lnrpcSendRequest" + } + } + ], + "tags": [ + "Lightning" + ] + } + }, "/v1/channels/transactions": { "post": { "summary": "SendPaymentSync is the synchronous non-streaming version of SendPayment.\nThis RPC is intended to be consumed by clients of the REST proxy.\nAdditionally, this RPC expects the destination's public key and the payment\nhash (if any) to be encoded as hex strings.", @@ -2929,6 +3015,59 @@ } } }, + "lnrpcChannelAcceptResponse": { + "type": "object", + "properties": { + "accept": { + "type": "boolean", + "format": "boolean", + "description": "Whether or not the client accepts the channel." + }, + "pending_chan_id": { + "type": "string", + "format": "byte", + "description": "The pending channel id to which this response applies." + }, + "error": { + "type": "string", + "description": "An optional error to send the initiating party to indicate why the channel\nwas rejected. This field *should not* contain sensitive information, it will\nbe sent to the initiating party. This field should only be set if accept is\nfalse, the channel will be rejected if an error is set with accept=true\nbecause the meaning of this response is ambiguous. Limited to 500\ncharacters." + }, + "upfront_shutdown": { + "type": "string", + "description": "The upfront shutdown address to use if the initiating peer supports option\nupfront shutdown script (see ListPeers for the features supported). Note\nthat the channel open will fail if this value is set for a peer that does\nnot support this feature bit." + }, + "csv_delay": { + "type": "integer", + "format": "int64", + "description": "The csv delay (in blocks) that we require for the remote party." + }, + "reserve_sat": { + "type": "string", + "format": "uint64", + "description": "The reserve amount in satoshis that we require the remote peer to adhere to.\nWe require that the remote peer always have some reserve amount allocated to\nthem so that there is always a disincentive to broadcast old state (if they\nhold 0 sats on their side of the channel, there is nothing to lose)." + }, + "in_flight_max_msat": { + "type": "string", + "format": "uint64", + "description": "The maximum amount of funds in millisatoshis that we allow the remote peer\nto have in outstanding htlcs." + }, + "max_htlc_count": { + "type": "integer", + "format": "int64", + "description": "The maximum number of htlcs that the remote peer can offer us." + }, + "min_htlc_in": { + "type": "string", + "format": "uint64", + "description": "The minimum value in millisatoshis for incoming htlcs on the channel." + }, + "min_accept_depth": { + "type": "integer", + "format": "int64", + "description": "The number of confirmations we require before we consider the channel open." + } + } + }, "lnrpcChannelBackup": { "type": "object", "properties": { diff --git a/lnrpc/websocket_proxy.go b/lnrpc/websocket_proxy.go index 3cb701be..9079fc5f 100644 --- a/lnrpc/websocket_proxy.go +++ b/lnrpc/websocket_proxy.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "net/textproto" + "regexp" "strings" "github.com/btcsuite/btclog" @@ -53,8 +54,13 @@ var ( // NewWebSocketProxy attempts to expose the underlying handler as a response- // streaming WebSocket stream with newline-delimited JSON as the content -// encoding. -func NewWebSocketProxy(h http.Handler, logger btclog.Logger) http.Handler { +// encoding. The clientStreamingURIs parameter can hold a list of all patterns +// for URIs that are mapped to client-streaming RPC methods. We need to keep +// track of those to make sure we initialize the request body correctly for the +// underlying grpc-gateway library. +func NewWebSocketProxy(h http.Handler, logger btclog.Logger, + clientStreamingURIs []*regexp.Regexp) http.Handler { + p := &WebsocketProxy{ backend: h, logger: logger, @@ -65,6 +71,7 @@ func NewWebSocketProxy(h http.Handler, logger btclog.Logger) http.Handler { return true }, }, + clientStreamingURIs: clientStreamingURIs, } return p } @@ -74,6 +81,12 @@ type WebsocketProxy struct { backend http.Handler logger btclog.Logger upgrader *websocket.Upgrader + + // clientStreamingURIs holds a list of all patterns for URIs that are + // mapped to client-streaming RPC methods. We need to keep track of + // those to make sure we initialize the request body correctly for the + // underlying grpc-gateway library. + clientStreamingURIs []*regexp.Regexp } // ServeHTTP handles the incoming HTTP request. If the request is an @@ -129,6 +142,14 @@ func (p *WebsocketProxy) upgradeToWebSocketProxy(w http.ResponseWriter, request.Method = m } + // Is this a call to a client-streaming RPC method? + clientStreaming := false + for _, pattern := range p.clientStreamingURIs { + if pattern.MatchString(r.URL.Path) { + clientStreaming = true + } + } + responseForwarder := newResponseForwardingWriter() go func() { <-ctx.Done() @@ -169,10 +190,13 @@ func (p *WebsocketProxy) upgradeToWebSocketProxy(w http.ResponseWriter, } _, _ = requestForwarder.Write([]byte{'\n'}) - // We currently only support server-streaming messages. - // Therefore we close the request body after the first - // incoming message to trigger a response. - requestForwarder.CloseWriter() + // The grpc-gateway library uses a different request + // reader depending on whether it is a client streaming + // RPC or not. For a non-streaming request we need to + // close with EOF to signal the request was completed. + if !clientStreaming { + requestForwarder.CloseWriter() + } } }()