routerrpc: move query routes into sub server

This commit moves the query routes backend logic from the main
rpc server into the sub server. It is another step towards splitting up
the main rpc server code.

In addition to this, a unit test is added to verify rpc parameter
parsing.
This commit is contained in:
Joost Jager 2019-03-11 09:56:05 +01:00
parent 6cc82b4a34
commit 293971cd03
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
3 changed files with 387 additions and 178 deletions

View File

@ -0,0 +1,229 @@
package routerrpc
import (
"encoding/hex"
"errors"
"fmt"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
context "golang.org/x/net/context"
)
// RouterBackend contains the backend implementation of the router rpc sub
// server calls.
type RouterBackend struct {
MaxPaymentMSat lnwire.MilliSatoshi
SelfNode routing.Vertex
FetchChannelCapacity func(chanID uint64) (btcutil.Amount, error)
FindRoutes func(source, target routing.Vertex,
amt lnwire.MilliSatoshi, restrictions *routing.RestrictParams,
numPaths uint32, finalExpiry ...uint16) (
[]*routing.Route, error)
}
// QueryRoutes attempts to query the daemons' Channel Router for a possible
// route to a target destination capable of carrying a specific amount of
// satoshis within the route's flow. The retuned route contains the full
// details required to craft and send an HTLC, also including the necessary
// information that should be present within the Sphinx packet encapsulated
// within the HTLC.
//
// TODO(roasbeef): should return a slice of routes in reality
// * create separate PR to send based on well formatted route
func (r *RouterBackend) QueryRoutes(ctx context.Context,
in *lnrpc.QueryRoutesRequest) (*lnrpc.QueryRoutesResponse, error) {
parsePubKey := func(key string) (routing.Vertex, error) {
pubKeyBytes, err := hex.DecodeString(key)
if err != nil {
return routing.Vertex{}, err
}
if len(pubKeyBytes) != 33 {
return routing.Vertex{},
errors.New("invalid key length")
}
var v routing.Vertex
copy(v[:], pubKeyBytes)
return v, nil
}
// Parse the hex-encoded source and target public keys into full public
// key objects we can properly manipulate.
targetPubKey, err := parsePubKey(in.PubKey)
if err != nil {
return nil, err
}
var sourcePubKey routing.Vertex
if in.SourcePubKey != "" {
var err error
sourcePubKey, err = parsePubKey(in.SourcePubKey)
if err != nil {
return nil, err
}
} else {
// If no source is specified, use self.
sourcePubKey = r.SelfNode
}
// Currently, within the bootstrap phase of the network, we limit the
// largest payment size allotted to (2^32) - 1 mSAT or 4.29 million
// satoshis.
amt := btcutil.Amount(in.Amt)
amtMSat := lnwire.NewMSatFromSatoshis(amt)
if amtMSat > r.MaxPaymentMSat {
return nil, fmt.Errorf("payment of %v is too large, max payment "+
"allowed is %v", amt, r.MaxPaymentMSat.ToSatoshis())
}
// Unmarshall restrictions from request.
feeLimit := calculateFeeLimit(in.FeeLimit, amtMSat)
ignoredNodes := make(map[routing.Vertex]struct{})
for _, ignorePubKey := range in.IgnoredNodes {
if len(ignorePubKey) != 33 {
return nil, fmt.Errorf("invalid ignore node pubkey")
}
var ignoreVertex routing.Vertex
copy(ignoreVertex[:], ignorePubKey)
ignoredNodes[ignoreVertex] = struct{}{}
}
ignoredEdges := make(map[routing.EdgeLocator]struct{})
for _, ignoredEdge := range in.IgnoredEdges {
locator := routing.EdgeLocator{
ChannelID: ignoredEdge.ChannelId,
}
if ignoredEdge.DirectionReverse {
locator.Direction = 1
}
ignoredEdges[locator] = struct{}{}
}
restrictions := &routing.RestrictParams{
FeeLimit: feeLimit,
IgnoredNodes: ignoredNodes,
IgnoredEdges: ignoredEdges,
}
// numRoutes will default to 10 if not specified explicitly.
numRoutesIn := uint32(in.NumRoutes)
if numRoutesIn == 0 {
numRoutesIn = 10
}
// Query the channel router for a possible path to the destination that
// can carry `in.Amt` satoshis _including_ the total fee required on
// the route.
var (
routes []*routing.Route
findErr error
)
if in.FinalCltvDelta == 0 {
routes, findErr = r.FindRoutes(
sourcePubKey, targetPubKey, amtMSat, restrictions, numRoutesIn,
)
} else {
routes, findErr = r.FindRoutes(
sourcePubKey, targetPubKey, amtMSat, restrictions, numRoutesIn,
uint16(in.FinalCltvDelta),
)
}
if findErr != nil {
return nil, findErr
}
// As the number of returned routes can be less than the number of
// requested routes, we'll clamp down the length of the response to the
// minimum of the two.
numRoutes := uint32(len(routes))
if numRoutesIn < numRoutes {
numRoutes = numRoutesIn
}
// For each valid route, we'll convert the result into the format
// required by the RPC system.
routeResp := &lnrpc.QueryRoutesResponse{
Routes: make([]*lnrpc.Route, 0, in.NumRoutes),
}
for i := uint32(0); i < numRoutes; i++ {
routeResp.Routes = append(
routeResp.Routes,
r.MarshallRoute(routes[i]),
)
}
return routeResp, nil
}
// calculateFeeLimit returns the fee limit in millisatoshis. If a percentage
// based fee limit has been requested, we'll factor in the ratio provided with
// the amount of the payment.
func calculateFeeLimit(feeLimit *lnrpc.FeeLimit,
amount lnwire.MilliSatoshi) lnwire.MilliSatoshi {
switch feeLimit.GetLimit().(type) {
case *lnrpc.FeeLimit_Fixed:
return lnwire.NewMSatFromSatoshis(
btcutil.Amount(feeLimit.GetFixed()),
)
case *lnrpc.FeeLimit_Percent:
return amount * lnwire.MilliSatoshi(feeLimit.GetPercent()) / 100
default:
// If a fee limit was not specified, we'll use the payment's
// amount as an upper bound in order to avoid payment attempts
// from incurring fees higher than the payment amount itself.
return amount
}
}
// MarshallRoute marshalls an internal route to an rpc route struct.
func (r *RouterBackend) MarshallRoute(route *routing.Route) *lnrpc.Route {
resp := &lnrpc.Route{
TotalTimeLock: route.TotalTimeLock,
TotalFees: int64(route.TotalFees.ToSatoshis()),
TotalFeesMsat: int64(route.TotalFees),
TotalAmt: int64(route.TotalAmount.ToSatoshis()),
TotalAmtMsat: int64(route.TotalAmount),
Hops: make([]*lnrpc.Hop, len(route.Hops)),
}
incomingAmt := route.TotalAmount
for i, hop := range route.Hops {
fee := route.HopFee(i)
// Channel capacity is not a defining property of a route. For
// backwards RPC compatibility, we retrieve it here from the
// graph.
chanCapacity, err := r.FetchChannelCapacity(hop.ChannelID)
if err != nil {
// If capacity cannot be retrieved, this may be a
// not-yet-received or private channel. Then report
// amount that is sent through the channel as capacity.
chanCapacity = incomingAmt.ToSatoshis()
}
resp.Hops[i] = &lnrpc.Hop{
ChanId: hop.ChannelID,
ChanCapacity: int64(chanCapacity),
AmtToForward: int64(hop.AmtToForward.ToSatoshis()),
AmtToForwardMsat: int64(hop.AmtToForward),
Fee: int64(fee.ToSatoshis()),
FeeMsat: int64(fee),
Expiry: uint32(hop.OutgoingTimeLock),
PubKey: hex.EncodeToString(
hop.PubKeyBytes[:]),
}
incomingAmt = hop.AmtToForward
}
return resp
}

