routing+server: add new QueryBandwidth method to reduce outbound failures
In this commit, we introduce a new method to the channel router's config struct: QueryBandwidth. This method allows the channel router to query for the up-to-date available bandwidth of a particular link. In the case that this link emanates from/to us, then we can query the switch to see if the link is active (if not bandwidth is zero), and return the current best estimate for the available bandwidth of the link. If the link, isn't one of ours, then we can thread through the total maximal capacity of the link. In order to implement this, the missionControl struct will now query the switch upon creation to obtain a fresh bandwidth snapshot. We take care to do this in a distinct db transaction in order to now introduced a circular waiting condition between the mutexes in bolt, and the channel state machine. The aim of this change is to reduce the number of unnecessary failures during HTLC payment routing as we'll now skip any links that are inactive, or just don't have enough bandwidth for the payment. Nodes that have several hundred channels (all of which in various states of activity and available bandwidth) should see a nice gain from this w.r.t payment latency.
This commit is contained in:
parent
c96d07c1ae
commit
f494433cbf
@ -4,6 +4,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/bbolt"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/roasbeef/btcd/btcec"
|
||||
@ -57,6 +58,8 @@ type missionControl struct {
|
||||
|
||||
selfNode *channeldb.LightningNode
|
||||
|
||||
queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi
|
||||
|
||||
sync.Mutex
|
||||
|
||||
// TODO(roasbeef): further counters, if vertex continually unavailable,
|
||||
@ -68,13 +71,15 @@ type missionControl struct {
|
||||
// newMissionControl returns a new instance of missionControl.
|
||||
//
|
||||
// TODO(roasbeef): persist memory
|
||||
func newMissionControl(g *channeldb.ChannelGraph,
|
||||
selfNode *channeldb.LightningNode) *missionControl {
|
||||
func newMissionControl(
|
||||
g *channeldb.ChannelGraph, selfNode *channeldb.LightningNode,
|
||||
qb func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) *missionControl {
|
||||
|
||||
return &missionControl{
|
||||
failedEdges: make(map[uint64]time.Time),
|
||||
failedVertexes: make(map[Vertex]time.Time),
|
||||
selfNode: selfNode,
|
||||
queryBandwidth: qb,
|
||||
graph: g,
|
||||
}
|
||||
}
|
||||
@ -157,6 +162,8 @@ type paymentSession struct {
|
||||
|
||||
additionalEdges map[Vertex][]*channeldb.ChannelEdgePolicy
|
||||
|
||||
bandwidthHints map[uint64]lnwire.MilliSatoshi
|
||||
|
||||
mc *missionControl
|
||||
}
|
||||
|
||||
@ -165,7 +172,7 @@ type paymentSession struct {
|
||||
// in order to populate additional edges to explore when finding a path to the
|
||||
// payment's destination.
|
||||
func (m *missionControl) NewPaymentSession(routeHints [][]HopHint,
|
||||
target *btcec.PublicKey) *paymentSession {
|
||||
target *btcec.PublicKey) (*paymentSession, error) {
|
||||
|
||||
viewSnapshot := m.GraphPruneView()
|
||||
|
||||
@ -210,11 +217,62 @@ func (m *missionControl) NewPaymentSession(routeHints [][]HopHint,
|
||||
}
|
||||
}
|
||||
|
||||
// We'll also obtain a set of bandwidthHints from the lower layer for
|
||||
// each of our outbound channels. This will allow the path finding to
|
||||
// skip any links that aren't active or just don't have enough
|
||||
// bandwidth to carry the payment.
|
||||
sourceNode, err := m.graph.SourceNode()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bandwidthHints, err := generateBandwidthHints(
|
||||
sourceNode, m.queryBandwidth,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &paymentSession{
|
||||
pruneViewSnapshot: viewSnapshot,
|
||||
additionalEdges: edges,
|
||||
bandwidthHints: bandwidthHints,
|
||||
mc: m,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// generateBandwidthHints is a helper function that's utilized the main
|
||||
// findPath function in order to obtain hints from the lower layer w.r.t to the
|
||||
// available bandwidth of edges on the network. Currently, we'll only obtain
|
||||
// bandwidth hints for the edges we directly have open ourselves. Obtaining
|
||||
// these hints allows us to reduce the number of extraneous attempts as we can
|
||||
// skip channels that are inactive, or just don't have enough bandwidth to
|
||||
// carry the payment.
|
||||
func generateBandwidthHints(sourceNode *channeldb.LightningNode,
|
||||
queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) (map[uint64]lnwire.MilliSatoshi, error) {
|
||||
|
||||
// First, we'll collect the set of outbound edges from the target
|
||||
// source node.
|
||||
var localChans []*channeldb.ChannelEdgeInfo
|
||||
err := sourceNode.ForEachChannel(nil, func(tx *bolt.Tx,
|
||||
edgeInfo *channeldb.ChannelEdgeInfo,
|
||||
_, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
localChans = append(localChans, edgeInfo)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Now that we have all of our outbound edges, we'll populate the set
|
||||
// of bandwidth hints, querying the lower switch layer for the most up
|
||||
// to date values.
|
||||
bandwidthHints := make(map[uint64]lnwire.MilliSatoshi)
|
||||
for _, localChan := range localChans {
|
||||
bandwidthHints[localChan.ChannelID] = queryBandwidth(localChan)
|
||||
}
|
||||
|
||||
return bandwidthHints, nil
|
||||
}
|
||||
|
||||
// ReportVertexFailure adds a vertex to the graph prune view after a client
|
||||
@ -285,7 +343,7 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment,
|
||||
path, err := findPath(
|
||||
nil, p.mc.graph, p.additionalEdges, p.mc.selfNode,
|
||||
payment.Target, pruneView.vertexes, pruneView.edges,
|
||||
payment.Amount,
|
||||
payment.Amount, p.bandwidthHints,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -458,7 +458,8 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
additionalEdges map[Vertex][]*channeldb.ChannelEdgePolicy,
|
||||
sourceNode *channeldb.LightningNode, target *btcec.PublicKey,
|
||||
ignoredNodes map[Vertex]struct{}, ignoredEdges map[uint64]struct{},
|
||||
amt lnwire.MilliSatoshi) ([]*ChannelHop, error) {
|
||||
amt lnwire.MilliSatoshi,
|
||||
bandwidthHints map[uint64]lnwire.MilliSatoshi) ([]*ChannelHop, error) {
|
||||
|
||||
var err error
|
||||
if tx == nil {
|
||||
@ -516,7 +517,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
// processEdge is a helper closure that will be used to make sure edges
|
||||
// satisfy our specific requirements.
|
||||
processEdge := func(edge *channeldb.ChannelEdgePolicy,
|
||||
capacity btcutil.Amount, pivot Vertex) {
|
||||
bandwidth lnwire.MilliSatoshi, pivot Vertex) {
|
||||
|
||||
v := Vertex(edge.Node.PubKeyBytes)
|
||||
|
||||
@ -547,7 +548,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
// this edge. We'll also shave off irrelevant edges by adding
|
||||
// the sufficient capacity of an edge and clearing their
|
||||
// min-htlc amount to our relaxation condition.
|
||||
if tempDist < distance[v].dist && capacity >= amt.ToSatoshis() &&
|
||||
if tempDist < distance[v].dist && bandwidth >= amt &&
|
||||
amt >= edge.MinHTLC && edge.TimeLockDelta != 0 {
|
||||
|
||||
distance[v] = nodeWithDist{
|
||||
@ -558,7 +559,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
prev[v] = edgeWithPrev{
|
||||
edge: &ChannelHop{
|
||||
ChannelEdgePolicy: edge,
|
||||
Capacity: capacity,
|
||||
Capacity: bandwidth.ToSatoshis(),
|
||||
},
|
||||
prevNode: pivot,
|
||||
}
|
||||
@ -606,7 +607,20 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
edgeInfo *channeldb.ChannelEdgeInfo,
|
||||
outEdge, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
processEdge(outEdge, edgeInfo.Capacity, pivot)
|
||||
// We'll query the lower layer to see if we can obtain
|
||||
// any more up to date information concerning the
|
||||
// bandwidth of this edge.
|
||||
edgeBandwidth, ok := bandwidthHints[edgeInfo.ChannelID]
|
||||
if !ok {
|
||||
// If we don't have a hint for this edge, then
|
||||
// we'll just use the known Capacity as the
|
||||
// available bandwidth.
|
||||
edgeBandwidth = lnwire.NewMSatFromSatoshis(
|
||||
edgeInfo.Capacity,
|
||||
)
|
||||
}
|
||||
|
||||
processEdge(outEdge, edgeBandwidth, pivot)
|
||||
|
||||
// TODO(roasbeef): return min HTLC as error in end?
|
||||
|
||||
@ -622,7 +636,7 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
// routing hint due to having enough capacity for the payment
|
||||
// and use the payment amount as its capacity.
|
||||
for _, edge := range additionalEdges[bestNode.PubKeyBytes] {
|
||||
processEdge(edge, amt.ToSatoshis(), pivot)
|
||||
processEdge(edge, amt, pivot)
|
||||
}
|
||||
}
|
||||
|
||||
@ -681,7 +695,8 @@ func findPath(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
// algorithm in a block box manner.
|
||||
func findPaths(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
source *channeldb.LightningNode, target *btcec.PublicKey,
|
||||
amt lnwire.MilliSatoshi, numPaths uint32) ([][]*ChannelHop, error) {
|
||||
amt lnwire.MilliSatoshi, numPaths uint32,
|
||||
bandwidthHints map[uint64]lnwire.MilliSatoshi) ([][]*ChannelHop, error) {
|
||||
|
||||
ignoredEdges := make(map[uint64]struct{})
|
||||
ignoredVertexes := make(map[Vertex]struct{})
|
||||
@ -698,7 +713,7 @@ func findPaths(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
// satoshis along the path before fees are calculated.
|
||||
startingPath, err := findPath(
|
||||
tx, graph, nil, source, target, ignoredVertexes, ignoredEdges,
|
||||
amt,
|
||||
amt, bandwidthHints,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to find path: %v", err)
|
||||
@ -773,6 +788,7 @@ func findPaths(tx *bolt.Tx, graph *channeldb.ChannelGraph,
|
||||
spurPath, err := findPath(
|
||||
tx, graph, nil, spurNode, target,
|
||||
ignoredVertexes, ignoredEdges, amt,
|
||||
bandwidthHints,
|
||||
)
|
||||
|
||||
// If we weren't able to find a path, we'll continue to
|
||||
|
@ -313,7 +313,7 @@ func TestBasicGraphPathFinding(t *testing.T) {
|
||||
target := aliases["sophon"]
|
||||
path, err := findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, paymentAmt,
|
||||
ignoredEdges, paymentAmt, nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to find path: %v", err)
|
||||
@ -455,7 +455,7 @@ func TestBasicGraphPathFinding(t *testing.T) {
|
||||
target = aliases["luoji"]
|
||||
path, err = findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, paymentAmt,
|
||||
ignoredEdges, paymentAmt, nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to find route: %v", err)
|
||||
@ -541,7 +541,7 @@ func TestPathFindingWithAdditionalEdges(t *testing.T) {
|
||||
// We should now be able to find a path from roasbeef to doge.
|
||||
path, err := findPath(
|
||||
nil, graph, additionalEdges, sourceNode, dogePubKey, nil, nil,
|
||||
paymentAmt,
|
||||
paymentAmt, nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to find private path to doge: %v", err)
|
||||
@ -578,6 +578,7 @@ func TestKShortestPathFinding(t *testing.T) {
|
||||
target := aliases["luoji"]
|
||||
paths, err := findPaths(
|
||||
nil, graph, sourceNode, target, paymentAmt, 100,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to find paths between roasbeef and "+
|
||||
@ -629,7 +630,7 @@ func TestNewRoutePathTooLong(t *testing.T) {
|
||||
target := aliases["ursula"]
|
||||
_, err = findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, paymentAmt,
|
||||
ignoredEdges, paymentAmt, nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("path should have been found")
|
||||
@ -640,7 +641,7 @@ func TestNewRoutePathTooLong(t *testing.T) {
|
||||
target = aliases["vincent"]
|
||||
path, err := findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, paymentAmt,
|
||||
ignoredEdges, paymentAmt, nil,
|
||||
)
|
||||
if err == nil {
|
||||
t.Fatalf("should not have been able to find path, supposed to be "+
|
||||
@ -682,7 +683,7 @@ func TestPathNotAvailable(t *testing.T) {
|
||||
|
||||
_, err = findPath(
|
||||
nil, graph, nil, sourceNode, unknownNode, ignoredVertexes,
|
||||
ignoredEdges, 100,
|
||||
ignoredEdges, 100, nil,
|
||||
)
|
||||
if !IsError(err, ErrNoPathFound) {
|
||||
t.Fatalf("path shouldn't have been found: %v", err)
|
||||
@ -718,7 +719,7 @@ func TestPathInsufficientCapacity(t *testing.T) {
|
||||
payAmt := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin)
|
||||
_, err = findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, payAmt,
|
||||
ignoredEdges, payAmt, nil,
|
||||
)
|
||||
if !IsError(err, ErrNoPathFound) {
|
||||
t.Fatalf("graph shouldn't be able to support payment: %v", err)
|
||||
@ -748,7 +749,7 @@ func TestRouteFailMinHTLC(t *testing.T) {
|
||||
payAmt := lnwire.MilliSatoshi(10)
|
||||
_, err = findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, payAmt,
|
||||
ignoredEdges, payAmt, nil,
|
||||
)
|
||||
if !IsError(err, ErrNoPathFound) {
|
||||
t.Fatalf("graph shouldn't be able to support payment: %v", err)
|
||||
@ -778,7 +779,7 @@ func TestRouteFailDisabledEdge(t *testing.T) {
|
||||
payAmt := lnwire.NewMSatFromSatoshis(10000)
|
||||
_, err = findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, payAmt,
|
||||
ignoredEdges, payAmt, nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to find path: %v", err)
|
||||
@ -799,7 +800,7 @@ func TestRouteFailDisabledEdge(t *testing.T) {
|
||||
// failure as it is no longer eligible.
|
||||
_, err = findPath(
|
||||
nil, graph, nil, sourceNode, target, ignoredVertexes,
|
||||
ignoredEdges, payAmt,
|
||||
ignoredEdges, payAmt, nil,
|
||||
)
|
||||
if !IsError(err, ErrNoPathFound) {
|
||||
t.Fatalf("graph shouldn't be able to support payment: %v", err)
|
||||
|
@ -163,6 +163,14 @@ type Config struct {
|
||||
// GraphPruneInterval is used as an interval to determine how often we
|
||||
// should examine the channel graph to garbage collect zombie channels.
|
||||
GraphPruneInterval time.Duration
|
||||
|
||||
// QueryBandwidth is a method that allows the router to query the lower
|
||||
// link layer to determine the up to date available bandwidth at a
|
||||
// prospective link to be traversed. If the link isn't available, then
|
||||
// a value of zero should be returned. Otherwise, the current up to
|
||||
// date knowledge of the available bandwidth of the link should be
|
||||
// returned.
|
||||
QueryBandwidth func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi
|
||||
}
|
||||
|
||||
// routeTuple is an entry within the ChannelRouter's route cache. We cache
|
||||
@ -283,18 +291,23 @@ func New(cfg Config) (*ChannelRouter, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ChannelRouter{
|
||||
r := &ChannelRouter{
|
||||
cfg: &cfg,
|
||||
networkUpdates: make(chan *routingMsg),
|
||||
topologyClients: make(map[uint64]*topologyClient),
|
||||
ntfnClientUpdates: make(chan *topologyClientUpdate),
|
||||
missionControl: newMissionControl(cfg.Graph, selfNode),
|
||||
channelEdgeMtx: multimutex.NewMutex(),
|
||||
selfNode: selfNode,
|
||||
routeCache: make(map[routeTuple][]*Route),
|
||||
rejectCache: make(map[uint64]struct{}),
|
||||
quit: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
r.missionControl = newMissionControl(
|
||||
cfg.Graph, selfNode, cfg.QueryBandwidth,
|
||||
)
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Start launches all the goroutines the ChannelRouter requires to carry out
|
||||
@ -1352,6 +1365,16 @@ func (r *ChannelRouter) FindRoutes(target *btcec.PublicKey,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Before we open the db transaction below, we'll attempt to obtain a
|
||||
// set of bandwidth hints that can help us eliminate certain routes
|
||||
// early on in the path finding process.
|
||||
bandwidthHints, err := generateBandwidthHints(
|
||||
r.selfNode, r.cfg.QueryBandwidth,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tx, err := r.cfg.Graph.Database().Begin(false)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
@ -1363,6 +1386,7 @@ func (r *ChannelRouter) FindRoutes(target *btcec.PublicKey,
|
||||
// our source to the destination.
|
||||
shortestPaths, err := findPaths(
|
||||
tx, r.cfg.Graph, r.selfNode, target, amt, numPaths,
|
||||
bandwidthHints,
|
||||
)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
@ -1567,9 +1591,13 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route
|
||||
// Before starting the HTLC routing attempt, we'll create a fresh
|
||||
// payment session which will report our errors back to mission
|
||||
// control.
|
||||
paySession := r.missionControl.NewPaymentSession(
|
||||
paySession, err := r.missionControl.NewPaymentSession(
|
||||
payment.RouteHints, payment.Target,
|
||||
)
|
||||
if err != nil {
|
||||
return preImage, nil, fmt.Errorf("unable to create payment "+
|
||||
"session: %v", err)
|
||||
}
|
||||
|
||||
// We'll continue until either our payment succeeds, or we encounter a
|
||||
// critical error during path finding.
|
||||
|
@ -128,6 +128,9 @@ func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func()
|
||||
},
|
||||
ChannelPruneExpiry: time.Hour * 24,
|
||||
GraphPruneInterval: time.Hour * 2,
|
||||
QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
|
||||
return lnwire.NewMSatFromSatoshis(e.Capacity)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to create router %v", err)
|
||||
@ -1617,7 +1620,7 @@ func TestFindPathFeeWeighting(t *testing.T) {
|
||||
// path even though the direct path has a higher potential time lock.
|
||||
path, err := findPath(
|
||||
nil, ctx.graph, nil, sourceNode, target, ignoreVertex,
|
||||
ignoreEdge, amt,
|
||||
ignoreEdge, amt, nil,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to find path: %v", err)
|
||||
|
29
server.go
29
server.go
@ -351,6 +351,35 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
|
||||
},
|
||||
ChannelPruneExpiry: time.Duration(time.Hour * 24 * 14),
|
||||
GraphPruneInterval: time.Duration(time.Hour),
|
||||
QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
|
||||
// If we aren't on either side of this edge, then we'll
|
||||
// just thread through the capacity of the edge as we
|
||||
// know it.
|
||||
if !bytes.Equal(edge.NodeKey1Bytes[:], selfNode.PubKeyBytes[:]) &&
|
||||
!bytes.Equal(edge.NodeKey2Bytes[:], selfNode.PubKeyBytes[:]) {
|
||||
|
||||
return lnwire.NewMSatFromSatoshis(edge.Capacity)
|
||||
}
|
||||
|
||||
cid := lnwire.NewChanIDFromOutPoint(&edge.ChannelPoint)
|
||||
link, err := s.htlcSwitch.GetLink(cid)
|
||||
if err != nil {
|
||||
// If the link isn't online, then we'll report
|
||||
// that it has zero bandwidth to the router.
|
||||
return 0
|
||||
}
|
||||
|
||||
// If the link is found within the switch, but it isn't
|
||||
// yet eligible to forward any HTLCs, then we'll treat
|
||||
// it as if it isn't online in the first place.
|
||||
if !link.EligibleToForward() {
|
||||
return 0
|
||||
}
|
||||
|
||||
// Otherwise, we'll return the current best estimate
|
||||
// for the available bandwidth for the link.
|
||||
return link.Bandwidth()
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create router: %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user