rpcserver: add SendToRoute methods

rpcserver: add SendToRoute handler
This commit is contained in:
t4sk 2018-05-01 17:17:55 +09:00 committed by Olaoluwa Osuntokun
parent dac62e812c
commit 6ddd7b4d0d

@ -231,6 +231,14 @@ var (
Entity: "offchain", Entity: "offchain",
Action: "write", Action: "write",
}}, }},
"/lnrpc.Lightning/SendToRoute": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/SendToRouteSync": {{
Entity: "offchain",
Action: "write",
}},
"/lnrpc.Lightning/AddInvoice": {{ "/lnrpc.Lightning/AddInvoice": {{
Entity: "invoices", Entity: "invoices",
Action: "write", Action: "write",
@ -1740,11 +1748,82 @@ func validatePayReqExpiry(payReq *zpay32.Invoice) error {
return nil return nil
} }
// paymentStream enables different types of payment stream, such as:
// lnrpc.Lightning_SendPaymentServer
// lnrpc.Lightning_SendToRouteServer
// to execute sendPayment.
type paymentStream struct {
recv func() (*paymentRequest, error)
send func(*lnrpc.SendResponse) error
}
// paymentRequest wraps lnrpc.SendRequest so that routes from
// lnrpc.SendToRouteRequest can be passed to sendPayment
type paymentRequest struct {
*lnrpc.SendRequest
routes []*routing.Route
}
// SendPayment dispatches a bi-directional streaming RPC for sending payments // SendPayment dispatches a bi-directional streaming RPC for sending payments
// through the Lightning Network. A single RPC invocation creates a persistent // through the Lightning Network. A single RPC invocation creates a persistent
// bi-directional stream allowing clients to rapidly send payments through the // bi-directional stream allowing clients to rapidly send payments through the
// Lightning Network with a single persistent connection. // Lightning Network with a single persistent connection.
func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer) error { func (r *rpcServer) SendPayment(stream lnrpc.Lightning_SendPaymentServer) error {
return r.sendPayment(&paymentStream{
recv: func() (*paymentRequest, error) {
req, err := stream.Recv()
if err != nil {
return nil, err
}
return &paymentRequest{
SendRequest: req,
}, nil
},
send: stream.Send,
})
}
// SendToRoute dispatches a bi-directional streaming RPC for sending payments
// through the Lightning Network via predefined routes passed in. A single RPC
// invocation creates a persistent bi-directional stream allowing clients to
// rapidly send payments through the Lightning Network with a single
// persistent connection.
func (r *rpcServer) SendToRoute(stream lnrpc.Lightning_SendToRouteServer) error {
return r.sendPayment(&paymentStream{
recv: func() (*paymentRequest, error) {
req, err := stream.Recv()
if err != nil {
return nil, err
}
graph := r.server.chanDB.ChannelGraph()
if len(req.Routes) == 0 {
return nil, fmt.Errorf("unable to send, no routes provided")
}
routes := make([]*routing.Route, len(req.Routes))
for i, rpcroute := range req.Routes {
route, err := unmarshallRoute(rpcroute, graph)
if err != nil {
return nil, err
}
routes[i] = route
}
return &paymentRequest{
SendRequest: &lnrpc.SendRequest{
PaymentHash: req.PaymentHash,
},
routes: routes,
}, nil
},
send: stream.Send,
})
}
func (r *rpcServer) sendPayment(stream *paymentStream) error {
// For each payment we need to know the msat amount, the destination // For each payment we need to know the msat amount, the destination
// public key, the payment hash, and the optional route hints. // public key, the payment hash, and the optional route hints.
type payment struct { type payment struct {
@ -1753,6 +1832,7 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
pHash []byte pHash []byte
cltvDelta uint16 cltvDelta uint16
routeHints [][]routing.HopHint routeHints [][]routing.HopHint
routes []*routing.Route
} }
payChan := make(chan *payment) payChan := make(chan *payment)
errChan := make(chan error, 1) errChan := make(chan error, 1)
@ -1799,7 +1879,7 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
// stream sent by the client. If we read the // stream sent by the client. If we read the
// EOF sentinel, then the client has closed the // EOF sentinel, then the client has closed the
// stream, and we can exit normally. // stream, and we can exit normally.
nextPayment, err := paymentStream.Recv() nextPayment, err := stream.recv()
if err == io.EOF { if err == io.EOF {
errChan <- nil errChan <- nil
return return
@ -1817,64 +1897,69 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
// fields. // fields.
p := &payment{} p := &payment{}
// If the payment request field isn't blank, if len(nextPayment.routes) == 0 {
// then the details of the invoice are encoded // If the payment request field isn't blank,
// entirely within the encoded payReq. So we'll // then the details of the invoice are encoded
// attempt to decode it, populating the // entirely within the encoded payReq. So we'll
// payment accordingly. // attempt to decode it, populating the
if nextPayment.PaymentRequest != "" { // payment accordingly.
payReq, err := zpay32.Decode(nextPayment.PaymentRequest, if nextPayment.PaymentRequest != "" {
activeNetParams.Params) payReq, err := zpay32.Decode(nextPayment.PaymentRequest,
if err != nil { activeNetParams.Params)
select { if err != nil {
case errChan <- err: select {
case <-reqQuit: case errChan <- err:
case <-reqQuit:
}
return
} }
return
}
// TODO(roasbeef): eliminate necessary // TODO(roasbeef): eliminate necessary
// encode/decode // encode/decode
// We first check that this payment // We first check that this payment
// request has not expired. // request has not expired.
err = validatePayReqExpiry(payReq) err = validatePayReqExpiry(payReq)
if err != nil { if err != nil {
select { select {
case errChan <- err: case errChan <- err:
case <-reqQuit: case <-reqQuit:
}
return
} }
return p.dest = payReq.Destination.SerializeCompressed()
}
p.dest = payReq.Destination.SerializeCompressed()
// If the amount was not included in the // If the amount was not included in the
// invoice, then we let the payee // invoice, then we let the payee
// specify the amount of satoshis they // specify the amount of satoshis they
// wish to send. We override the amount // wish to send. We override the amount
// to pay with the amount provided from // to pay with the amount provided from
// the payment request. // the payment request.
if payReq.MilliSat == nil { if payReq.MilliSat == nil {
p.msat = lnwire.NewMSatFromSatoshis(
btcutil.Amount(nextPayment.Amt),
)
} else {
p.msat = *payReq.MilliSat
}
p.pHash = payReq.PaymentHash[:]
p.cltvDelta = uint16(payReq.MinFinalCLTVExpiry())
p.routeHints = payReq.RouteHints
} else {
// If the payment request field was not
// specified, construct the payment from
// the other fields.
p.msat = lnwire.NewMSatFromSatoshis( p.msat = lnwire.NewMSatFromSatoshis(
btcutil.Amount(nextPayment.Amt), btcutil.Amount(nextPayment.Amt),
) )
} else { p.dest = nextPayment.Dest
p.msat = *payReq.MilliSat p.pHash = nextPayment.PaymentHash
p.cltvDelta = uint16(nextPayment.FinalCltvDelta)
} }
p.pHash = payReq.PaymentHash[:]
p.cltvDelta = uint16(payReq.MinFinalCLTVExpiry())
p.routeHints = payReq.RouteHints
} else { } else {
// If the payment request field was not
// specified, construct the payment from
// the other fields.
p.msat = lnwire.NewMSatFromSatoshis(
btcutil.Amount(nextPayment.Amt),
)
p.dest = nextPayment.Dest
p.pHash = nextPayment.PaymentHash p.pHash = nextPayment.PaymentHash
p.cltvDelta = uint16(nextPayment.FinalCltvDelta) p.routes = nextPayment.routes
} }
select { select {
@ -1902,7 +1987,7 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
"large, max payment allowed is %v", "large, max payment allowed is %v",
p.msat, maxPaymentMSat) p.msat, maxPaymentMSat)
if err := paymentStream.Send(&lnrpc.SendResponse{ if err := stream.send(&lnrpc.SendResponse{
PaymentError: pErr.Error(), PaymentError: pErr.Error(),
}); err != nil { }); err != nil {
return err return err
@ -1912,9 +1997,13 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
// Parse the details of the payment which include the // Parse the details of the payment which include the
// pubkey of the destination and the payment amount. // pubkey of the destination and the payment amount.
destNode, err := btcec.ParsePubKey(p.dest, btcec.S256()) var destNode *btcec.PublicKey
if err != nil { var pubKeyErr error
return err if len(p.routes) == 0 {
destNode, pubKeyErr = btcec.ParsePubKey(p.dest, btcec.S256())
if pubKeyErr != nil {
return pubKeyErr
}
} }
// If we're in debug HTLC mode, then all outgoing HTLCs // If we're in debug HTLC mode, then all outgoing HTLCs
@ -1944,22 +2033,37 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
// successful, the route chosen will be // successful, the route chosen will be
// returned. Otherwise, we'll get a non-nil // returned. Otherwise, we'll get a non-nil
// error. // error.
payment := &routing.LightningPayment{ var (
Target: destNode, preImage [32]byte
Amount: p.msat, route *routing.Route
PaymentHash: rHash, routerErr error
RouteHints: p.routeHints, )
if len(p.routes) == 0 {
payment := &routing.LightningPayment{
Target: destNode,
Amount: p.msat,
PaymentHash: rHash,
RouteHints: p.routeHints,
}
if p.cltvDelta != 0 {
payment.FinalCLTVDelta = &p.cltvDelta
}
preImage, route, routerErr = r.server.chanRouter.SendPayment(payment)
} else {
payment := &routing.LightningPayment{
PaymentHash: rHash,
}
preImage, route, routerErr = r.server.chanRouter.SendToRoute(p.routes, payment)
} }
if p.cltvDelta != 0 {
payment.FinalCLTVDelta = &p.cltvDelta if routerErr != nil {
}
preImage, route, err := r.server.chanRouter.SendPayment(payment)
if err != nil {
// If we receive payment error than, // If we receive payment error than,
// instead of terminating the stream, // instead of terminating the stream,
// send error response to the user. // send error response to the user.
err := paymentStream.Send(&lnrpc.SendResponse{ err := stream.send(&lnrpc.SendResponse{
PaymentError: err.Error(), PaymentError: routerErr.Error(),
}) })
if err != nil { if err != nil {
errChan <- err errChan <- err
@ -1967,14 +2071,23 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
return return
} }
// Calculate the original amount of funds which was sent through routes
// without total fee.
var amt lnwire.MilliSatoshi
if len(p.routes) > 0 {
amt = route.TotalAmount - route.TotalFees
} else {
amt = p.msat
}
// Save the completed payment to the database // Save the completed payment to the database
// for record keeping purposes. // for record keeping purposes.
if err := r.savePayment(route, p.msat, preImage[:]); err != nil { if err := r.savePayment(route, amt, preImage[:]); err != nil {
errChan <- err errChan <- err
return return
} }
err = paymentStream.Send(&lnrpc.SendResponse{ err := stream.send(&lnrpc.SendResponse{
PaymentPreimage: preImage[:], PaymentPreimage: preImage[:],
PaymentRoute: marshallRoute(route), PaymentRoute: marshallRoute(route),
}) })
@ -1994,6 +2107,44 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
func (r *rpcServer) SendPaymentSync(ctx context.Context, func (r *rpcServer) SendPaymentSync(ctx context.Context,
nextPayment *lnrpc.SendRequest) (*lnrpc.SendResponse, error) { nextPayment *lnrpc.SendRequest) (*lnrpc.SendResponse, error) {
return r.sendPaymentSync(ctx, &paymentRequest{
SendRequest: nextPayment,
})
}
// SendToRouteSync is the synchronous non-streaming version of SendToRoute.
// This RPC is intended to be consumed by clients of the REST proxy.
// Additionally, this RPC expects the payment hash (if any) to be encoded
// as hex strings.
func (r *rpcServer) SendToRouteSync(ctx context.Context,
req *lnrpc.SendToRouteRequest) (*lnrpc.SendResponse, error) {
if len(req.Routes) == 0 {
return nil, fmt.Errorf("unable to send, no routes provided")
}
graph := r.server.chanDB.ChannelGraph()
routes := make([]*routing.Route, len(req.Routes))
for i, route := range req.Routes {
route, err := unmarshallRoute(route, graph)
if err != nil {
return nil, err
}
routes[i] = route
}
return r.sendPaymentSync(ctx, &paymentRequest{
SendRequest: &lnrpc.SendRequest{
PaymentHashString: req.PaymentHashString,
},
routes: routes,
})
}
func (r *rpcServer) sendPaymentSync(ctx context.Context,
nextPayment *paymentRequest) (*lnrpc.SendResponse, error) {
// TODO(roasbeef): enforce fee limits, pass into router, ditch if exceed limit // TODO(roasbeef): enforce fee limits, pass into router, ditch if exceed limit
// * limit either a %, or absolute, or iff more than sending // * limit either a %, or absolute, or iff more than sending
@ -2013,40 +2164,69 @@ func (r *rpcServer) SendPaymentSync(ctx context.Context,
routeHints [][]routing.HopHint routeHints [][]routing.HopHint
) )
// If the proto request has an encoded payment request, then we we'll if len(nextPayment.routes) == 0 {
// use that solely to dispatch the payment. // If the proto request has an encoded payment request, then we we'll
if nextPayment.PaymentRequest != "" { // use that solely to dispatch the payment.
payReq, err := zpay32.Decode(nextPayment.PaymentRequest, if nextPayment.PaymentRequest != "" {
activeNetParams.Params) payReq, err := zpay32.Decode(nextPayment.PaymentRequest,
if err != nil { activeNetParams.Params)
return nil, err if err != nil {
} return nil, err
}
// We first check that this payment request has not expired. // We first check that this payment request has not expired.
if err := validatePayReqExpiry(payReq); err != nil { if err := validatePayReqExpiry(payReq); err != nil {
return nil, err return nil, err
} }
destPub = payReq.Destination destPub = payReq.Destination
// If the amount was not included in the invoice, then we let
// the payee specify the amount of satoshis they wish to send.
// We override the amount to pay with the amount provided from
// the payment request.
if payReq.MilliSat == nil {
amtMSat = lnwire.NewMSatFromSatoshis(
btcutil.Amount(nextPayment.Amt),
)
} else {
amtMSat = *payReq.MilliSat
}
rHash = *payReq.PaymentHash
cltvDelta = uint16(payReq.MinFinalCLTVExpiry())
routeHints = payReq.RouteHints
// Otherwise, the payment conditions have been manually
// specified in the proto.
} else {
// If we're in debug HTLC mode, then all outgoing HTLCs will
// pay to the same debug rHash. Otherwise, we pay to the rHash
// specified within the RPC request.
if cfg.DebugHTLC && nextPayment.PaymentHashString == "" {
rHash = debugHash
} else {
paymentHash, err := hex.DecodeString(nextPayment.PaymentHashString)
if err != nil {
return nil, err
}
copy(rHash[:], paymentHash)
}
pubBytes, err := hex.DecodeString(nextPayment.DestString)
if err != nil {
return nil, err
}
destPub, err = btcec.ParsePubKey(pubBytes, btcec.S256())
if err != nil {
return nil, err
}
// If the amount was not included in the invoice, then we let
// the payee specify the amount of satoshis they wish to send.
// We override the amount to pay with the amount provided from
// the payment request.
if payReq.MilliSat == nil {
amtMSat = lnwire.NewMSatFromSatoshis( amtMSat = lnwire.NewMSatFromSatoshis(
btcutil.Amount(nextPayment.Amt), btcutil.Amount(nextPayment.Amt),
) )
} else {
amtMSat = *payReq.MilliSat
} }
rHash = *payReq.PaymentHash
cltvDelta = uint16(payReq.MinFinalCLTVExpiry())
routeHints = payReq.RouteHints
// Otherwise, the payment conditions have been manually
// specified in the proto.
} else { } else {
// If we're in debug HTLC mode, then all outgoing HTLCs will // If we're in debug HTLC mode, then all outgoing HTLCs will
// pay to the same debug rHash. Otherwise, we pay to the rHash // pay to the same debug rHash. Otherwise, we pay to the rHash
@ -2061,19 +2241,6 @@ func (r *rpcServer) SendPaymentSync(ctx context.Context,
copy(rHash[:], paymentHash) copy(rHash[:], paymentHash)
} }
pubBytes, err := hex.DecodeString(nextPayment.DestString)
if err != nil {
return nil, err
}
destPub, err = btcec.ParsePubKey(pubBytes, btcec.S256())
if err != nil {
return nil, err
}
amtMSat = lnwire.NewMSatFromSatoshis(
btcutil.Amount(nextPayment.Amt),
)
} }
// Currently, within the bootstrap phase of the network, we limit the // Currently, within the bootstrap phase of the network, we limit the
@ -2090,25 +2257,49 @@ func (r *rpcServer) SendPaymentSync(ctx context.Context,
// Finally, send a payment request to the channel router. If the // Finally, send a payment request to the channel router. If the
// payment succeeds, then the returned route will be that was used // payment succeeds, then the returned route will be that was used
// successfully within the payment. // successfully within the payment.
payment := &routing.LightningPayment{ var (
Target: destPub, preImage [32]byte
Amount: amtMSat, route *routing.Route
PaymentHash: rHash, routerErr error
RouteHints: routeHints, )
if len(nextPayment.routes) == 0 {
payment := &routing.LightningPayment{
Target: destPub,
Amount: amtMSat,
PaymentHash: rHash,
RouteHints: routeHints,
}
if cltvDelta != 0 {
payment.FinalCLTVDelta = &cltvDelta
}
preImage, route, routerErr = r.server.chanRouter.SendPayment(payment)
} else {
payment := &routing.LightningPayment{
PaymentHash: rHash,
}
preImage, route, routerErr = r.server.chanRouter.SendToRoute(
nextPayment.routes, payment)
} }
if cltvDelta != 0 {
payment.FinalCLTVDelta = &cltvDelta if routerErr != nil {
}
preImage, route, err := r.server.chanRouter.SendPayment(payment)
if err != nil {
return &lnrpc.SendResponse{ return &lnrpc.SendResponse{
PaymentError: err.Error(), PaymentError: routerErr.Error(),
}, nil }, nil
} }
// Calculate the original amount of funds which was sent through routes
// without total fee.
var amt lnwire.MilliSatoshi
if len(nextPayment.routes) > 0 {
amt = route.TotalAmount - route.TotalFees
} else {
amt = amtMSat
}
// With the payment completed successfully, we now ave the details of // With the payment completed successfully, we now ave the details of
// the completed payment to the database for historical record keeping. // the completed payment to the database for historical record keeping.
if err := r.savePayment(route, amtMSat, preImage[:]); err != nil { if err := r.savePayment(route, amt, preImage[:]); err != nil {
return nil, err return nil, err
} }
@ -2949,6 +3140,61 @@ func marshallRoute(route *routing.Route) *lnrpc.Route {
return resp return resp
} }
func unmarshallRoute(rpcroute *lnrpc.Route,
graph *channeldb.ChannelGraph) (*routing.Route, error) {
rpcsLog.Infof("rpcroute: %v", spew.Sdump(rpcroute))
route := &routing.Route{
TotalTimeLock: rpcroute.TotalTimeLock,
TotalFees: lnwire.MilliSatoshi(rpcroute.TotalFeesMsat),
TotalAmount: lnwire.MilliSatoshi(rpcroute.TotalAmtMsat),
Hops: make([]*routing.Hop, len(rpcroute.Hops)),
}
node, err := graph.SourceNode()
if err != nil {
return nil, fmt.Errorf("unable to fetch source node from graph "+
"while unmarshaling route. %v", err)
}
for i, hop := range rpcroute.Hops {
edgeInfo, c1, c2, err := graph.FetchChannelEdgesByID(hop.ChanId)
if err != nil {
return nil, fmt.Errorf("unable to fetch channel edges by "+
"channel ID for hop (%d): %v", i, err)
}
var channelEdgePolicy *channeldb.ChannelEdgePolicy
switch {
case bytes.Equal(node.PubKeyBytes[:], c1.Node.PubKeyBytes[:]):
channelEdgePolicy = c2
node = c2.Node
case bytes.Equal(node.PubKeyBytes[:], c2.Node.PubKeyBytes[:]):
channelEdgePolicy = c1
node = c1.Node
default:
return nil, fmt.Errorf("could not find channel edge for hop=%d", i)
}
routingHop := &routing.ChannelHop{
ChannelEdgePolicy: channelEdgePolicy,
Capacity: btcutil.Amount(hop.ChanCapacity),
Chain: edgeInfo.ChainHash,
}
route.Hops[i] = &routing.Hop{
Channel: routingHop,
OutgoingTimeLock: hop.Expiry,
AmtToForward: lnwire.MilliSatoshi(hop.AmtToForwardMsat),
Fee: lnwire.MilliSatoshi(hop.FeeMsat),
}
}
return route, nil
}
// GetNetworkInfo returns some basic stats about the known channel graph from // GetNetworkInfo returns some basic stats about the known channel graph from
// the PoV of the node. // the PoV of the node.
func (r *rpcServer) GetNetworkInfo(ctx context.Context, func (r *rpcServer) GetNetworkInfo(ctx context.Context,