View File

@ -0,0 +1,126 @@
package routerrpc
import (
"bytes"
"context"
"encoding/hex"
"testing"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/lnrpc"
)
const (
destKey = "0286098b97bc843372b4426d4b276cea9aa2f48f0428d6f5b66ae101befc14f8b4"
ignoreNodeKey = "02f274f48f3c0d590449a6776e3ce8825076ac376e470e992246eebc565ef8bb2a"
)
var (
sourceKey = routing.Vertex{1, 2, 3}
)
// TestQueryRoutes asserts that query routes rpc parameters are properly parsed
// and passed onto path finding.
func TestQueryRoutes(t *testing.T) {
ignoreNodeBytes, err := hex.DecodeString(ignoreNodeKey)
if err != nil {
t.Fatal(err)
}
var ignoreNodeVertex routing.Vertex
copy(ignoreNodeVertex[:], ignoreNodeBytes)
destNodeBytes, err := hex.DecodeString(destKey)
if err != nil {
t.Fatal(err)
}
request := &lnrpc.QueryRoutesRequest{
PubKey: destKey,
Amt: 100000,
NumRoutes: 1,
FinalCltvDelta: 100,
FeeLimit: &lnrpc.FeeLimit{
Limit: &lnrpc.FeeLimit_Fixed{
Fixed: 250,
},
},
IgnoredNodes: [][]byte{ignoreNodeBytes},
IgnoredEdges: []*lnrpc.EdgeLocator{&lnrpc.EdgeLocator{
ChannelId: 555,
DirectionReverse: true,
}},
}
route := &routing.Route{}
findRoutes := func(source, target routing.Vertex,
amt lnwire.MilliSatoshi, restrictions *routing.RestrictParams,
numPaths uint32, finalExpiry ...uint16) (
[]*routing.Route, error) {
if int64(amt) != request.Amt*1000 {
t.Fatal("unexpected amount")
}
if numPaths != 1 {
t.Fatal("unexpected number of routes")
}
if source != sourceKey {
t.Fatal("unexpected source key")
}
if !bytes.Equal(target[:], destNodeBytes) {
t.Fatal("unexpected target key")
}
if restrictions.FeeLimit != 250*1000 {
t.Fatal("unexpected fee limit")
}
if len(restrictions.IgnoredEdges) != 1 {
t.Fatal("unexpected ignored edges map size")
}
if _, ok := restrictions.IgnoredEdges[routing.EdgeLocator{
ChannelID: 555, Direction: 1,
}]; !ok {
t.Fatal("unexpected ignored edge")
}
if len(restrictions.IgnoredNodes) != 1 {
t.Fatal("unexpected ignored nodes map size")
}
if _, ok := restrictions.IgnoredNodes[ignoreNodeVertex]; !ok {
t.Fatal("unexpected ignored node")
}
return []*routing.Route{
route,
}, nil
}
backend := &RouterBackend{
MaxPaymentMSat: lnwire.NewMSatFromSatoshis(1000000),
FindRoutes: findRoutes,
SelfNode: routing.Vertex{1, 2, 3},
FetchChannelCapacity: func(chanID uint64) (
btcutil.Amount, error) {
return 1, nil
},
}
resp, err := backend.QueryRoutes(context.Background(), request)
if err != nil {
t.Fatal(err)
}
if len(resp.Routes) != 1 {
t.Fatal("expected a single route response")
}
}

