Merge pull request #3298 from joostjager/globalize-mc

routing: globalize mc
This commit is contained in:
Olaoluwa Osuntokun 2019-07-16 17:39:36 -07:00 committed by GitHub
commit 92e14af72a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 603 additions and 425 deletions

28
lnrpc/routerrpc/config.go Normal file

@ -0,0 +1,28 @@
package routerrpc
import (
"time"
"github.com/lightningnetwork/lnd/lnwire"
)
// RoutingConfig contains the configurable parameters that control routing.
type RoutingConfig struct {
// PenaltyHalfLife defines after how much time a penalized node or
// channel is back at 50% probability.
PenaltyHalfLife time.Duration
// PaymentAttemptPenalty is the virtual cost in path finding weight
// units of executing a payment attempt that fails. It is used to trade
// off potentially better routes against their probability of
// succeeding.
PaymentAttemptPenalty lnwire.MilliSatoshi
// MinProbability defines the minimum success probability of the
// returned route.
MinRouteProbability float64
// AprioriHopProbability is the assumed success probability of a hop in
// a route when no other information is available.
AprioriHopProbability float64
}

@ -72,10 +72,9 @@ func DefaultConfig() *Config {
} }
} }
// GetMissionControlConfig returns the mission control config based on this sub // GetRoutingConfig returns the routing config based on this sub server config.
// server config. func GetRoutingConfig(cfg *Config) *RoutingConfig {
func GetMissionControlConfig(cfg *Config) *routing.MissionControlConfig { return &RoutingConfig{
return &routing.MissionControlConfig{
AprioriHopProbability: cfg.AprioriHopProbability, AprioriHopProbability: cfg.AprioriHopProbability,
MinRouteProbability: cfg.MinRouteProbability, MinRouteProbability: cfg.MinRouteProbability,
PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis( PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis(

@ -14,10 +14,9 @@ func DefaultConfig() *Config {
return &Config{} return &Config{}
} }
// GetMissionControlConfig returns the mission control config based on this sub // GetRoutingConfig returns the routing config based on this sub server config.
// server config. func GetRoutingConfig(cfg *Config) *RoutingConfig {
func GetMissionControlConfig(cfg *Config) *routing.MissionControlConfig { return &RoutingConfig{
return &routing.MissionControlConfig{
AprioriHopProbability: routing.DefaultAprioriHopProbability, AprioriHopProbability: routing.DefaultAprioriHopProbability,
MinRouteProbability: routing.DefaultMinRouteProbability, MinRouteProbability: routing.DefaultMinRouteProbability,
PaymentAttemptPenalty: routing.DefaultPaymentAttemptPenalty, PaymentAttemptPenalty: routing.DefaultPaymentAttemptPenalty,

@ -5,12 +5,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/btcsuite/btcd/btcec"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/zpay32"
) )
const ( const (
@ -18,6 +14,30 @@ const (
// half-life duration defines after how much time a penalized node or // half-life duration defines after how much time a penalized node or
// channel is back at 50% probability. // channel is back at 50% probability.
DefaultPenaltyHalfLife = time.Hour DefaultPenaltyHalfLife = time.Hour
// minSecondChanceInterval is the minimum time required between
// second-chance failures.
//
// If nodes return a channel policy related failure, they may get a
// second chance to forward the payment. It could be that the channel
// policy that we are aware of is not up to date. This is especially
// important in case of mobile apps that are mostly offline.
//
// However, we don't want to give nodes the option to endlessly return
// new channel updates so that we are kept busy trying to route through
// that node until the payment loop times out.
//
// Therefore we only grant a second chance to a node if the previous
// second chance is sufficiently long ago. This is what
// minSecondChanceInterval defines. If a second policy failure comes in
// within that interval, we will apply a penalty.
//
// Second chances granted are tracked on the level of node pairs. This
// means that if a node has multiple channels to the same peer, they
// will only get a single second chance to route to that peer again.
// Nodes forward non-strict, so it isn't necessary to apply a less
// restrictive channel level tracking scheme here.
minSecondChanceInterval = time.Minute
) )
// MissionControl contains state which summarizes the past attempts of HTLC // MissionControl contains state which summarizes the past attempts of HTLC
@ -32,11 +52,9 @@ const (
type MissionControl struct { type MissionControl struct {
history map[route.Vertex]*nodeHistory history map[route.Vertex]*nodeHistory
graph *channeldb.ChannelGraph // lastSecondChance tracks the last time a second chance was granted for
// a directed node pair.
selfNode *channeldb.LightningNode lastSecondChance map[DirectedNodePair]time.Time
queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi
// now is expected to return the current time. It is supplied as an // now is expected to return the current time. It is supplied as an
// external function to enable deterministic unit tests. // external function to enable deterministic unit tests.
@ -52,10 +70,6 @@ type MissionControl struct {
// TODO(roasbeef): also add favorable metrics for nodes // TODO(roasbeef): also add favorable metrics for nodes
} }
// A compile time assertion to ensure MissionControl meets the
// PaymentSessionSource interface.
var _ PaymentSessionSource = (*MissionControl)(nil)
// MissionControlConfig defines parameters that control mission control // MissionControlConfig defines parameters that control mission control
// behaviour. // behaviour.
type MissionControlConfig struct { type MissionControlConfig struct {
@ -63,16 +77,6 @@ type MissionControlConfig struct {
// channel is back at 50% probability. // channel is back at 50% probability.
PenaltyHalfLife time.Duration PenaltyHalfLife time.Duration
// PaymentAttemptPenalty is the virtual cost in path finding weight
// units of executing a payment attempt that fails. It is used to trade
// off potentially better routes against their probability of
// succeeding.
PaymentAttemptPenalty lnwire.MilliSatoshi
// MinProbability defines the minimum success probability of the
// returned route.
MinRouteProbability float64
// AprioriHopProbability is the assumed success probability of a hop in // AprioriHopProbability is the assumed success probability of a hop in
// a route when no other information is available. // a route when no other information is available.
AprioriHopProbability float64 AprioriHopProbability float64
@ -143,164 +147,19 @@ type MissionControlChannelSnapshot struct {
} }
// NewMissionControl returns a new instance of missionControl. // NewMissionControl returns a new instance of missionControl.
// func NewMissionControl(cfg *MissionControlConfig) *MissionControl {
// TODO(roasbeef): persist memory
func NewMissionControl(g *channeldb.ChannelGraph, selfNode *channeldb.LightningNode,
qb func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi,
cfg *MissionControlConfig) *MissionControl {
log.Debugf("Instantiating mission control with config: "+ log.Debugf("Instantiating mission control with config: "+
"PenaltyHalfLife=%v, PaymentAttemptPenalty=%v, "+ "PenaltyHalfLife=%v, AprioriHopProbability=%v",
"MinRouteProbability=%v, AprioriHopProbability=%v", cfg.PenaltyHalfLife, cfg.AprioriHopProbability)
cfg.PenaltyHalfLife,
int64(cfg.PaymentAttemptPenalty.ToSatoshis()),
cfg.MinRouteProbability, cfg.AprioriHopProbability)
return &MissionControl{ return &MissionControl{
history: make(map[route.Vertex]*nodeHistory), history: make(map[route.Vertex]*nodeHistory),
selfNode: selfNode, lastSecondChance: make(map[DirectedNodePair]time.Time),
queryBandwidth: qb, now: time.Now,
graph: g, cfg: cfg,
now: time.Now,
cfg: cfg,
} }
} }
// NewPaymentSession creates a new payment session backed by the latest prune
// view from Mission Control. An optional set of routing hints can be provided
// in order to populate additional edges to explore when finding a path to the
// payment's destination.
func (m *MissionControl) NewPaymentSession(routeHints [][]zpay32.HopHint,
target route.Vertex) (PaymentSession, error) {
edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy)
// Traverse through all of the available hop hints and include them in
// our edges map, indexed by the public key of the channel's starting
// node.
for _, routeHint := range routeHints {
// If multiple hop hints are provided within a single route
// hint, we'll assume they must be chained together and sorted
// in forward order in order to reach the target successfully.
for i, hopHint := range routeHint {
// In order to determine the end node of this hint,
// we'll need to look at the next hint's start node. If
// we've reached the end of the hints list, we can
// assume we've reached the destination.
endNode := &channeldb.LightningNode{}
if i != len(routeHint)-1 {
endNode.AddPubKey(routeHint[i+1].NodeID)
} else {
targetPubKey, err := btcec.ParsePubKey(
target[:], btcec.S256(),
)
if err != nil {
return nil, err
}
endNode.AddPubKey(targetPubKey)
}
// Finally, create the channel edge from the hop hint
// and add it to list of edges corresponding to the node
// at the start of the channel.
edge := &channeldb.ChannelEdgePolicy{
Node: endNode,
ChannelID: hopHint.ChannelID,
FeeBaseMSat: lnwire.MilliSatoshi(
hopHint.FeeBaseMSat,
),
FeeProportionalMillionths: lnwire.MilliSatoshi(
hopHint.FeeProportionalMillionths,
),
TimeLockDelta: hopHint.CLTVExpiryDelta,
}
v := route.NewVertex(hopHint.NodeID)
edges[v] = append(edges[v], edge)
}
}
// We'll also obtain a set of bandwidthHints from the lower layer for
// each of our outbound channels. This will allow the path finding to
// skip any links that aren't active or just don't have enough
// bandwidth to carry the payment.
sourceNode, err := m.graph.SourceNode()
if err != nil {
return nil, err
}
bandwidthHints, err := generateBandwidthHints(
sourceNode, m.queryBandwidth,
)
if err != nil {
return nil, err
}
return &paymentSession{
additionalEdges: edges,
bandwidthHints: bandwidthHints,
errFailedPolicyChans: make(map[nodeChannel]struct{}),
mc: m,
pathFinder: findPath,
}, nil
}
// NewPaymentSessionForRoute creates a new paymentSession instance that is just
// used for failure reporting to missioncontrol.
func (m *MissionControl) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession {
return &paymentSession{
errFailedPolicyChans: make(map[nodeChannel]struct{}),
mc: m,
preBuiltRoute: preBuiltRoute,
}
}
// NewPaymentSessionEmpty creates a new paymentSession instance that is empty,
// and will be exhausted immediately. Used for failure reporting to
// missioncontrol for resumed payment we don't want to make more attempts for.
func (m *MissionControl) NewPaymentSessionEmpty() PaymentSession {
return &paymentSession{
errFailedPolicyChans: make(map[nodeChannel]struct{}),
mc: m,
preBuiltRoute: &route.Route{},
preBuiltRouteTried: true,
}
}
// generateBandwidthHints is a helper function that's utilized the main
// findPath function in order to obtain hints from the lower layer w.r.t to the
// available bandwidth of edges on the network. Currently, we'll only obtain
// bandwidth hints for the edges we directly have open ourselves. Obtaining
// these hints allows us to reduce the number of extraneous attempts as we can
// skip channels that are inactive, or just don't have enough bandwidth to
// carry the payment.
func generateBandwidthHints(sourceNode *channeldb.LightningNode,
queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) (map[uint64]lnwire.MilliSatoshi, error) {
// First, we'll collect the set of outbound edges from the target
// source node.
var localChans []*channeldb.ChannelEdgeInfo
err := sourceNode.ForEachChannel(nil, func(tx *bbolt.Tx,
edgeInfo *channeldb.ChannelEdgeInfo,
_, _ *channeldb.ChannelEdgePolicy) error {
localChans = append(localChans, edgeInfo)
return nil
})
if err != nil {
return nil, err
}
// Now that we have all of our outbound edges, we'll populate the set
// of bandwidth hints, querying the lower switch layer for the most up
// to date values.
bandwidthHints := make(map[uint64]lnwire.MilliSatoshi)
for _, localChan := range localChans {
bandwidthHints[localChan.ChannelID] = queryBandwidth(localChan)
}
return bandwidthHints, nil
}
// ResetHistory resets the history of MissionControl returning it to a state as // ResetHistory resets the history of MissionControl returning it to a state as
// if no payment attempts have been made. // if no payment attempts have been made.
func (m *MissionControl) ResetHistory() { func (m *MissionControl) ResetHistory() {
@ -308,13 +167,14 @@ func (m *MissionControl) ResetHistory() {
defer m.Unlock() defer m.Unlock()
m.history = make(map[route.Vertex]*nodeHistory) m.history = make(map[route.Vertex]*nodeHistory)
m.lastSecondChance = make(map[DirectedNodePair]time.Time)
log.Debugf("Mission control history cleared") log.Debugf("Mission control history cleared")
} }
// getEdgeProbability is expected to return the success probability of a payment // GetEdgeProbability is expected to return the success probability of a payment
// from fromNode along edge. // from fromNode along edge.
func (m *MissionControl) getEdgeProbability(fromNode route.Vertex, func (m *MissionControl) GetEdgeProbability(fromNode route.Vertex,
edge EdgeLocator, amt lnwire.MilliSatoshi) float64 { edge EdgeLocator, amt lnwire.MilliSatoshi) float64 {
m.Lock() m.Lock()
@ -376,6 +236,37 @@ func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory,
return probability return probability
} }
// requestSecondChance checks whether the node fromNode can have a second chance
// at providing a channel update for its channel with toNode.
func (m *MissionControl) requestSecondChance(timestamp time.Time,
fromNode, toNode route.Vertex) bool {
// Look up previous second chance time.
pair := DirectedNodePair{
From: fromNode,
To: toNode,
}
lastSecondChance, ok := m.lastSecondChance[pair]
// If the channel hasn't already be given a second chance or its last
// second chance was long ago, we give it another chance.
if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval {
m.lastSecondChance[pair] = timestamp
log.Debugf("Second chance granted for %v->%v", fromNode, toNode)
return true
}
// Otherwise penalize the channel, because we don't allow channel
// updates that are that frequent. This is to prevent nodes from keeping
// us busy by continuously sending new channel updates.
log.Debugf("Second chance denied for %v->%v, remaining interval: %v",
fromNode, toNode, timestamp.Sub(lastSecondChance))
return false
}
// createHistoryIfNotExists returns the history for the given node. If the node // createHistoryIfNotExists returns the history for the given node. If the node
// is yet unknown, it will create an empty history structure. // is yet unknown, it will create an empty history structure.
func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory { func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory {
@ -391,8 +282,8 @@ func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHist
return node return node
} }
// reportVertexFailure reports a node level failure. // ReportVertexFailure reports a node level failure.
func (m *MissionControl) reportVertexFailure(v route.Vertex) { func (m *MissionControl) ReportVertexFailure(v route.Vertex) {
log.Debugf("Reporting vertex %v failure to Mission Control", v) log.Debugf("Reporting vertex %v failure to Mission Control", v)
now := m.now() now := m.now()
@ -404,10 +295,30 @@ func (m *MissionControl) reportVertexFailure(v route.Vertex) {
history.lastFail = &now history.lastFail = &now
} }
// reportEdgeFailure reports a channel level failure. // ReportEdgePolicyFailure reports a policy related failure.
func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) {
now := m.now()
m.Lock()
defer m.Unlock()
// We may have an out of date graph. Therefore we don't always penalize
// immediately. If some time has passed since the last policy failure,
// we grant the node a second chance at forwarding the payment.
if m.requestSecondChance(
now, failedEdge.from, failedEdge.to,
) {
return
}
history := m.createHistoryIfNotExists(failedEdge.from)
history.lastFail = &now
}
// ReportEdgeFailure reports a channel level failure.
// //
// TODO(roasbeef): also add value attempted to send and capacity of channel // TODO(roasbeef): also add value attempted to send and capacity of channel
func (m *MissionControl) reportEdgeFailure(failedEdge edge, func (m *MissionControl) ReportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi) { minPenalizeAmt lnwire.MilliSatoshi) {
log.Debugf("Reporting channel %v failure to Mission Control", log.Debugf("Reporting channel %v failure to Mission Control",

@ -8,17 +8,56 @@ import (
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
// TestMissionControl tests mission control probability estimation. var (
func TestMissionControl(t *testing.T) { mcTestNode = route.Vertex{}
now := testTime mcTestEdge = EdgeLocator{
ChannelID: 123,
}
mcTestTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
)
type mcTestContext struct {
t *testing.T
mc *MissionControl
now time.Time
}
func createMcTestContext(t *testing.T) *mcTestContext {
ctx := &mcTestContext{
t: t,
now: mcTestTime,
}
mc := NewMissionControl( mc := NewMissionControl(
nil, nil, nil, &MissionControlConfig{ &MissionControlConfig{
PenaltyHalfLife: 30 * time.Minute, PenaltyHalfLife: 30 * time.Minute,
AprioriHopProbability: 0.8, AprioriHopProbability: 0.8,
}, },
) )
mc.now = func() time.Time { return now }
mc.now = func() time.Time { return ctx.now }
ctx.mc = mc
return ctx
}
// Assert that mission control returns a probability for an edge.
func (ctx *mcTestContext) expectP(amt lnwire.MilliSatoshi,
expected float64) {
ctx.t.Helper()
p := ctx.mc.GetEdgeProbability(mcTestNode, mcTestEdge, amt)
if p != expected {
ctx.t.Fatalf("unexpected probability %v", p)
}
}
// TestMissionControl tests mission control probability estimation.
func TestMissionControl(t *testing.T) {
ctx := createMcTestContext(t)
ctx.now = testTime
testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
@ -27,50 +66,38 @@ func TestMissionControl(t *testing.T) {
channel: 123, channel: 123,
} }
expectP := func(amt lnwire.MilliSatoshi, expected float64) {
t.Helper()
p := mc.getEdgeProbability(
testNode, EdgeLocator{ChannelID: testEdge.channel},
amt,
)
if p != expected {
t.Fatalf("unexpected probability %v", p)
}
}
// Initial probability is expected to be 1. // Initial probability is expected to be 1.
expectP(1000, 0.8) ctx.expectP(1000, 0.8)
// Expect probability to be zero after reporting the edge as failed. // Expect probability to be zero after reporting the edge as failed.
mc.reportEdgeFailure(testEdge, 1000) ctx.mc.ReportEdgeFailure(testEdge, 1000)
expectP(1000, 0) ctx.expectP(1000, 0)
// As we reported with a min penalization amt, a lower amt than reported // As we reported with a min penalization amt, a lower amt than reported
// should be unaffected. // should be unaffected.
expectP(500, 0.8) ctx.expectP(500, 0.8)
// Edge decay started. // Edge decay started.
now = testTime.Add(30 * time.Minute) ctx.now = testTime.Add(30 * time.Minute)
expectP(1000, 0.4) ctx.expectP(1000, 0.4)
// Edge fails again, this time without a min penalization amt. The edge // Edge fails again, this time without a min penalization amt. The edge
// should be penalized regardless of amount. // should be penalized regardless of amount.
mc.reportEdgeFailure(testEdge, 0) ctx.mc.ReportEdgeFailure(testEdge, 0)
expectP(1000, 0) ctx.expectP(1000, 0)
expectP(500, 0) ctx.expectP(500, 0)
// Edge decay started. // Edge decay started.
now = testTime.Add(60 * time.Minute) ctx.now = testTime.Add(60 * time.Minute)
expectP(1000, 0.4) ctx.expectP(1000, 0.4)
// A node level failure should bring probability of every channel back // A node level failure should bring probability of every channel back
// to zero. // to zero.
mc.reportVertexFailure(testNode) ctx.mc.ReportVertexFailure(testNode)
expectP(1000, 0) ctx.expectP(1000, 0)
// Check whether history snapshot looks sane. // Check whether history snapshot looks sane.
history := mc.GetHistorySnapshot() history := ctx.mc.GetHistorySnapshot()
if len(history.Nodes) != 1 { if len(history.Nodes) != 1 {
t.Fatal("unexpected number of nodes") t.Fatal("unexpected number of nodes")
} }
@ -79,3 +106,25 @@ func TestMissionControl(t *testing.T) {
t.Fatal("unexpected number of channels") t.Fatal("unexpected number of channels")
} }
} }
// TestMissionControlChannelUpdate tests that the first channel update is not
// penalizing the channel yet.
func TestMissionControlChannelUpdate(t *testing.T) {
ctx := createMcTestContext(t)
testEdge := edge{
channel: 123,
}
// Report a policy related failure. Because it is the first, we don't
// expect a penalty.
ctx.mc.ReportEdgePolicyFailure(testEdge)
ctx.expectP(0, 0.8)
// Report another failure for the same channel. We expect it to be
// pruned.
ctx.mc.ReportEdgePolicyFailure(testEdge)
ctx.expectP(0, 0)
}

@ -93,6 +93,25 @@ func (m *mockPaymentSessionSource) NewPaymentSessionEmpty() PaymentSession {
return &mockPaymentSession{} return &mockPaymentSession{}
} }
type mockMissionControl struct {
}
var _ MissionController = (*mockMissionControl)(nil)
func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi) {
}
func (m *mockMissionControl) ReportEdgePolicyFailure(failedEdge edge) {}
func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {}
func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator,
amt lnwire.MilliSatoshi) float64 {
return 0
}
type mockPaymentSession struct { type mockPaymentSession struct {
routes []*route.Route routes []*route.Route
} }

10
routing/nodepair.go Normal file

@ -0,0 +1,10 @@
package routing
import (
"github.com/lightningnetwork/lnd/routing/route"
)
// DirectedNodePair stores a directed pair of nodes.
type DirectedNodePair struct {
From, To route.Vertex
}

@ -343,7 +343,7 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID,
func (p *paymentLifecycle) handleSendError(sendErr error) error { func (p *paymentLifecycle) handleSendError(sendErr error) error {
final, reason := p.router.processSendError( final, reason := p.router.processSendError(
p.paySession, &p.attempt.Route, sendErr, &p.attempt.Route, sendErr,
) )
if !final { if !final {
// Save the forwarding error so it can be returned if // Save the forwarding error so it can be returned if

@ -16,27 +16,6 @@ type PaymentSession interface {
// specified HTLC payment to the target node. // specified HTLC payment to the target node.
RequestRoute(payment *LightningPayment, RequestRoute(payment *LightningPayment,
height uint32, finalCltvDelta uint16) (*route.Route, error) height uint32, finalCltvDelta uint16) (*route.Route, error)
// ReportVertexFailure reports to the PaymentSession that the passsed
// vertex failed to route the previous payment attempt. The
// PaymentSession will use this information to produce a better next
// route.
ReportVertexFailure(v route.Vertex)
// ReportEdgeFailure reports to the PaymentSession that the passed edge
// failed to route the previous payment attempt. A minimum penalization
// amount is included to attenuate the failure. This is set to a
// non-zero value for channel balance failures. The PaymentSession will
// use this information to produce a better next route.
ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi)
// ReportEdgePolicyFailure reports to the PaymentSession that we
// received a failure message that relates to a channel policy. For
// these types of failures, the PaymentSession can decide whether to to
// keep the edge included in the next attempted route. The
// PaymentSession will use this information to produce a better next
// route.
ReportEdgePolicyFailure(failedEdge edge)
} }
// paymentSession is used during an HTLC routings session to prune the local // paymentSession is used during an HTLC routings session to prune the local
@ -52,13 +31,7 @@ type paymentSession struct {
bandwidthHints map[uint64]lnwire.MilliSatoshi bandwidthHints map[uint64]lnwire.MilliSatoshi
// errFailedFeeChans is a map of the short channel IDs that were the sessionSource *SessionSource
// source of policy related routing failures during this payment attempt.
// We'll use this map to prune out channels when the first error may not
// require pruning, but any subsequent ones do.
errFailedPolicyChans map[nodeChannel]struct{}
mc *MissionControl
preBuiltRoute *route.Route preBuiltRoute *route.Route
preBuiltRouteTried bool preBuiltRouteTried bool
@ -66,68 +39,6 @@ type paymentSession struct {
pathFinder pathFinder pathFinder pathFinder
} }
// A compile time assertion to ensure paymentSession meets the PaymentSession
// interface.
var _ PaymentSession = (*paymentSession)(nil)
// ReportVertexFailure adds a vertex to the graph prune view after a client
// reports a routing failure localized to the vertex. The time the vertex was
// added is noted, as it'll be pruned from the shared view after a period of
// vertexDecay. However, the vertex will remain pruned for the *local* session.
// This ensures we don't retry this vertex during the payment attempt.
//
// NOTE: Part of the PaymentSession interface.
func (p *paymentSession) ReportVertexFailure(v route.Vertex) {
p.mc.reportVertexFailure(v)
}
// ReportEdgeFailure adds a channel to the graph prune view. The time the
// channel was added is noted, as it'll be pruned from the global view after a
// period of edgeDecay. However, the edge will remain pruned for the duration
// of the *local* session. This ensures that we don't flap by continually
// retrying an edge after its pruning has expired.
//
// TODO(roasbeef): also add value attempted to send and capacity of channel
//
// NOTE: Part of the PaymentSession interface.
func (p *paymentSession) ReportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi) {
p.mc.reportEdgeFailure(failedEdge, minPenalizeAmt)
}
// ReportEdgePolicyFailure handles a failure message that relates to a
// channel policy. For these types of failures, the policy is updated and we
// want to keep it included during path finding. This function does mark the
// edge as 'policy failed once'. The next time it fails, the whole node will be
// pruned. This is to prevent nodes from keeping us busy by continuously sending
// new channel updates.
//
// NOTE: Part of the PaymentSession interface.
//
// TODO(joostjager): Move this logic into global mission control.
func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) {
key := nodeChannel{
node: failedEdge.from,
channel: failedEdge.channel,
}
// Check to see if we've already reported a policy related failure for
// this channel. If so, then we'll prune out the vertex.
_, ok := p.errFailedPolicyChans[key]
if ok {
// TODO(joostjager): is this aggressive pruning still necessary?
// Just pruning edges may also work unless there is a huge
// number of failing channels from that node?
p.ReportVertexFailure(key.node)
return
}
// Finally, we'll record a policy failure from this node and move on.
p.errFailedPolicyChans[key] = struct{}{}
}
// RequestRoute returns a route which is likely to be capable for successfully // RequestRoute returns a route which is likely to be capable for successfully
// routing the specified HTLC payment to the target node. Initially the first // routing the specified HTLC payment to the target node. Initially the first
// set of paths returned from this method may encounter routing failure along // set of paths returned from this method may encounter routing failure along
@ -169,21 +80,24 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment,
// Taking into account this prune view, we'll attempt to locate a path // Taking into account this prune view, we'll attempt to locate a path
// to our destination, respecting the recommendations from // to our destination, respecting the recommendations from
// MissionControl. // MissionControl.
ss := p.sessionSource
restrictions := &RestrictParams{
ProbabilitySource: ss.MissionControl.GetEdgeProbability,
FeeLimit: payment.FeeLimit,
OutgoingChannelID: payment.OutgoingChannelID,
CltvLimit: cltvLimit,
PaymentAttemptPenalty: ss.PaymentAttemptPenalty,
MinProbability: ss.MinRouteProbability,
}
path, err := p.pathFinder( path, err := p.pathFinder(
&graphParams{ &graphParams{
graph: p.mc.graph, graph: ss.Graph,
additionalEdges: p.additionalEdges, additionalEdges: p.additionalEdges,
bandwidthHints: p.bandwidthHints, bandwidthHints: p.bandwidthHints,
}, },
&RestrictParams{ restrictions, ss.SelfNode.PubKeyBytes, payment.Target,
ProbabilitySource: p.mc.getEdgeProbability,
FeeLimit: payment.FeeLimit,
OutgoingChannelID: payment.OutgoingChannelID,
CltvLimit: cltvLimit,
PaymentAttemptPenalty: p.mc.cfg.PaymentAttemptPenalty,
MinProbability: p.mc.cfg.MinRouteProbability,
},
p.mc.selfNode.PubKeyBytes, payment.Target,
payment.Amount, payment.Amount,
) )
if err != nil { if err != nil {
@ -192,7 +106,7 @@ func (p *paymentSession) RequestRoute(payment *LightningPayment,
// With the next candidate path found, we'll attempt to turn this into // With the next candidate path found, we'll attempt to turn this into
// a route by applying the time-lock and fee requirements. // a route by applying the time-lock and fee requirements.
sourceVertex := route.Vertex(p.mc.selfNode.PubKeyBytes) sourceVertex := route.Vertex(ss.SelfNode.PubKeyBytes)
route, err := newRoute( route, err := newRoute(
payment.Amount, sourceVertex, path, height, finalCltvDelta, payment.Amount, sourceVertex, path, height, finalCltvDelta,
) )

@ -0,0 +1,147 @@
package routing
import (
"github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/zpay32"
)
// A compile time assertion to ensure MissionControl meets the
// PaymentSessionSource interface.
var _ PaymentSessionSource = (*SessionSource)(nil)
// SessionSource defines a source for the router to retrieve new payment
// sessions.
type SessionSource struct {
// Graph is the channel graph that will be used to gather metrics from
// and also to carry out path finding queries.
Graph *channeldb.ChannelGraph
// QueryBandwidth is a method that allows querying the lower link layer
// to determine the up to date available bandwidth at a prospective link
// to be traversed. If the link isn't available, then a value of zero
// should be returned. Otherwise, the current up to date knowledge of
// the available bandwidth of the link should be returned.
QueryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi
// SelfNode is our own node.
SelfNode *channeldb.LightningNode
// MissionControl is a shared memory of sorts that executions of payment
// path finding use in order to remember which vertexes/edges were
// pruned from prior attempts. During payment execution, errors sent by
// nodes are mapped into a vertex or edge to be pruned. Each run will
// then take into account this set of pruned vertexes/edges to reduce
// route failure and pass on graph information gained to the next
// execution.
MissionControl MissionController
// PaymentAttemptPenalty is the virtual cost in path finding weight
// units of executing a payment attempt that fails. It is used to trade
// off potentially better routes against their probability of
// succeeding.
PaymentAttemptPenalty lnwire.MilliSatoshi
// MinProbability defines the minimum success probability of the
// returned route.
MinRouteProbability float64
}
// NewPaymentSession creates a new payment session backed by the latest prune
// view from Mission Control. An optional set of routing hints can be provided
// in order to populate additional edges to explore when finding a path to the
// payment's destination.
func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint,
target route.Vertex) (PaymentSession, error) {
edges := make(map[route.Vertex][]*channeldb.ChannelEdgePolicy)
// Traverse through all of the available hop hints and include them in
// our edges map, indexed by the public key of the channel's starting
// node.
for _, routeHint := range routeHints {
// If multiple hop hints are provided within a single route
// hint, we'll assume they must be chained together and sorted
// in forward order in order to reach the target successfully.
for i, hopHint := range routeHint {
// In order to determine the end node of this hint,
// we'll need to look at the next hint's start node. If
// we've reached the end of the hints list, we can
// assume we've reached the destination.
endNode := &channeldb.LightningNode{}
if i != len(routeHint)-1 {
endNode.AddPubKey(routeHint[i+1].NodeID)
} else {
targetPubKey, err := btcec.ParsePubKey(
target[:], btcec.S256(),
)
if err != nil {
return nil, err
}
endNode.AddPubKey(targetPubKey)
}
// Finally, create the channel edge from the hop hint
// and add it to list of edges corresponding to the node
// at the start of the channel.
edge := &channeldb.ChannelEdgePolicy{
Node: endNode,
ChannelID: hopHint.ChannelID,
FeeBaseMSat: lnwire.MilliSatoshi(
hopHint.FeeBaseMSat,
),
FeeProportionalMillionths: lnwire.MilliSatoshi(
hopHint.FeeProportionalMillionths,
),
TimeLockDelta: hopHint.CLTVExpiryDelta,
}
v := route.NewVertex(hopHint.NodeID)
edges[v] = append(edges[v], edge)
}
}
// We'll also obtain a set of bandwidthHints from the lower layer for
// each of our outbound channels. This will allow the path finding to
// skip any links that aren't active or just don't have enough
// bandwidth to carry the payment.
sourceNode, err := m.Graph.SourceNode()
if err != nil {
return nil, err
}
bandwidthHints, err := generateBandwidthHints(
sourceNode, m.QueryBandwidth,
)
if err != nil {
return nil, err
}
return &paymentSession{
additionalEdges: edges,
bandwidthHints: bandwidthHints,
sessionSource: m,
pathFinder: findPath,
}, nil
}
// NewPaymentSessionForRoute creates a new paymentSession instance that is just
// used for failure reporting to missioncontrol.
func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession {
return &paymentSession{
sessionSource: m,
preBuiltRoute: preBuiltRoute,
}
}
// NewPaymentSessionEmpty creates a new paymentSession instance that is empty,
// and will be exhausted immediately. Used for failure reporting to
// missioncontrol for resumed payment we don't want to make more attempts for.
func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession {
return &paymentSession{
sessionSource: m,
preBuiltRoute: &route.Route{},
preBuiltRouteTried: true,
}
}

@ -32,12 +32,16 @@ func TestRequestRoute(t *testing.T) {
return path, nil return path, nil
} }
session := &paymentSession{ sessionSource := &SessionSource{
mc: &MissionControl{ SelfNode: &channeldb.LightningNode{},
selfNode: &channeldb.LightningNode{}, MissionControl: &MissionControl{
cfg: &MissionControlConfig{}, cfg: &MissionControlConfig{},
}, },
pathFinder: findPath, }
session := &paymentSession{
sessionSource: sessionSource,
pathFinder: findPath,
} }
cltvLimit := uint32(30) cltvLimit := uint32(30)

@ -171,6 +171,25 @@ type PaymentSessionSource interface {
NewPaymentSessionEmpty() PaymentSession NewPaymentSessionEmpty() PaymentSession
} }
// MissionController is an interface that exposes failure reporting and
// probability estimation.
type MissionController interface {
// ReportEdgeFailure reports a channel level failure.
ReportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi)
// ReportEdgePolicyFailure reports a policy related failure.
ReportEdgePolicyFailure(failedEdge edge)
// ReportVertexFailure reports a node level failure.
ReportVertexFailure(v route.Vertex)
// GetEdgeProbability is expected to return the success probability of a
// payment from fromNode along edge.
GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator,
amt lnwire.MilliSatoshi) float64
}
// FeeSchema is the set fee configuration for a Lightning Node on the network. // FeeSchema is the set fee configuration for a Lightning Node on the network.
// Using the coefficients described within the schema, the required fee to // Using the coefficients described within the schema, the required fee to
// forward outgoing payments can be derived. // forward outgoing payments can be derived.
@ -234,7 +253,11 @@ type Config struct {
// Each run will then take into account this set of pruned // Each run will then take into account this set of pruned
// vertexes/edges to reduce route failure and pass on graph information // vertexes/edges to reduce route failure and pass on graph information
// gained to the next execution. // gained to the next execution.
MissionControl PaymentSessionSource MissionControl MissionController
// SessionSource defines a source for the router to retrieve new payment
// sessions.
SessionSource PaymentSessionSource
// ChannelPruneExpiry is the duration used to determine if a channel // ChannelPruneExpiry is the duration used to determine if a channel
// should be pruned or not. If the delta between now and when the // should be pruned or not. If the delta between now and when the
@ -544,7 +567,7 @@ func (r *ChannelRouter) Start() error {
// //
// PayAttemptTime doesn't need to be set, as there is // PayAttemptTime doesn't need to be set, as there is
// only a single attempt. // only a single attempt.
paySession := r.cfg.MissionControl.NewPaymentSessionEmpty() paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
lPayment := &LightningPayment{ lPayment := &LightningPayment{
PaymentHash: payment.Info.PaymentHash, PaymentHash: payment.Info.PaymentHash,
@ -1651,7 +1674,7 @@ func (r *ChannelRouter) preparePayment(payment *LightningPayment) (
// Before starting the HTLC routing attempt, we'll create a fresh // Before starting the HTLC routing attempt, we'll create a fresh
// payment session which will report our errors back to mission // payment session which will report our errors back to mission
// control. // control.
paySession, err := r.cfg.MissionControl.NewPaymentSession( paySession, err := r.cfg.SessionSource.NewPaymentSession(
payment.RouteHints, payment.Target, payment.RouteHints, payment.Target,
) )
if err != nil { if err != nil {
@ -1682,7 +1705,7 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
lntypes.Preimage, error) { lntypes.Preimage, error) {
// Create a payment session for just this route. // Create a payment session for just this route.
paySession := r.cfg.MissionControl.NewPaymentSessionForRoute(route) paySession := r.cfg.SessionSource.NewPaymentSessionForRoute(route)
// Calculate amount paid to receiver. // Calculate amount paid to receiver.
amt := route.TotalAmount - route.TotalFees() amt := route.TotalAmount - route.TotalFees()
@ -1806,13 +1829,54 @@ func (r *ChannelRouter) sendPayment(
} }
// tryApplyChannelUpdate tries to apply a channel update present in the failure
// message if any.
func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route,
errorSourceIdx int, failure lnwire.FailureMessage) error {
// It makes no sense to apply our own channel updates.
if errorSourceIdx == 0 {
log.Errorf("Channel update of ourselves received")
return nil
}
// Extract channel update if the error contains one.
update := r.extractChannelUpdate(failure)
if update == nil {
return nil
}
// Parse pubkey to allow validation of the channel update. This should
// always succeed, otherwise there is something wrong in our
// implementation. Therefore return an error.
errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes
errSource, err := btcec.ParsePubKey(
errVertex[:], btcec.S256(),
)
if err != nil {
log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v",
errorSourceIdx, errVertex)
return err
}
// Apply channel update.
if !r.applyChannelUpdate(update, errSource) {
log.Debugf("Invalid channel update received: node=%x",
errVertex)
}
return nil
}
// processSendError analyzes the error for the payment attempt received from the // processSendError analyzes the error for the payment attempt received from the
// switch and updates mission control and/or channel policies. Depending on the // switch and updates mission control and/or channel policies. Depending on the
// error type, this error is either the final outcome of the payment or we need // error type, this error is either the final outcome of the payment or we need
// to continue with an alternative route. This is indicated by the boolean // to continue with an alternative route. This is indicated by the boolean
// return value. // return value.
func (r *ChannelRouter) processSendError(paySession PaymentSession, func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) (
rt *route.Route, sendErr error) (bool, channeldb.FailureReason) { bool, channeldb.FailureReason) {
// If the failure message could not be decrypted, attribute the failure // If the failure message could not be decrypted, attribute the failure
// to our own outgoing channel. // to our own outgoing channel.
@ -1831,32 +1895,28 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
return true, channeldb.FailureReasonError return true, channeldb.FailureReasonError
} }
var ( failureMessage := fErr.FailureMessage
failureSourceIdx = fErr.FailureSourceIdx failureSourceIdx := fErr.FailureSourceIdx
failureVertex route.Vertex // Apply channel update if the error contains one. For unknown
failureSource *btcec.PublicKey // failures, failureMessage is nil.
err error if failureMessage != nil {
) err := r.tryApplyChannelUpdate(
rt, failureSourceIdx, failureMessage,
)
if err != nil {
return true, channeldb.FailureReasonError
}
}
var failureVertex route.Vertex
// For any non-self failure, look up the source pub key in the hops // For any non-self failure, look up the source pub key in the hops
// slice. Otherwise return the self node pubkey. // slice. Otherwise return the self node pubkey.
if failureSourceIdx > 0 { if failureSourceIdx > 0 {
failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes
failureSource, err = btcec.ParsePubKey(failureVertex[:], btcec.S256())
if err != nil {
log.Errorf("Cannot parse pubkey %v: %v",
failureVertex, err)
return true, channeldb.FailureReasonError
}
} else { } else {
failureVertex = r.selfNode.PubKeyBytes failureVertex = r.selfNode.PubKeyBytes
failureSource, err = r.selfNode.PubKey()
if err != nil {
log.Errorf("Cannot parse self pubkey: %v", err)
return true, channeldb.FailureReasonError
}
} }
log.Tracef("Node %x (index %v) reported failure when sending htlc", log.Tracef("Node %x (index %v) reported failure when sending htlc",
failureVertex, failureSourceIdx) failureVertex, failureSourceIdx)
@ -1865,41 +1925,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// update with id may not be available. // update with id may not be available.
failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx)
// processChannelUpdateAndRetry is a closure that switch fErr.FailureMessage.(type) {
// handles a failure message containing a channel
// update. This function always tries to apply the
// channel update and passes on the result to the
// payment session to adjust its view on the reliability
// of the network.
//
// As channel id, the locally determined channel id is
// used. It does not rely on the channel id that is part
// of the channel update message, because the remote
// node may lie to us or the update may be corrupt.
processChannelUpdateAndRetry := func(
update *lnwire.ChannelUpdate,
pubKey *btcec.PublicKey) {
// Try to apply the channel update.
updateOk := r.applyChannelUpdate(update, pubKey)
// If the update could not be applied, prune the
// edge. There is no reason to continue trying
// this channel.
//
// TODO: Could even prune the node completely?
// Or is there a valid reason for the channel
// update to fail?
if !updateOk {
paySession.ReportEdgeFailure(
failedEdge, 0,
)
}
paySession.ReportEdgePolicyFailure(failedEdge)
}
switch onionErr := fErr.FailureMessage.(type) {
// If the end destination didn't know the payment // If the end destination didn't know the payment
// hash or we sent the wrong payment amount to the // hash or we sent the wrong payment amount to the
@ -1955,8 +1981,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// that sent us this error, as it doesn't now what the // that sent us this error, as it doesn't now what the
// correct block height is. // correct block height is.
case *lnwire.FailExpiryTooSoon: case *lnwire.FailExpiryTooSoon:
r.applyChannelUpdate(&onionErr.Update, failureSource) r.cfg.MissionControl.ReportVertexFailure(failureVertex)
paySession.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
// If we hit an instance of onion payload corruption or an invalid // If we hit an instance of onion payload corruption or an invalid
@ -1976,57 +2001,49 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// amount, we'll apply the new minimum amount and retry // amount, we'll apply the new minimum amount and retry
// routing. // routing.
case *lnwire.FailAmountBelowMinimum: case *lnwire.FailAmountBelowMinimum:
processChannelUpdateAndRetry( r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge)
&onionErr.Update, failureSource,
)
return false, 0 return false, 0
// If we get a failure due to a fee, we'll apply the // If we get a failure due to a fee, we'll apply the
// new fee update, and retry our attempt using the // new fee update, and retry our attempt using the
// newly updated fees. // newly updated fees.
case *lnwire.FailFeeInsufficient: case *lnwire.FailFeeInsufficient:
processChannelUpdateAndRetry( r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge)
&onionErr.Update, failureSource,
)
return false, 0 return false, 0
// If we get the failure for an intermediate node that // If we get the failure for an intermediate node that
// disagrees with our time lock values, then we'll // disagrees with our time lock values, then we'll
// apply the new delta value and try it once more. // apply the new delta value and try it once more.
case *lnwire.FailIncorrectCltvExpiry: case *lnwire.FailIncorrectCltvExpiry:
processChannelUpdateAndRetry( r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge)
&onionErr.Update, failureSource,
)
return false, 0 return false, 0
// The outgoing channel that this node was meant to // The outgoing channel that this node was meant to
// forward one is currently disabled, so we'll apply // forward one is currently disabled, so we'll apply
// the update and continue. // the update and continue.
case *lnwire.FailChannelDisabled: case *lnwire.FailChannelDisabled:
r.applyChannelUpdate(&onionErr.Update, failureSource) r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0)
paySession.ReportEdgeFailure(failedEdge, 0)
return false, 0 return false, 0
// It's likely that the outgoing channel didn't have // It's likely that the outgoing channel didn't have
// sufficient capacity, so we'll prune this edge for // sufficient capacity, so we'll prune this edge for
// now, and continue onwards with our path finding. // now, and continue onwards with our path finding.
case *lnwire.FailTemporaryChannelFailure: case *lnwire.FailTemporaryChannelFailure:
r.applyChannelUpdate(onionErr.Update, failureSource) r.cfg.MissionControl.ReportEdgeFailure(failedEdge, failedAmt)
paySession.ReportEdgeFailure(failedEdge, failedAmt)
return false, 0 return false, 0
// If the send fail due to a node not having the // If the send fail due to a node not having the
// required features, then we'll note this error and // required features, then we'll note this error and
// continue. // continue.
case *lnwire.FailRequiredNodeFeatureMissing: case *lnwire.FailRequiredNodeFeatureMissing:
paySession.ReportVertexFailure(failureVertex) r.cfg.MissionControl.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
// If the send fail due to a node not having the // If the send fail due to a node not having the
// required features, then we'll note this error and // required features, then we'll note this error and
// continue. // continue.
case *lnwire.FailRequiredChannelFeatureMissing: case *lnwire.FailRequiredChannelFeatureMissing:
paySession.ReportVertexFailure(failureVertex) r.cfg.MissionControl.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
// If the next hop in the route wasn't known or // If the next hop in the route wasn't known or
@ -2037,18 +2054,18 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// returning errors in order to attempt to black list // returning errors in order to attempt to black list
// another node. // another node.
case *lnwire.FailUnknownNextPeer: case *lnwire.FailUnknownNextPeer:
paySession.ReportEdgeFailure(failedEdge, 0) r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0)
return false, 0 return false, 0
// If the node wasn't able to forward for which ever // If the node wasn't able to forward for which ever
// reason, then we'll note this and continue with the // reason, then we'll note this and continue with the
// routes. // routes.
case *lnwire.FailTemporaryNodeFailure: case *lnwire.FailTemporaryNodeFailure:
paySession.ReportVertexFailure(failureVertex) r.cfg.MissionControl.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
case *lnwire.FailPermanentNodeFailure: case *lnwire.FailPermanentNodeFailure:
paySession.ReportVertexFailure(failureVertex) r.cfg.MissionControl.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
// If we crafted a route that contains a too long time // If we crafted a route that contains a too long time
@ -2061,15 +2078,15 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// that as a hint during future path finding through // that as a hint during future path finding through
// that node. // that node.
case *lnwire.FailExpiryTooFar: case *lnwire.FailExpiryTooFar:
paySession.ReportVertexFailure(failureVertex) r.cfg.MissionControl.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
// If we get a permanent channel or node failure, then // If we get a permanent channel or node failure, then
// we'll prune the channel in both directions and // we'll prune the channel in both directions and
// continue with the rest of the routes. // continue with the rest of the routes.
case *lnwire.FailPermanentChannelFailure: case *lnwire.FailPermanentChannelFailure:
paySession.ReportEdgeFailure(failedEdge, 0) r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0)
paySession.ReportEdgeFailure(edge{ r.cfg.MissionControl.ReportEdgeFailure(edge{
from: failedEdge.to, from: failedEdge.to,
to: failedEdge.from, to: failedEdge.from,
channel: failedEdge.channel, channel: failedEdge.channel,
@ -2078,11 +2095,34 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// Any other failure or an empty failure will get the node pruned. // Any other failure or an empty failure will get the node pruned.
default: default:
paySession.ReportVertexFailure(failureVertex) r.cfg.MissionControl.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
} }
} }
// extractChannelUpdate examines the error and extracts the channel update.
func (r *ChannelRouter) extractChannelUpdate(
failure lnwire.FailureMessage) *lnwire.ChannelUpdate {
var update *lnwire.ChannelUpdate
switch onionErr := failure.(type) {
case *lnwire.FailExpiryTooSoon:
update = &onionErr.Update
case *lnwire.FailAmountBelowMinimum:
update = &onionErr.Update
case *lnwire.FailFeeInsufficient:
update = &onionErr.Update
case *lnwire.FailIncorrectCltvExpiry:
update = &onionErr.Update
case *lnwire.FailChannelDisabled:
update = &onionErr.Update
case *lnwire.FailTemporaryChannelFailure:
update = onionErr.Update
}
return update
}
// getFailedEdge tries to locate the failing channel given a route and the // getFailedEdge tries to locate the failing channel given a route and the
// pubkey of the node that sent the failure. It will assume that the failure is // pubkey of the node that sent the failure. It will assume that the failure is
// associated with the outgoing channel of the failing node. As a second result, // associated with the outgoing channel of the failing node. As a second result,
@ -2127,11 +2167,6 @@ func getFailedEdge(route *route.Route, failureSource int) (edge,
// database. It returns a bool indicating whether the updates was successful. // database. It returns a bool indicating whether the updates was successful.
func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate,
pubKey *btcec.PublicKey) bool { pubKey *btcec.PublicKey) bool {
// If we get passed a nil channel update (as it's optional with some
// onion errors), then we'll exit early with a success result.
if msg == nil {
return true
}
ch, _, _, err := r.GetChannelByID(msg.ShortChannelID) ch, _, _, err := r.GetChannelByID(msg.ShortChannelID)
if err != nil { if err != nil {
@ -2410,3 +2445,38 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error { func (r *ChannelRouter) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64()) return r.cfg.Graph.MarkEdgeLive(chanID.ToUint64())
} }
// generateBandwidthHints is a helper function that's utilized the main
// findPath function in order to obtain hints from the lower layer w.r.t to the
// available bandwidth of edges on the network. Currently, we'll only obtain
// bandwidth hints for the edges we directly have open ourselves. Obtaining
// these hints allows us to reduce the number of extraneous attempts as we can
// skip channels that are inactive, or just don't have enough bandwidth to
// carry the payment.
func generateBandwidthHints(sourceNode *channeldb.LightningNode,
queryBandwidth func(*channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi) (map[uint64]lnwire.MilliSatoshi, error) {
// First, we'll collect the set of outbound edges from the target
// source node.
var localChans []*channeldb.ChannelEdgeInfo
err := sourceNode.ForEachChannel(nil, func(tx *bbolt.Tx,
edgeInfo *channeldb.ChannelEdgeInfo,
_, _ *channeldb.ChannelEdgePolicy) error {
localChans = append(localChans, edgeInfo)
return nil
})
if err != nil {
return nil, err
}
// Now that we have all of our outbound edges, we'll populate the set
// of bandwidth hints, querying the lower switch layer for the most up
// to date values.
bandwidthHints := make(map[uint64]lnwire.MilliSatoshi)
for _, localChan := range localChans {
bandwidthHints[localChan.ChannelID] = queryBandwidth(localChan)
}
return bandwidthHints, nil
}

@ -91,17 +91,23 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
} }
mc := NewMissionControl( mc := NewMissionControl(
graphInstance.graph, selfNode,
func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
return lnwire.NewMSatFromSatoshis(e.Capacity)
},
&MissionControlConfig{ &MissionControlConfig{
MinRouteProbability: 0.01,
PaymentAttemptPenalty: 100,
PenaltyHalfLife: time.Hour, PenaltyHalfLife: time.Hour,
AprioriHopProbability: 0.9, AprioriHopProbability: 0.9,
}, },
) )
sessionSource := &SessionSource{
Graph: graphInstance.graph,
SelfNode: selfNode,
QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
return lnwire.NewMSatFromSatoshis(e.Capacity)
},
MinRouteProbability: 0.01,
PaymentAttemptPenalty: 100,
MissionControl: mc,
}
router, err := New(Config{ router, err := New(Config{
Graph: graphInstance.graph, Graph: graphInstance.graph,
Chain: chain, Chain: chain,
@ -109,6 +115,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
Payer: &mockPaymentAttemptDispatcher{}, Payer: &mockPaymentAttemptDispatcher{},
Control: makeMockControlTower(), Control: makeMockControlTower(),
MissionControl: mc, MissionControl: mc,
SessionSource: sessionSource,
ChannelPruneExpiry: time.Hour * 24, ChannelPruneExpiry: time.Hour * 24,
GraphPruneInterval: time.Hour * 2, GraphPruneInterval: time.Hour * 2,
QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
@ -2940,7 +2947,7 @@ func TestRouterPaymentStateMachine(t *testing.T) {
Chain: chain, Chain: chain,
ChainView: chainView, ChainView: chainView,
Control: control, Control: control,
MissionControl: &mockPaymentSessionSource{}, SessionSource: &mockPaymentSessionSource{},
Payer: payer, Payer: payer,
ChannelPruneExpiry: time.Hour * 24, ChannelPruneExpiry: time.Hour * 24,
GraphPruneInterval: time.Hour * 2, GraphPruneInterval: time.Hour * 2,
@ -3004,10 +3011,12 @@ func TestRouterPaymentStateMachine(t *testing.T) {
copy(preImage[:], bytes.Repeat([]byte{9}, 32)) copy(preImage[:], bytes.Repeat([]byte{9}, 32))
router.cfg.MissionControl = &mockPaymentSessionSource{ router.cfg.SessionSource = &mockPaymentSessionSource{
routes: test.routes, routes: test.routes,
} }
router.cfg.MissionControl = &mockMissionControl{}
// Send the payment. Since this is new payment hash, the // Send the payment. Since this is new payment hash, the
// information should be registered with the ControlTower. // information should be registered with the ControlTower.
paymentResult := make(chan error) paymentResult := make(chan error)

@ -652,11 +652,29 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
// //
// TODO(joostjager): When we are further in the process of moving to sub // TODO(joostjager): When we are further in the process of moving to sub
// servers, the mission control instance itself can be moved there too. // servers, the mission control instance itself can be moved there too.
routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC)
s.missionControl = routing.NewMissionControl( s.missionControl = routing.NewMissionControl(
chanGraph, selfNode, queryBandwidth, &routing.MissionControlConfig{
routerrpc.GetMissionControlConfig(cfg.SubRPCServers.RouterRPC), AprioriHopProbability: routingConfig.AprioriHopProbability,
PenaltyHalfLife: routingConfig.PenaltyHalfLife,
},
) )
srvrLog.Debugf("Instantiating payment session source with config: "+
"PaymentAttemptPenalty=%v, MinRouteProbability=%v",
int64(routingConfig.PaymentAttemptPenalty.ToSatoshis()),
routingConfig.MinRouteProbability)
paymentSessionSource := &routing.SessionSource{
Graph: chanGraph,
MissionControl: s.missionControl,
QueryBandwidth: queryBandwidth,
SelfNode: selfNode,
PaymentAttemptPenalty: routingConfig.PaymentAttemptPenalty,
MinRouteProbability: routingConfig.MinRouteProbability,
}
paymentControl := channeldb.NewPaymentControl(chanDB) paymentControl := channeldb.NewPaymentControl(chanDB)
s.controlTower = routing.NewControlTower(paymentControl) s.controlTower = routing.NewControlTower(paymentControl)
@ -668,6 +686,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
Payer: s.htlcSwitch, Payer: s.htlcSwitch,
Control: s.controlTower, Control: s.controlTower,
MissionControl: s.missionControl, MissionControl: s.missionControl,
SessionSource: paymentSessionSource,
ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, ChannelPruneExpiry: routing.DefaultChannelPruneExpiry,
GraphPruneInterval: time.Duration(time.Hour), GraphPruneInterval: time.Duration(time.Hour),
QueryBandwidth: queryBandwidth, QueryBandwidth: queryBandwidth,