diff --git a/lnrpc/routerrpc/router_backend.go b/lnrpc/routerrpc/router_backend.go index dfda2109..9e399e8f 100644 --- a/lnrpc/routerrpc/router_backend.go +++ b/lnrpc/routerrpc/router_backend.go @@ -125,7 +125,8 @@ func (r *RouterBackend) QueryRoutes(ctx context.Context, restrictions := &routing.RestrictParams{ FeeLimit: feeLimit, ProbabilitySource: func(node route.Vertex, - edge routing.EdgeLocator) float64 { + edge routing.EdgeLocator, + amt lnwire.MilliSatoshi) float64 { if _, ok := ignoredNodes[node]; ok { return 0 diff --git a/lnrpc/routerrpc/router_backend_test.go b/lnrpc/routerrpc/router_backend_test.go index fee5fb4b..49af5387 100644 --- a/lnrpc/routerrpc/router_backend_test.go +++ b/lnrpc/routerrpc/router_backend_test.go @@ -81,19 +81,19 @@ func TestQueryRoutes(t *testing.T) { } if restrictions.ProbabilitySource(route.Vertex{}, - ignoredEdge, + ignoredEdge, 0, ) != 0 { t.Fatal("expecting 0% probability for ignored edge") } if restrictions.ProbabilitySource(ignoreNodeVertex, - routing.EdgeLocator{}, + routing.EdgeLocator{}, 0, ) != 0 { t.Fatal("expecting 0% probability for ignored node") } if restrictions.ProbabilitySource(route.Vertex{}, - routing.EdgeLocator{}, + routing.EdgeLocator{}, 0, ) != 1 { t.Fatal("expecting 100% probability") } diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 94358536..6381c24f 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -32,6 +32,7 @@ import ( "github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwire" "golang.org/x/net/context" @@ -8382,8 +8383,14 @@ out: // failed payment. shutdownAndAssert(net, t, carol) - // TODO(roasbeef): mission control - time.Sleep(time.Second * 5) + // Reset mission control to forget the temporary channel failure above. + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + _, err = net.Alice.RouterClient.ResetMissionControl( + ctxt, &routerrpc.ResetMissionControlRequest{}, + ) + if err != nil { + t.Fatalf("unable to reset mission control: %v", err) + } sendReq = &lnrpc.SendRequest{ PaymentRequest: carolInvoice.PaymentRequest, diff --git a/lntest/node.go b/lntest/node.go index 622a0e32..fd63afb5 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/chanbackup" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" + "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/macaroons" ) @@ -248,6 +249,10 @@ type HarnessNode struct { lnrpc.WalletUnlockerClient invoicesrpc.InvoicesClient + + // RouterClient cannot be embedded, because a name collision would occur + // on the main rpc SendPayment. + RouterClient routerrpc.RouterClient } // Assert *HarnessNode implements the lnrpc.LightningClient interface. @@ -497,6 +502,7 @@ func (hn *HarnessNode) initLightningClient(conn *grpc.ClientConn) error { // HarnessNode directly for normal rpc operations. hn.LightningClient = lnrpc.NewLightningClient(conn) hn.InvoicesClient = invoicesrpc.NewInvoicesClient(conn) + hn.RouterClient = routerrpc.NewRouterClient(conn) // Set the harness node's pubkey to what the node claims in GetInfo. err := hn.FetchNodeInfo() diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 9cbaa911..727858e7 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -1,6 +1,7 @@ package routing import ( + "math" "sync" "time" @@ -13,48 +14,27 @@ import ( ) const ( - // vertexDecay is the decay period of colored vertexes added to - // MissionControl. Once vertexDecay passes after an entry has been - // added to the prune view, it is garbage collected. This value is - // larger than edgeDecay as an edge failure typical indicates an - // unbalanced channel, while a vertex failure indicates a node is not - // online and active. - vertexDecay = time.Duration(time.Minute * 5) + // defaultPenaltyHalfLife is the default half-life duration. The + // half-life duration defines after how much time a penalized node or + // channel is back at 50% probability. + defaultPenaltyHalfLife = time.Hour - // edgeDecay is the decay period of colored edges added to - // MissionControl. Once edgeDecay passed after an entry has been added, - // it is garbage collected. This value is smaller than vertexDecay as - // an edge related failure during payment sending typically indicates - // that a channel was unbalanced, a condition which may quickly change. - // - // TODO(roasbeef): instead use random delay on each? - edgeDecay = time.Duration(time.Second * 5) + // aprioriHopProbability is the assumed success probability of a hop in + // a route when no other information is available. + aprioriHopProbability = 1 ) // MissionControl contains state which summarizes the past attempts of HTLC -// routing by external callers when sending payments throughout the network. -// MissionControl remembers the outcome of these past routing attempts (success -// and failure), and is able to provide hints/guidance to future HTLC routing -// attempts. MissionControl maintains a decaying network view of the -// edges/vertexes that should be marked as "pruned" during path finding. This -// graph view acts as a shared memory during HTLC payment routing attempts. -// With each execution, if an error is encountered, based on the type of error -// and the location of the error within the route, an edge or vertex is added -// to the view. Later sending attempts will then query the view for all the -// vertexes/edges that should be ignored. Items in the view decay after a set -// period of time, allowing the view to be dynamic w.r.t network changes. +// routing by external callers when sending payments throughout the network. It +// acts as a shared memory during routing attempts with the goal to optimize the +// payment attempt success rate. +// +// Failed payment attempts are reported to mission control. These reports are +// used to track the time of the last node or channel level failure. The time +// since the last failure is used to estimate a success probability that is fed +// into the path finding process for subsequent payment attempts. type MissionControl struct { - // failedEdges maps a short channel ID to be pruned, to the time that - // it was added to the prune view. Edges are added to this map if a - // caller reports to MissionControl a failure localized to that edge - // when sending a payment. - failedEdges map[EdgeLocator]time.Time - - // failedVertexes maps a node's public key that should be pruned, to - // the time that it was added to the prune view. Vertexes are added to - // this map if a caller reports to MissionControl a failure localized - // to that particular vertex. - failedVertexes map[route.Vertex]time.Time + history map[route.Vertex]*nodeHistory graph *channeldb.ChannelGraph @@ -62,6 +42,14 @@ type MissionControl struct { queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi + // now is expected to return the current time. It is supplied as an + // external function to enable deterministic unit tests. + now func() time.Time + + // penaltyHalfLife defines after how much time a penalized node or + // channel is back at 50% probability. + penaltyHalfLife time.Duration + sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, @@ -74,83 +62,41 @@ type MissionControl struct { // PaymentSessionSource interface. var _ PaymentSessionSource = (*MissionControl)(nil) -// NewMissionControl returns a new instance of MissionControl. +// nodeHistory contains a summary of payment attempt outcomes involving a +// particular node. +type nodeHistory struct { + // lastFail is the last time a node level failure occurred, if any. + lastFail *time.Time + + // channelLastFail tracks history per channel, if available for that + // channel. + channelLastFail map[uint64]*channelHistory +} + +// channelHistory contains a summary of payment attempt outcomes involving a +// particular channel. +type channelHistory struct { + // lastFail is the last time a channel level failure occurred. + lastFail time.Time + + // minPenalizeAmt is the minimum amount for which to take this failure + // into account. + minPenalizeAmt lnwire.MilliSatoshi +} + +// NewMissionControl returns a new instance of missionControl. // // TODO(roasbeef): persist memory func NewMissionControl(g *channeldb.ChannelGraph, selfNode *channeldb.LightningNode, qb func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) *MissionControl { return &MissionControl{ - failedEdges: make(map[EdgeLocator]time.Time), - failedVertexes: make(map[route.Vertex]time.Time), - selfNode: selfNode, - queryBandwidth: qb, - graph: g, - } -} - -// graphPruneView is a filter of sorts that path finding routines should -// consult during the execution. Any edges or vertexes within the view should -// be ignored during path finding. The contents of the view reflect the current -// state of the wider network from the PoV of mission control compiled via HTLC -// routing attempts in the past. -type graphPruneView struct { - edges map[EdgeLocator]struct{} - - vertexes map[route.Vertex]struct{} -} - -// graphPruneView returns a new graphPruneView instance which is to be -// consulted during path finding. If a vertex/edge is found within the returned -// prune view, it is to be ignored as a goroutine has had issues routing -// through it successfully. Within this method the main view of the -// MissionControl is garbage collected as entries are detected to be "stale". -func (m *MissionControl) graphPruneView() graphPruneView { - // First, we'll grab the current time, this value will be used to - // determine if an entry is stale or not. - now := time.Now() - - m.Lock() - - // For each of the vertexes that have been added to the prune view, if - // it is now "stale", then we'll ignore it and avoid adding it to the - // view we'll return. - vertexes := make(map[route.Vertex]struct{}) - for vertex, pruneTime := range m.failedVertexes { - if now.Sub(pruneTime) >= vertexDecay { - log.Tracef("Pruning decayed failure report for vertex %v "+ - "from Mission Control", vertex) - - delete(m.failedVertexes, vertex) - continue - } - - vertexes[vertex] = struct{}{} - } - - // We'll also do the same for edges, but use the edgeDecay this time - // rather than the decay for vertexes. - edges := make(map[EdgeLocator]struct{}) - for edge, pruneTime := range m.failedEdges { - if now.Sub(pruneTime) >= edgeDecay { - log.Tracef("Pruning decayed failure report for edge %v "+ - "from Mission Control", edge) - - delete(m.failedEdges, edge) - continue - } - - edges[edge] = struct{}{} - } - - m.Unlock() - - log.Debugf("Mission Control returning prune view of %v edges, %v "+ - "vertexes", len(edges), len(vertexes)) - - return graphPruneView{ - edges: edges, - vertexes: vertexes, + history: make(map[route.Vertex]*nodeHistory), + selfNode: selfNode, + queryBandwidth: qb, + graph: g, + now: time.Now, + penaltyHalfLife: defaultPenaltyHalfLife, } } @@ -161,8 +107,6 @@ func (m *MissionControl) graphPruneView() graphPruneView { func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint, target route.Vertex) (PaymentSession, error) { - viewSnapshot := m.graphPruneView() - edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy) // Traverse through all of the available hop hints and include them in @@ -226,10 +170,9 @@ func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint, } return &paymentSession{ - pruneViewSnapshot: viewSnapshot, additionalEdges: edges, bandwidthHints: bandwidthHints, - errFailedPolicyChans: make(map[EdgeLocator]struct{}), + errFailedPolicyChans: make(map[nodeChannel]struct{}), mc: m, pathFinder: findPath, }, nil @@ -239,8 +182,7 @@ func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint, // used for failure reporting to missioncontrol. func (m *MissionControl) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { return &paymentSession{ - pruneViewSnapshot: m.graphPruneView(), - errFailedPolicyChans: make(map[EdgeLocator]struct{}), + errFailedPolicyChans: make(map[nodeChannel]struct{}), mc: m, preBuiltRoute: preBuiltRoute, } @@ -251,8 +193,7 @@ func (m *MissionControl) NewPaymentSessionForRoute(preBuiltRoute *route.Route) P // missioncontrol for resumed payment we don't want to make more attempts for. func (m *MissionControl) NewPaymentSessionEmpty() PaymentSession { return &paymentSession{ - pruneViewSnapshot: m.graphPruneView(), - errFailedPolicyChans: make(map[EdgeLocator]struct{}), + errFailedPolicyChans: make(map[nodeChannel]struct{}), mc: m, preBuiltRoute: &route.Route{}, preBuiltRouteTried: true, @@ -298,7 +239,122 @@ func generateBandwidthHints(sourceNode *channeldb.LightningNode, // if no payment attempts have been made. func (m *MissionControl) ResetHistory() { m.Lock() - m.failedEdges = make(map[EdgeLocator]time.Time) - m.failedVertexes = make(map[route.Vertex]time.Time) - m.Unlock() + defer m.Unlock() + + m.history = make(map[route.Vertex]*nodeHistory) + + log.Debugf("Mission control history cleared") +} + +// getEdgeProbability is expected to return the success probability of a payment +// from fromNode along edge. +func (m *MissionControl) getEdgeProbability(fromNode route.Vertex, + edge EdgeLocator, amt lnwire.MilliSatoshi) float64 { + + m.Lock() + defer m.Unlock() + + // Get the history for this node. If there is no history available, + // assume that it's success probability is a constant a priori + // probability. After the attempt new information becomes available to + // adjust this probability. + nodeHistory, ok := m.history[fromNode] + if !ok { + return aprioriHopProbability + } + + return m.getEdgeProbabilityForNode(nodeHistory, edge.ChannelID, amt) +} + +// getEdgeProbabilityForNode estimates the probability of successfully +// traversing a channel based on the node history. +func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory, + channelID uint64, amt lnwire.MilliSatoshi) float64 { + + // Calculate the last failure of the given edge. A node failure is + // considered a failure that would have affected every edge. Therefore + // we insert a node level failure into the history of every channel. + lastFailure := nodeHistory.lastFail + + // Take into account a minimum penalize amount. For balance errors, a + // failure may be reported with such a minimum to prevent too aggresive + // penalization. We only take into account a previous failure if the + // amount that we currently get the probability for is greater or equal + // than the minPenalizeAmt of the previous failure. + channelHistory, ok := nodeHistory.channelLastFail[channelID] + if ok && channelHistory.minPenalizeAmt <= amt { + + // If there is both a node level failure recorded and a channel + // level failure is applicable too, we take the most recent of + // the two. + if lastFailure == nil || + channelHistory.lastFail.After(*lastFailure) { + + lastFailure = &channelHistory.lastFail + } + } + + if lastFailure == nil { + return aprioriHopProbability + } + + timeSinceLastFailure := m.now().Sub(*lastFailure) + + // Calculate success probability. It is an exponential curve that brings + // the probability down to zero when a failure occurs. From there it + // recovers asymptotically back to the a priori probability. The rate at + // which this happens is controlled by the penaltyHalfLife parameter. + exp := -timeSinceLastFailure.Hours() / m.penaltyHalfLife.Hours() + probability := aprioriHopProbability * (1 - math.Pow(2, exp)) + + return probability +} + +// createHistoryIfNotExists returns the history for the given node. If the node +// is yet unknown, it will create an empty history structure. +func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory { + if node, ok := m.history[vertex]; ok { + return node + } + + node := &nodeHistory{ + channelLastFail: make(map[uint64]*channelHistory), + } + m.history[vertex] = node + + return node +} + +// reportVertexFailure reports a node level failure. +func (m *MissionControl) reportVertexFailure(v route.Vertex) { + log.Debugf("Reporting vertex %v failure to Mission Control", v) + + now := m.now() + + m.Lock() + defer m.Unlock() + + history := m.createHistoryIfNotExists(v) + history.lastFail = &now +} + +// reportEdgeFailure reports a channel level failure. +// +// TODO(roasbeef): also add value attempted to send and capacity of channel +func (m *MissionControl) reportEdgeFailure(failedEdge edge, + minPenalizeAmt lnwire.MilliSatoshi) { + + log.Debugf("Reporting channel %v failure to Mission Control", + failedEdge.channel) + + now := m.now() + + m.Lock() + defer m.Unlock() + + history := m.createHistoryIfNotExists(failedEdge.from) + history.channelLastFail[failedEdge.channel] = &channelHistory{ + lastFail: now, + minPenalizeAmt: minPenalizeAmt, + } } diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go new file mode 100644 index 00000000..39ebb5bd --- /dev/null +++ b/routing/missioncontrol_test.go @@ -0,0 +1,67 @@ +package routing + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) + +// TestMissionControl tests mission control probability estimation. +func TestMissionControl(t *testing.T) { + now := testTime + + mc := NewMissionControl(nil, nil, nil) + mc.now = func() time.Time { return now } + mc.penaltyHalfLife = 30 * time.Minute + + testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) + + testNode := route.Vertex{} + testEdge := edge{ + channel: 123, + } + + expectP := func(amt lnwire.MilliSatoshi, expected float64) { + t.Helper() + + p := mc.getEdgeProbability( + testNode, EdgeLocator{ChannelID: testEdge.channel}, + amt, + ) + if p != expected { + t.Fatalf("unexpected probability %v", p) + } + } + + // Initial probability is expected to be 1. + expectP(1000, 1) + + // Expect probability to be zero after reporting the edge as failed. + mc.reportEdgeFailure(testEdge, 1000) + expectP(1000, 0) + + // As we reported with a min penalization amt, a lower amt than reported + // should be unaffected. + expectP(500, 1) + + // Edge decay started. + now = testTime.Add(30 * time.Minute) + expectP(1000, 0.5) + + // Edge fails again, this time without a min penalization amt. The edge + // should be penalized regardless of amount. + mc.reportEdgeFailure(testEdge, 0) + expectP(1000, 0) + expectP(500, 0) + + // Edge decay started. + now = testTime.Add(60 * time.Minute) + expectP(1000, 0.5) + + // A node level failure should bring probability of every channel back + // to zero. + mc.reportVertexFailure(testNode) + expectP(1000, 0) +} diff --git a/routing/mock_test.go b/routing/mock_test.go index 6d98dd6e..c5314b83 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -112,10 +112,9 @@ func (m *mockPaymentSession) RequestRoute(payment *LightningPayment, func (m *mockPaymentSession) ReportVertexFailure(v route.Vertex) {} -func (m *mockPaymentSession) ReportEdgeFailure(e *EdgeLocator) {} +func (m *mockPaymentSession) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) {} -func (m *mockPaymentSession) ReportEdgePolicyFailure(errSource route.Vertex, failedEdge *EdgeLocator) { -} +func (m *mockPaymentSession) ReportEdgePolicyFailure(failedEdge edge) {} type mockPayer struct { sendResult chan error diff --git a/routing/pathfind.go b/routing/pathfind.go index 4feb7265..3312a66e 100644 --- a/routing/pathfind.go +++ b/routing/pathfind.go @@ -242,7 +242,8 @@ type graphParams struct { type RestrictParams struct { // ProbabilitySource is a callback that is expected to return the // success probability of traversing the channel from the node. - ProbabilitySource func(route.Vertex, EdgeLocator) float64 + ProbabilitySource func(route.Vertex, EdgeLocator, + lnwire.MilliSatoshi) float64 // FeeLimit is a maximum fee amount allowed to be used on the path from // the source to the target. @@ -398,7 +399,7 @@ func findPath(g *graphParams, r *RestrictParams, source, target route.Vertex, // Request the success probability for this edge. locator := newEdgeLocator(edge) edgeProbability := r.ProbabilitySource( - fromVertex, *locator, + fromVertex, *locator, amountToSend, ) log.Tracef("path finding probability: fromnode=%v, chanid=%v, "+ diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index 433265b8..8a5cf068 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -75,7 +75,7 @@ var ( // noProbabilitySource is used in testing to return the same probability 1 for // all edges. -func noProbabilitySource(route.Vertex, EdgeLocator) float64 { +func noProbabilitySource(route.Vertex, EdgeLocator, lnwire.MilliSatoshi) float64 { return 1 } @@ -2137,7 +2137,12 @@ func testProbabilityRouting(t *testing.T, p10, p11, p20, minProbability float64, target := testGraphInstance.aliasMap["target"] // Configure a probability source with the test parameters. - probabilitySource := func(node route.Vertex, edge EdgeLocator) float64 { + probabilitySource := func(node route.Vertex, edge EdgeLocator, + amt lnwire.MilliSatoshi) float64 { + + if amt == 0 { + t.Fatal("expected non-zero amount") + } switch edge.ChannelID { case 10: diff --git a/routing/payment_session.go b/routing/payment_session.go index a75d1178..6865d1b3 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -2,7 +2,6 @@ package routing import ( "fmt" - "time" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" @@ -24,11 +23,12 @@ type PaymentSession interface { // route. ReportVertexFailure(v route.Vertex) - // ReportEdgeFailure reports to the PaymentSession that the passed - // channel failed to route the previous payment attempt. The - // PaymentSession will use this information to produce a better next - // route. - ReportEdgeFailure(e *EdgeLocator) + // ReportEdgeFailure reports to the PaymentSession that the passed edge + // failed to route the previous payment attempt. A minimum penalization + // amount is included to attenuate the failure. This is set to a + // non-zero value for channel balance failures. The PaymentSession will + // use this information to produce a better next route. + ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) // ReportEdgePolicyFailure reports to the PaymentSession that we // received a failure message that relates to a channel policy. For @@ -36,7 +36,7 @@ type PaymentSession interface { // keep the edge included in the next attempted route. The // PaymentSession will use this information to produce a better next // route. - ReportEdgePolicyFailure(errSource route.Vertex, failedEdge *EdgeLocator) + ReportEdgePolicyFailure(failedEdge edge) } // paymentSession is used during an HTLC routings session to prune the local @@ -48,8 +48,6 @@ type PaymentSession interface { // loop if payment attempts take long enough. An additional set of edges can // also be provided to assist in reaching the payment's destination. type paymentSession struct { - pruneViewSnapshot graphPruneView - additionalEdges map[route.Vertex][]*channeldb.ChannelEdgePolicy bandwidthHints map[uint64]lnwire.MilliSatoshi @@ -58,7 +56,7 @@ type paymentSession struct { // source of policy related routing failures during this payment attempt. // We'll use this map to prune out channels when the first error may not // require pruning, but any subsequent ones do. - errFailedPolicyChans map[EdgeLocator]struct{} + errFailedPolicyChans map[nodeChannel]struct{} mc *MissionControl @@ -80,17 +78,7 @@ var _ PaymentSession = (*paymentSession)(nil) // // NOTE: Part of the PaymentSession interface. func (p *paymentSession) ReportVertexFailure(v route.Vertex) { - log.Debugf("Reporting vertex %v failure to Mission Control", v) - - // First, we'll add the failed vertex to our local prune view snapshot. - p.pruneViewSnapshot.vertexes[v] = struct{}{} - - // With the vertex added, we'll now report back to the global prune - // view, with this new piece of information so it can be utilized for - // new payment sessions. - p.mc.Lock() - p.mc.failedVertexes[v] = time.Now() - p.mc.Unlock() + p.mc.reportVertexFailure(v) } // ReportEdgeFailure adds a channel to the graph prune view. The time the @@ -102,18 +90,10 @@ func (p *paymentSession) ReportVertexFailure(v route.Vertex) { // TODO(roasbeef): also add value attempted to send and capacity of channel // // NOTE: Part of the PaymentSession interface. -func (p *paymentSession) ReportEdgeFailure(e *EdgeLocator) { - log.Debugf("Reporting edge %v failure to Mission Control", e) +func (p *paymentSession) ReportEdgeFailure(failedEdge edge, + minPenalizeAmt lnwire.MilliSatoshi) { - // First, we'll add the failed edge to our local prune view snapshot. - p.pruneViewSnapshot.edges[*e] = struct{}{} - - // With the edge added, we'll now report back to the global prune view, - // with this new piece of information so it can be utilized for new - // payment sessions. - p.mc.Lock() - p.mc.failedEdges[*e] = time.Now() - p.mc.Unlock() + p.mc.reportEdgeFailure(failedEdge, minPenalizeAmt) } // ReportEdgePolicyFailure handles a failure message that relates to a @@ -124,37 +104,28 @@ func (p *paymentSession) ReportEdgeFailure(e *EdgeLocator) { // new channel updates. // // NOTE: Part of the PaymentSession interface. -func (p *paymentSession) ReportEdgePolicyFailure( - errSource route.Vertex, failedEdge *EdgeLocator) { +// +// TODO(joostjager): Move this logic into global mission control. +func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) { + key := nodeChannel{ + node: failedEdge.from, + channel: failedEdge.channel, + } // Check to see if we've already reported a policy related failure for // this channel. If so, then we'll prune out the vertex. - _, ok := p.errFailedPolicyChans[*failedEdge] + _, ok := p.errFailedPolicyChans[key] if ok { // TODO(joostjager): is this aggressive pruning still necessary? // Just pruning edges may also work unless there is a huge // number of failing channels from that node? - p.ReportVertexFailure(errSource) + p.ReportVertexFailure(key.node) return } // Finally, we'll record a policy failure from this node and move on. - p.errFailedPolicyChans[*failedEdge] = struct{}{} -} - -func (p *paymentSession) getEdgeProbability(node route.Vertex, - edge EdgeLocator) float64 { - - if _, ok := p.pruneViewSnapshot.vertexes[node]; ok { - return 0 - } - - if _, ok := p.pruneViewSnapshot.edges[edge]; ok { - return 0 - } - - return 1 + p.errFailedPolicyChans[key] = struct{}{} } // RequestRoute returns a route which is likely to be capable for successfully @@ -183,15 +154,6 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment, return nil, fmt.Errorf("pre-built route already tried") } - // Otherwise we actually need to perform path finding, so we'll obtain - // our current prune view snapshot. This view will only ever grow - // during the duration of this payment session, never shrinking. - pruneView := p.pruneViewSnapshot - - log.Debugf("Mission Control session using prune view of %v "+ - "edges, %v vertexes", len(pruneView.edges), - len(pruneView.vertexes)) - // If a route cltv limit was specified, we need to subtract the final // delta before passing it into path finding. The optimal path is // independent of the final cltv delta and the path finding algorithm is @@ -214,11 +176,12 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment, bandwidthHints: p.bandwidthHints, }, &RestrictParams{ - ProbabilitySource: p.getEdgeProbability, + ProbabilitySource: p.mc.getEdgeProbability, FeeLimit: payment.FeeLimit, OutgoingChannelID: payment.OutgoingChannelID, CltvLimit: cltvLimit, PaymentAttemptPenalty: DefaultPaymentAttemptPenalty, + MinProbability: DefaultMinProbability, }, p.mc.selfNode.PubKeyBytes, payment.Target, payment.Amount, @@ -241,3 +204,9 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment, return route, err } + +// nodeChannel is a combination of the node pubkey and one of its channels. +type nodeChannel struct { + node route.Vertex + channel uint64 +} diff --git a/routing/payment_session_test.go b/routing/payment_session_test.go index f730bc57..3ac01f2a 100644 --- a/routing/payment_session_test.go +++ b/routing/payment_session_test.go @@ -36,8 +36,7 @@ func TestRequestRoute(t *testing.T) { mc: &MissionControl{ selfNode: &channeldb.LightningNode{}, }, - pruneViewSnapshot: graphPruneView{}, - pathFinder: findPath, + pathFinder: findPath, } cltvLimit := uint32(30) diff --git a/routing/router.go b/routing/router.go index 1e4ec706..a78791ea 100644 --- a/routing/router.go +++ b/routing/router.go @@ -319,6 +319,13 @@ func (e *EdgeLocator) String() string { return fmt.Sprintf("%v:%v", e.ChannelID, e.Direction) } +// edge is a combination of a channel and the node pubkeys of both of its +// endpoints. +type edge struct { + from, to route.Vertex + channel uint64 +} + // ChannelRouter is the layer 3 router within the Lightning stack. Below the // ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain // itself. The primary role of the ChannelRouter is to respond to queries for @@ -1769,7 +1776,9 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // Always determine chan id ourselves, because a channel // update with id may not be available. - failedEdge, err := getFailedEdge(rt, route.Vertex(errVertex)) + failedEdge, failedAmt, err := getFailedEdge( + rt, route.Vertex(errVertex), + ) if err != nil { return true } @@ -1801,13 +1810,11 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // update to fail? if !updateOk { paySession.ReportEdgeFailure( - failedEdge, + failedEdge, 0, ) } - paySession.ReportEdgePolicyFailure( - route.NewVertex(errSource), failedEdge, - ) + paySession.ReportEdgePolicyFailure(failedEdge) } switch onionErr := fErr.FailureMessage.(type) { @@ -1898,7 +1905,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // the update and continue. case *lnwire.FailChannelDisabled: r.applyChannelUpdate(&onionErr.Update, errSource) - paySession.ReportEdgeFailure(failedEdge) + paySession.ReportEdgeFailure(failedEdge, 0) return false // It's likely that the outgoing channel didn't have @@ -1906,7 +1913,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: r.applyChannelUpdate(onionErr.Update, errSource) - paySession.ReportEdgeFailure(failedEdge) + paySession.ReportEdgeFailure(failedEdge, failedAmt) return false // If the send fail due to a node not having the @@ -1931,7 +1938,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // returning errors in order to attempt to black list // another node. case *lnwire.FailUnknownNextPeer: - paySession.ReportEdgeFailure(failedEdge) + paySession.ReportEdgeFailure(failedEdge, 0) return false // If the node wasn't able to forward for which ever @@ -1962,14 +1969,12 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // we'll prune the channel in both directions and // continue with the rest of the routes. case *lnwire.FailPermanentChannelFailure: - paySession.ReportEdgeFailure(&EdgeLocator{ - ChannelID: failedEdge.ChannelID, - Direction: 0, - }) - paySession.ReportEdgeFailure(&EdgeLocator{ - ChannelID: failedEdge.ChannelID, - Direction: 1, - }) + paySession.ReportEdgeFailure(failedEdge, 0) + paySession.ReportEdgeFailure(edge{ + from: failedEdge.to, + to: failedEdge.from, + channel: failedEdge.channel, + }, 0) return false default: @@ -1979,12 +1984,14 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // getFailedEdge tries to locate the failing channel given a route and the // pubkey of the node that sent the error. It will assume that the error is -// associated with the outgoing channel of the error node. -func getFailedEdge(route *route.Route, errSource route.Vertex) ( - *EdgeLocator, error) { +// associated with the outgoing channel of the error node. As a second result, +// it returns the amount sent over the edge. +func getFailedEdge(route *route.Route, errSource route.Vertex) (edge, + lnwire.MilliSatoshi, error) { hopCount := len(route.Hops) fromNode := route.SourcePubKey + amt := route.TotalAmount for i, hop := range route.Hops { toNode := hop.PubKeyBytes @@ -2003,17 +2010,18 @@ func getFailedEdge(route *route.Route, errSource route.Vertex) ( // If the errSource is the final hop, we assume that the failing // channel is the incoming channel. if errSource == fromNode || finalHopFailing { - return newEdgeLocatorByPubkeys( - hop.ChannelID, - &fromNode, - &toNode, - ), nil + return edge{ + from: fromNode, + to: toNode, + channel: hop.ChannelID, + }, amt, nil } fromNode = toNode + amt = hop.AmtToForward } - return nil, fmt.Errorf("cannot find error source node in route") + return edge{}, 0, fmt.Errorf("cannot find error source node in route") } // applyChannelUpdate validates a channel update and if valid, applies it to the