View File

@ -17,6 +17,8 @@ import (
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -388,6 +390,10 @@ type rpcServer struct {
// connect to the main gRPC server to proxy all incoming requests.
tlsCfg *tls.Config
// RouterBackend contains the backend implementation of the router
// rpc sub server.
RouterBackend *routerrpc.RouterBackend
quit chan struct{}
}
@ -471,6 +477,28 @@ func newRPCServer(s *server, macService *macaroons.Service,
)
}
// Set up router rpc backend.
channelGraph := s.chanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
}
graph := s.chanDB.ChannelGraph()
RouterBackend := &routerrpc.RouterBackend{
MaxPaymentMSat: maxPaymentMSat,
SelfNode: selfNode.PubKeyBytes,
FetchChannelCapacity: func(chanID uint64) (btcutil.Amount,
error) {
info, _, _, err := graph.FetchChannelEdgesByID(chanID)
if err != nil {
return 0, err
}
return info.Capacity, nil
},
FindRoutes: s.chanRouter.FindRoutes,
}
// Finally, with all the pre-set up complete, we can create the main
// gRPC server, and register the main lnrpc server along side.
grpcServer := grpc.NewServer(serverOpts...)
@ -480,6 +508,7 @@ func newRPCServer(s *server, macService *macaroons.Service,
tlsCfg: tlsCfg,
grpcServer: grpcServer,
server: s,
RouterBackend: RouterBackend,
quit: make(chan struct{}, 1),
}
lnrpc.RegisterLightningServer(grpcServer, rootRPCServer)
@ -3153,7 +3182,7 @@ func (r *rpcServer) sendPayment(stream *paymentStream) error {
return
}
marshalledRouted := r.marshallRoute(resp.Route)
marshalledRouted := r.RouterBackend.MarshallRoute(resp.Route)
err := stream.send(&lnrpc.SendResponse{
PaymentHash: payIntent.rHash[:],
PaymentPreimage: resp.Preimage[:],
@ -3238,7 +3267,7 @@ func (r *rpcServer) sendPaymentSync(ctx context.Context,
return &lnrpc.SendResponse{
PaymentHash: payIntent.rHash[:],
PaymentPreimage: resp.Preimage[:],
PaymentRoute: r.marshallRoute(resp.Route),
PaymentRoute: r.RouterBackend.MarshallRoute(resp.Route),
}, nil
}
@ -3997,182 +4026,7 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context,
func (r *rpcServer) QueryRoutes(ctx context.Context,
in *lnrpc.QueryRoutesRequest) (*lnrpc.QueryRoutesResponse, error) {
parsePubKey := func(key string) (routing.Vertex, error) {
pubKeyBytes, err := hex.DecodeString(key)
if err != nil {
return routing.Vertex{}, err
}
if len(pubKeyBytes) != 33 {
return routing.Vertex{},
errors.New("invalid key length")
}
var v routing.Vertex
copy(v[:], pubKeyBytes)
return v, nil
}
// Parse the hex-encoded source and target public keys into full public
// key objects we can properly manipulate.
targetPubKey, err := parsePubKey(in.PubKey)
if err != nil {
return nil, err
}
var sourcePubKey routing.Vertex
if in.SourcePubKey != "" {
var err error
sourcePubKey, err = parsePubKey(in.SourcePubKey)
if err != nil {
return nil, err
}
} else {
// If no source is specified, use self.
channelGraph := r.server.chanDB.ChannelGraph()
selfNode, err := channelGraph.SourceNode()
if err != nil {
return nil, err
}
sourcePubKey = selfNode.PubKeyBytes
}
// Currently, within the bootstrap phase of the network, we limit the
// largest payment size allotted to (2^32) - 1 mSAT or 4.29 million
// satoshis.
amt := btcutil.Amount(in.Amt)
amtMSat := lnwire.NewMSatFromSatoshis(amt)
if amtMSat > maxPaymentMSat {
return nil, fmt.Errorf("payment of %v is too large, max payment "+
"allowed is %v", amt, maxPaymentMSat.ToSatoshis())
}
// Unmarshall restrictions from request.
feeLimit := calculateFeeLimit(in.FeeLimit, amtMSat)
ignoredNodes := make(map[routing.Vertex]struct{})
for _, ignorePubKey := range in.IgnoredNodes {
if len(ignorePubKey) != 33 {
return nil, fmt.Errorf("invalid ignore node pubkey")
}
var ignoreVertex routing.Vertex
copy(ignoreVertex[:], ignorePubKey)
ignoredNodes[ignoreVertex] = struct{}{}
}
ignoredEdges := make(map[routing.EdgeLocator]struct{})
for _, ignoredEdge := range in.IgnoredEdges {
locator := routing.EdgeLocator{
ChannelID: ignoredEdge.ChannelId,
}
if ignoredEdge.DirectionReverse {
locator.Direction = 1
}
ignoredEdges[locator] = struct{}{}
}
restrictions := &routing.RestrictParams{
FeeLimit: feeLimit,
IgnoredNodes: ignoredNodes,
IgnoredEdges: ignoredEdges,
}
// numRoutes will default to 10 if not specified explicitly.
numRoutesIn := uint32(in.NumRoutes)
if numRoutesIn == 0 {
numRoutesIn = 10
}
// Query the channel router for a possible path to the destination that
// can carry `in.Amt` satoshis _including_ the total fee required on
// the route.
var (
routes []*routing.Route
findErr error
)
if in.FinalCltvDelta == 0 {
routes, findErr = r.server.chanRouter.FindRoutes(
sourcePubKey, targetPubKey, amtMSat, restrictions, numRoutesIn,
)
} else {
routes, findErr = r.server.chanRouter.FindRoutes(
sourcePubKey, targetPubKey, amtMSat, restrictions, numRoutesIn,
uint16(in.FinalCltvDelta),
)
}
if findErr != nil {
return nil, findErr
}
// As the number of returned routes can be less than the number of
// requested routes, we'll clamp down the length of the response to the
// minimum of the two.
numRoutes := uint32(len(routes))
if numRoutesIn < numRoutes {
numRoutes = numRoutesIn
}
// For each valid route, we'll convert the result into the format
// required by the RPC system.
routeResp := &lnrpc.QueryRoutesResponse{
Routes: make([]*lnrpc.Route, 0, in.NumRoutes),
}
for i := uint32(0); i < numRoutes; i++ {
routeResp.Routes = append(
routeResp.Routes, r.marshallRoute(routes[i]),
)
}
return routeResp, nil
}
func (r *rpcServer) marshallRoute(route *routing.Route) *lnrpc.Route {
resp := &lnrpc.Route{
TotalTimeLock: route.TotalTimeLock,
TotalFees: int64(route.TotalFees.ToSatoshis()),
TotalFeesMsat: int64(route.TotalFees),
TotalAmt: int64(route.TotalAmount.ToSatoshis()),
TotalAmtMsat: int64(route.TotalAmount),
Hops: make([]*lnrpc.Hop, len(route.Hops)),
}
graph := r.server.chanDB.ChannelGraph()
incomingAmt := route.TotalAmount
for i, hop := range route.Hops {
fee := route.HopFee(i)
// Channel capacity is not a defining property of a route. For
// backwards RPC compatibility, we retrieve it here from the
// graph.
var chanCapacity btcutil.Amount
info, _, _, err := graph.FetchChannelEdgesByID(hop.ChannelID)
if err == nil {
chanCapacity = info.Capacity
} else {
// If capacity cannot be retrieved, this may be a
// not-yet-received or private channel. Then report
// amount that is sent through the channel as capacity.
chanCapacity = incomingAmt.ToSatoshis()
}
resp.Hops[i] = &lnrpc.Hop{
ChanId: hop.ChannelID,
ChanCapacity: int64(chanCapacity),
AmtToForward: int64(hop.AmtToForward.ToSatoshis()),
AmtToForwardMsat: int64(hop.AmtToForward),
Fee: int64(fee.ToSatoshis()),
FeeMsat: int64(fee),
Expiry: uint32(hop.OutgoingTimeLock),
PubKey: hex.EncodeToString(
hop.PubKeyBytes[:]),
}
incomingAmt = hop.AmtToForward
}
return resp
return r.RouterBackend.QueryRoutes(ctx, in)
}
// unmarshallHopByChannelLookup unmarshalls an rpc hop for which the pub key is