routing: global probability based mission control

Previously every payment had its own local mission control state which
was in effect only for that payment. In this commit most of the local
state is removed and payments all tap into the global mission control
probability estimator.

Furthermore the decay time of pruned edges and nodes is extended, so
that observations about the network can better benefit future payment
processes.

Last, the probability function is transformed from a binary output to a
gradual curve, allowing for a better trade off between candidate routes.
This commit is contained in:
Joost Jager 2019-03-19 17:09:27 +01:00
parent 3349d517aa
commit 7133f37bb8
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
12 changed files with 338 additions and 220 deletions

View File

@ -125,7 +125,8 @@ func (r *RouterBackend) QueryRoutes(ctx context.Context,
restrictions := &routing.RestrictParams{
FeeLimit: feeLimit,
ProbabilitySource: func(node route.Vertex,
edge routing.EdgeLocator) float64 {
edge routing.EdgeLocator,
amt lnwire.MilliSatoshi) float64 {
if _, ok := ignoredNodes[node]; ok {
return 0

View File

@ -81,19 +81,19 @@ func TestQueryRoutes(t *testing.T) {
}
if restrictions.ProbabilitySource(route.Vertex{},
ignoredEdge,
ignoredEdge, 0,
) != 0 {
t.Fatal("expecting 0% probability for ignored edge")
}
if restrictions.ProbabilitySource(ignoreNodeVertex,
routing.EdgeLocator{},
routing.EdgeLocator{}, 0,
) != 0 {
t.Fatal("expecting 0% probability for ignored node")
}
if restrictions.ProbabilitySource(route.Vertex{},
routing.EdgeLocator{},
routing.EdgeLocator{}, 0,
) != 1 {
t.Fatal("expecting 100% probability")
}

View File

@ -32,6 +32,7 @@ import (
"github.com/lightningnetwork/lnd"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwire"
"golang.org/x/net/context"
@ -8382,8 +8383,14 @@ out:
// failed payment.
shutdownAndAssert(net, t, carol)
// TODO(roasbeef): mission control
time.Sleep(time.Second * 5)
// Reset mission control to forget the temporary channel failure above.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
_, err = net.Alice.RouterClient.ResetMissionControl(
ctxt, &routerrpc.ResetMissionControlRequest{},
)
if err != nil {
t.Fatalf("unable to reset mission control: %v", err)
}
sendReq = &lnrpc.SendRequest{
PaymentRequest: carolInvoice.PaymentRequest,

View File

@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/macaroons"
)
@ -248,6 +249,10 @@ type HarnessNode struct {
lnrpc.WalletUnlockerClient
invoicesrpc.InvoicesClient
// RouterClient cannot be embedded, because a name collision would occur
// on the main rpc SendPayment.
RouterClient routerrpc.RouterClient
}
// Assert *HarnessNode implements the lnrpc.LightningClient interface.
@ -497,6 +502,7 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error {
// HarnessNode directly for normal rpc operations.
hn.LightningClient = lnrpc.NewLightningClient(conn)
hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn)
hn.RouterClient = routerrpc.NewRouterClient(conn)
// Set the harness node's pubkey to what the node claims in GetInfo.
err := hn.FetchNodeInfo()

View File

@ -1,6 +1,7 @@
package routing
import (
"math"
"sync"
"time"
@ -13,48 +14,27 @@ import (
)
const (
// vertexDecay is the decay period of colored vertexes added to
// MissionControl. Once vertexDecay passes after an entry has been
// added to the prune view, it is garbage collected. This value is
// larger than edgeDecay as an edge failure typical indicates an
// unbalanced channel, while a vertex failure indicates a node is not
// online and active.
vertexDecay = time.Duration(time.Minute * 5)
// defaultPenaltyHalfLife is the default half-life duration. The
// half-life duration defines after how much time a penalized node or
// channel is back at 50% probability.
defaultPenaltyHalfLife = time.Hour
// edgeDecay is the decay period of colored edges added to
// MissionControl. Once edgeDecay passed after an entry has been added,
// it is garbage collected. This value is smaller than vertexDecay as
// an edge related failure during payment sending typically indicates
// that a channel was unbalanced, a condition which may quickly change.
//
// TODO(roasbeef): instead use random delay on each?
edgeDecay = time.Duration(time.Second * 5)
// aprioriHopProbability is the assumed success probability of a hop in
// a route when no other information is available.
aprioriHopProbability = 1
)
// MissionControl contains state which summarizes the past attempts of HTLC
// routing by external callers when sending payments throughout the network.
// MissionControl remembers the outcome of these past routing attempts (success
// and failure), and is able to provide hints/guidance to future HTLC routing
// attempts. MissionControl maintains a decaying network view of the
// edges/vertexes that should be marked as "pruned" during path finding. This
// graph view acts as a shared memory during HTLC payment routing attempts.
// With each execution, if an error is encountered, based on the type of error
// and the location of the error within the route, an edge or vertex is added
// to the view. Later sending attempts will then query the view for all the
// vertexes/edges that should be ignored. Items in the view decay after a set
// period of time, allowing the view to be dynamic w.r.t network changes.
// routing by external callers when sending payments throughout the network. It
// acts as a shared memory during routing attempts with the goal to optimize the
// payment attempt success rate.
//
// Failed payment attempts are reported to mission control. These reports are
// used to track the time of the last node or channel level failure. The time
// since the last failure is used to estimate a success probability that is fed
// into the path finding process for subsequent payment attempts.
type MissionControl struct {
// failedEdges maps a short channel ID to be pruned, to the time that
// it was added to the prune view. Edges are added to this map if a
// caller reports to MissionControl a failure localized to that edge
// when sending a payment.
failedEdges map[EdgeLocator]time.Time
// failedVertexes maps a node's public key that should be pruned, to
// the time that it was added to the prune view. Vertexes are added to
// this map if a caller reports to MissionControl a failure localized
// to that particular vertex.
failedVertexes map[route.Vertex]time.Time
history map[route.Vertex]*nodeHistory
graph *channeldb.ChannelGraph
@ -62,6 +42,14 @@ type MissionControl struct {
queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi
// now is expected to return the current time. It is supplied as an
// external function to enable deterministic unit tests.
now func() time.Time
// penaltyHalfLife defines after how much time a penalized node or
// channel is back at 50% probability.
penaltyHalfLife time.Duration
sync.Mutex
// TODO(roasbeef): further counters, if vertex continually unavailable,
@ -74,83 +62,41 @@ type MissionControl struct {
// PaymentSessionSource interface.
var _ PaymentSessionSource = (*MissionControl)(nil)
// NewMissionControl returns a new instance of MissionControl.
// nodeHistory contains a summary of payment attempt outcomes involving a
// particular node.
type nodeHistory struct {
// lastFail is the last time a node level failure occurred, if any.
lastFail *time.Time
// channelLastFail tracks history per channel, if available for that
// channel.
channelLastFail map[uint64]*channelHistory
}
// channelHistory contains a summary of payment attempt outcomes involving a
// particular channel.
type channelHistory struct {
// lastFail is the last time a channel level failure occurred.
lastFail time.Time
// minPenalizeAmt is the minimum amount for which to take this failure
// into account.
minPenalizeAmt lnwire.MilliSatoshi
}
// NewMissionControl returns a new instance of missionControl.
//
// TODO(roasbeef): persist memory
func NewMissionControl(g *channeldb.ChannelGraph, selfNode *channeldb.LightningNode,
qb func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) *MissionControl {
return &MissionControl{
failedEdges: make(map[EdgeLocator]time.Time),
failedVertexes: make(map[route.Vertex]time.Time),
selfNode: selfNode,
queryBandwidth: qb,
graph: g,
}
}
// graphPruneView is a filter of sorts that path finding routines should
// consult during the execution. Any edges or vertexes within the view should
// be ignored during path finding. The contents of the view reflect the current
// state of the wider network from the PoV of mission control compiled via HTLC
// routing attempts in the past.
type graphPruneView struct {
edges map[EdgeLocator]struct{}
vertexes map[route.Vertex]struct{}
}
// graphPruneView returns a new graphPruneView instance which is to be
// consulted during path finding. If a vertex/edge is found within the returned
// prune view, it is to be ignored as a goroutine has had issues routing
// through it successfully. Within this method the main view of the
// MissionControl is garbage collected as entries are detected to be "stale".
func (m *MissionControl) graphPruneView() graphPruneView {
// First, we'll grab the current time, this value will be used to
// determine if an entry is stale or not.
now := time.Now()
m.Lock()
// For each of the vertexes that have been added to the prune view, if
// it is now "stale", then we'll ignore it and avoid adding it to the
// view we'll return.
vertexes := make(map[route.Vertex]struct{})
for vertex, pruneTime := range m.failedVertexes {
if now.Sub(pruneTime) >= vertexDecay {
log.Tracef("Pruning decayed failure report for vertex %v "+
"from Mission Control", vertex)
delete(m.failedVertexes, vertex)
continue
}
vertexes[vertex] = struct{}{}
}
// We'll also do the same for edges, but use the edgeDecay this time
// rather than the decay for vertexes.
edges := make(map[EdgeLocator]struct{})
for edge, pruneTime := range m.failedEdges {
if now.Sub(pruneTime) >= edgeDecay {
log.Tracef("Pruning decayed failure report for edge %v "+
"from Mission Control", edge)
delete(m.failedEdges, edge)
continue
}
edges[edge] = struct{}{}
}
m.Unlock()
log.Debugf("Mission Control returning prune view of %v edges, %v "+
"vertexes", len(edges), len(vertexes))
return graphPruneView{
edges: edges,
vertexes: vertexes,
history: make(map[route.Vertex]*nodeHistory),
selfNode: selfNode,
queryBandwidth: qb,
graph: g,
now: time.Now,
penaltyHalfLife: defaultPenaltyHalfLife,
}
}
@ -161,8 +107,6 @@ func (m *MissionControl) graphPruneView() graphPruneView {
func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint,
target route.Vertex) (PaymentSession, error) {
viewSnapshot := m.graphPruneView()
edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy)
// Traverse through all of the available hop hints and include them in
@ -226,10 +170,9 @@ func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint,
}
return &paymentSession{
pruneViewSnapshot: viewSnapshot,
additionalEdges: edges,
bandwidthHints: bandwidthHints,
errFailedPolicyChans: make(map[EdgeLocator]struct{}),
errFailedPolicyChans: make(map[nodeChannel]struct{}),
mc: m,
pathFinder: findPath,
}, nil
@ -239,8 +182,7 @@ func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint,
// used for failure reporting to missioncontrol.
func (m *MissionControl) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession {
return &paymentSession{
pruneViewSnapshot: m.graphPruneView(),
errFailedPolicyChans: make(map[EdgeLocator]struct{}),
errFailedPolicyChans: make(map[nodeChannel]struct{}),
mc: m,
preBuiltRoute: preBuiltRoute,
}
@ -251,8 +193,7 @@ func (m *MissionControl) NewPaymentSessionForRoute(preBuiltRoute *route.Route) P
// missioncontrol for resumed payment we don't want to make more attempts for.
func (m *MissionControl) NewPaymentSessionEmpty() PaymentSession {
return &paymentSession{
pruneViewSnapshot: m.graphPruneView(),
errFailedPolicyChans: make(map[EdgeLocator]struct{}),
errFailedPolicyChans: make(map[nodeChannel]struct{}),
mc: m,
preBuiltRoute: &route.Route{},
preBuiltRouteTried: true,
@ -298,7 +239,122 @@ func generateBandwidthHints(sourceNode *channeldb.LightningNode,
// if no payment attempts have been made.
func (m *MissionControl) ResetHistory() {
m.Lock()
m.failedEdges = make(map[EdgeLocator]time.Time)
m.failedVertexes = make(map[route.Vertex]time.Time)
m.Unlock()
defer m.Unlock()
m.history = make(map[route.Vertex]*nodeHistory)
log.Debugf("Mission control history cleared")
}
// getEdgeProbability is expected to return the success probability of a payment
// from fromNode along edge.
func (m *MissionControl) getEdgeProbability(fromNode route.Vertex,
edge EdgeLocator, amt lnwire.MilliSatoshi) float64 {
m.Lock()
defer m.Unlock()
// Get the history for this node. If there is no history available,
// assume that it's success probability is a constant a priori
// probability. After the attempt new information becomes available to
// adjust this probability.
nodeHistory, ok := m.history[fromNode]
if !ok {
return aprioriHopProbability
}
return m.getEdgeProbabilityForNode(nodeHistory, edge.ChannelID, amt)
}
// getEdgeProbabilityForNode estimates the probability of successfully
// traversing a channel based on the node history.
func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory,
channelID uint64, amt lnwire.MilliSatoshi) float64 {
// Calculate the last failure of the given edge. A node failure is
// considered a failure that would have affected every edge. Therefore
// we insert a node level failure into the history of every channel.
lastFailure := nodeHistory.lastFail
// Take into account a minimum penalize amount. For balance errors, a
// failure may be reported with such a minimum to prevent too aggresive
// penalization. We only take into account a previous failure if the
// amount that we currently get the probability for is greater or equal
// than the minPenalizeAmt of the previous failure.
channelHistory, ok := nodeHistory.channelLastFail[channelID]
if ok && channelHistory.minPenalizeAmt <= amt {
// If there is both a node level failure recorded and a channel
// level failure is applicable too, we take the most recent of
// the two.
if lastFailure == nil ||
channelHistory.lastFail.After(*lastFailure) {
lastFailure = &channelHistory.lastFail
}
}
if lastFailure == nil {
return aprioriHopProbability
}
timeSinceLastFailure := m.now().Sub(*lastFailure)
// Calculate success probability. It is an exponential curve that brings
// the probability down to zero when a failure occurs. From there it
// recovers asymptotically back to the a priori probability. The rate at
// which this happens is controlled by the penaltyHalfLife parameter.
exp := -timeSinceLastFailure.Hours() / m.penaltyHalfLife.Hours()
probability := aprioriHopProbability * (1 - math.Pow(2, exp))
return probability
}
// createHistoryIfNotExists returns the history for the given node. If the node
// is yet unknown, it will create an empty history structure.
func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory {
if node, ok := m.history[vertex]; ok {
return node
}
node := &nodeHistory{
channelLastFail: make(map[uint64]*channelHistory),
}
m.history[vertex] = node
return node
}
// reportVertexFailure reports a node level failure.
func (m *MissionControl) reportVertexFailure(v route.Vertex) {
log.Debugf("Reporting vertex %v failure to Mission Control", v)
now := m.now()
m.Lock()
defer m.Unlock()
history := m.createHistoryIfNotExists(v)
history.lastFail = &now
}
// reportEdgeFailure reports a channel level failure.
//
// TODO(roasbeef): also add value attempted to send and capacity of channel
func (m *MissionControl) reportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi) {
log.Debugf("Reporting channel %v failure to Mission Control",
failedEdge.channel)
now := m.now()
m.Lock()
defer m.Unlock()
history := m.createHistoryIfNotExists(failedEdge.from)
history.channelLastFail[failedEdge.channel] = &channelHistory{
lastFail: now,
minPenalizeAmt: minPenalizeAmt,
}
}

View File

@ -0,0 +1,67 @@
package routing
import (
"testing"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
// TestMissionControl tests mission control probability estimation.
func TestMissionControl(t *testing.T) {
now := testTime
mc := NewMissionControl(nil, nil, nil)
mc.now = func() time.Time { return now }
mc.penaltyHalfLife = 30 * time.Minute
testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
testNode := route.Vertex{}
testEdge := edge{
channel: 123,
}
expectP := func(amt lnwire.MilliSatoshi, expected float64) {
t.Helper()
p := mc.getEdgeProbability(
testNode, EdgeLocator{ChannelID: testEdge.channel},
amt,
)
if p != expected {
t.Fatalf("unexpected probability %v", p)
}
}
// Initial probability is expected to be 1.
expectP(1000, 1)
// Expect probability to be zero after reporting the edge as failed.
mc.reportEdgeFailure(testEdge, 1000)
expectP(1000, 0)
// As we reported with a min penalization amt, a lower amt than reported
// should be unaffected.
expectP(500, 1)
// Edge decay started.
now = testTime.Add(30 * time.Minute)
expectP(1000, 0.5)
// Edge fails again, this time without a min penalization amt. The edge
// should be penalized regardless of amount.
mc.reportEdgeFailure(testEdge, 0)
expectP(1000, 0)
expectP(500, 0)
// Edge decay started.
now = testTime.Add(60 * time.Minute)
expectP(1000, 0.5)
// A node level failure should bring probability of every channel back
// to zero.
mc.reportVertexFailure(testNode)
expectP(1000, 0)
}

View File

@ -112,10 +112,9 @@ func (m *mockPaymentSession) RequestRoute(payment *LightningPayment,
func (m *mockPaymentSession) ReportVertexFailure(v route.Vertex) {}
func (m *mockPaymentSession) ReportEdgeFailure(e *EdgeLocator) {}
func (m *mockPaymentSession) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) {}
func (m *mockPaymentSession) ReportEdgePolicyFailure(errSource route.Vertex, failedEdge *EdgeLocator) {
}
func (m *mockPaymentSession) ReportEdgePolicyFailure(failedEdge edge) {}
type mockPayer struct {
sendResult chan error

View File

@ -242,7 +242,8 @@ type graphParams struct {
type RestrictParams struct {
// ProbabilitySource is a callback that is expected to return the
// success probability of traversing the channel from the node.
ProbabilitySource func(route.Vertex, EdgeLocator) float64
ProbabilitySource func(route.Vertex, EdgeLocator,
lnwire.MilliSatoshi) float64
// FeeLimit is a maximum fee amount allowed to be used on the path from
// the source to the target.
@ -398,7 +399,7 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex,
// Request the success probability for this edge.
locator := newEdgeLocator(edge)
edgeProbability := r.ProbabilitySource(
fromVertex, *locator,
fromVertex, *locator, amountToSend,
)
log.Tracef("path finding probability: fromnode=%v, chanid=%v, "+

View File

@ -75,7 +75,7 @@ var (
// noProbabilitySource is used in testing to return the same probability 1 for
// all edges.
func noProbabilitySource(route.Vertex, EdgeLocator) float64 {
func noProbabilitySource(route.Vertex, EdgeLocator, lnwire.MilliSatoshi) float64 {
return 1
}
@ -2137,7 +2137,12 @@ func testProbabilityRouting(t *testing.T, p10, p11, p20, minProbability float64,
target := testGraphInstance.aliasMap["target"]
// Configure a probability source with the test parameters.
probabilitySource := func(node route.Vertex, edge EdgeLocator) float64 {
probabilitySource := func(node route.Vertex, edge EdgeLocator,
amt lnwire.MilliSatoshi) float64 {
if amt == 0 {
t.Fatal("expected non-zero amount")
}
switch edge.ChannelID {
case 10:

View File

@ -2,7 +2,6 @@ package routing
import (
"fmt"
"time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
@ -24,11 +23,12 @@ type PaymentSession interface {
// route.
ReportVertexFailure(v route.Vertex)
// ReportEdgeFailure reports to the PaymentSession that the passed
// channel failed to route the previous payment attempt. The
// PaymentSession will use this information to produce a better next
// route.
ReportEdgeFailure(e *EdgeLocator)
// ReportEdgeFailure reports to the PaymentSession that the passed edge
// failed to route the previous payment attempt. A minimum penalization
// amount is included to attenuate the failure. This is set to a
// non-zero value for channel balance failures. The PaymentSession will
// use this information to produce a better next route.
ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi)
// ReportEdgePolicyFailure reports to the PaymentSession that we
// received a failure message that relates to a channel policy. For
@ -36,7 +36,7 @@ type PaymentSession interface {
// keep the edge included in the next attempted route. The
// PaymentSession will use this information to produce a better next
// route.
ReportEdgePolicyFailure(errSource route.Vertex, failedEdge *EdgeLocator)
ReportEdgePolicyFailure(failedEdge edge)
}
// paymentSession is used during an HTLC routings session to prune the local
@ -48,8 +48,6 @@ type PaymentSession interface {
// loop if payment attempts take long enough. An additional set of edges can
// also be provided to assist in reaching the payment's destination.
type paymentSession struct {
pruneViewSnapshot graphPruneView
additionalEdges map[route.Vertex][]*channeldb.ChannelEdgePolicy
bandwidthHints map[uint64]lnwire.MilliSatoshi
@ -58,7 +56,7 @@ type paymentSession struct {
// source of policy related routing failures during this payment attempt.
// We'll use this map to prune out channels when the first error may not
// require pruning, but any subsequent ones do.
errFailedPolicyChans map[EdgeLocator]struct{}
errFailedPolicyChans map[nodeChannel]struct{}
mc *MissionControl
@ -80,17 +78,7 @@ var _ PaymentSession = (*paymentSession)(nil)
//
// NOTE: Part of the PaymentSession interface.
func (p *paymentSession) ReportVertexFailure(v route.Vertex) {
log.Debugf("Reporting vertex %v failure to Mission Control", v)
// First, we'll add the failed vertex to our local prune view snapshot.
p.pruneViewSnapshot.vertexes[v] = struct{}{}
// With the vertex added, we'll now report back to the global prune
// view, with this new piece of information so it can be utilized for
// new payment sessions.
p.mc.Lock()
p.mc.failedVertexes[v] = time.Now()
p.mc.Unlock()
p.mc.reportVertexFailure(v)
}
// ReportEdgeFailure adds a channel to the graph prune view. The time the
@ -102,18 +90,10 @@ func (p *paymentSession) ReportVertexFailure(v route.Vertex) {
// TODO(roasbeef): also add value attempted to send and capacity of channel
//
// NOTE: Part of the PaymentSession interface.
func (p *paymentSession) ReportEdgeFailure(e *EdgeLocator) {
log.Debugf("Reporting edge %v failure to Mission Control", e)
func (p *paymentSession) ReportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi) {
// First, we'll add the failed edge to our local prune view snapshot.
p.pruneViewSnapshot.edges[*e] = struct{}{}
// With the edge added, we'll now report back to the global prune view,
// with this new piece of information so it can be utilized for new
// payment sessions.
p.mc.Lock()
p.mc.failedEdges[*e] = time.Now()
p.mc.Unlock()
p.mc.reportEdgeFailure(failedEdge, minPenalizeAmt)
}
// ReportEdgePolicyFailure handles a failure message that relates to a
@ -124,37 +104,28 @@ func (p *paymentSession) ReportEdgeFailure(e *EdgeLocator) {
// new channel updates.
//
// NOTE: Part of the PaymentSession interface.
func (p *paymentSession) ReportEdgePolicyFailure(
errSource route.Vertex, failedEdge *EdgeLocator) {
//
// TODO(joostjager): Move this logic into global mission control.
func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) {
key := nodeChannel{
node: failedEdge.from,
channel: failedEdge.channel,
}
// Check to see if we've already reported a policy related failure for
// this channel. If so, then we'll prune out the vertex.
_, ok := p.errFailedPolicyChans[*failedEdge]
_, ok := p.errFailedPolicyChans[key]
if ok {
// TODO(joostjager): is this aggressive pruning still necessary?
// Just pruning edges may also work unless there is a huge
// number of failing channels from that node?
p.ReportVertexFailure(errSource)
p.ReportVertexFailure(key.node)
return
}
// Finally, we'll record a policy failure from this node and move on.
p.errFailedPolicyChans[*failedEdge] = struct{}{}
}
func (p *paymentSession) getEdgeProbability(node route.Vertex,
edge EdgeLocator) float64 {
if _, ok := p.pruneViewSnapshot.vertexes[node]; ok {
return 0
}
if _, ok := p.pruneViewSnapshot.edges[edge]; ok {
return 0
}
return 1
p.errFailedPolicyChans[key] = struct{}{}
}
// RequestRoute returns a route which is likely to be capable for successfully
@ -183,15 +154,6 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment,
return nil, fmt.Errorf("pre-built route already tried")
}
// Otherwise we actually need to perform path finding, so we'll obtain
// our current prune view snapshot. This view will only ever grow
// during the duration of this payment session, never shrinking.
pruneView := p.pruneViewSnapshot
log.Debugf("Mission Control session using prune view of %v "+
"edges, %v vertexes", len(pruneView.edges),
len(pruneView.vertexes))
// If a route cltv limit was specified, we need to subtract the final
// delta before passing it into path finding. The optimal path is
// independent of the final cltv delta and the path finding algorithm is
@ -214,11 +176,12 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment,
bandwidthHints: p.bandwidthHints,
},
&RestrictParams{
ProbabilitySource: p.getEdgeProbability,
ProbabilitySource: p.mc.getEdgeProbability,
FeeLimit: payment.FeeLimit,
OutgoingChannelID: payment.OutgoingChannelID,
CltvLimit: cltvLimit,
PaymentAttemptPenalty: DefaultPaymentAttemptPenalty,
MinProbability: DefaultMinProbability,
},
p.mc.selfNode.PubKeyBytes, payment.Target,
payment.Amount,
@ -241,3 +204,9 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment,
return route, err
}
// nodeChannel is a combination of the node pubkey and one of its channels.
type nodeChannel struct {
node route.Vertex
channel uint64
}

View File

@ -36,8 +36,7 @@ func TestRequestRoute(t *testing.T) {
mc: &MissionControl{
selfNode: &channeldb.LightningNode{},
},
pruneViewSnapshot: graphPruneView{},
pathFinder: findPath,
pathFinder: findPath,
}
cltvLimit := uint32(30)

View File

@ -319,6 +319,13 @@ func (e *EdgeLocator) String() string {
return fmt.Sprintf("%v:%v", e.ChannelID, e.Direction)
}
// edge is a combination of a channel and the node pubkeys of both of its
// endpoints.
type edge struct {
from, to route.Vertex
channel uint64
}
// ChannelRouter is the layer 3 router within the Lightning stack. Below the
// ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain
// itself. The primary role of the ChannelRouter is to respond to queries for
@ -1769,7 +1776,9 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// Always determine chan id ourselves, because a channel
// update with id may not be available.
failedEdge, err := getFailedEdge(rt, route.Vertex(errVertex))
failedEdge, failedAmt, err := getFailedEdge(
rt, route.Vertex(errVertex),
)
if err != nil {
return true
}
@ -1801,13 +1810,11 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// update to fail?
if !updateOk {
paySession.ReportEdgeFailure(
failedEdge,
failedEdge, 0,
)
}
paySession.ReportEdgePolicyFailure(
route.NewVertex(errSource), failedEdge,
)
paySession.ReportEdgePolicyFailure(failedEdge)
}
switch onionErr := fErr.FailureMessage.(type) {
@ -1898,7 +1905,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// the update and continue.
case *lnwire.FailChannelDisabled:
r.applyChannelUpdate(&onionErr.Update, errSource)
paySession.ReportEdgeFailure(failedEdge)
paySession.ReportEdgeFailure(failedEdge, 0)
return false
// It's likely that the outgoing channel didn't have
@ -1906,7 +1913,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// now, and continue onwards with our path finding.
case *lnwire.FailTemporaryChannelFailure:
r.applyChannelUpdate(onionErr.Update, errSource)
paySession.ReportEdgeFailure(failedEdge)
paySession.ReportEdgeFailure(failedEdge, failedAmt)
return false
// If the send fail due to a node not having the
@ -1931,7 +1938,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// returning errors in order to attempt to black list
// another node.
case *lnwire.FailUnknownNextPeer:
paySession.ReportEdgeFailure(failedEdge)
paySession.ReportEdgeFailure(failedEdge, 0)
return false
// If the node wasn't able to forward for which ever
@ -1962,14 +1969,12 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// we'll prune the channel in both directions and
// continue with the rest of the routes.
case *lnwire.FailPermanentChannelFailure:
paySession.ReportEdgeFailure(&EdgeLocator{
ChannelID: failedEdge.ChannelID,
Direction: 0,
})
paySession.ReportEdgeFailure(&EdgeLocator{
ChannelID: failedEdge.ChannelID,
Direction: 1,
})
paySession.ReportEdgeFailure(failedEdge, 0)
paySession.ReportEdgeFailure(edge{
from: failedEdge.to,
to: failedEdge.from,
channel: failedEdge.channel,
}, 0)
return false
default:
@ -1979,12 +1984,14 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// getFailedEdge tries to locate the failing channel given a route and the
// pubkey of the node that sent the error. It will assume that the error is
// associated with the outgoing channel of the error node.
func getFailedEdge(route *route.Route, errSource route.Vertex) (
*EdgeLocator, error) {
// associated with the outgoing channel of the error node. As a second result,
// it returns the amount sent over the edge.
func getFailedEdge(route *route.Route, errSource route.Vertex) (edge,
lnwire.MilliSatoshi, error) {
hopCount := len(route.Hops)
fromNode := route.SourcePubKey
amt := route.TotalAmount
for i, hop := range route.Hops {
toNode := hop.PubKeyBytes
@ -2003,17 +2010,18 @@ func getFailedEdge(route *route.Route, errSource route.Vertex) (
// If the errSource is the final hop, we assume that the failing
// channel is the incoming channel.
if errSource == fromNode || finalHopFailing {
return newEdgeLocatorByPubkeys(
hop.ChannelID,
&fromNode,
&toNode,
), nil
return edge{
from: fromNode,
to: toNode,
channel: hop.ChannelID,
}, amt, nil
}
fromNode = toNode
amt = hop.AmtToForward
}
return nil, fmt.Errorf("cannot find error source node in route")
return edge{}, 0, fmt.Errorf("cannot find error source node in route")
}
// applyChannelUpdate validates a channel update and if valid, applies it to the