routing+channeldb: findPath uses Vertex instead of LightningNode

This commit modifies the nodeWithDist struct to use a route.Vertex
instead of a *channeldb.LightningNode. This change, coupled with
the new ForEachNodeChannel function, allows the findPath Djikstra's
algorithm to cut down on database lookups since we no longer need
to call the FetchOtherNode function.
This commit is contained in:
nsa 2019-06-18 22:19:37 -04:00
parent 0193034f4f
commit 61a1ab08fb
3 changed files with 85 additions and 60 deletions

@ -236,6 +236,28 @@ func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, *ChannelEdgePoli
})
}
// ForEachNodeChannel iterates through all channels of a given node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
func (c *ChannelGraph) ForEachNodeChannel(tx *bbolt.Tx, nodePub []byte,
cb func(*bbolt.Tx, *ChannelEdgeInfo, *ChannelEdgePolicy,
*ChannelEdgePolicy) error) error {
db := c.db
return nodeTraversal(tx, nodePub, db, cb)
}
// ForEachNode iterates through all the stored vertices/nodes in the graph,
// executing the passed callback with each node encountered. If the callback
// returns an error, then the transaction is aborted and the iteration stops
@ -2183,24 +2205,11 @@ func (c *ChannelGraph) HasLightningNode(nodePub [33]byte) (time.Time, bool, erro
return updateTime, exists, nil
}
// ForEachChannel iterates through all channels of this node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
func (l *LightningNode) ForEachChannel(tx *bbolt.Tx,
// nodeTraversal is used to traverse all channels of a node given by its
// public key and passes channel information into the specified callback.
func nodeTraversal(tx *bbolt.Tx, nodePub []byte, db *DB,
cb func(*bbolt.Tx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
nodePub := l.PubKeyBytes[:]
traversal := func(tx *bbolt.Tx) error {
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
@ -2241,7 +2250,7 @@ func (l *LightningNode) ForEachChannel(tx *bbolt.Tx,
if err != nil {
return err
}
edgeInfo.db = l.db
edgeInfo.db = db
outgoingPolicy, err := fetchChanEdgePolicy(
edges, chanID, nodePub, nodes,
@ -2256,7 +2265,7 @@ func (l *LightningNode) ForEachChannel(tx *bbolt.Tx,
}
incomingPolicy, err := fetchChanEdgePolicy(
edges, chanID, otherNode, nodes,
edges, chanID, otherNode[:], nodes,
)
if err != nil {
return err
@ -2275,7 +2284,7 @@ func (l *LightningNode) ForEachChannel(tx *bbolt.Tx,
// If no transaction was provided, then we'll create a new transaction
// to execute the transaction within.
if tx == nil {
return l.db.View(traversal)
return db.View(traversal)
}
// Otherwise, we re-use the existing transaction to execute the graph
@ -2283,6 +2292,28 @@ func (l *LightningNode) ForEachChannel(tx *bbolt.Tx,
return traversal(tx)
}
// ForEachChannel iterates through all channels of this node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
// If the caller wishes to re-use an existing boltdb transaction, then it
// should be passed as the first argument. Otherwise the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
func (l *LightningNode) ForEachChannel(tx *bbolt.Tx,
cb func(*bbolt.Tx, *ChannelEdgeInfo, *ChannelEdgePolicy, *ChannelEdgePolicy) error) error {
nodePub := l.PubKeyBytes[:]
db := l.db
return nodeTraversal(tx, nodePub, db, cb)
}
// ChannelEdgeInfo represents a fully authenticated channel along with all its
// unique attributes. Once an authenticated channel announcement has been
// processed on the network, then an instance of ChannelEdgeInfo encapsulating
@ -2450,15 +2481,15 @@ func (c *ChannelEdgeInfo) BitcoinKey2() (*btcec.PublicKey, error) {
// OtherNodeKeyBytes returns the node key bytes of the other end of
// the channel.
func (c *ChannelEdgeInfo) OtherNodeKeyBytes(thisNodeKey []byte) (
[]byte, error) {
[33]byte, error) {
switch {
case bytes.Equal(c.NodeKey1Bytes[:], thisNodeKey):
return c.NodeKey2Bytes[:], nil
return c.NodeKey2Bytes, nil
case bytes.Equal(c.NodeKey2Bytes[:], thisNodeKey):
return c.NodeKey1Bytes[:], nil
return c.NodeKey1Bytes, nil
default:
return nil, fmt.Errorf("node not participating in this channel")
return [33]byte{}, fmt.Errorf("node not participating in this channel")
}
}

@ -3,6 +3,7 @@ package routing
import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
// nodeWithDist is a helper struct that couples the distance from the current
@ -12,9 +13,9 @@ type nodeWithDist struct {
// current context.
dist int64
// node is the vertex itself. This pointer can be used to explore all
// the outgoing edges (channels) emanating from a node.
node *channeldb.LightningNode
// node is the vertex itself. This can be used to explore all the
// outgoing edges (channels) emanating from a node.
node route.Vertex
// amountToReceive is the amount that should be received by this node.
// Either as final payment to the final node or as an intermediate

@ -60,7 +60,7 @@ var (
// of a channel edge. ChannelEdgePolicy only contains to destination node
// of the edge.
type edgePolicyWithSource struct {
sourceNode *channeldb.LightningNode
sourceNode route.Vertex
edge *channeldb.ChannelEdgePolicy
}
@ -313,7 +313,7 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// with a visited map
distance[route.Vertex(node.PubKeyBytes)] = nodeWithDist{
dist: infinity,
node: node,
node: route.Vertex(node.PubKeyBytes),
}
return nil
}); err != nil {
@ -324,10 +324,9 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
for vertex, outgoingEdgePolicies := range g.additionalEdges {
// We'll also include all the nodes found within the additional
// edges that are not known to us yet in the distance map.
node := &channeldb.LightningNode{PubKeyBytes: vertex}
distance[vertex] = nodeWithDist{
dist: infinity,
node: node,
node: vertex,
}
// Build reverse lookup to find incoming edges. Needed because
@ -335,7 +334,7 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
for _, outgoingEdgePolicy := range outgoingEdgePolicies {
toVertex := outgoingEdgePolicy.Node.PubKeyBytes
incomingEdgePolicy := &edgePolicyWithSource{
sourceNode: node,
sourceNode: vertex,
edge: outgoingEdgePolicy,
}
@ -351,11 +350,10 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// charges no fee. Distance is set to 0, because this is the starting
// point of the graph traversal. We are searching backwards to get the
// fees first time right and correctly match channel bandwidth.
targetNode := &channeldb.LightningNode{PubKeyBytes: target}
distance[target] = nodeWithDist{
dist: 0,
weight: 0,
node: targetNode,
node: target,
amountToReceive: amt,
incomingCltv: 0,
probability: 1,
@ -368,11 +366,8 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// processEdge is a helper closure that will be used to make sure edges
// satisfy our specific requirements.
processEdge := func(fromNode *channeldb.LightningNode,
edge *channeldb.ChannelEdgePolicy,
bandwidth lnwire.MilliSatoshi, toNode route.Vertex) {
fromVertex := route.Vertex(fromNode.PubKeyBytes)
processEdge := func(fromVertex route.Vertex, bandwidth lnwire.MilliSatoshi,
edge *channeldb.ChannelEdgePolicy, toNode route.Vertex) {
// If this is not a local channel and it is disabled, we will
// skip it.
@ -434,7 +429,7 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
return
}
// Compute fee that fromNode is charging. It is based on the
// Compute fee that fromVertex is charging. It is based on the
// amount that needs to be sent to the next node in the route.
//
// Source node has no predecessor to pay a fee. Therefore set
@ -442,7 +437,7 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// limit check and edge weight.
//
// Also determine the time lock delta that will be added to the
// route if fromNode is selected. If fromNode is the source
// route if fromVertex is selected. If fromVertex is the source
// node, no additional timelock is required.
var fee lnwire.MilliSatoshi
var timeLockDelta uint16
@ -485,10 +480,10 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
return
}
// By adding fromNode in the route, there will be an extra
// By adding fromVertex in the route, there will be an extra
// weight composed of the fee that this node will charge and
// the amount that will be locked for timeLockDelta blocks in
// the HTLC that is handed out to fromNode.
// the HTLC that is handed out to fromVertex.
weight := edgeWeight(amountToReceive, fee, timeLockDelta)
// Compute the tentative weight to this new channel/edge
@ -524,7 +519,7 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
distance[fromVertex] = nodeWithDist{
dist: tempDist,
weight: tempWeight,
node: fromNode,
node: fromVertex,
amountToReceive: amountToReceive,
incomingCltv: incomingCltv,
probability: probability,
@ -548,22 +543,17 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// Fetch the node within the smallest distance from our source
// from the heap.
partialPath := heap.Pop(&nodeHeap).(nodeWithDist)
bestNode := partialPath.node
pivot := partialPath.node
// If we've reached our source (or we don't have any incoming
// edges), then we're done here and can exit the graph
// traversal early.
if bestNode.PubKeyBytes == source {
if pivot == source {
break
}
// Now that we've found the next potential step to take we'll
// examine all the incoming edges (channels) from this node to
// further our graph traversal.
pivot := route.Vertex(bestNode.PubKeyBytes)
err := bestNode.ForEachChannel(tx, func(tx *bbolt.Tx,
edgeInfo *channeldb.ChannelEdgeInfo,
_, inEdge *channeldb.ChannelEdgePolicy) error {
cb := func(_ *bbolt.Tx, edgeInfo *channeldb.ChannelEdgeInfo, _,
inEdge *channeldb.ChannelEdgePolicy) error {
// If there is no edge policy for this candidate
// node, skip. Note that we are searching backwards
@ -595,18 +585,21 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// the node on the _other_ end of this channel as we
// may later need to iterate over the incoming edges of
// this node if we explore it further.
channelSource, err := edgeInfo.FetchOtherNode(
tx, pivot[:],
)
chanSource, err := edgeInfo.OtherNodeKeyBytes(pivot[:])
if err != nil {
return err
}
// Check if this candidate node is better than what we
// already have.
processEdge(channelSource, inEdge, edgeBandwidth, pivot)
processEdge(route.Vertex(chanSource), edgeBandwidth, inEdge, pivot)
return nil
})
}
// Now that we've found the next potential step to take we'll
// examine all the incoming edges (channels) from this node to
// further our graph traversal.
err := g.graph.ForEachNodeChannel(tx, pivot[:], cb)
if err != nil {
return nil, err
}
@ -617,9 +610,9 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// routing hint due to having enough capacity for the payment
// and use the payment amount as its capacity.
bandWidth := partialPath.amountToReceive
for _, reverseEdge := range additionalEdgesWithSrc[bestNode.PubKeyBytes] {
processEdge(reverseEdge.sourceNode, reverseEdge.edge,
bandWidth, pivot)
for _, reverseEdge := range additionalEdgesWithSrc[pivot] {
processEdge(reverseEdge.sourceNode, bandWidth,
reverseEdge.edge, pivot)
}
}