diff --git a/Gopkg.lock b/Gopkg.lock index 69b66d0b..654a6da8 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -189,14 +189,17 @@ revision = "a6af135bd4e28680facf08a3d206b454abc877a4" [[projects]] - digest = "1:19c3d5be42d5d503b94650e45bb2a01264bb36caff20f708e9cf67a6683b4c04" + digest = "1:52f82517e64acdc35d16c476fb8bd9191aa205eb36a450a3fc0dd5b3fe27a4ab" name = "github.com/golang/protobuf" packages = [ "jsonpb", "proto", "protoc-gen-go/descriptor", + "ptypes", "ptypes/any", + "ptypes/duration", "ptypes/struct", + "ptypes/timestamp", ] pruneopts = "UT" revision = "bbd03ef6da3a115852eaf24c8a1c46aeb39aa175" @@ -434,27 +437,38 @@ revision = "df60624c1e9b9d2973e889c7a1cff73155da81c4" [[projects]] - digest = "1:8d9ccf0a790b530f94357a5d27e9c86fb53e862b6e0f9133c792269c0b567218" + digest = "1:ab8e92d746fb5c4c18846b0879842ac8e53b3d352449423d0924a11f1020ae1b" name = "google.golang.org/grpc" packages = [ ".", + "balancer", + "balancer/base", + "balancer/roundrobin", "codes", "connectivity", "credentials", - "grpclb/grpc_lb_v1", + "encoding", + "encoding/proto", "grpclog", "internal", + "internal/backoff", + "internal/channelz", + "internal/envconfig", + "internal/grpcrand", + "internal/transport", "keepalive", "metadata", "naming", "peer", + "resolver", + "resolver/dns", + "resolver/passthrough", "stats", "status", "tap", - "transport", ] pruneopts = "UT" - revision = "b3ddf786825de56a4178401b7e174ee332173b66" + revision = "8dea3dc473e90c8179e519d91302d0597c0ca1d1" [[projects]] digest = "1:9f0c81ca4b497d3723d0a66495d8a1efe277068b77ef3ad2d6460e480bf09bb3" diff --git a/Gopkg.toml b/Gopkg.toml index 06cc4265..e49131b0 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -108,7 +108,7 @@ [[constraint]] name = "google.golang.org/grpc" - revision = "b3ddf786825de56a4178401b7e174ee332173b66" + revision = "8dea3dc473e90c8179e519d91302d0597c0ca1d1" [[override]] name = "gopkg.in/errgo.v1" diff --git a/rpcserver.go b/rpcserver.go index f3e3abfb..5a27ad44 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1978,6 +1978,8 @@ func calculateFeeLimit(feeLimit *lnrpc.FeeLimit, // bi-directional stream allowing clients to rapidly send payments through the // Lightning Network with a single persistent connection. func (r *rpcServer) SendPayment(stream lnrpc.Lightning_SendPaymentServer) error { + var lock sync.Mutex + return r.sendPayment(&paymentStream{ recv: func() (*rpcPaymentRequest, error) { req, err := stream.Recv() @@ -1989,7 +1991,12 @@ func (r *rpcServer) SendPayment(stream lnrpc.Lightning_SendPaymentServer) error SendRequest: req, }, nil }, - send: stream.Send, + send: func(r *lnrpc.SendResponse) error { + // Calling stream.Send concurrently is not safe. + lock.Lock() + defer lock.Unlock() + return stream.Send(r) + }, }) } @@ -1999,6 +2006,8 @@ func (r *rpcServer) SendPayment(stream lnrpc.Lightning_SendPaymentServer) error // rapidly send payments through the Lightning Network with a single persistent // connection. func (r *rpcServer) SendToRoute(stream lnrpc.Lightning_SendToRouteServer) error { + var lock sync.Mutex + return r.sendPayment(&paymentStream{ recv: func() (*rpcPaymentRequest, error) { req, err := stream.Recv() @@ -2028,7 +2037,12 @@ func (r *rpcServer) SendToRoute(stream lnrpc.Lightning_SendToRouteServer) error routes: routes, }, nil }, - send: stream.Send, + send: func(r *lnrpc.SendResponse) error { + // Calling stream.Send concurrently is not safe. + lock.Lock() + defer lock.Unlock() + return stream.Send(r) + }, }) } @@ -2319,6 +2333,10 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error { defer func() { close(reqQuit) }() + + // TODO(joostjager): Callers expect result to come in in the same order + // as the request were sent, but this is far from guarantueed in the + // code below. go func() { for { select {