From cf3dd3fb945b57505bae7c50c50538c02746127e Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 3 Jun 2019 20:32:07 +0200 Subject: [PATCH 1/5] routing/test: create mission control test context --- routing/missioncontrol_test.go | 87 ++++++++++++++++++++++------------ 1 file changed, 57 insertions(+), 30 deletions(-) diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 19a28828..72c3b07a 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -8,9 +8,25 @@ import ( "github.com/lightningnetwork/lnd/routing/route" ) -// TestMissionControl tests mission control probability estimation. -func TestMissionControl(t *testing.T) { - now := testTime +var ( + mcTestNode = route.Vertex{} + mcTestEdge = EdgeLocator{ + ChannelID: 123, + } + mcTestTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) +) + +type mcTestContext struct { + t *testing.T + mc *MissionControl + now time.Time +} + +func createMcTestContext(t *testing.T) *mcTestContext { + ctx := &mcTestContext{ + t: t, + now: mcTestTime, + } mc := NewMissionControl( nil, nil, nil, &MissionControlConfig{ @@ -18,7 +34,30 @@ func TestMissionControl(t *testing.T) { AprioriHopProbability: 0.8, }, ) - mc.now = func() time.Time { return now } + + mc.now = func() time.Time { return ctx.now } + ctx.mc = mc + + return ctx +} + +// Assert that mission control returns a probability for an edge. +func (ctx *mcTestContext) expectP(amt lnwire.MilliSatoshi, + expected float64) { + + ctx.t.Helper() + + p := ctx.mc.getEdgeProbability(mcTestNode, mcTestEdge, amt) + if p != expected { + ctx.t.Fatalf("unexpected probability %v", p) + } +} + +// TestMissionControl tests mission control probability estimation. +func TestMissionControl(t *testing.T) { + ctx := createMcTestContext(t) + + ctx.now = testTime testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) @@ -27,50 +66,38 @@ func TestMissionControl(t *testing.T) { channel: 123, } - expectP := func(amt lnwire.MilliSatoshi, expected float64) { - t.Helper() - - p := mc.getEdgeProbability( - testNode, EdgeLocator{ChannelID: testEdge.channel}, - amt, - ) - if p != expected { - t.Fatalf("unexpected probability %v", p) - } - } - // Initial probability is expected to be 1. - expectP(1000, 0.8) + ctx.expectP(1000, 0.8) // Expect probability to be zero after reporting the edge as failed. - mc.reportEdgeFailure(testEdge, 1000) - expectP(1000, 0) + ctx.mc.reportEdgeFailure(testEdge, 1000) + ctx.expectP(1000, 0) // As we reported with a min penalization amt, a lower amt than reported // should be unaffected. - expectP(500, 0.8) + ctx.expectP(500, 0.8) // Edge decay started. - now = testTime.Add(30 * time.Minute) - expectP(1000, 0.4) + ctx.now = testTime.Add(30 * time.Minute) + ctx.expectP(1000, 0.4) // Edge fails again, this time without a min penalization amt. The edge // should be penalized regardless of amount. - mc.reportEdgeFailure(testEdge, 0) - expectP(1000, 0) - expectP(500, 0) + ctx.mc.reportEdgeFailure(testEdge, 0) + ctx.expectP(1000, 0) + ctx.expectP(500, 0) // Edge decay started. - now = testTime.Add(60 * time.Minute) - expectP(1000, 0.4) + ctx.now = testTime.Add(60 * time.Minute) + ctx.expectP(1000, 0.4) // A node level failure should bring probability of every channel back // to zero. - mc.reportVertexFailure(testNode) - expectP(1000, 0) + ctx.mc.reportVertexFailure(testNode) + ctx.expectP(1000, 0) // Check whether history snapshot looks sane. - history := mc.GetHistorySnapshot() + history := ctx.mc.GetHistorySnapshot() if len(history.Nodes) != 1 { t.Fatal("unexpected number of nodes") } From 37e275169537d48094fd3051d46b6dcc40545050 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 18 Jun 2019 18:30:56 +0200 Subject: [PATCH 2/5] routing+routerrpc: isolate payment session source from mission control --- lnrpc/routerrpc/config.go | 28 ++++++ lnrpc/routerrpc/config_active.go | 7 +- lnrpc/routerrpc/config_default.go | 7 +- routing/missioncontrol.go | 157 +++--------------------------- routing/missioncontrol_test.go | 10 +- routing/mock_test.go | 17 ++++ routing/payment_session.go | 33 ++++--- routing/payment_session_source.go | 150 ++++++++++++++++++++++++++++ routing/payment_session_test.go | 14 ++- routing/router.go | 28 +++++- routing/router_test.go | 25 +++-- server.go | 23 ++++- 12 files changed, 308 insertions(+), 191 deletions(-) create mode 100644 lnrpc/routerrpc/config.go create mode 100644 routing/payment_session_source.go diff --git a/lnrpc/routerrpc/config.go b/lnrpc/routerrpc/config.go new file mode 100644 index 00000000..15dd6c40 --- /dev/null +++ b/lnrpc/routerrpc/config.go @@ -0,0 +1,28 @@ +package routerrpc + +import ( + "time" + + "github.com/lightningnetwork/lnd/lnwire" +) + +// RoutingConfig contains the configurable parameters that control routing. +type RoutingConfig struct { + // PenaltyHalfLife defines after how much time a penalized node or + // channel is back at 50% probability. + PenaltyHalfLife time.Duration + + // PaymentAttemptPenalty is the virtual cost in path finding weight + // units of executing a payment attempt that fails. It is used to trade + // off potentially better routes against their probability of + // succeeding. + PaymentAttemptPenalty lnwire.MilliSatoshi + + // MinProbability defines the minimum success probability of the + // returned route. + MinRouteProbability float64 + + // AprioriHopProbability is the assumed success probability of a hop in + // a route when no other information is available. + AprioriHopProbability float64 +} diff --git a/lnrpc/routerrpc/config_active.go b/lnrpc/routerrpc/config_active.go index 36877c36..b919364e 100644 --- a/lnrpc/routerrpc/config_active.go +++ b/lnrpc/routerrpc/config_active.go @@ -72,10 +72,9 @@ func DefaultConfig() *Config { } } -// GetMissionControlConfig returns the mission control config based on this sub -// server config. -func GetMissionControlConfig(cfg *Config) *routing.MissionControlConfig { - return &routing.MissionControlConfig{ +// GetRoutingConfig returns the routing config based on this sub server config. +func GetRoutingConfig(cfg *Config) *RoutingConfig { + return &RoutingConfig{ AprioriHopProbability: cfg.AprioriHopProbability, MinRouteProbability: cfg.MinRouteProbability, PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis( diff --git a/lnrpc/routerrpc/config_default.go b/lnrpc/routerrpc/config_default.go index 81c4a577..a3a95021 100644 --- a/lnrpc/routerrpc/config_default.go +++ b/lnrpc/routerrpc/config_default.go @@ -14,10 +14,9 @@ func DefaultConfig() *Config { return &Config{} } -// GetMissionControlConfig returns the mission control config based on this sub -// server config. -func GetMissionControlConfig(cfg *Config) *routing.MissionControlConfig { - return &routing.MissionControlConfig{ +// GetRoutingConfig returns the routing config based on this sub server config. +func GetRoutingConfig(cfg *Config) *RoutingConfig { + return &RoutingConfig{ AprioriHopProbability: routing.DefaultAprioriHopProbability, MinRouteProbability: routing.DefaultMinRouteProbability, PaymentAttemptPenalty: routing.DefaultPaymentAttemptPenalty, diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 17575a72..69a34b65 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,12 +5,10 @@ import ( "sync" "time" - "github.com/btcsuite/btcd/btcec" "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" - "github.com/lightningnetwork/lnd/zpay32" ) const ( @@ -32,12 +30,6 @@ const ( type MissionControl struct { history map[route.Vertex]*nodeHistory - graph *channeldb.ChannelGraph - - selfNode *channeldb.LightningNode - - queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi - // now is expected to return the current time. It is supplied as an // external function to enable deterministic unit tests. now func() time.Time @@ -52,10 +44,6 @@ type MissionControl struct { // TODO(roasbeef): also add favorable metrics for nodes } -// A compile time assertion to ensure MissionControl meets the -// PaymentSessionSource interface. -var _ PaymentSessionSource = (*MissionControl)(nil) - // MissionControlConfig defines parameters that control mission control // behaviour. type MissionControlConfig struct { @@ -63,16 +51,6 @@ type MissionControlConfig struct { // channel is back at 50% probability. PenaltyHalfLife time.Duration - // PaymentAttemptPenalty is the virtual cost in path finding weight - // units of executing a payment attempt that fails. It is used to trade - // off potentially better routes against their probability of - // succeeding. - PaymentAttemptPenalty lnwire.MilliSatoshi - - // MinProbability defines the minimum success probability of the - // returned route. - MinRouteProbability float64 - // AprioriHopProbability is the assumed success probability of a hop in // a route when no other information is available. AprioriHopProbability float64 @@ -143,126 +121,15 @@ type MissionControlChannelSnapshot struct { } // NewMissionControl returns a new instance of missionControl. -// -// TODO(roasbeef): persist memory -func NewMissionControl(g *channeldb.ChannelGraph, selfNode *channeldb.LightningNode, - qb func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi, - cfg *MissionControlConfig) *MissionControl { - +func NewMissionControl(cfg *MissionControlConfig) *MissionControl { log.Debugf("Instantiating mission control with config: "+ - "PenaltyHalfLife=%v, PaymentAttemptPenalty=%v, "+ - "MinRouteProbability=%v, AprioriHopProbability=%v", - cfg.PenaltyHalfLife, - int64(cfg.PaymentAttemptPenalty.ToSatoshis()), - cfg.MinRouteProbability, cfg.AprioriHopProbability) + "PenaltyHalfLife=%v, AprioriHopProbability=%v", + cfg.PenaltyHalfLife, cfg.AprioriHopProbability) return &MissionControl{ - history: make(map[route.Vertex]*nodeHistory), - selfNode: selfNode, - queryBandwidth: qb, - graph: g, - now: time.Now, - cfg: cfg, - } -} - -// NewPaymentSession creates a new payment session backed by the latest prune -// view from Mission Control. An optional set of routing hints can be provided -// in order to populate additional edges to explore when finding a path to the -// payment's destination. -func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint, - target route.Vertex) (PaymentSession, error) { - - edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy) - - // Traverse through all of the available hop hints and include them in - // our edges map, indexed by the public key of the channel's starting - // node. - for _, routeHint := range routeHints { - // If multiple hop hints are provided within a single route - // hint, we'll assume they must be chained together and sorted - // in forward order in order to reach the target successfully. - for i, hopHint := range routeHint { - // In order to determine the end node of this hint, - // we'll need to look at the next hint's start node. If - // we've reached the end of the hints list, we can - // assume we've reached the destination. - endNode := &channeldb.LightningNode{} - if i != len(routeHint)-1 { - endNode.AddPubKey(routeHint[i+1].NodeID) - } else { - targetPubKey, err := btcec.ParsePubKey( - target[:], btcec.S256(), - ) - if err != nil { - return nil, err - } - endNode.AddPubKey(targetPubKey) - } - - // Finally, create the channel edge from the hop hint - // and add it to list of edges corresponding to the node - // at the start of the channel. - edge := &channeldb.ChannelEdgePolicy{ - Node: endNode, - ChannelID: hopHint.ChannelID, - FeeBaseMSat: lnwire.MilliSatoshi( - hopHint.FeeBaseMSat, - ), - FeeProportionalMillionths: lnwire.MilliSatoshi( - hopHint.FeeProportionalMillionths, - ), - TimeLockDelta: hopHint.CLTVExpiryDelta, - } - - v := route.NewVertex(hopHint.NodeID) - edges[v] = append(edges[v], edge) - } - } - - // We'll also obtain a set of bandwidthHints from the lower layer for - // each of our outbound channels. This will allow the path finding to - // skip any links that aren't active or just don't have enough - // bandwidth to carry the payment. - sourceNode, err := m.graph.SourceNode() - if err != nil { - return nil, err - } - bandwidthHints, err := generateBandwidthHints( - sourceNode, m.queryBandwidth, - ) - if err != nil { - return nil, err - } - - return &paymentSession{ - additionalEdges: edges, - bandwidthHints: bandwidthHints, - errFailedPolicyChans: make(map[nodeChannel]struct{}), - mc: m, - pathFinder: findPath, - }, nil -} - -// NewPaymentSessionForRoute creates a new paymentSession instance that is just -// used for failure reporting to missioncontrol. -func (m *MissionControl) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { - return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - mc: m, - preBuiltRoute: preBuiltRoute, - } -} - -// NewPaymentSessionEmpty creates a new paymentSession instance that is empty, -// and will be exhausted immediately. Used for failure reporting to -// missioncontrol for resumed payment we don't want to make more attempts for. -func (m *MissionControl) NewPaymentSessionEmpty() PaymentSession { - return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - mc: m, - preBuiltRoute: &route.Route{}, - preBuiltRouteTried: true, + history: make(map[route.Vertex]*nodeHistory), + now: time.Now, + cfg: cfg, } } @@ -312,9 +179,9 @@ func (m *MissionControl) ResetHistory() { log.Debugf("Mission control history cleared") } -// getEdgeProbability is expected to return the success probability of a payment +// GetEdgeProbability is expected to return the success probability of a payment // from fromNode along edge. -func (m *MissionControl) getEdgeProbability(fromNode route.Vertex, +func (m *MissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, amt lnwire.MilliSatoshi) float64 { m.Lock() @@ -391,8 +258,8 @@ func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHist return node } -// reportVertexFailure reports a node level failure. -func (m *MissionControl) reportVertexFailure(v route.Vertex) { +// ReportVertexFailure reports a node level failure. +func (m *MissionControl) ReportVertexFailure(v route.Vertex) { log.Debugf("Reporting vertex %v failure to Mission Control", v) now := m.now() @@ -404,10 +271,10 @@ func (m *MissionControl) reportVertexFailure(v route.Vertex) { history.lastFail = &now } -// reportEdgeFailure reports a channel level failure. +// ReportEdgeFailure reports a channel level failure. // // TODO(roasbeef): also add value attempted to send and capacity of channel -func (m *MissionControl) reportEdgeFailure(failedEdge edge, +func (m *MissionControl) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { log.Debugf("Reporting channel %v failure to Mission Control", diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 72c3b07a..f6f658b8 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -29,7 +29,7 @@ func createMcTestContext(t *testing.T) *mcTestContext { } mc := NewMissionControl( - nil, nil, nil, &MissionControlConfig{ + &MissionControlConfig{ PenaltyHalfLife: 30 * time.Minute, AprioriHopProbability: 0.8, }, @@ -47,7 +47,7 @@ func (ctx *mcTestContext) expectP(amt lnwire.MilliSatoshi, ctx.t.Helper() - p := ctx.mc.getEdgeProbability(mcTestNode, mcTestEdge, amt) + p := ctx.mc.GetEdgeProbability(mcTestNode, mcTestEdge, amt) if p != expected { ctx.t.Fatalf("unexpected probability %v", p) } @@ -70,7 +70,7 @@ func TestMissionControl(t *testing.T) { ctx.expectP(1000, 0.8) // Expect probability to be zero after reporting the edge as failed. - ctx.mc.reportEdgeFailure(testEdge, 1000) + ctx.mc.ReportEdgeFailure(testEdge, 1000) ctx.expectP(1000, 0) // As we reported with a min penalization amt, a lower amt than reported @@ -83,7 +83,7 @@ func TestMissionControl(t *testing.T) { // Edge fails again, this time without a min penalization amt. The edge // should be penalized regardless of amount. - ctx.mc.reportEdgeFailure(testEdge, 0) + ctx.mc.ReportEdgeFailure(testEdge, 0) ctx.expectP(1000, 0) ctx.expectP(500, 0) @@ -93,7 +93,7 @@ func TestMissionControl(t *testing.T) { // A node level failure should bring probability of every channel back // to zero. - ctx.mc.reportVertexFailure(testNode) + ctx.mc.ReportVertexFailure(testNode) ctx.expectP(1000, 0) // Check whether history snapshot looks sane. diff --git a/routing/mock_test.go b/routing/mock_test.go index f2f7de78..cf0a3f3b 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -93,6 +93,23 @@ func (m *mockPaymentSessionSource) NewPaymentSessionEmpty() PaymentSession { return &mockPaymentSession{} } +type mockMissionControl struct { +} + +var _ MissionController = (*mockMissionControl)(nil) + +func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, + minPenalizeAmt lnwire.MilliSatoshi) { +} + +func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {} + +func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, + amt lnwire.MilliSatoshi) float64 { + + return 0 +} + type mockPaymentSession struct { routes []*route.Route } diff --git a/routing/payment_session.go b/routing/payment_session.go index 7508e30a..4a2c0a5b 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -58,7 +58,7 @@ type paymentSession struct { // require pruning, but any subsequent ones do. errFailedPolicyChans map[nodeChannel]struct{} - mc *MissionControl + sessionSource *SessionSource preBuiltRoute *route.Route preBuiltRouteTried bool @@ -78,7 +78,7 @@ var _ PaymentSession = (*paymentSession)(nil) // // NOTE: Part of the PaymentSession interface. func (p *paymentSession) ReportVertexFailure(v route.Vertex) { - p.mc.reportVertexFailure(v) + p.sessionSource.MissionControl.ReportVertexFailure(v) } // ReportEdgeFailure adds a channel to the graph prune view. The time the @@ -93,7 +93,9 @@ func (p *paymentSession) ReportVertexFailure(v route.Vertex) { func (p *paymentSession) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { - p.mc.reportEdgeFailure(failedEdge, minPenalizeAmt) + p.sessionSource.MissionControl.ReportEdgeFailure( + failedEdge, minPenalizeAmt, + ) } // ReportEdgePolicyFailure handles a failure message that relates to a @@ -169,21 +171,24 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment, // Taking into account this prune view, we'll attempt to locate a path // to our destination, respecting the recommendations from // MissionControl. + ss := p.sessionSource + + restrictions := &RestrictParams{ + ProbabilitySource: ss.MissionControl.GetEdgeProbability, + FeeLimit: payment.FeeLimit, + OutgoingChannelID: payment.OutgoingChannelID, + CltvLimit: cltvLimit, + PaymentAttemptPenalty: ss.PaymentAttemptPenalty, + MinProbability: ss.MinRouteProbability, + } + path, err := p.pathFinder( &graphParams{ - graph: p.mc.graph, + graph: ss.Graph, additionalEdges: p.additionalEdges, bandwidthHints: p.bandwidthHints, }, - &RestrictParams{ - ProbabilitySource: p.mc.getEdgeProbability, - FeeLimit: payment.FeeLimit, - OutgoingChannelID: payment.OutgoingChannelID, - CltvLimit: cltvLimit, - PaymentAttemptPenalty: p.mc.cfg.PaymentAttemptPenalty, - MinProbability: p.mc.cfg.MinRouteProbability, - }, - p.mc.selfNode.PubKeyBytes, payment.Target, + restrictions, ss.SelfNode.PubKeyBytes, payment.Target, payment.Amount, ) if err != nil { @@ -192,7 +197,7 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment, // With the next candidate path found, we'll attempt to turn this into // a route by applying the time-lock and fee requirements. - sourceVertex := route.Vertex(p.mc.selfNode.PubKeyBytes) + sourceVertex := route.Vertex(ss.SelfNode.PubKeyBytes) route, err := newRoute( payment.Amount, sourceVertex, path, height, finalCltvDelta, ) diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go new file mode 100644 index 00000000..69738026 --- /dev/null +++ b/routing/payment_session_source.go @@ -0,0 +1,150 @@ +package routing + +import ( + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/zpay32" +) + +// A compile time assertion to ensure MissionControl meets the +// PaymentSessionSource interface. +var _ PaymentSessionSource = (*SessionSource)(nil) + +// SessionSource defines a source for the router to retrieve new payment +// sessions. +type SessionSource struct { + // Graph is the channel graph that will be used to gather metrics from + // and also to carry out path finding queries. + Graph *channeldb.ChannelGraph + + // QueryBandwidth is a method that allows querying the lower link layer + // to determine the up to date available bandwidth at a prospective link + // to be traversed. If the link isn't available, then a value of zero + // should be returned. Otherwise, the current up to date knowledge of + // the available bandwidth of the link should be returned. + QueryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi + + // SelfNode is our own node. + SelfNode *channeldb.LightningNode + + // MissionControl is a shared memory of sorts that executions of payment + // path finding use in order to remember which vertexes/edges were + // pruned from prior attempts. During payment execution, errors sent by + // nodes are mapped into a vertex or edge to be pruned. Each run will + // then take into account this set of pruned vertexes/edges to reduce + // route failure and pass on graph information gained to the next + // execution. + MissionControl MissionController + + // PaymentAttemptPenalty is the virtual cost in path finding weight + // units of executing a payment attempt that fails. It is used to trade + // off potentially better routes against their probability of + // succeeding. + PaymentAttemptPenalty lnwire.MilliSatoshi + + // MinProbability defines the minimum success probability of the + // returned route. + MinRouteProbability float64 +} + +// NewPaymentSession creates a new payment session backed by the latest prune +// view from Mission Control. An optional set of routing hints can be provided +// in order to populate additional edges to explore when finding a path to the +// payment's destination. +func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, + target route.Vertex) (PaymentSession, error) { + + edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy) + + // Traverse through all of the available hop hints and include them in + // our edges map, indexed by the public key of the channel's starting + // node. + for _, routeHint := range routeHints { + // If multiple hop hints are provided within a single route + // hint, we'll assume they must be chained together and sorted + // in forward order in order to reach the target successfully. + for i, hopHint := range routeHint { + // In order to determine the end node of this hint, + // we'll need to look at the next hint's start node. If + // we've reached the end of the hints list, we can + // assume we've reached the destination. + endNode := &channeldb.LightningNode{} + if i != len(routeHint)-1 { + endNode.AddPubKey(routeHint[i+1].NodeID) + } else { + targetPubKey, err := btcec.ParsePubKey( + target[:], btcec.S256(), + ) + if err != nil { + return nil, err + } + endNode.AddPubKey(targetPubKey) + } + + // Finally, create the channel edge from the hop hint + // and add it to list of edges corresponding to the node + // at the start of the channel. + edge := &channeldb.ChannelEdgePolicy{ + Node: endNode, + ChannelID: hopHint.ChannelID, + FeeBaseMSat: lnwire.MilliSatoshi( + hopHint.FeeBaseMSat, + ), + FeeProportionalMillionths: lnwire.MilliSatoshi( + hopHint.FeeProportionalMillionths, + ), + TimeLockDelta: hopHint.CLTVExpiryDelta, + } + + v := route.NewVertex(hopHint.NodeID) + edges[v] = append(edges[v], edge) + } + } + + // We'll also obtain a set of bandwidthHints from the lower layer for + // each of our outbound channels. This will allow the path finding to + // skip any links that aren't active or just don't have enough + // bandwidth to carry the payment. + sourceNode, err := m.Graph.SourceNode() + if err != nil { + return nil, err + } + bandwidthHints, err := generateBandwidthHints( + sourceNode, m.QueryBandwidth, + ) + if err != nil { + return nil, err + } + + return &paymentSession{ + additionalEdges: edges, + bandwidthHints: bandwidthHints, + errFailedPolicyChans: make(map[nodeChannel]struct{}), + sessionSource: m, + pathFinder: findPath, + }, nil +} + +// NewPaymentSessionForRoute creates a new paymentSession instance that is just +// used for failure reporting to missioncontrol. +func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { + return &paymentSession{ + errFailedPolicyChans: make(map[nodeChannel]struct{}), + sessionSource: m, + preBuiltRoute: preBuiltRoute, + } +} + +// NewPaymentSessionEmpty creates a new paymentSession instance that is empty, +// and will be exhausted immediately. Used for failure reporting to +// missioncontrol for resumed payment we don't want to make more attempts for. +func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession { + return &paymentSession{ + errFailedPolicyChans: make(map[nodeChannel]struct{}), + sessionSource: m, + preBuiltRoute: &route.Route{}, + preBuiltRouteTried: true, + } +} diff --git a/routing/payment_session_test.go b/routing/payment_session_test.go index a5ee7889..33070fbd 100644 --- a/routing/payment_session_test.go +++ b/routing/payment_session_test.go @@ -32,12 +32,16 @@ func TestRequestRoute(t *testing.T) { return path, nil } - session := &paymentSession{ - mc: &MissionControl{ - selfNode: &channeldb.LightningNode{}, - cfg: &MissionControlConfig{}, + sessionSource := &SessionSource{ + SelfNode: &channeldb.LightningNode{}, + MissionControl: &MissionControl{ + cfg: &MissionControlConfig{}, }, - pathFinder: findPath, + } + + session := &paymentSession{ + sessionSource: sessionSource, + pathFinder: findPath, } cltvLimit := uint32(30) diff --git a/routing/router.go b/routing/router.go index 87bb5c75..80cf4753 100644 --- a/routing/router.go +++ b/routing/router.go @@ -171,6 +171,22 @@ type PaymentSessionSource interface { NewPaymentSessionEmpty() PaymentSession } +// MissionController is an interface that exposes failure reporting and +// probability estimation. +type MissionController interface { + // ReportEdgeFailure reports a channel level failure. + ReportEdgeFailure(failedEdge edge, + minPenalizeAmt lnwire.MilliSatoshi) + + // ReportVertexFailure reports a node level failure. + ReportVertexFailure(v route.Vertex) + + // GetEdgeProbability is expected to return the success probability of a + // payment from fromNode along edge. + GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, + amt lnwire.MilliSatoshi) float64 +} + // FeeSchema is the set fee configuration for a Lightning Node on the network. // Using the coefficients described within the schema, the required fee to // forward outgoing payments can be derived. @@ -234,7 +250,11 @@ type Config struct { // Each run will then take into account this set of pruned // vertexes/edges to reduce route failure and pass on graph information // gained to the next execution. - MissionControl PaymentSessionSource + MissionControl MissionController + + // SessionSource defines a source for the router to retrieve new payment + // sessions. + SessionSource PaymentSessionSource // ChannelPruneExpiry is the duration used to determine if a channel // should be pruned or not. If the delta between now and when the @@ -544,7 +564,7 @@ func (r *ChannelRouter) Start() error { // // PayAttemptTime doesn't need to be set, as there is // only a single attempt. - paySession := r.cfg.MissionControl.NewPaymentSessionEmpty() + paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() lPayment := &LightningPayment{ PaymentHash: payment.Info.PaymentHash, @@ -1651,7 +1671,7 @@ func (r *ChannelRouter) preparePayment(payment *LightningPayment) ( // Before starting the HTLC routing attempt, we'll create a fresh // payment session which will report our errors back to mission // control. - paySession, err := r.cfg.MissionControl.NewPaymentSession( + paySession, err := r.cfg.SessionSource.NewPaymentSession( payment.RouteHints, payment.Target, ) if err != nil { @@ -1682,7 +1702,7 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) ( lntypes.Preimage, error) { // Create a payment session for just this route. - paySession := r.cfg.MissionControl.NewPaymentSessionForRoute(route) + paySession := r.cfg.SessionSource.NewPaymentSessionForRoute(route) // Calculate amount paid to receiver. amt := route.TotalAmount - route.TotalFees() diff --git a/routing/router_test.go b/routing/router_test.go index c50dba1c..bc71cfc9 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -91,17 +91,23 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr } mc := NewMissionControl( - graphInstance.graph, selfNode, - func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { - return lnwire.NewMSatFromSatoshis(e.Capacity) - }, &MissionControlConfig{ - MinRouteProbability: 0.01, - PaymentAttemptPenalty: 100, PenaltyHalfLife: time.Hour, AprioriHopProbability: 0.9, }, ) + + sessionSource := &SessionSource{ + Graph: graphInstance.graph, + SelfNode: selfNode, + QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { + return lnwire.NewMSatFromSatoshis(e.Capacity) + }, + MinRouteProbability: 0.01, + PaymentAttemptPenalty: 100, + MissionControl: mc, + } + router, err := New(Config{ Graph: graphInstance.graph, Chain: chain, @@ -109,6 +115,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr Payer: &mockPaymentAttemptDispatcher{}, Control: makeMockControlTower(), MissionControl: mc, + SessionSource: sessionSource, ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { @@ -2940,7 +2947,7 @@ func TestRouterPaymentStateMachine(t *testing.T) { Chain: chain, ChainView: chainView, Control: control, - MissionControl: &mockPaymentSessionSource{}, + SessionSource: &mockPaymentSessionSource{}, Payer: payer, ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, @@ -3004,10 +3011,12 @@ func TestRouterPaymentStateMachine(t *testing.T) { copy(preImage[:], bytes.Repeat([]byte{9}, 32)) - router.cfg.MissionControl = &mockPaymentSessionSource{ + router.cfg.SessionSource = &mockPaymentSessionSource{ routes: test.routes, } + router.cfg.MissionControl = &mockMissionControl{} + // Send the payment. Since this is new payment hash, the // information should be registered with the ControlTower. paymentResult := make(chan error) diff --git a/server.go b/server.go index 88204d81..64b37666 100644 --- a/server.go +++ b/server.go @@ -652,11 +652,29 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, // // TODO(joostjager): When we are further in the process of moving to sub // servers, the mission control instance itself can be moved there too. + routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC) + s.missionControl = routing.NewMissionControl( - chanGraph, selfNode, queryBandwidth, - routerrpc.GetMissionControlConfig(cfg.SubRPCServers.RouterRPC), + &routing.MissionControlConfig{ + AprioriHopProbability: routingConfig.AprioriHopProbability, + PenaltyHalfLife: routingConfig.PenaltyHalfLife, + }, ) + srvrLog.Debugf("Instantiating payment session source with config: "+ + "PaymentAttemptPenalty=%v, MinRouteProbability=%v", + int64(routingConfig.PaymentAttemptPenalty.ToSatoshis()), + routingConfig.MinRouteProbability) + + paymentSessionSource := &routing.SessionSource{ + Graph: chanGraph, + MissionControl: s.missionControl, + QueryBandwidth: queryBandwidth, + SelfNode: selfNode, + PaymentAttemptPenalty: routingConfig.PaymentAttemptPenalty, + MinRouteProbability: routingConfig.MinRouteProbability, + } + paymentControl := channeldb.NewPaymentControl(chanDB) s.controlTower = routing.NewControlTower(paymentControl) @@ -668,6 +686,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, Payer: s.htlcSwitch, Control: s.controlTower, MissionControl: s.missionControl, + SessionSource: paymentSessionSource, ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, GraphPruneInterval: time.Duration(time.Hour), QueryBandwidth: queryBandwidth, From d31efddf1bd6e689a1b502aad045de5a6f49d50a Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 24 Jun 2019 09:08:04 +0200 Subject: [PATCH 3/5] routing: move generateBandwidthHints --- routing/missioncontrol.go | 34 ---------------------------------- routing/router.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 69a34b65..364a0e3a 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -133,40 +133,6 @@ func NewMissionControl(cfg *MissionControlConfig) *MissionControl { } } -// generateBandwidthHints is a helper function that's utilized the main -// findPath function in order to obtain hints from the lower layer w.r.t to the -// available bandwidth of edges on the network. Currently, we'll only obtain -// bandwidth hints for the edges we directly have open ourselves. Obtaining -// these hints allows us to reduce the number of extraneous attempts as we can -// skip channels that are inactive, or just don't have enough bandwidth to -// carry the payment. -func generateBandwidthHints(sourceNode *channeldb.LightningNode, - queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) (map[uint64]lnwire.MilliSatoshi, error) { - - // First, we'll collect the set of outbound edges from the target - // source node. - var localChans []*channeldb.ChannelEdgeInfo - err := sourceNode.ForEachChannel(nil, func(tx *bbolt.Tx, - edgeInfo *channeldb.ChannelEdgeInfo, - _, _ *channeldb.ChannelEdgePolicy) error { - - localChans = append(localChans, edgeInfo) - return nil - }) - if err != nil { - return nil, err - } - - // Now that we have all of our outbound edges, we'll populate the set - // of bandwidth hints, querying the lower switch layer for the most up - // to date values. - bandwidthHints := make(map[uint64]lnwire.MilliSatoshi) - for _, localChan := range localChans { - bandwidthHints[localChan.ChannelID] = queryBandwidth(localChan) - } - - return bandwidthHints, nil -} // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. diff --git a/routing/router.go b/routing/router.go index 80cf4753..d84d5875 100644 --- a/routing/router.go +++ b/routing/router.go @@ -2430,3 +2430,38 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error { return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64()) } + +// generateBandwidthHints is a helper function that's utilized the main +// findPath function in order to obtain hints from the lower layer w.r.t to the +// available bandwidth of edges on the network. Currently, we'll only obtain +// bandwidth hints for the edges we directly have open ourselves. Obtaining +// these hints allows us to reduce the number of extraneous attempts as we can +// skip channels that are inactive, or just don't have enough bandwidth to +// carry the payment. +func generateBandwidthHints(sourceNode *channeldb.LightningNode, + queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) (map[uint64]lnwire.MilliSatoshi, error) { + + // First, we'll collect the set of outbound edges from the target + // source node. + var localChans []*channeldb.ChannelEdgeInfo + err := sourceNode.ForEachChannel(nil, func(tx *bbolt.Tx, + edgeInfo *channeldb.ChannelEdgeInfo, + _, _ *channeldb.ChannelEdgePolicy) error { + + localChans = append(localChans, edgeInfo) + return nil + }) + if err != nil { + return nil, err + } + + // Now that we have all of our outbound edges, we'll populate the set + // of bandwidth hints, querying the lower switch layer for the most up + // to date values. + bandwidthHints := make(map[uint64]lnwire.MilliSatoshi) + for _, localChan := range localChans { + bandwidthHints[localChan.ChannelID] = queryBandwidth(localChan) + } + + return bandwidthHints, nil +} From dc13da5abbfa429273b516abd566f6c6fa5bb200 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 08:39:34 +0200 Subject: [PATCH 4/5] routing: move second chance logic into mission control If nodes return a channel policy related failure, they may get a second chance. Our graph may not be up to date. Previously this logic was contained in the payment session. This commit moves that into global mission control and thereby removes the last mission control state that was kept on the payment level. Because mission control is not aware of the relation between payment attempts and payments, the second chance logic is no longer based tracking second chances given per payment. Instead a time based approach is used. If a node reports a policy failure that prevents forwarding to its peer, it will get a second chance. But it will get it only if the previous second chance was long enough ago. Also those second chances are no longer dependent on whether an associated channel update is valid. It will get the second chance regardless, to prevent creating a dependency between mission control and the graph. This would interfer with (future) replay of history, because the graph may not be the same anymore at that point. --- routing/missioncontrol.go | 90 +++++++++++++++-- routing/missioncontrol_test.go | 22 +++++ routing/mock_test.go | 2 + routing/nodepair.go | 10 ++ routing/payment_session.go | 26 +---- routing/payment_session_source.go | 21 ++-- routing/router.go | 155 ++++++++++++++++-------------- 7 files changed, 213 insertions(+), 113 deletions(-) create mode 100644 routing/nodepair.go diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 364a0e3a..627b1789 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,8 +5,6 @@ import ( "sync" "time" - "github.com/coreos/bbolt" - "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -16,6 +14,30 @@ const ( // half-life duration defines after how much time a penalized node or // channel is back at 50% probability. DefaultPenaltyHalfLife = time.Hour + + // minSecondChanceInterval is the minimum time required between + // second-chance failures. + // + // If nodes return a channel policy related failure, they may get a + // second chance to forward the payment. It could be that the channel + // policy that we are aware of is not up to date. This is especially + // important in case of mobile apps that are mostly offline. + // + // However, we don't want to give nodes the option to endlessly return + // new channel updates so that we are kept busy trying to route through + // that node until the payment loop times out. + // + // Therefore we only grant a second chance to a node if the previous + // second chance is sufficiently long ago. This is what + // minSecondChanceInterval defines. If a second policy failure comes in + // within that interval, we will apply a penalty. + // + // Second chances granted are tracked on the level of node pairs. This + // means that if a node has multiple channels to the same peer, they + // will only get a single second chance to route to that peer again. + // Nodes forward non-strict, so it isn't necessary to apply a less + // restrictive channel level tracking scheme here. + minSecondChanceInterval = time.Minute ) // MissionControl contains state which summarizes the past attempts of HTLC @@ -30,6 +52,10 @@ const ( type MissionControl struct { history map[route.Vertex]*nodeHistory + // lastSecondChance tracks the last time a second chance was granted for + // a directed node pair. + lastSecondChance map[DirectedNodePair]time.Time + // now is expected to return the current time. It is supplied as an // external function to enable deterministic unit tests. now func() time.Time @@ -127,13 +153,13 @@ func NewMissionControl(cfg *MissionControlConfig) *MissionControl { cfg.PenaltyHalfLife, cfg.AprioriHopProbability) return &MissionControl{ - history: make(map[route.Vertex]*nodeHistory), - now: time.Now, - cfg: cfg, + history: make(map[route.Vertex]*nodeHistory), + lastSecondChance: make(map[DirectedNodePair]time.Time), + now: time.Now, + cfg: cfg, } } - // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. func (m *MissionControl) ResetHistory() { @@ -141,6 +167,7 @@ func (m *MissionControl) ResetHistory() { defer m.Unlock() m.history = make(map[route.Vertex]*nodeHistory) + m.lastSecondChance = make(map[DirectedNodePair]time.Time) log.Debugf("Mission control history cleared") } @@ -209,6 +236,37 @@ func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory, return probability } +// requestSecondChance checks whether the node fromNode can have a second chance +// at providing a channel update for its channel with toNode. +func (m *MissionControl) requestSecondChance(timestamp time.Time, + fromNode, toNode route.Vertex) bool { + + // Look up previous second chance time. + pair := DirectedNodePair{ + From: fromNode, + To: toNode, + } + lastSecondChance, ok := m.lastSecondChance[pair] + + // If the channel hasn't already be given a second chance or its last + // second chance was long ago, we give it another chance. + if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval { + m.lastSecondChance[pair] = timestamp + + log.Debugf("Second chance granted for %v->%v", fromNode, toNode) + + return true + } + + // Otherwise penalize the channel, because we don't allow channel + // updates that are that frequent. This is to prevent nodes from keeping + // us busy by continuously sending new channel updates. + log.Debugf("Second chance denied for %v->%v, remaining interval: %v", + fromNode, toNode, timestamp.Sub(lastSecondChance)) + + return false +} + // createHistoryIfNotExists returns the history for the given node. If the node // is yet unknown, it will create an empty history structure. func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory { @@ -237,6 +295,26 @@ func (m *MissionControl) ReportVertexFailure(v route.Vertex) { history.lastFail = &now } +// ReportEdgePolicyFailure reports a policy related failure. +func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) { + now := m.now() + + m.Lock() + defer m.Unlock() + + // We may have an out of date graph. Therefore we don't always penalize + // immediately. If some time has passed since the last policy failure, + // we grant the node a second chance at forwarding the payment. + if m.requestSecondChance( + now, failedEdge.from, failedEdge.to, + ) { + return + } + + history := m.createHistoryIfNotExists(failedEdge.from) + history.lastFail = &now +} + // ReportEdgeFailure reports a channel level failure. // // TODO(roasbeef): also add value attempted to send and capacity of channel diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index f6f658b8..237f81df 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -106,3 +106,25 @@ func TestMissionControl(t *testing.T) { t.Fatal("unexpected number of channels") } } + +// TestMissionControlChannelUpdate tests that the first channel update is not +// penalizing the channel yet. +func TestMissionControlChannelUpdate(t *testing.T) { + ctx := createMcTestContext(t) + + testEdge := edge{ + channel: 123, + } + + // Report a policy related failure. Because it is the first, we don't + // expect a penalty. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0.8) + + // Report another failure for the same channel. We expect it to be + // pruned. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0) +} diff --git a/routing/mock_test.go b/routing/mock_test.go index cf0a3f3b..4c4f97eb 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -102,6 +102,8 @@ func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { } +func (m *mockMissionControl) ReportEdgePolicyFailure(failedEdge edge) {} + func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {} func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, diff --git a/routing/nodepair.go b/routing/nodepair.go new file mode 100644 index 00000000..edec8e02 --- /dev/null +++ b/routing/nodepair.go @@ -0,0 +1,10 @@ +package routing + +import ( + "github.com/lightningnetwork/lnd/routing/route" +) + +// DirectedNodePair stores a directed pair of nodes. +type DirectedNodePair struct { + From, To route.Vertex +} diff --git a/routing/payment_session.go b/routing/payment_session.go index 4a2c0a5b..f138ae31 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -52,12 +52,6 @@ type paymentSession struct { bandwidthHints map[uint64]lnwire.MilliSatoshi - // errFailedFeeChans is a map of the short channel IDs that were the - // source of policy related routing failures during this payment attempt. - // We'll use this map to prune out channels when the first error may not - // require pruning, but any subsequent ones do. - errFailedPolicyChans map[nodeChannel]struct{} - sessionSource *SessionSource preBuiltRoute *route.Route @@ -109,25 +103,7 @@ func (p *paymentSession) ReportEdgeFailure(failedEdge edge, // // TODO(joostjager): Move this logic into global mission control. func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) { - key := nodeChannel{ - node: failedEdge.from, - channel: failedEdge.channel, - } - - // Check to see if we've already reported a policy related failure for - // this channel. If so, then we'll prune out the vertex. - _, ok := p.errFailedPolicyChans[key] - if ok { - // TODO(joostjager): is this aggressive pruning still necessary? - // Just pruning edges may also work unless there is a huge - // number of failing channels from that node? - p.ReportVertexFailure(key.node) - - return - } - - // Finally, we'll record a policy failure from this node and move on. - p.errFailedPolicyChans[key] = struct{}{} + p.sessionSource.MissionControl.ReportEdgePolicyFailure(failedEdge) } // RequestRoute returns a route which is likely to be capable for successfully diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go index 69738026..77dad51e 100644 --- a/routing/payment_session_source.go +++ b/routing/payment_session_source.go @@ -119,11 +119,10 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, } return &paymentSession{ - additionalEdges: edges, - bandwidthHints: bandwidthHints, - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - pathFinder: findPath, + additionalEdges: edges, + bandwidthHints: bandwidthHints, + sessionSource: m, + pathFinder: findPath, }, nil } @@ -131,9 +130,8 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, // used for failure reporting to missioncontrol. func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - preBuiltRoute: preBuiltRoute, + sessionSource: m, + preBuiltRoute: preBuiltRoute, } } @@ -142,9 +140,8 @@ func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) Pa // missioncontrol for resumed payment we don't want to make more attempts for. func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession { return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - sessionSource: m, - preBuiltRoute: &route.Route{}, - preBuiltRouteTried: true, + sessionSource: m, + preBuiltRoute: &route.Route{}, + preBuiltRouteTried: true, } } diff --git a/routing/router.go b/routing/router.go index d84d5875..0fd1caf9 100644 --- a/routing/router.go +++ b/routing/router.go @@ -178,6 +178,9 @@ type MissionController interface { ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) + // ReportEdgePolicyFailure reports a policy related failure. + ReportEdgePolicyFailure(failedEdge edge) + // ReportVertexFailure reports a node level failure. ReportVertexFailure(v route.Vertex) @@ -1826,6 +1829,47 @@ func (r *ChannelRouter) sendPayment( } +// tryApplyChannelUpdate tries to apply a channel update present in the failure +// message if any. +func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, + errorSourceIdx int, failure lnwire.FailureMessage) error { + + // It makes no sense to apply our own channel updates. + if errorSourceIdx == 0 { + log.Errorf("Channel update of ourselves received") + + return nil + } + + // Extract channel update if the error contains one. + update := r.extractChannelUpdate(failure) + if update == nil { + return nil + } + + // Parse pubkey to allow validation of the channel update. This should + // always succeed, otherwise there is something wrong in our + // implementation. Therefore return an error. + errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes + errSource, err := btcec.ParsePubKey( + errVertex[:], btcec.S256(), + ) + if err != nil { + log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v", + errorSourceIdx, errVertex) + + return err + } + + // Apply channel update. + if !r.applyChannelUpdate(update, errSource) { + log.Debugf("Invalid channel update received: node=%x", + errVertex) + } + + return nil +} + // processSendError analyzes the error for the payment attempt received from the // switch and updates mission control and/or channel policies. Depending on the // error type, this error is either the final outcome of the payment or we need @@ -1851,32 +1895,28 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, return true, channeldb.FailureReasonError } - var ( - failureSourceIdx = fErr.FailureSourceIdx + failureMessage := fErr.FailureMessage + failureSourceIdx := fErr.FailureSourceIdx - failureVertex route.Vertex - failureSource *btcec.PublicKey - err error - ) + // Apply channel update if the error contains one. For unknown + // failures, failureMessage is nil. + if failureMessage != nil { + err := r.tryApplyChannelUpdate( + rt, failureSourceIdx, failureMessage, + ) + if err != nil { + return true, channeldb.FailureReasonError + } + } + + var failureVertex route.Vertex // For any non-self failure, look up the source pub key in the hops // slice. Otherwise return the self node pubkey. if failureSourceIdx > 0 { failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes - failureSource, err = btcec.ParsePubKey(failureVertex[:], btcec.S256()) - if err != nil { - log.Errorf("Cannot parse pubkey %v: %v", - failureVertex, err) - - return true, channeldb.FailureReasonError - } } else { failureVertex = r.selfNode.PubKeyBytes - failureSource, err = r.selfNode.PubKey() - if err != nil { - log.Errorf("Cannot parse self pubkey: %v", err) - return true, channeldb.FailureReasonError - } } log.Tracef("Node %x (index %v) reported failure when sending htlc", failureVertex, failureSourceIdx) @@ -1885,41 +1925,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // update with id may not be available. failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) - // processChannelUpdateAndRetry is a closure that - // handles a failure message containing a channel - // update. This function always tries to apply the - // channel update and passes on the result to the - // payment session to adjust its view on the reliability - // of the network. - // - // As channel id, the locally determined channel id is - // used. It does not rely on the channel id that is part - // of the channel update message, because the remote - // node may lie to us or the update may be corrupt. - processChannelUpdateAndRetry := func( - update *lnwire.ChannelUpdate, - pubKey *btcec.PublicKey) { - - // Try to apply the channel update. - updateOk := r.applyChannelUpdate(update, pubKey) - - // If the update could not be applied, prune the - // edge. There is no reason to continue trying - // this channel. - // - // TODO: Could even prune the node completely? - // Or is there a valid reason for the channel - // update to fail? - if !updateOk { - paySession.ReportEdgeFailure( - failedEdge, 0, - ) - } - - paySession.ReportEdgePolicyFailure(failedEdge) - } - - switch onionErr := fErr.FailureMessage.(type) { + switch fErr.FailureMessage.(type) { // If the end destination didn't know the payment // hash or we sent the wrong payment amount to the @@ -1975,7 +1981,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // that sent us this error, as it doesn't now what the // correct block height is. case *lnwire.FailExpiryTooSoon: - r.applyChannelUpdate(&onionErr.Update, failureSource) paySession.ReportVertexFailure(failureVertex) return false, 0 @@ -1996,34 +2001,27 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // amount, we'll apply the new minimum amount and retry // routing. case *lnwire.FailAmountBelowMinimum: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get a failure due to a fee, we'll apply the // new fee update, and retry our attempt using the // newly updated fees. case *lnwire.FailFeeInsufficient: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get the failure for an intermediate node that // disagrees with our time lock values, then we'll // apply the new delta value and try it once more. case *lnwire.FailIncorrectCltvExpiry: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + paySession.ReportEdgePolicyFailure(failedEdge) return false, 0 // The outgoing channel that this node was meant to // forward one is currently disabled, so we'll apply // the update and continue. case *lnwire.FailChannelDisabled: - r.applyChannelUpdate(&onionErr.Update, failureSource) paySession.ReportEdgeFailure(failedEdge, 0) return false, 0 @@ -2031,7 +2029,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // sufficient capacity, so we'll prune this edge for // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: - r.applyChannelUpdate(onionErr.Update, failureSource) paySession.ReportEdgeFailure(failedEdge, failedAmt) return false, 0 @@ -2103,6 +2100,29 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, } } +// extractChannelUpdate examines the error and extracts the channel update. +func (r *ChannelRouter) extractChannelUpdate( + failure lnwire.FailureMessage) *lnwire.ChannelUpdate { + + var update *lnwire.ChannelUpdate + switch onionErr := failure.(type) { + case *lnwire.FailExpiryTooSoon: + update = &onionErr.Update + case *lnwire.FailAmountBelowMinimum: + update = &onionErr.Update + case *lnwire.FailFeeInsufficient: + update = &onionErr.Update + case *lnwire.FailIncorrectCltvExpiry: + update = &onionErr.Update + case *lnwire.FailChannelDisabled: + update = &onionErr.Update + case *lnwire.FailTemporaryChannelFailure: + update = onionErr.Update + } + + return update +} + // getFailedEdge tries to locate the failing channel given a route and the // pubkey of the node that sent the failure. It will assume that the failure is // associated with the outgoing channel of the failing node. As a second result, @@ -2147,11 +2167,6 @@ func getFailedEdge(route *route.Route, failureSource int) (edge, // database. It returns a bool indicating whether the updates was successful. func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, pubKey *btcec.PublicKey) bool { - // If we get passed a nil channel update (as it's optional with some - // onion errors), then we'll exit early with a success result. - if msg == nil { - return true - } ch, _, _, err := r.GetChannelByID(msg.ShortChannelID) if err != nil { From 8055bcf2e0496b3edbc9b0601108911f8a61f03f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 08:52:49 +0200 Subject: [PATCH 5/5] routing: report failures to mission control directly As there is no more state kept in the payment session, failure reporting can go straight to mission control. --- routing/payment_lifecycle.go | 2 +- routing/payment_session.go | 67 ------------------------------------ routing/router.go | 34 +++++++++--------- 3 files changed, 18 insertions(+), 85 deletions(-) diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 32f6883e..4de85b5d 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -343,7 +343,7 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID, func (p *paymentLifecycle) handleSendError(sendErr error) error { final, reason := p.router.processSendError( - p.paySession, &p.attempt.Route, sendErr, + &p.attempt.Route, sendErr, ) if !final { // Save the forwarding error so it can be returned if diff --git a/routing/payment_session.go b/routing/payment_session.go index f138ae31..44abeae9 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -16,27 +16,6 @@ type PaymentSession interface { // specified HTLC payment to the target node. RequestRoute(payment *LightningPayment, height uint32, finalCltvDelta uint16) (*route.Route, error) - - // ReportVertexFailure reports to the PaymentSession that the passsed - // vertex failed to route the previous payment attempt. The - // PaymentSession will use this information to produce a better next - // route. - ReportVertexFailure(v route.Vertex) - - // ReportEdgeFailure reports to the PaymentSession that the passed edge - // failed to route the previous payment attempt. A minimum penalization - // amount is included to attenuate the failure. This is set to a - // non-zero value for channel balance failures. The PaymentSession will - // use this information to produce a better next route. - ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) - - // ReportEdgePolicyFailure reports to the PaymentSession that we - // received a failure message that relates to a channel policy. For - // these types of failures, the PaymentSession can decide whether to to - // keep the edge included in the next attempted route. The - // PaymentSession will use this information to produce a better next - // route. - ReportEdgePolicyFailure(failedEdge edge) } // paymentSession is used during an HTLC routings session to prune the local @@ -60,52 +39,6 @@ type paymentSession struct { pathFinder pathFinder } -// A compile time assertion to ensure paymentSession meets the PaymentSession -// interface. -var _ PaymentSession = (*paymentSession)(nil) - -// ReportVertexFailure adds a vertex to the graph prune view after a client -// reports a routing failure localized to the vertex. The time the vertex was -// added is noted, as it'll be pruned from the shared view after a period of -// vertexDecay. However, the vertex will remain pruned for the *local* session. -// This ensures we don't retry this vertex during the payment attempt. -// -// NOTE: Part of the PaymentSession interface. -func (p *paymentSession) ReportVertexFailure(v route.Vertex) { - p.sessionSource.MissionControl.ReportVertexFailure(v) -} - -// ReportEdgeFailure adds a channel to the graph prune view. The time the -// channel was added is noted, as it'll be pruned from the global view after a -// period of edgeDecay. However, the edge will remain pruned for the duration -// of the *local* session. This ensures that we don't flap by continually -// retrying an edge after its pruning has expired. -// -// TODO(roasbeef): also add value attempted to send and capacity of channel -// -// NOTE: Part of the PaymentSession interface. -func (p *paymentSession) ReportEdgeFailure(failedEdge edge, - minPenalizeAmt lnwire.MilliSatoshi) { - - p.sessionSource.MissionControl.ReportEdgeFailure( - failedEdge, minPenalizeAmt, - ) -} - -// ReportEdgePolicyFailure handles a failure message that relates to a -// channel policy. For these types of failures, the policy is updated and we -// want to keep it included during path finding. This function does mark the -// edge as 'policy failed once'. The next time it fails, the whole node will be -// pruned. This is to prevent nodes from keeping us busy by continuously sending -// new channel updates. -// -// NOTE: Part of the PaymentSession interface. -// -// TODO(joostjager): Move this logic into global mission control. -func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) { - p.sessionSource.MissionControl.ReportEdgePolicyFailure(failedEdge) -} - // RequestRoute returns a route which is likely to be capable for successfully // routing the specified HTLC payment to the target node. Initially the first // set of paths returned from this method may encounter routing failure along diff --git a/routing/router.go b/routing/router.go index 0fd1caf9..4df236d5 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1875,8 +1875,8 @@ func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, // error type, this error is either the final outcome of the payment or we need // to continue with an alternative route. This is indicated by the boolean // return value. -func (r *ChannelRouter) processSendError(paySession PaymentSession, - rt *route.Route, sendErr error) (bool, channeldb.FailureReason) { +func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) ( + bool, channeldb.FailureReason) { // If the failure message could not be decrypted, attribute the failure // to our own outgoing channel. @@ -1981,7 +1981,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // that sent us this error, as it doesn't now what the // correct block height is. case *lnwire.FailExpiryTooSoon: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If we hit an instance of onion payload corruption or an invalid @@ -2001,49 +2001,49 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // amount, we'll apply the new minimum amount and retry // routing. case *lnwire.FailAmountBelowMinimum: - paySession.ReportEdgePolicyFailure(failedEdge) + r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get a failure due to a fee, we'll apply the // new fee update, and retry our attempt using the // newly updated fees. case *lnwire.FailFeeInsufficient: - paySession.ReportEdgePolicyFailure(failedEdge) + r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get the failure for an intermediate node that // disagrees with our time lock values, then we'll // apply the new delta value and try it once more. case *lnwire.FailIncorrectCltvExpiry: - paySession.ReportEdgePolicyFailure(failedEdge) + r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) return false, 0 // The outgoing channel that this node was meant to // forward one is currently disabled, so we'll apply // the update and continue. case *lnwire.FailChannelDisabled: - paySession.ReportEdgeFailure(failedEdge, 0) + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) return false, 0 // It's likely that the outgoing channel didn't have // sufficient capacity, so we'll prune this edge for // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: - paySession.ReportEdgeFailure(failedEdge, failedAmt) + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, failedAmt) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredNodeFeatureMissing: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredChannelFeatureMissing: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If the next hop in the route wasn't known or @@ -2054,18 +2054,18 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // returning errors in order to attempt to black list // another node. case *lnwire.FailUnknownNextPeer: - paySession.ReportEdgeFailure(failedEdge, 0) + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) return false, 0 // If the node wasn't able to forward for which ever // reason, then we'll note this and continue with the // routes. case *lnwire.FailTemporaryNodeFailure: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 case *lnwire.FailPermanentNodeFailure: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If we crafted a route that contains a too long time @@ -2078,15 +2078,15 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // that as a hint during future path finding through // that node. case *lnwire.FailExpiryTooFar: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If we get a permanent channel or node failure, then // we'll prune the channel in both directions and // continue with the rest of the routes. case *lnwire.FailPermanentChannelFailure: - paySession.ReportEdgeFailure(failedEdge, 0) - paySession.ReportEdgeFailure(edge{ + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) + r.cfg.MissionControl.ReportEdgeFailure(edge{ from: failedEdge.to, to: failedEdge.from, channel: failedEdge.channel, @@ -2095,7 +2095,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // Any other failure or an empty failure will get the node pruned. default: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 } }