Merge pull request #3418 from champo/routing_is_fast_now
routing, channeldb: several optimizations for path finding
This commit is contained in:
commit
5ae4f0eae4
@ -3,6 +3,7 @@ package routing
|
||||
import (
|
||||
"container/heap"
|
||||
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
)
|
||||
@ -35,12 +36,15 @@ type nodeWithDist struct {
|
||||
// Includes the routing fees and a virtual cost factor to account for
|
||||
// time locks.
|
||||
weight int64
|
||||
|
||||
// nextHop is the edge this route comes from.
|
||||
nextHop *channeldb.ChannelEdgePolicy
|
||||
}
|
||||
|
||||
// distanceHeap is a min-distance heap that's used within our path finding
|
||||
// algorithm to keep track of the "closest" node to our source node.
|
||||
type distanceHeap struct {
|
||||
nodes []nodeWithDist
|
||||
nodes []*nodeWithDist
|
||||
|
||||
// pubkeyIndices maps public keys of nodes to their respective index in
|
||||
// the heap. This is used as a way to avoid db lookups by using heap.Fix
|
||||
@ -50,9 +54,10 @@ type distanceHeap struct {
|
||||
|
||||
// newDistanceHeap initializes a new distance heap. This is required because
|
||||
// we must initialize the pubkeyIndices map for path-finding optimizations.
|
||||
func newDistanceHeap() distanceHeap {
|
||||
func newDistanceHeap(numNodes int) distanceHeap {
|
||||
distHeap := distanceHeap{
|
||||
pubkeyIndices: make(map[route.Vertex]int),
|
||||
pubkeyIndices: make(map[route.Vertex]int, numNodes),
|
||||
nodes: make([]*nodeWithDist, 0, numNodes),
|
||||
}
|
||||
|
||||
return distHeap
|
||||
@ -84,7 +89,7 @@ func (d *distanceHeap) Swap(i, j int) {
|
||||
//
|
||||
// NOTE: This is part of the heap.Interface implementation.
|
||||
func (d *distanceHeap) Push(x interface{}) {
|
||||
n := x.(nodeWithDist)
|
||||
n := x.(*nodeWithDist)
|
||||
d.nodes = append(d.nodes, n)
|
||||
d.pubkeyIndices[n.node] = len(d.nodes) - 1
|
||||
}
|
||||
@ -96,6 +101,7 @@ func (d *distanceHeap) Push(x interface{}) {
|
||||
func (d *distanceHeap) Pop() interface{} {
|
||||
n := len(d.nodes)
|
||||
x := d.nodes[n-1]
|
||||
d.nodes[n-1] = nil
|
||||
d.nodes = d.nodes[0 : n-1]
|
||||
delete(d.pubkeyIndices, x.node)
|
||||
return x
|
||||
@ -106,7 +112,7 @@ func (d *distanceHeap) Pop() interface{} {
|
||||
// modify its position and reorder the heap. If the vertex does not already
|
||||
// exist in the heap, then it is pushed onto the heap. Otherwise, we will end
|
||||
// up performing more db lookups on the same node in the pathfinding algorithm.
|
||||
func (d *distanceHeap) PushOrFix(dist nodeWithDist) {
|
||||
func (d *distanceHeap) PushOrFix(dist *nodeWithDist) {
|
||||
index, ok := d.pubkeyIndices[dist.node]
|
||||
if !ok {
|
||||
heap.Push(d, dist)
|
||||
|
@ -17,19 +17,19 @@ func TestHeapOrdering(t *testing.T) {
|
||||
|
||||
// First, create a blank heap, we'll use this to push on randomly
|
||||
// generated items.
|
||||
nodeHeap := newDistanceHeap()
|
||||
nodeHeap := newDistanceHeap(0)
|
||||
|
||||
prand.Seed(1)
|
||||
|
||||
// Create 100 random entries adding them to the heap created above, but
|
||||
// also a list that we'll sort with the entries.
|
||||
const numEntries = 100
|
||||
sortedEntries := make([]nodeWithDist, 0, numEntries)
|
||||
sortedEntries := make([]*nodeWithDist, 0, numEntries)
|
||||
for i := 0; i < numEntries; i++ {
|
||||
var pubKey [33]byte
|
||||
prand.Read(pubKey[:])
|
||||
|
||||
entry := nodeWithDist{
|
||||
entry := &nodeWithDist{
|
||||
node: route.Vertex(pubKey),
|
||||
dist: prand.Int63(),
|
||||
}
|
||||
@ -55,9 +55,9 @@ func TestHeapOrdering(t *testing.T) {
|
||||
|
||||
// One by one, pop of all the entries from the heap, they should come
|
||||
// out in sorted order.
|
||||
var poppedEntries []nodeWithDist
|
||||
var poppedEntries []*nodeWithDist
|
||||
for nodeHeap.Len() != 0 {
|
||||
e := heap.Pop(&nodeHeap).(nodeWithDist)
|
||||
e := heap.Pop(&nodeHeap).(*nodeWithDist)
|
||||
poppedEntries = append(poppedEntries, e)
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec"
|
||||
"github.com/coreos/bbolt"
|
||||
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
@ -36,6 +37,11 @@ const (
|
||||
// some effect with smaller time lock values. The value may need
|
||||
// tweaking and/or be made configurable in the future.
|
||||
RiskFactorBillionths = 15
|
||||
|
||||
// estimatedNodeCount is used to preallocate the path finding structures
|
||||
// to avoid resizing and copies. It should be number on the same order as
|
||||
// the number of active nodes in the network.
|
||||
estimatedNodeCount = 10000
|
||||
)
|
||||
|
||||
// pathFinder defines the interface of a path finding algorithm.
|
||||
@ -319,61 +325,40 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
defer tx.Rollback()
|
||||
}
|
||||
|
||||
if r.DestPayloadTLV {
|
||||
// Check if the target has TLV enabled
|
||||
|
||||
targetKey, err := btcec.ParsePubKey(target[:], btcec.S256())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
targetNode, err := g.graph.FetchLightningNode(targetKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if targetNode.Features != nil {
|
||||
supportsTLV := targetNode.Features.HasFeature(
|
||||
lnwire.TLVOnionPayloadOptional,
|
||||
)
|
||||
if !supportsTLV {
|
||||
return nil, fmt.Errorf("destination hop doesn't " +
|
||||
"understand new TLV paylods")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// First we'll initialize an empty heap which'll help us to quickly
|
||||
// locate the next edge we should visit next during our graph
|
||||
// traversal.
|
||||
nodeHeap := newDistanceHeap()
|
||||
nodeHeap := newDistanceHeap(estimatedNodeCount)
|
||||
|
||||
// For each node in the graph, we create an entry in the distance map
|
||||
// for the node set with a distance of "infinity". graph.ForEachNode
|
||||
// also returns the source node, so there is no need to add the source
|
||||
// node explicitly.
|
||||
distance := make(map[route.Vertex]nodeWithDist)
|
||||
if err := g.graph.ForEachNode(tx, func(_ *bbolt.Tx,
|
||||
node *channeldb.LightningNode) error {
|
||||
// TODO(roasbeef): with larger graph can just use disk seeks
|
||||
// with a visited map
|
||||
vertex := route.Vertex(node.PubKeyBytes)
|
||||
distance[vertex] = nodeWithDist{
|
||||
dist: infinity,
|
||||
node: route.Vertex(node.PubKeyBytes),
|
||||
}
|
||||
|
||||
// If we don't have any features for this node, then we can
|
||||
// stop here.
|
||||
if node.Features == nil || !r.DestPayloadTLV {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We only need to perform this check for the final node, so we
|
||||
// can exit here if this isn't them.
|
||||
if vertex != target {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If we have any records for the final hop, then we'll check
|
||||
// not to ensure that they are actually able to interpret them.
|
||||
supportsTLV := node.Features.HasFeature(
|
||||
lnwire.TLVOnionPayloadOptional,
|
||||
)
|
||||
if !supportsTLV {
|
||||
return fmt.Errorf("destination hop doesn't " +
|
||||
"understand new TLV paylods")
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Holds the current best distance for a given node.
|
||||
distance := make(map[route.Vertex]*nodeWithDist, estimatedNodeCount)
|
||||
|
||||
additionalEdgesWithSrc := make(map[route.Vertex][]*edgePolicyWithSource)
|
||||
for vertex, outgoingEdgePolicies := range g.additionalEdges {
|
||||
// We'll also include all the nodes found within the additional
|
||||
// edges that are not known to us yet in the distance map.
|
||||
distance[vertex] = nodeWithDist{
|
||||
dist: infinity,
|
||||
node: vertex,
|
||||
}
|
||||
|
||||
// Build reverse lookup to find incoming edges. Needed because
|
||||
// search is taken place from target to source.
|
||||
@ -391,12 +376,12 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
}
|
||||
|
||||
// We can't always assume that the end destination is publicly
|
||||
// advertised to the network and included in the graph.ForEachNode call
|
||||
// above, so we'll manually include the target node. The target node
|
||||
// charges no fee. Distance is set to 0, because this is the starting
|
||||
// point of the graph traversal. We are searching backwards to get the
|
||||
// fees first time right and correctly match channel bandwidth.
|
||||
distance[target] = nodeWithDist{
|
||||
// advertised to the network so we'll manually include the target node.
|
||||
// The target node charges no fee. Distance is set to 0, because this
|
||||
// is the starting point of the graph traversal. We are searching
|
||||
// backwards to get the fees first time right and correctly match
|
||||
// channel bandwidth.
|
||||
distance[target] = &nodeWithDist{
|
||||
dist: 0,
|
||||
weight: 0,
|
||||
node: target,
|
||||
@ -405,15 +390,10 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
probability: 1,
|
||||
}
|
||||
|
||||
// We'll use this map as a series of "next" hop pointers. So to get
|
||||
// from `Vertex` to the target node, we'll take the edge that it's
|
||||
// mapped to within `next`.
|
||||
next := make(map[route.Vertex]*channeldb.ChannelEdgePolicy)
|
||||
|
||||
// processEdge is a helper closure that will be used to make sure edges
|
||||
// satisfy our specific requirements.
|
||||
processEdge := func(fromVertex route.Vertex, bandwidth lnwire.MilliSatoshi,
|
||||
edge *channeldb.ChannelEdgePolicy, toNode route.Vertex) {
|
||||
edge *channeldb.ChannelEdgePolicy, toNodeDist *nodeWithDist) {
|
||||
|
||||
edgesExpanded++
|
||||
|
||||
@ -440,16 +420,18 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
|
||||
// Calculate amount that the candidate node would have to sent
|
||||
// out.
|
||||
toNodeDist := distance[toNode]
|
||||
amountToSend := toNodeDist.amountToReceive
|
||||
|
||||
// Request the success probability for this edge.
|
||||
edgeProbability := r.ProbabilitySource(
|
||||
fromVertex, toNode, amountToSend,
|
||||
fromVertex, toNodeDist.node, amountToSend,
|
||||
)
|
||||
|
||||
log.Tracef("path finding probability: fromnode=%v, tonode=%v, "+
|
||||
"probability=%v", fromVertex, toNode, edgeProbability)
|
||||
log.Trace(newLogClosure(func() string {
|
||||
return fmt.Sprintf("path finding probability: fromnode=%v,"+
|
||||
" tonode=%v, probability=%v", fromVertex, toNodeDist.node,
|
||||
edgeProbability)
|
||||
}))
|
||||
|
||||
// If the probability is zero, there is no point in trying.
|
||||
if edgeProbability == 0 {
|
||||
@ -548,7 +530,8 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
// route, return. It is important to also return if the distance
|
||||
// is equal, because otherwise the algorithm could run into an
|
||||
// endless loop.
|
||||
if tempDist >= distance[fromVertex].dist {
|
||||
current, ok := distance[fromVertex]
|
||||
if ok && tempDist >= current.dist {
|
||||
return
|
||||
}
|
||||
|
||||
@ -563,21 +546,21 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
// better than the current best known distance to this node.
|
||||
// The new better distance is recorded, and also our "next hop"
|
||||
// map is populated with this edge.
|
||||
distance[fromVertex] = nodeWithDist{
|
||||
withDist := &nodeWithDist{
|
||||
dist: tempDist,
|
||||
weight: tempWeight,
|
||||
node: fromVertex,
|
||||
amountToReceive: amountToReceive,
|
||||
incomingCltv: incomingCltv,
|
||||
probability: probability,
|
||||
nextHop: edge,
|
||||
}
|
||||
distance[fromVertex] = withDist
|
||||
|
||||
next[fromVertex] = edge
|
||||
|
||||
// Either push distance[fromVertex] onto the heap if the node
|
||||
// Either push withDist onto the heap if the node
|
||||
// represented by fromVertex is not already on the heap OR adjust
|
||||
// its position within the heap via heap.Fix.
|
||||
nodeHeap.PushOrFix(distance[fromVertex])
|
||||
nodeHeap.PushOrFix(withDist)
|
||||
}
|
||||
|
||||
// TODO(roasbeef): also add path caching
|
||||
@ -592,7 +575,7 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
|
||||
// Fetch the node within the smallest distance from our source
|
||||
// from the heap.
|
||||
partialPath := heap.Pop(&nodeHeap).(nodeWithDist)
|
||||
partialPath := heap.Pop(&nodeHeap).(*nodeWithDist)
|
||||
pivot := partialPath.node
|
||||
|
||||
// If we've reached our source (or we don't have any incoming
|
||||
@ -642,7 +625,7 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
|
||||
// Check if this candidate node is better than what we
|
||||
// already have.
|
||||
processEdge(route.Vertex(chanSource), edgeBandwidth, inEdge, pivot)
|
||||
processEdge(chanSource, edgeBandwidth, inEdge, partialPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -662,30 +645,28 @@ func findPath(g *graphParams, r *RestrictParams, cfg *PathFindingConfig,
|
||||
bandWidth := partialPath.amountToReceive
|
||||
for _, reverseEdge := range additionalEdgesWithSrc[pivot] {
|
||||
processEdge(reverseEdge.sourceNode, bandWidth,
|
||||
reverseEdge.edge, pivot)
|
||||
reverseEdge.edge, partialPath)
|
||||
}
|
||||
}
|
||||
|
||||
// If the source node isn't found in the next hop map, then a path
|
||||
// doesn't exist, so we terminate in an error.
|
||||
if _, ok := next[source]; !ok {
|
||||
return nil, newErrf(ErrNoPathFound, "unable to find a path to "+
|
||||
"destination")
|
||||
}
|
||||
|
||||
// Use the nextHop map to unravel the forward path from source to
|
||||
// Use the distance map to unravel the forward path from source to
|
||||
// target.
|
||||
pathEdges := make([]*channeldb.ChannelEdgePolicy, 0, len(next))
|
||||
var pathEdges []*channeldb.ChannelEdgePolicy
|
||||
currentNode := source
|
||||
for currentNode != target { // TODO(roasbeef): assumes no cycles
|
||||
// Determine the next hop forward using the next map.
|
||||
nextNode := next[currentNode]
|
||||
currentNodeWithDist, ok := distance[currentNode]
|
||||
if !ok {
|
||||
// If the node doesnt have a next hop it means we didn't find a path.
|
||||
return nil, newErrf(ErrNoPathFound, "unable to find a "+
|
||||
"path to destination")
|
||||
}
|
||||
|
||||
// Add the next hop to the list of path edges.
|
||||
pathEdges = append(pathEdges, nextNode)
|
||||
pathEdges = append(pathEdges, currentNodeWithDist.nextHop)
|
||||
|
||||
// Advance current node.
|
||||
currentNode = route.Vertex(nextNode.Node.PubKeyBytes)
|
||||
currentNode = currentNodeWithDist.nextHop.Node.PubKeyBytes
|
||||
}
|
||||
|
||||
// The route is invalid if it spans more than 20 hops. The current
|
||||
|
Loading…
Reference in New Issue
Block a user