routing: move second chance logic into mission control

If nodes return a channel policy related failure, they may get a second
chance. Our graph may not be up to date. Previously this logic was
contained in the payment session.

This commit moves that into global mission control and thereby removes
the last mission control state that was kept on the payment level.

Because mission control is not aware of the relation between payment
attempts and payments, the second chance logic is no longer based
tracking second chances given per payment.

Instead a time based approach is used. If a node reports a policy
failure that prevents forwarding to its peer, it will get a second
chance. But it will get it only if the previous second chance was
long enough ago.

Also those second chances are no longer dependent on whether an
associated channel update is valid. It will get the second chance
regardless, to prevent creating a dependency between mission control and
the graph. This would interfer with (future) replay of history, because
the graph may not be the same anymore at that point.
This commit is contained in:
Joost Jager 2019-06-26 08:39:34 +02:00
parent d31efddf1b
commit dc13da5abb
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
7 changed files with 213 additions and 113 deletions

@ -5,8 +5,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
@ -16,6 +14,30 @@ const (
// half-life duration defines after how much time a penalized node or // half-life duration defines after how much time a penalized node or
// channel is back at 50% probability. // channel is back at 50% probability.
DefaultPenaltyHalfLife = time.Hour DefaultPenaltyHalfLife = time.Hour
// minSecondChanceInterval is the minimum time required between
// second-chance failures.
//
// If nodes return a channel policy related failure, they may get a
// second chance to forward the payment. It could be that the channel
// policy that we are aware of is not up to date. This is especially
// important in case of mobile apps that are mostly offline.
//
// However, we don't want to give nodes the option to endlessly return
// new channel updates so that we are kept busy trying to route through
// that node until the payment loop times out.
//
// Therefore we only grant a second chance to a node if the previous
// second chance is sufficiently long ago. This is what
// minSecondChanceInterval defines. If a second policy failure comes in
// within that interval, we will apply a penalty.
//
// Second chances granted are tracked on the level of node pairs. This
// means that if a node has multiple channels to the same peer, they
// will only get a single second chance to route to that peer again.
// Nodes forward non-strict, so it isn't necessary to apply a less
// restrictive channel level tracking scheme here.
minSecondChanceInterval = time.Minute
) )
// MissionControl contains state which summarizes the past attempts of HTLC // MissionControl contains state which summarizes the past attempts of HTLC
@ -30,6 +52,10 @@ const (
type MissionControl struct { type MissionControl struct {
history map[route.Vertex]*nodeHistory history map[route.Vertex]*nodeHistory
// lastSecondChance tracks the last time a second chance was granted for
// a directed node pair.
lastSecondChance map[DirectedNodePair]time.Time
// now is expected to return the current time. It is supplied as an // now is expected to return the current time. It is supplied as an
// external function to enable deterministic unit tests. // external function to enable deterministic unit tests.
now func() time.Time now func() time.Time
@ -128,12 +154,12 @@ func NewMissionControl(cfg *MissionControlConfig) *MissionControl {
return &MissionControl{ return &MissionControl{
history: make(map[route.Vertex]*nodeHistory), history: make(map[route.Vertex]*nodeHistory),
lastSecondChance: make(map[DirectedNodePair]time.Time),
now: time.Now, now: time.Now,
cfg: cfg, cfg: cfg,
} }
} }
// ResetHistory resets the history of MissionControl returning it to a state as // ResetHistory resets the history of MissionControl returning it to a state as
// if no payment attempts have been made. // if no payment attempts have been made.
func (m *MissionControl) ResetHistory() { func (m *MissionControl) ResetHistory() {
@ -141,6 +167,7 @@ func (m *MissionControl) ResetHistory() {
defer m.Unlock() defer m.Unlock()
m.history = make(map[route.Vertex]*nodeHistory) m.history = make(map[route.Vertex]*nodeHistory)
m.lastSecondChance = make(map[DirectedNodePair]time.Time)
log.Debugf("Mission control history cleared") log.Debugf("Mission control history cleared")
} }
@ -209,6 +236,37 @@ func (m *MissionControl) getEdgeProbabilityForNode(nodeHistory *nodeHistory,
return probability return probability
} }
// requestSecondChance checks whether the node fromNode can have a second chance
// at providing a channel update for its channel with toNode.
func (m *MissionControl) requestSecondChance(timestamp time.Time,
fromNode, toNode route.Vertex) bool {
// Look up previous second chance time.
pair := DirectedNodePair{
From: fromNode,
To: toNode,
}
lastSecondChance, ok := m.lastSecondChance[pair]
// If the channel hasn't already be given a second chance or its last
// second chance was long ago, we give it another chance.
if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval {
m.lastSecondChance[pair] = timestamp
log.Debugf("Second chance granted for %v->%v", fromNode, toNode)
return true
}
// Otherwise penalize the channel, because we don't allow channel
// updates that are that frequent. This is to prevent nodes from keeping
// us busy by continuously sending new channel updates.
log.Debugf("Second chance denied for %v->%v, remaining interval: %v",
fromNode, toNode, timestamp.Sub(lastSecondChance))
return false
}
// createHistoryIfNotExists returns the history for the given node. If the node // createHistoryIfNotExists returns the history for the given node. If the node
// is yet unknown, it will create an empty history structure. // is yet unknown, it will create an empty history structure.
func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory { func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHistory {
@ -237,6 +295,26 @@ func (m *MissionControl) ReportVertexFailure(v route.Vertex) {
history.lastFail = &now history.lastFail = &now
} }
// ReportEdgePolicyFailure reports a policy related failure.
func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) {
now := m.now()
m.Lock()
defer m.Unlock()
// We may have an out of date graph. Therefore we don't always penalize
// immediately. If some time has passed since the last policy failure,
// we grant the node a second chance at forwarding the payment.
if m.requestSecondChance(
now, failedEdge.from, failedEdge.to,
) {
return
}
history := m.createHistoryIfNotExists(failedEdge.from)
history.lastFail = &now
}
// ReportEdgeFailure reports a channel level failure. // ReportEdgeFailure reports a channel level failure.
// //
// TODO(roasbeef): also add value attempted to send and capacity of channel // TODO(roasbeef): also add value attempted to send and capacity of channel

@ -106,3 +106,25 @@ func TestMissionControl(t *testing.T) {
t.Fatal("unexpected number of channels") t.Fatal("unexpected number of channels")
} }
} }
// TestMissionControlChannelUpdate tests that the first channel update is not
// penalizing the channel yet.
func TestMissionControlChannelUpdate(t *testing.T) {
ctx := createMcTestContext(t)
testEdge := edge{
channel: 123,
}
// Report a policy related failure. Because it is the first, we don't
// expect a penalty.
ctx.mc.ReportEdgePolicyFailure(testEdge)
ctx.expectP(0, 0.8)
// Report another failure for the same channel. We expect it to be
// pruned.
ctx.mc.ReportEdgePolicyFailure(testEdge)
ctx.expectP(0, 0)
}

