diff --git a/lnrpc/routerrpc/config.go b/lnrpc/routerrpc/config.go new file mode 100644 index 00000000..15dd6c40 --- /dev/null +++ b/lnrpc/routerrpc/config.go @@ -0,0 +1,28 @@ +package routerrpc + +import ( + "time" + + "github.com/lightningnetwork/lnd/lnwire" +) + +// RoutingConfig contains the configurable parameters that control routing. +type RoutingConfig struct { + // PenaltyHalfLife defines after how much time a penalized node or + // channel is back at 50% probability. + PenaltyHalfLife time.Duration + + // PaymentAttemptPenalty is the virtual cost in path finding weight + // units of executing a payment attempt that fails. It is used to trade + // off potentially better routes against their probability of + // succeeding. + PaymentAttemptPenalty lnwire.MilliSatoshi + + // MinProbability defines the minimum success probability of the + // returned route. + MinRouteProbability float64 + + // AprioriHopProbability is the assumed success probability of a hop in + // a route when no other information is available. + AprioriHopProbability float64 +} diff --git a/lnrpc/routerrpc/config_active.go b/lnrpc/routerrpc/config_active.go index 36877c36..b919364e 100644 --- a/lnrpc/routerrpc/config_active.go +++ b/lnrpc/routerrpc/config_active.go @@ -72,10 +72,9 @@ func DefaultConfig() *Config { } } -// GetMissionControlConfig returns the mission control config based on this sub -// server config. -func GetMissionControlConfig(cfg *Config) *routing.MissionControlConfig { - return &routing.MissionControlConfig{ +// GetRoutingConfig returns the routing config based on this sub server config. +func GetRoutingConfig(cfg *Config) *RoutingConfig { + return &RoutingConfig{ AprioriHopProbability: cfg.AprioriHopProbability, MinRouteProbability: cfg.MinRouteProbability, PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis( diff --git a/lnrpc/routerrpc/config_default.go b/lnrpc/routerrpc/config_default.go index 81c4a577..a3a95021 100644 --- a/lnrpc/routerrpc/config_default.go +++ b/lnrpc/routerrpc/config_default.go @@ -14,10 +14,9 @@ func DefaultConfig() *Config { return &Config{} } -// GetMissionControlConfig returns the mission control config based on this sub -// server config. -func GetMissionControlConfig(cfg *Config) *routing.MissionControlConfig { - return &routing.MissionControlConfig{ +// GetRoutingConfig returns the routing config based on this sub server config. +func GetRoutingConfig(cfg *Config) *RoutingConfig { + return &RoutingConfig{ AprioriHopProbability: routing.DefaultAprioriHopProbability, MinRouteProbability: routing.DefaultMinRouteProbability, PaymentAttemptPenalty: routing.DefaultPaymentAttemptPenalty, diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 17575a72..627b1789 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,12 +5,8 @@ import ( "sync" "time" - "github.com/btcsuite/btcd/btcec" - "github.com/coreos/bbolt" - "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" - "github.com/lightningnetwork/lnd/zpay32" ) const ( @@ -18,6 +14,30 @@ const ( // half-life duration defines after how much time a penalized node or // channel is back at 50% probability. DefaultPenaltyHalfLife = time.Hour + + // minSecondChanceInterval is the minimum time required between + // second-chance failures. + // + // If nodes return a channel policy related failure, they may get a + // second chance to forward the payment. It could be that the channel + // policy that we are aware of is not up to date. This is especially + // important in case of mobile apps that are mostly offline. + // + // However, we don't want to give nodes the option to endlessly return + // new channel updates so that we are kept busy trying to route through + // that node until the payment loop times out. + // + // Therefore we only grant a second chance to a node if the previous + // second chance is sufficiently long ago. This is what + // minSecondChanceInterval defines. If a second policy failure comes in + // within that interval, we will apply a penalty. + // + // Second chances granted are tracked on the level of node pairs. This + // means that if a node has multiple channels to the same peer, they + // will only get a single second chance to route to that peer again. + // Nodes forward non-strict, so it isn't necessary to apply a less + // restrictive channel level tracking scheme here. + minSecondChanceInterval = time.Minute ) // MissionControl contains state which summarizes the past attempts of HTLC @@ -32,11 +52,9 @@ const ( type MissionControl struct { history map[route.Vertex]*nodeHistory - graph *channeldb.ChannelGraph - - selfNode *channeldb.LightningNode - - queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi + // lastSecondChance tracks the last time a second chance was granted for + // a directed node pair. + lastSecondChance map[DirectedNodePair]time.Time // now is expected to return the current time. It is supplied as an // external function to enable deterministic unit tests. @@ -52,10 +70,6 @@ type MissionControl struct { // TODO(roasbeef): also add favorable metrics for nodes } -// A compile time assertion to ensure MissionControl meets the -// PaymentSessionSource interface. -var _ PaymentSessionSource = (*MissionControl)(nil) - // MissionControlConfig defines parameters that control mission control // behaviour. type MissionControlConfig struct { @@ -63,16 +77,6 @@ type MissionControlConfig struct { // channel is back at 50% probability. PenaltyHalfLife time.Duration - // PaymentAttemptPenalty is the virtual cost in path finding weight - // units of executing a payment attempt that fails. It is used to trade - // off potentially better routes against their probability of - // succeeding. - PaymentAttemptPenalty lnwire.MilliSatoshi - - // MinProbability defines the minimum success probability of the - // returned route. - MinRouteProbability float64 - // AprioriHopProbability is the assumed success probability of a hop in // a route when no other information is available. AprioriHopProbability float64 @@ -143,164 +147,19 @@ type MissionControlChannelSnapshot struct { } // NewMissionControl returns a new instance of missionControl. -// -// TODO(roasbeef): persist memory -func NewMissionControl(g *channeldb.ChannelGraph, selfNode *channeldb.LightningNode, - qb func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi, - cfg *MissionControlConfig) *MissionControl { - +func NewMissionControl(cfg *MissionControlConfig) *MissionControl { log.Debugf("Instantiating mission control with config: "+ - "PenaltyHalfLife=%v, PaymentAttemptPenalty=%v, "+ - "MinRouteProbability=%v, AprioriHopProbability=%v", - cfg.PenaltyHalfLife, - int64(cfg.PaymentAttemptPenalty.ToSatoshis()), - cfg.MinRouteProbability, cfg.AprioriHopProbability) + "PenaltyHalfLife=%v, AprioriHopProbability=%v", + cfg.PenaltyHalfLife, cfg.AprioriHopProbability) return &MissionControl{ - history: make(map[route.Vertex]*nodeHistory), - selfNode: selfNode, - queryBandwidth: qb, - graph: g, - now: time.Now, - cfg: cfg, + history: make(map[route.Vertex]*nodeHistory), + lastSecondChance: make(map[DirectedNodePair]time.Time), + now: time.Now, + cfg: cfg, } } -// NewPaymentSession creates a new payment session backed by the latest prune -// view from Mission Control. An optional set of routing hints can be provided -// in order to populate additional edges to explore when finding a path to the -// payment's destination. -func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint, - target route.Vertex) (PaymentSession, error) { - - edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy) - - // Traverse through all of the available hop hints and include them in - // our edges map, indexed by the public key of the channel's starting - // node. - for _, routeHint := range routeHints { - // If multiple hop hints are provided within a single route - // hint, we'll assume they must be chained together and sorted - // in forward order in order to reach the target successfully. - for i, hopHint := range routeHint { - // In order to determine the end node of this hint, - // we'll need to look at the next hint's start node. If - // we've reached the end of the hints list, we can - // assume we've reached the destination. - endNode := &channeldb.LightningNode{} - if i != len(routeHint)-1 { - endNode.AddPubKey(routeHint[i+1].NodeID) - } else { - targetPubKey, err := btcec.ParsePubKey( - target[:], btcec.S256(), - ) - if err != nil { - return nil, err - } - endNode.AddPubKey(targetPubKey) - } - - // Finally, create the channel edge from the hop hint - // and add it to list of edges corresponding to the node - // at the start of the channel. - edge := &channeldb.ChannelEdgePolicy{ - Node: endNode, - ChannelID: hopHint.ChannelID, - FeeBaseMSat: lnwire.MilliSatoshi( - hopHint.FeeBaseMSat, - ), - FeeProportionalMillionths: lnwire.MilliSatoshi( - hopHint.FeeProportionalMillionths, - ), - TimeLockDelta: hopHint.CLTVExpiryDelta, - } - - v := route.NewVertex(hopHint.NodeID) - edges[v] = append(edges[v], edge) - } - } - - // We'll also obtain a set of bandwidthHints from the lower layer for - // each of our outbound channels. This will allow the path finding to - // skip any links that aren't active or just don't have enough - // bandwidth to carry the payment. - sourceNode, err := m.graph.SourceNode() - if err != nil { - return nil, err - } - bandwidthHints, err := generateBandwidthHints( - sourceNode, m.queryBandwidth, - ) - if err != nil { - return nil, err - } - - return &paymentSession{ - additionalEdges: edges, - bandwidthHints: bandwidthHints, - errFailedPolicyChans: make(map[nodeChannel]struct{}), - mc: m, - pathFinder: findPath, - }, nil -} - -// NewPaymentSessionForRoute creates a new paymentSession instance that is just -// used for failure reporting to missioncontrol. -func (m *MissionControl) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { - return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - mc: m, - preBuiltRoute: preBuiltRoute, - } -} - -// NewPaymentSessionEmpty creates a new paymentSession instance that is empty, -// and will be exhausted immediately. Used for failure reporting to -// missioncontrol for resumed payment we don't want to make more attempts for. -func (m *MissionControl) NewPaymentSessionEmpty() PaymentSession { - return &paymentSession{ - errFailedPolicyChans: make(map[nodeChannel]struct{}), - mc: m, - preBuiltRoute: &route.Route{}, - preBuiltRouteTried: true, - } -} - -// generateBandwidthHints is a helper function that's utilized the main -// findPath function in order to obtain hints from the lower layer w.r.t to the -// available bandwidth of edges on the network. Currently, we'll only obtain -// bandwidth hints for the edges we directly have open ourselves. Obtaining -// these hints allows us to reduce the number of extraneous attempts as we can -// skip channels that are inactive, or just don't have enough bandwidth to -// carry the payment. -func generateBandwidthHints(sourceNode *channeldb.LightningNode, - queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) (map[uint64]lnwire.MilliSatoshi, error) { - - // First, we'll collect the set of outbound edges from the target - // source node. - var localChans []*channeldb.ChannelEdgeInfo - err := sourceNode.ForEachChannel(nil, func(tx *bbolt.Tx, - edgeInfo *channeldb.ChannelEdgeInfo, - _, _ *channeldb.ChannelEdgePolicy) error { - - localChans = append(localChans, edgeInfo) - return nil - }) - if err != nil { - return nil, err - } - - // Now that we have all of our outbound edges, we'll populate the set - // of bandwidth hints, querying the lower switch layer for the most up - // to date values. - bandwidthHints := make(map[uint64]lnwire.MilliSatoshi) - for _, localChan := range localChans { - bandwidthHints[localChan.ChannelID] = queryBandwidth(localChan) - } - - return bandwidthHints, nil -} - // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. func (m *MissionControl) ResetHistory() { @@ -308,13 +167,14 @@ func (m *MissionControl) ResetHistory() { defer m.Unlock() m.history = make(map[route.Vertex]*nodeHistory) + m.lastSecondChance = make(map[DirectedNodePair]time.Time) log.Debugf("Mission control history cleared") } -// getEdgeProbability is expected to return the success probability of a payment +// GetEdgeProbability is expected to return the success probability of a payment // from fromNode along edge. -func (m *MissionControl) getEdgeProbability(fromNode route.Vertex, +func (m *MissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, amt lnwire.MilliSatoshi) float64 { m.Lock() @@ -376,6 +236,37 @@ func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory, return probability } +// requestSecondChance checks whether the node fromNode can have a second chance +// at providing a channel update for its channel with toNode. +func (m *MissionControl) requestSecondChance(timestamp time.Time, + fromNode, toNode route.Vertex) bool { + + // Look up previous second chance time. + pair := DirectedNodePair{ + From: fromNode, + To: toNode, + } + lastSecondChance, ok := m.lastSecondChance[pair] + + // If the channel hasn't already be given a second chance or its last + // second chance was long ago, we give it another chance. + if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval { + m.lastSecondChance[pair] = timestamp + + log.Debugf("Second chance granted for %v->%v", fromNode, toNode) + + return true + } + + // Otherwise penalize the channel, because we don't allow channel + // updates that are that frequent. This is to prevent nodes from keeping + // us busy by continuously sending new channel updates. + log.Debugf("Second chance denied for %v->%v, remaining interval: %v", + fromNode, toNode, timestamp.Sub(lastSecondChance)) + + return false +} + // createHistoryIfNotExists returns the history for the given node. If the node // is yet unknown, it will create an empty history structure. func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory { @@ -391,8 +282,8 @@ func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHist return node } -// reportVertexFailure reports a node level failure. -func (m *MissionControl) reportVertexFailure(v route.Vertex) { +// ReportVertexFailure reports a node level failure. +func (m *MissionControl) ReportVertexFailure(v route.Vertex) { log.Debugf("Reporting vertex %v failure to Mission Control", v) now := m.now() @@ -404,10 +295,30 @@ func (m *MissionControl) reportVertexFailure(v route.Vertex) { history.lastFail = &now } -// reportEdgeFailure reports a channel level failure. +// ReportEdgePolicyFailure reports a policy related failure. +func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) { + now := m.now() + + m.Lock() + defer m.Unlock() + + // We may have an out of date graph. Therefore we don't always penalize + // immediately. If some time has passed since the last policy failure, + // we grant the node a second chance at forwarding the payment. + if m.requestSecondChance( + now, failedEdge.from, failedEdge.to, + ) { + return + } + + history := m.createHistoryIfNotExists(failedEdge.from) + history.lastFail = &now +} + +// ReportEdgeFailure reports a channel level failure. // // TODO(roasbeef): also add value attempted to send and capacity of channel -func (m *MissionControl) reportEdgeFailure(failedEdge edge, +func (m *MissionControl) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { log.Debugf("Reporting channel %v failure to Mission Control", diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 19a28828..237f81df 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -8,17 +8,56 @@ import ( "github.com/lightningnetwork/lnd/routing/route" ) -// TestMissionControl tests mission control probability estimation. -func TestMissionControl(t *testing.T) { - now := testTime +var ( + mcTestNode = route.Vertex{} + mcTestEdge = EdgeLocator{ + ChannelID: 123, + } + mcTestTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) +) + +type mcTestContext struct { + t *testing.T + mc *MissionControl + now time.Time +} + +func createMcTestContext(t *testing.T) *mcTestContext { + ctx := &mcTestContext{ + t: t, + now: mcTestTime, + } mc := NewMissionControl( - nil, nil, nil, &MissionControlConfig{ + &MissionControlConfig{ PenaltyHalfLife: 30 * time.Minute, AprioriHopProbability: 0.8, }, ) - mc.now = func() time.Time { return now } + + mc.now = func() time.Time { return ctx.now } + ctx.mc = mc + + return ctx +} + +// Assert that mission control returns a probability for an edge. +func (ctx *mcTestContext) expectP(amt lnwire.MilliSatoshi, + expected float64) { + + ctx.t.Helper() + + p := ctx.mc.GetEdgeProbability(mcTestNode, mcTestEdge, amt) + if p != expected { + ctx.t.Fatalf("unexpected probability %v", p) + } +} + +// TestMissionControl tests mission control probability estimation. +func TestMissionControl(t *testing.T) { + ctx := createMcTestContext(t) + + ctx.now = testTime testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) @@ -27,50 +66,38 @@ func TestMissionControl(t *testing.T) { channel: 123, } - expectP := func(amt lnwire.MilliSatoshi, expected float64) { - t.Helper() - - p := mc.getEdgeProbability( - testNode, EdgeLocator{ChannelID: testEdge.channel}, - amt, - ) - if p != expected { - t.Fatalf("unexpected probability %v", p) - } - } - // Initial probability is expected to be 1. - expectP(1000, 0.8) + ctx.expectP(1000, 0.8) // Expect probability to be zero after reporting the edge as failed. - mc.reportEdgeFailure(testEdge, 1000) - expectP(1000, 0) + ctx.mc.ReportEdgeFailure(testEdge, 1000) + ctx.expectP(1000, 0) // As we reported with a min penalization amt, a lower amt than reported // should be unaffected. - expectP(500, 0.8) + ctx.expectP(500, 0.8) // Edge decay started. - now = testTime.Add(30 * time.Minute) - expectP(1000, 0.4) + ctx.now = testTime.Add(30 * time.Minute) + ctx.expectP(1000, 0.4) // Edge fails again, this time without a min penalization amt. The edge // should be penalized regardless of amount. - mc.reportEdgeFailure(testEdge, 0) - expectP(1000, 0) - expectP(500, 0) + ctx.mc.ReportEdgeFailure(testEdge, 0) + ctx.expectP(1000, 0) + ctx.expectP(500, 0) // Edge decay started. - now = testTime.Add(60 * time.Minute) - expectP(1000, 0.4) + ctx.now = testTime.Add(60 * time.Minute) + ctx.expectP(1000, 0.4) // A node level failure should bring probability of every channel back // to zero. - mc.reportVertexFailure(testNode) - expectP(1000, 0) + ctx.mc.ReportVertexFailure(testNode) + ctx.expectP(1000, 0) // Check whether history snapshot looks sane. - history := mc.GetHistorySnapshot() + history := ctx.mc.GetHistorySnapshot() if len(history.Nodes) != 1 { t.Fatal("unexpected number of nodes") } @@ -79,3 +106,25 @@ func TestMissionControl(t *testing.T) { t.Fatal("unexpected number of channels") } } + +// TestMissionControlChannelUpdate tests that the first channel update is not +// penalizing the channel yet. +func TestMissionControlChannelUpdate(t *testing.T) { + ctx := createMcTestContext(t) + + testEdge := edge{ + channel: 123, + } + + // Report a policy related failure. Because it is the first, we don't + // expect a penalty. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0.8) + + // Report another failure for the same channel. We expect it to be + // pruned. + ctx.mc.ReportEdgePolicyFailure(testEdge) + + ctx.expectP(0, 0) +} diff --git a/routing/mock_test.go b/routing/mock_test.go index f2f7de78..4c4f97eb 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -93,6 +93,25 @@ func (m *mockPaymentSessionSource) NewPaymentSessionEmpty() PaymentSession { return &mockPaymentSession{} } +type mockMissionControl struct { +} + +var _ MissionController = (*mockMissionControl)(nil) + +func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, + minPenalizeAmt lnwire.MilliSatoshi) { +} + +func (m *mockMissionControl) ReportEdgePolicyFailure(failedEdge edge) {} + +func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {} + +func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, + amt lnwire.MilliSatoshi) float64 { + + return 0 +} + type mockPaymentSession struct { routes []*route.Route } diff --git a/routing/nodepair.go b/routing/nodepair.go new file mode 100644 index 00000000..edec8e02 --- /dev/null +++ b/routing/nodepair.go @@ -0,0 +1,10 @@ +package routing + +import ( + "github.com/lightningnetwork/lnd/routing/route" +) + +// DirectedNodePair stores a directed pair of nodes. +type DirectedNodePair struct { + From, To route.Vertex +} diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 32f6883e..4de85b5d 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -343,7 +343,7 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID, func (p *paymentLifecycle) handleSendError(sendErr error) error { final, reason := p.router.processSendError( - p.paySession, &p.attempt.Route, sendErr, + &p.attempt.Route, sendErr, ) if !final { // Save the forwarding error so it can be returned if diff --git a/routing/payment_session.go b/routing/payment_session.go index 7508e30a..44abeae9 100644 --- a/routing/payment_session.go +++ b/routing/payment_session.go @@ -16,27 +16,6 @@ type PaymentSession interface { // specified HTLC payment to the target node. RequestRoute(payment *LightningPayment, height uint32, finalCltvDelta uint16) (*route.Route, error) - - // ReportVertexFailure reports to the PaymentSession that the passsed - // vertex failed to route the previous payment attempt. The - // PaymentSession will use this information to produce a better next - // route. - ReportVertexFailure(v route.Vertex) - - // ReportEdgeFailure reports to the PaymentSession that the passed edge - // failed to route the previous payment attempt. A minimum penalization - // amount is included to attenuate the failure. This is set to a - // non-zero value for channel balance failures. The PaymentSession will - // use this information to produce a better next route. - ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) - - // ReportEdgePolicyFailure reports to the PaymentSession that we - // received a failure message that relates to a channel policy. For - // these types of failures, the PaymentSession can decide whether to to - // keep the edge included in the next attempted route. The - // PaymentSession will use this information to produce a better next - // route. - ReportEdgePolicyFailure(failedEdge edge) } // paymentSession is used during an HTLC routings session to prune the local @@ -52,13 +31,7 @@ type paymentSession struct { bandwidthHints map[uint64]lnwire.MilliSatoshi - // errFailedFeeChans is a map of the short channel IDs that were the - // source of policy related routing failures during this payment attempt. - // We'll use this map to prune out channels when the first error may not - // require pruning, but any subsequent ones do. - errFailedPolicyChans map[nodeChannel]struct{} - - mc *MissionControl + sessionSource *SessionSource preBuiltRoute *route.Route preBuiltRouteTried bool @@ -66,68 +39,6 @@ type paymentSession struct { pathFinder pathFinder } -// A compile time assertion to ensure paymentSession meets the PaymentSession -// interface. -var _ PaymentSession = (*paymentSession)(nil) - -// ReportVertexFailure adds a vertex to the graph prune view after a client -// reports a routing failure localized to the vertex. The time the vertex was -// added is noted, as it'll be pruned from the shared view after a period of -// vertexDecay. However, the vertex will remain pruned for the *local* session. -// This ensures we don't retry this vertex during the payment attempt. -// -// NOTE: Part of the PaymentSession interface. -func (p *paymentSession) ReportVertexFailure(v route.Vertex) { - p.mc.reportVertexFailure(v) -} - -// ReportEdgeFailure adds a channel to the graph prune view. The time the -// channel was added is noted, as it'll be pruned from the global view after a -// period of edgeDecay. However, the edge will remain pruned for the duration -// of the *local* session. This ensures that we don't flap by continually -// retrying an edge after its pruning has expired. -// -// TODO(roasbeef): also add value attempted to send and capacity of channel -// -// NOTE: Part of the PaymentSession interface. -func (p *paymentSession) ReportEdgeFailure(failedEdge edge, - minPenalizeAmt lnwire.MilliSatoshi) { - - p.mc.reportEdgeFailure(failedEdge, minPenalizeAmt) -} - -// ReportEdgePolicyFailure handles a failure message that relates to a -// channel policy. For these types of failures, the policy is updated and we -// want to keep it included during path finding. This function does mark the -// edge as 'policy failed once'. The next time it fails, the whole node will be -// pruned. This is to prevent nodes from keeping us busy by continuously sending -// new channel updates. -// -// NOTE: Part of the PaymentSession interface. -// -// TODO(joostjager): Move this logic into global mission control. -func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) { - key := nodeChannel{ - node: failedEdge.from, - channel: failedEdge.channel, - } - - // Check to see if we've already reported a policy related failure for - // this channel. If so, then we'll prune out the vertex. - _, ok := p.errFailedPolicyChans[key] - if ok { - // TODO(joostjager): is this aggressive pruning still necessary? - // Just pruning edges may also work unless there is a huge - // number of failing channels from that node? - p.ReportVertexFailure(key.node) - - return - } - - // Finally, we'll record a policy failure from this node and move on. - p.errFailedPolicyChans[key] = struct{}{} -} - // RequestRoute returns a route which is likely to be capable for successfully // routing the specified HTLC payment to the target node. Initially the first // set of paths returned from this method may encounter routing failure along @@ -169,21 +80,24 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment, // Taking into account this prune view, we'll attempt to locate a path // to our destination, respecting the recommendations from // MissionControl. + ss := p.sessionSource + + restrictions := &RestrictParams{ + ProbabilitySource: ss.MissionControl.GetEdgeProbability, + FeeLimit: payment.FeeLimit, + OutgoingChannelID: payment.OutgoingChannelID, + CltvLimit: cltvLimit, + PaymentAttemptPenalty: ss.PaymentAttemptPenalty, + MinProbability: ss.MinRouteProbability, + } + path, err := p.pathFinder( &graphParams{ - graph: p.mc.graph, + graph: ss.Graph, additionalEdges: p.additionalEdges, bandwidthHints: p.bandwidthHints, }, - &RestrictParams{ - ProbabilitySource: p.mc.getEdgeProbability, - FeeLimit: payment.FeeLimit, - OutgoingChannelID: payment.OutgoingChannelID, - CltvLimit: cltvLimit, - PaymentAttemptPenalty: p.mc.cfg.PaymentAttemptPenalty, - MinProbability: p.mc.cfg.MinRouteProbability, - }, - p.mc.selfNode.PubKeyBytes, payment.Target, + restrictions, ss.SelfNode.PubKeyBytes, payment.Target, payment.Amount, ) if err != nil { @@ -192,7 +106,7 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment, // With the next candidate path found, we'll attempt to turn this into // a route by applying the time-lock and fee requirements. - sourceVertex := route.Vertex(p.mc.selfNode.PubKeyBytes) + sourceVertex := route.Vertex(ss.SelfNode.PubKeyBytes) route, err := newRoute( payment.Amount, sourceVertex, path, height, finalCltvDelta, ) diff --git a/routing/payment_session_source.go b/routing/payment_session_source.go new file mode 100644 index 00000000..77dad51e --- /dev/null +++ b/routing/payment_session_source.go @@ -0,0 +1,147 @@ +package routing + +import ( + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/zpay32" +) + +// A compile time assertion to ensure MissionControl meets the +// PaymentSessionSource interface. +var _ PaymentSessionSource = (*SessionSource)(nil) + +// SessionSource defines a source for the router to retrieve new payment +// sessions. +type SessionSource struct { + // Graph is the channel graph that will be used to gather metrics from + // and also to carry out path finding queries. + Graph *channeldb.ChannelGraph + + // QueryBandwidth is a method that allows querying the lower link layer + // to determine the up to date available bandwidth at a prospective link + // to be traversed. If the link isn't available, then a value of zero + // should be returned. Otherwise, the current up to date knowledge of + // the available bandwidth of the link should be returned. + QueryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi + + // SelfNode is our own node. + SelfNode *channeldb.LightningNode + + // MissionControl is a shared memory of sorts that executions of payment + // path finding use in order to remember which vertexes/edges were + // pruned from prior attempts. During payment execution, errors sent by + // nodes are mapped into a vertex or edge to be pruned. Each run will + // then take into account this set of pruned vertexes/edges to reduce + // route failure and pass on graph information gained to the next + // execution. + MissionControl MissionController + + // PaymentAttemptPenalty is the virtual cost in path finding weight + // units of executing a payment attempt that fails. It is used to trade + // off potentially better routes against their probability of + // succeeding. + PaymentAttemptPenalty lnwire.MilliSatoshi + + // MinProbability defines the minimum success probability of the + // returned route. + MinRouteProbability float64 +} + +// NewPaymentSession creates a new payment session backed by the latest prune +// view from Mission Control. An optional set of routing hints can be provided +// in order to populate additional edges to explore when finding a path to the +// payment's destination. +func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint, + target route.Vertex) (PaymentSession, error) { + + edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy) + + // Traverse through all of the available hop hints and include them in + // our edges map, indexed by the public key of the channel's starting + // node. + for _, routeHint := range routeHints { + // If multiple hop hints are provided within a single route + // hint, we'll assume they must be chained together and sorted + // in forward order in order to reach the target successfully. + for i, hopHint := range routeHint { + // In order to determine the end node of this hint, + // we'll need to look at the next hint's start node. If + // we've reached the end of the hints list, we can + // assume we've reached the destination. + endNode := &channeldb.LightningNode{} + if i != len(routeHint)-1 { + endNode.AddPubKey(routeHint[i+1].NodeID) + } else { + targetPubKey, err := btcec.ParsePubKey( + target[:], btcec.S256(), + ) + if err != nil { + return nil, err + } + endNode.AddPubKey(targetPubKey) + } + + // Finally, create the channel edge from the hop hint + // and add it to list of edges corresponding to the node + // at the start of the channel. + edge := &channeldb.ChannelEdgePolicy{ + Node: endNode, + ChannelID: hopHint.ChannelID, + FeeBaseMSat: lnwire.MilliSatoshi( + hopHint.FeeBaseMSat, + ), + FeeProportionalMillionths: lnwire.MilliSatoshi( + hopHint.FeeProportionalMillionths, + ), + TimeLockDelta: hopHint.CLTVExpiryDelta, + } + + v := route.NewVertex(hopHint.NodeID) + edges[v] = append(edges[v], edge) + } + } + + // We'll also obtain a set of bandwidthHints from the lower layer for + // each of our outbound channels. This will allow the path finding to + // skip any links that aren't active or just don't have enough + // bandwidth to carry the payment. + sourceNode, err := m.Graph.SourceNode() + if err != nil { + return nil, err + } + bandwidthHints, err := generateBandwidthHints( + sourceNode, m.QueryBandwidth, + ) + if err != nil { + return nil, err + } + + return &paymentSession{ + additionalEdges: edges, + bandwidthHints: bandwidthHints, + sessionSource: m, + pathFinder: findPath, + }, nil +} + +// NewPaymentSessionForRoute creates a new paymentSession instance that is just +// used for failure reporting to missioncontrol. +func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { + return &paymentSession{ + sessionSource: m, + preBuiltRoute: preBuiltRoute, + } +} + +// NewPaymentSessionEmpty creates a new paymentSession instance that is empty, +// and will be exhausted immediately. Used for failure reporting to +// missioncontrol for resumed payment we don't want to make more attempts for. +func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession { + return &paymentSession{ + sessionSource: m, + preBuiltRoute: &route.Route{}, + preBuiltRouteTried: true, + } +} diff --git a/routing/payment_session_test.go b/routing/payment_session_test.go index a5ee7889..33070fbd 100644 --- a/routing/payment_session_test.go +++ b/routing/payment_session_test.go @@ -32,12 +32,16 @@ func TestRequestRoute(t *testing.T) { return path, nil } - session := &paymentSession{ - mc: &MissionControl{ - selfNode: &channeldb.LightningNode{}, - cfg: &MissionControlConfig{}, + sessionSource := &SessionSource{ + SelfNode: &channeldb.LightningNode{}, + MissionControl: &MissionControl{ + cfg: &MissionControlConfig{}, }, - pathFinder: findPath, + } + + session := &paymentSession{ + sessionSource: sessionSource, + pathFinder: findPath, } cltvLimit := uint32(30) diff --git a/routing/router.go b/routing/router.go index 87bb5c75..4df236d5 100644 --- a/routing/router.go +++ b/routing/router.go @@ -171,6 +171,25 @@ type PaymentSessionSource interface { NewPaymentSessionEmpty() PaymentSession } +// MissionController is an interface that exposes failure reporting and +// probability estimation. +type MissionController interface { + // ReportEdgeFailure reports a channel level failure. + ReportEdgeFailure(failedEdge edge, + minPenalizeAmt lnwire.MilliSatoshi) + + // ReportEdgePolicyFailure reports a policy related failure. + ReportEdgePolicyFailure(failedEdge edge) + + // ReportVertexFailure reports a node level failure. + ReportVertexFailure(v route.Vertex) + + // GetEdgeProbability is expected to return the success probability of a + // payment from fromNode along edge. + GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, + amt lnwire.MilliSatoshi) float64 +} + // FeeSchema is the set fee configuration for a Lightning Node on the network. // Using the coefficients described within the schema, the required fee to // forward outgoing payments can be derived. @@ -234,7 +253,11 @@ type Config struct { // Each run will then take into account this set of pruned // vertexes/edges to reduce route failure and pass on graph information // gained to the next execution. - MissionControl PaymentSessionSource + MissionControl MissionController + + // SessionSource defines a source for the router to retrieve new payment + // sessions. + SessionSource PaymentSessionSource // ChannelPruneExpiry is the duration used to determine if a channel // should be pruned or not. If the delta between now and when the @@ -544,7 +567,7 @@ func (r *ChannelRouter) Start() error { // // PayAttemptTime doesn't need to be set, as there is // only a single attempt. - paySession := r.cfg.MissionControl.NewPaymentSessionEmpty() + paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() lPayment := &LightningPayment{ PaymentHash: payment.Info.PaymentHash, @@ -1651,7 +1674,7 @@ func (r *ChannelRouter) preparePayment(payment *LightningPayment) ( // Before starting the HTLC routing attempt, we'll create a fresh // payment session which will report our errors back to mission // control. - paySession, err := r.cfg.MissionControl.NewPaymentSession( + paySession, err := r.cfg.SessionSource.NewPaymentSession( payment.RouteHints, payment.Target, ) if err != nil { @@ -1682,7 +1705,7 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) ( lntypes.Preimage, error) { // Create a payment session for just this route. - paySession := r.cfg.MissionControl.NewPaymentSessionForRoute(route) + paySession := r.cfg.SessionSource.NewPaymentSessionForRoute(route) // Calculate amount paid to receiver. amt := route.TotalAmount - route.TotalFees() @@ -1806,13 +1829,54 @@ func (r *ChannelRouter) sendPayment( } +// tryApplyChannelUpdate tries to apply a channel update present in the failure +// message if any. +func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, + errorSourceIdx int, failure lnwire.FailureMessage) error { + + // It makes no sense to apply our own channel updates. + if errorSourceIdx == 0 { + log.Errorf("Channel update of ourselves received") + + return nil + } + + // Extract channel update if the error contains one. + update := r.extractChannelUpdate(failure) + if update == nil { + return nil + } + + // Parse pubkey to allow validation of the channel update. This should + // always succeed, otherwise there is something wrong in our + // implementation. Therefore return an error. + errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes + errSource, err := btcec.ParsePubKey( + errVertex[:], btcec.S256(), + ) + if err != nil { + log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v", + errorSourceIdx, errVertex) + + return err + } + + // Apply channel update. + if !r.applyChannelUpdate(update, errSource) { + log.Debugf("Invalid channel update received: node=%x", + errVertex) + } + + return nil +} + // processSendError analyzes the error for the payment attempt received from the // switch and updates mission control and/or channel policies. Depending on the // error type, this error is either the final outcome of the payment or we need // to continue with an alternative route. This is indicated by the boolean // return value. -func (r *ChannelRouter) processSendError(paySession PaymentSession, - rt *route.Route, sendErr error) (bool, channeldb.FailureReason) { +func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) ( + bool, channeldb.FailureReason) { // If the failure message could not be decrypted, attribute the failure // to our own outgoing channel. @@ -1831,32 +1895,28 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, return true, channeldb.FailureReasonError } - var ( - failureSourceIdx = fErr.FailureSourceIdx + failureMessage := fErr.FailureMessage + failureSourceIdx := fErr.FailureSourceIdx - failureVertex route.Vertex - failureSource *btcec.PublicKey - err error - ) + // Apply channel update if the error contains one. For unknown + // failures, failureMessage is nil. + if failureMessage != nil { + err := r.tryApplyChannelUpdate( + rt, failureSourceIdx, failureMessage, + ) + if err != nil { + return true, channeldb.FailureReasonError + } + } + + var failureVertex route.Vertex // For any non-self failure, look up the source pub key in the hops // slice. Otherwise return the self node pubkey. if failureSourceIdx > 0 { failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes - failureSource, err = btcec.ParsePubKey(failureVertex[:], btcec.S256()) - if err != nil { - log.Errorf("Cannot parse pubkey %v: %v", - failureVertex, err) - - return true, channeldb.FailureReasonError - } } else { failureVertex = r.selfNode.PubKeyBytes - failureSource, err = r.selfNode.PubKey() - if err != nil { - log.Errorf("Cannot parse self pubkey: %v", err) - return true, channeldb.FailureReasonError - } } log.Tracef("Node %x (index %v) reported failure when sending htlc", failureVertex, failureSourceIdx) @@ -1865,41 +1925,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // update with id may not be available. failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) - // processChannelUpdateAndRetry is a closure that - // handles a failure message containing a channel - // update. This function always tries to apply the - // channel update and passes on the result to the - // payment session to adjust its view on the reliability - // of the network. - // - // As channel id, the locally determined channel id is - // used. It does not rely on the channel id that is part - // of the channel update message, because the remote - // node may lie to us or the update may be corrupt. - processChannelUpdateAndRetry := func( - update *lnwire.ChannelUpdate, - pubKey *btcec.PublicKey) { - - // Try to apply the channel update. - updateOk := r.applyChannelUpdate(update, pubKey) - - // If the update could not be applied, prune the - // edge. There is no reason to continue trying - // this channel. - // - // TODO: Could even prune the node completely? - // Or is there a valid reason for the channel - // update to fail? - if !updateOk { - paySession.ReportEdgeFailure( - failedEdge, 0, - ) - } - - paySession.ReportEdgePolicyFailure(failedEdge) - } - - switch onionErr := fErr.FailureMessage.(type) { + switch fErr.FailureMessage.(type) { // If the end destination didn't know the payment // hash or we sent the wrong payment amount to the @@ -1955,8 +1981,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // that sent us this error, as it doesn't now what the // correct block height is. case *lnwire.FailExpiryTooSoon: - r.applyChannelUpdate(&onionErr.Update, failureSource) - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If we hit an instance of onion payload corruption or an invalid @@ -1976,57 +2001,49 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // amount, we'll apply the new minimum amount and retry // routing. case *lnwire.FailAmountBelowMinimum: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get a failure due to a fee, we'll apply the // new fee update, and retry our attempt using the // newly updated fees. case *lnwire.FailFeeInsufficient: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) return false, 0 // If we get the failure for an intermediate node that // disagrees with our time lock values, then we'll // apply the new delta value and try it once more. case *lnwire.FailIncorrectCltvExpiry: - processChannelUpdateAndRetry( - &onionErr.Update, failureSource, - ) + r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) return false, 0 // The outgoing channel that this node was meant to // forward one is currently disabled, so we'll apply // the update and continue. case *lnwire.FailChannelDisabled: - r.applyChannelUpdate(&onionErr.Update, failureSource) - paySession.ReportEdgeFailure(failedEdge, 0) + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) return false, 0 // It's likely that the outgoing channel didn't have // sufficient capacity, so we'll prune this edge for // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: - r.applyChannelUpdate(onionErr.Update, failureSource) - paySession.ReportEdgeFailure(failedEdge, failedAmt) + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, failedAmt) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredNodeFeatureMissing: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredChannelFeatureMissing: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If the next hop in the route wasn't known or @@ -2037,18 +2054,18 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // returning errors in order to attempt to black list // another node. case *lnwire.FailUnknownNextPeer: - paySession.ReportEdgeFailure(failedEdge, 0) + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) return false, 0 // If the node wasn't able to forward for which ever // reason, then we'll note this and continue with the // routes. case *lnwire.FailTemporaryNodeFailure: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 case *lnwire.FailPermanentNodeFailure: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If we crafted a route that contains a too long time @@ -2061,15 +2078,15 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // that as a hint during future path finding through // that node. case *lnwire.FailExpiryTooFar: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 // If we get a permanent channel or node failure, then // we'll prune the channel in both directions and // continue with the rest of the routes. case *lnwire.FailPermanentChannelFailure: - paySession.ReportEdgeFailure(failedEdge, 0) - paySession.ReportEdgeFailure(edge{ + r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) + r.cfg.MissionControl.ReportEdgeFailure(edge{ from: failedEdge.to, to: failedEdge.from, channel: failedEdge.channel, @@ -2078,11 +2095,34 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession, // Any other failure or an empty failure will get the node pruned. default: - paySession.ReportVertexFailure(failureVertex) + r.cfg.MissionControl.ReportVertexFailure(failureVertex) return false, 0 } } +// extractChannelUpdate examines the error and extracts the channel update. +func (r *ChannelRouter) extractChannelUpdate( + failure lnwire.FailureMessage) *lnwire.ChannelUpdate { + + var update *lnwire.ChannelUpdate + switch onionErr := failure.(type) { + case *lnwire.FailExpiryTooSoon: + update = &onionErr.Update + case *lnwire.FailAmountBelowMinimum: + update = &onionErr.Update + case *lnwire.FailFeeInsufficient: + update = &onionErr.Update + case *lnwire.FailIncorrectCltvExpiry: + update = &onionErr.Update + case *lnwire.FailChannelDisabled: + update = &onionErr.Update + case *lnwire.FailTemporaryChannelFailure: + update = onionErr.Update + } + + return update +} + // getFailedEdge tries to locate the failing channel given a route and the // pubkey of the node that sent the failure. It will assume that the failure is // associated with the outgoing channel of the failing node. As a second result, @@ -2127,11 +2167,6 @@ func getFailedEdge(route *route.Route, failureSource int) (edge, // database. It returns a bool indicating whether the updates was successful. func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, pubKey *btcec.PublicKey) bool { - // If we get passed a nil channel update (as it's optional with some - // onion errors), then we'll exit early with a success result. - if msg == nil { - return true - } ch, _, _, err := r.GetChannelByID(msg.ShortChannelID) if err != nil { @@ -2410,3 +2445,38 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error { return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64()) } + +// generateBandwidthHints is a helper function that's utilized the main +// findPath function in order to obtain hints from the lower layer w.r.t to the +// available bandwidth of edges on the network. Currently, we'll only obtain +// bandwidth hints for the edges we directly have open ourselves. Obtaining +// these hints allows us to reduce the number of extraneous attempts as we can +// skip channels that are inactive, or just don't have enough bandwidth to +// carry the payment. +func generateBandwidthHints(sourceNode *channeldb.LightningNode, + queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) (map[uint64]lnwire.MilliSatoshi, error) { + + // First, we'll collect the set of outbound edges from the target + // source node. + var localChans []*channeldb.ChannelEdgeInfo + err := sourceNode.ForEachChannel(nil, func(tx *bbolt.Tx, + edgeInfo *channeldb.ChannelEdgeInfo, + _, _ *channeldb.ChannelEdgePolicy) error { + + localChans = append(localChans, edgeInfo) + return nil + }) + if err != nil { + return nil, err + } + + // Now that we have all of our outbound edges, we'll populate the set + // of bandwidth hints, querying the lower switch layer for the most up + // to date values. + bandwidthHints := make(map[uint64]lnwire.MilliSatoshi) + for _, localChan := range localChans { + bandwidthHints[localChan.ChannelID] = queryBandwidth(localChan) + } + + return bandwidthHints, nil +} diff --git a/routing/router_test.go b/routing/router_test.go index c50dba1c..bc71cfc9 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -91,17 +91,23 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr } mc := NewMissionControl( - graphInstance.graph, selfNode, - func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { - return lnwire.NewMSatFromSatoshis(e.Capacity) - }, &MissionControlConfig{ - MinRouteProbability: 0.01, - PaymentAttemptPenalty: 100, PenaltyHalfLife: time.Hour, AprioriHopProbability: 0.9, }, ) + + sessionSource := &SessionSource{ + Graph: graphInstance.graph, + SelfNode: selfNode, + QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { + return lnwire.NewMSatFromSatoshis(e.Capacity) + }, + MinRouteProbability: 0.01, + PaymentAttemptPenalty: 100, + MissionControl: mc, + } + router, err := New(Config{ Graph: graphInstance.graph, Chain: chain, @@ -109,6 +115,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr Payer: &mockPaymentAttemptDispatcher{}, Control: makeMockControlTower(), MissionControl: mc, + SessionSource: sessionSource, ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { @@ -2940,7 +2947,7 @@ func TestRouterPaymentStateMachine(t *testing.T) { Chain: chain, ChainView: chainView, Control: control, - MissionControl: &mockPaymentSessionSource{}, + SessionSource: &mockPaymentSessionSource{}, Payer: payer, ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, @@ -3004,10 +3011,12 @@ func TestRouterPaymentStateMachine(t *testing.T) { copy(preImage[:], bytes.Repeat([]byte{9}, 32)) - router.cfg.MissionControl = &mockPaymentSessionSource{ + router.cfg.SessionSource = &mockPaymentSessionSource{ routes: test.routes, } + router.cfg.MissionControl = &mockMissionControl{} + // Send the payment. Since this is new payment hash, the // information should be registered with the ControlTower. paymentResult := make(chan error) diff --git a/server.go b/server.go index 88204d81..64b37666 100644 --- a/server.go +++ b/server.go @@ -652,11 +652,29 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, // // TODO(joostjager): When we are further in the process of moving to sub // servers, the mission control instance itself can be moved there too. + routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC) + s.missionControl = routing.NewMissionControl( - chanGraph, selfNode, queryBandwidth, - routerrpc.GetMissionControlConfig(cfg.SubRPCServers.RouterRPC), + &routing.MissionControlConfig{ + AprioriHopProbability: routingConfig.AprioriHopProbability, + PenaltyHalfLife: routingConfig.PenaltyHalfLife, + }, ) + srvrLog.Debugf("Instantiating payment session source with config: "+ + "PaymentAttemptPenalty=%v, MinRouteProbability=%v", + int64(routingConfig.PaymentAttemptPenalty.ToSatoshis()), + routingConfig.MinRouteProbability) + + paymentSessionSource := &routing.SessionSource{ + Graph: chanGraph, + MissionControl: s.missionControl, + QueryBandwidth: queryBandwidth, + SelfNode: selfNode, + PaymentAttemptPenalty: routingConfig.PaymentAttemptPenalty, + MinRouteProbability: routingConfig.MinRouteProbability, + } + paymentControl := channeldb.NewPaymentControl(chanDB) s.controlTower = routing.NewControlTower(paymentControl) @@ -668,6 +686,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, Payer: s.htlcSwitch, Control: s.controlTower, MissionControl: s.missionControl, + SessionSource: paymentSessionSource, ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, GraphPruneInterval: time.Duration(time.Hour), QueryBandwidth: queryBandwidth,