@ -102,6 +102,8 @@ func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi) { minPenalizeAmt lnwire.MilliSatoshi) {
} }
func (m *mockMissionControl) ReportEdgePolicyFailure(failedEdge edge) {}
func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {} func (m *mockMissionControl) ReportVertexFailure(v route.Vertex) {}
func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator, func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, edge EdgeLocator,

10
routing/nodepair.go Normal file

@ -0,0 +1,10 @@
package routing
import (
"github.com/lightningnetwork/lnd/routing/route"
)
// DirectedNodePair stores a directed pair of nodes.
type DirectedNodePair struct {
From, To route.Vertex
}

@ -52,12 +52,6 @@ type paymentSession struct {
bandwidthHints map[uint64]lnwire.MilliSatoshi bandwidthHints map[uint64]lnwire.MilliSatoshi
// errFailedFeeChans is a map of the short channel IDs that were the
// source of policy related routing failures during this payment attempt.
// We'll use this map to prune out channels when the first error may not
// require pruning, but any subsequent ones do.
errFailedPolicyChans map[nodeChannel]struct{}
sessionSource *SessionSource sessionSource *SessionSource
preBuiltRoute *route.Route preBuiltRoute *route.Route
@ -109,25 +103,7 @@ func (p *paymentSession) ReportEdgeFailure(failedEdge edge,
// //
// TODO(joostjager): Move this logic into global mission control. // TODO(joostjager): Move this logic into global mission control.
func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) { func (p *paymentSession) ReportEdgePolicyFailure(failedEdge edge) {
key := nodeChannel{ p.sessionSource.MissionControl.ReportEdgePolicyFailure(failedEdge)
node: failedEdge.from,
channel: failedEdge.channel,
}
// Check to see if we've already reported a policy related failure for
// this channel. If so, then we'll prune out the vertex.
_, ok := p.errFailedPolicyChans[key]
if ok {
// TODO(joostjager): is this aggressive pruning still necessary?
// Just pruning edges may also work unless there is a huge
// number of failing channels from that node?
p.ReportVertexFailure(key.node)
return
}
// Finally, we'll record a policy failure from this node and move on.
p.errFailedPolicyChans[key] = struct{}{}
} }
// RequestRoute returns a route which is likely to be capable for successfully // RequestRoute returns a route which is likely to be capable for successfully

@ -121,7 +121,6 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint,
return &paymentSession{ return &paymentSession{
additionalEdges: edges, additionalEdges: edges,
bandwidthHints: bandwidthHints, bandwidthHints: bandwidthHints,
errFailedPolicyChans: make(map[nodeChannel]struct{}),
sessionSource: m, sessionSource: m,
pathFinder: findPath, pathFinder: findPath,
}, nil }, nil
@ -131,7 +130,6 @@ func (m *SessionSource) NewPaymentSession(routeHints [][]zpay32.HopHint,
// used for failure reporting to missioncontrol. // used for failure reporting to missioncontrol.
func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession { func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession {
return &paymentSession{ return &paymentSession{
errFailedPolicyChans: make(map[nodeChannel]struct{}),
sessionSource: m, sessionSource: m,
preBuiltRoute: preBuiltRoute, preBuiltRoute: preBuiltRoute,
} }
@ -142,7 +140,6 @@ func (m *SessionSource) NewPaymentSessionForRoute(preBuiltRoute *route.Route) Pa
// missioncontrol for resumed payment we don't want to make more attempts for. // missioncontrol for resumed payment we don't want to make more attempts for.
func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession { func (m *SessionSource) NewPaymentSessionEmpty() PaymentSession {
return &paymentSession{ return &paymentSession{
errFailedPolicyChans: make(map[nodeChannel]struct{}),
sessionSource: m, sessionSource: m,
preBuiltRoute: &route.Route{}, preBuiltRoute: &route.Route{},
preBuiltRouteTried: true, preBuiltRouteTried: true,

@ -178,6 +178,9 @@ type MissionController interface {
ReportEdgeFailure(failedEdge edge, ReportEdgeFailure(failedEdge edge,
minPenalizeAmt lnwire.MilliSatoshi) minPenalizeAmt lnwire.MilliSatoshi)
// ReportEdgePolicyFailure reports a policy related failure.
ReportEdgePolicyFailure(failedEdge edge)
// ReportVertexFailure reports a node level failure. // ReportVertexFailure reports a node level failure.
ReportVertexFailure(v route.Vertex) ReportVertexFailure(v route.Vertex)
@ -1826,6 +1829,47 @@ func (r *ChannelRouter) sendPayment(
} }
// tryApplyChannelUpdate tries to apply a channel update present in the failure
// message if any.
func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route,
errorSourceIdx int, failure lnwire.FailureMessage) error {
// It makes no sense to apply our own channel updates.
if errorSourceIdx == 0 {
log.Errorf("Channel update of ourselves received")
return nil
}
// Extract channel update if the error contains one.
update := r.extractChannelUpdate(failure)
if update == nil {
return nil
}
// Parse pubkey to allow validation of the channel update. This should
// always succeed, otherwise there is something wrong in our
// implementation. Therefore return an error.
errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes
errSource, err := btcec.ParsePubKey(
errVertex[:], btcec.S256(),
)
if err != nil {
log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v",
errorSourceIdx, errVertex)
return err
}
// Apply channel update.
if !r.applyChannelUpdate(update, errSource) {
log.Debugf("Invalid channel update received: node=%x",
errVertex)
}
return nil
}
// processSendError analyzes the error for the payment attempt received from the // processSendError analyzes the error for the payment attempt received from the
// switch and updates mission control and/or channel policies. Depending on the // switch and updates mission control and/or channel policies. Depending on the
// error type, this error is either the final outcome of the payment or we need // error type, this error is either the final outcome of the payment or we need
@ -1851,32 +1895,28 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
return true, channeldb.FailureReasonError return true, channeldb.FailureReasonError
} }
var ( failureMessage := fErr.FailureMessage
failureSourceIdx = fErr.FailureSourceIdx failureSourceIdx := fErr.FailureSourceIdx
failureVertex route.Vertex // Apply channel update if the error contains one. For unknown
failureSource *btcec.PublicKey // failures, failureMessage is nil.
err error if failureMessage != nil {
err := r.tryApplyChannelUpdate(
rt, failureSourceIdx, failureMessage,
) )
if err != nil {
return true, channeldb.FailureReasonError
}
}
var failureVertex route.Vertex
// For any non-self failure, look up the source pub key in the hops // For any non-self failure, look up the source pub key in the hops
// slice. Otherwise return the self node pubkey. // slice. Otherwise return the self node pubkey.
if failureSourceIdx > 0 { if failureSourceIdx > 0 {
failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes
failureSource, err = btcec.ParsePubKey(failureVertex[:], btcec.S256())
if err != nil {
log.Errorf("Cannot parse pubkey %v: %v",
failureVertex, err)
return true, channeldb.FailureReasonError
}
} else { } else {
failureVertex = r.selfNode.PubKeyBytes failureVertex = r.selfNode.PubKeyBytes
failureSource, err = r.selfNode.PubKey()
if err != nil {
log.Errorf("Cannot parse self pubkey: %v", err)
return true, channeldb.FailureReasonError
}
} }
log.Tracef("Node %x (index %v) reported failure when sending htlc", log.Tracef("Node %x (index %v) reported failure when sending htlc",
failureVertex, failureSourceIdx) failureVertex, failureSourceIdx)
@ -1885,41 +1925,7 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// update with id may not be available. // update with id may not be available.
failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx)
// processChannelUpdateAndRetry is a closure that switch fErr.FailureMessage.(type) {
// handles a failure message containing a channel
// update. This function always tries to apply the
// channel update and passes on the result to the
// payment session to adjust its view on the reliability
// of the network.
//
// As channel id, the locally determined channel id is
// used. It does not rely on the channel id that is part
// of the channel update message, because the remote
// node may lie to us or the update may be corrupt.
processChannelUpdateAndRetry := func(
update *lnwire.ChannelUpdate,
pubKey *btcec.PublicKey) {
// Try to apply the channel update.
updateOk := r.applyChannelUpdate(update, pubKey)
// If the update could not be applied, prune the
// edge. There is no reason to continue trying
// this channel.
//
// TODO: Could even prune the node completely?
// Or is there a valid reason for the channel
// update to fail?
if !updateOk {
paySession.ReportEdgeFailure(
failedEdge, 0,
)
}
paySession.ReportEdgePolicyFailure(failedEdge)
}
switch onionErr := fErr.FailureMessage.(type) {
// If the end destination didn't know the payment // If the end destination didn't know the payment
// hash or we sent the wrong payment amount to the // hash or we sent the wrong payment amount to the
@ -1975,7 +1981,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// that sent us this error, as it doesn't now what the // that sent us this error, as it doesn't now what the
// correct block height is. // correct block height is.
case *lnwire.FailExpiryTooSoon: case *lnwire.FailExpiryTooSoon:
r.applyChannelUpdate(&onionErr.Update, failureSource)
paySession.ReportVertexFailure(failureVertex) paySession.ReportVertexFailure(failureVertex)
return false, 0 return false, 0
@ -1996,34 +2001,27 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// amount, we'll apply the new minimum amount and retry // amount, we'll apply the new minimum amount and retry
// routing. // routing.
case *lnwire.FailAmountBelowMinimum: case *lnwire.FailAmountBelowMinimum:
processChannelUpdateAndRetry( paySession.ReportEdgePolicyFailure(failedEdge)
&onionErr.Update, failureSource,
)
return false, 0 return false, 0
// If we get a failure due to a fee, we'll apply the // If we get a failure due to a fee, we'll apply the
// new fee update, and retry our attempt using the // new fee update, and retry our attempt using the
// newly updated fees. // newly updated fees.
case *lnwire.FailFeeInsufficient: case *lnwire.FailFeeInsufficient:
processChannelUpdateAndRetry( paySession.ReportEdgePolicyFailure(failedEdge)
&onionErr.Update, failureSource,
)
return false, 0 return false, 0
// If we get the failure for an intermediate node that // If we get the failure for an intermediate node that
// disagrees with our time lock values, then we'll // disagrees with our time lock values, then we'll
// apply the new delta value and try it once more. // apply the new delta value and try it once more.
case *lnwire.FailIncorrectCltvExpiry: case *lnwire.FailIncorrectCltvExpiry:
processChannelUpdateAndRetry( paySession.ReportEdgePolicyFailure(failedEdge)
&onionErr.Update, failureSource,
)
return false, 0 return false, 0
// The outgoing channel that this node was meant to // The outgoing channel that this node was meant to
// forward one is currently disabled, so we'll apply // forward one is currently disabled, so we'll apply
// the update and continue. // the update and continue.
case *lnwire.FailChannelDisabled: case *lnwire.FailChannelDisabled:
r.applyChannelUpdate(&onionErr.Update, failureSource)
paySession.ReportEdgeFailure(failedEdge, 0) paySession.ReportEdgeFailure(failedEdge, 0)
return false, 0 return false, 0
@ -2031,7 +2029,6 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
// sufficient capacity, so we'll prune this edge for // sufficient capacity, so we'll prune this edge for
// now, and continue onwards with our path finding. // now, and continue onwards with our path finding.
case *lnwire.FailTemporaryChannelFailure: case *lnwire.FailTemporaryChannelFailure:
r.applyChannelUpdate(onionErr.Update, failureSource)
paySession.ReportEdgeFailure(failedEdge, failedAmt) paySession.ReportEdgeFailure(failedEdge, failedAmt)
return false, 0 return false, 0
@ -2103,6 +2100,29 @@ func (r *ChannelRouter) processSendError(paySession PaymentSession,
} }
} }
// extractChannelUpdate examines the error and extracts the channel update.
func (r *ChannelRouter) extractChannelUpdate(
failure lnwire.FailureMessage) *lnwire.ChannelUpdate {
var update *lnwire.ChannelUpdate
switch onionErr := failure.(type) {
case *lnwire.FailExpiryTooSoon:
update = &onionErr.Update
case *lnwire.FailAmountBelowMinimum:
update = &onionErr.Update
case *lnwire.FailFeeInsufficient:
update = &onionErr.Update
case *lnwire.FailIncorrectCltvExpiry:
update = &onionErr.Update
case *lnwire.FailChannelDisabled:
update = &onionErr.Update
case *lnwire.FailTemporaryChannelFailure:
update = onionErr.Update
}
return update
}
// getFailedEdge tries to locate the failing channel given a route and the // getFailedEdge tries to locate the failing channel given a route and the
// pubkey of the node that sent the failure. It will assume that the failure is // pubkey of the node that sent the failure. It will assume that the failure is
// associated with the outgoing channel of the failing node. As a second result, // associated with the outgoing channel of the failing node. As a second result,
@ -2147,11 +2167,6 @@ func getFailedEdge(route *route.Route, failureSource int) (edge,
// database. It returns a bool indicating whether the updates was successful. // database. It returns a bool indicating whether the updates was successful.
func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate,
pubKey *btcec.PublicKey) bool { pubKey *btcec.PublicKey) bool {
// If we get passed a nil channel update (as it's optional with some
// onion errors), then we'll exit early with a success result.
if msg == nil {
return true
}
ch, _, _, err := r.GetChannelByID(msg.ShortChannelID) ch, _, _, err := r.GetChannelByID(msg.ShortChannelID)
if err != nil { if err != nil {