Merge pull request #4113 from joostjager/mc-failure-overwrite

routing: minimum failure relaxation interval
This commit is contained in:
Joost Jager 2020-03-28 11:10:26 +01:00 committed by GitHub
commit a2336005e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 290 additions and 162 deletions

@ -50,6 +50,11 @@ const (
// DefaultAprioriWeight is the default a priori weight. See // DefaultAprioriWeight is the default a priori weight. See
// MissionControlConfig for further explanation. // MissionControlConfig for further explanation.
DefaultAprioriWeight = 0.5 DefaultAprioriWeight = 0.5
// DefaultMinFailureRelaxInterval is the default minimum time that must
// have passed since the previously recorded failure before the failure
// amount may be raised.
DefaultMinFailureRelaxInterval = time.Minute
) )
// NodeResults contains previous results from a node to its peers. // NodeResults contains previous results from a node to its peers.
@ -65,15 +70,9 @@ type NodeResults map[route.Vertex]TimedPairResult
// since the last failure is used to estimate a success probability that is fed // since the last failure is used to estimate a success probability that is fed
// into the path finding process for subsequent payment attempts. // into the path finding process for subsequent payment attempts.
type MissionControl struct { type MissionControl struct {
// lastPairResult tracks the last payment result (on a pair basis) for // state is the internal mission control state that is input for
// each transited node. This is a multi-layer map that allows us to look // probability estimation.
// up the failure history of all connected channels (node pairs) for a state *missionControlState
// particular node.
lastPairResult map[route.Vertex]NodeResults
// lastSecondChance tracks the last time a second chance was granted for
// a directed node pair.
lastSecondChance map[DirectedNodePair]time.Time
// now is expected to return the current time. It is supplied as an // now is expected to return the current time. It is supplied as an
// external function to enable deterministic unit tests. // external function to enable deterministic unit tests.
@ -119,6 +118,11 @@ type MissionControlConfig struct {
// results, unless there are none available. // results, unless there are none available.
AprioriWeight float64 AprioriWeight float64
// MinFailureRelaxInterval is the minimum time that must have passed
// since the previously recorded failure before the failure amount may
// be raised.
MinFailureRelaxInterval time.Duration
// SelfNode is our own pubkey. // SelfNode is our own pubkey.
SelfNode route.Vertex SelfNode route.Vertex
} }
@ -194,12 +198,11 @@ func NewMissionControl(db kvdb.Backend, cfg *MissionControlConfig) (
} }
mc := &MissionControl{ mc := &MissionControl{
lastPairResult: make(map[route.Vertex]NodeResults), state: newMissionControlState(cfg.MinFailureRelaxInterval),
lastSecondChance: make(map[DirectedNodePair]time.Time), now: time.Now,
now: time.Now, cfg: cfg,
cfg: cfg, store: store,
store: store, estimator: estimator,
estimator: estimator,
} }
if err := mc.init(); err != nil { if err := mc.init(); err != nil {
@ -240,8 +243,7 @@ func (m *MissionControl) ResetHistory() error {
return err return err
} }
m.lastPairResult = make(map[route.Vertex]NodeResults) m.state.resetHistory()
m.lastSecondChance = make(map[DirectedNodePair]time.Time)
log.Debugf("Mission control history cleared") log.Debugf("Mission control history cleared")
@ -257,7 +259,7 @@ func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
defer m.Unlock() defer m.Unlock()
now := m.now() now := m.now()
results := m.lastPairResult[fromNode] results, _ := m.state.getLastPairResult(fromNode)
// Use a distinct probability estimation function for local channels. // Use a distinct probability estimation function for local channels.
if fromNode == m.cfg.SelfNode { if fromNode == m.cfg.SelfNode {
@ -267,148 +269,15 @@ func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex,
return m.estimator.getPairProbability(now, results, toNode, amt) return m.estimator.getPairProbability(now, results, toNode, amt)
} }
// setLastPairResult stores a result for a node pair.
func (m *MissionControl) setLastPairResult(fromNode, toNode route.Vertex,
timestamp time.Time, result *pairResult) {
nodePairs, ok := m.lastPairResult[fromNode]
if !ok {
nodePairs = make(NodeResults)
m.lastPairResult[fromNode] = nodePairs
}
current := nodePairs[toNode]
// Apply the new result to the existing data for this pair. If there is
// no existing data, apply it to the default values for TimedPairResult.
if result.success {
successAmt := result.amt
current.SuccessTime = timestamp
// Only update the success amount if this amount is higher. This
// prevents the success range from shrinking when there is no
// reason to do so. For example: small amount probes shouldn't
// affect a previous success for a much larger amount.
if successAmt > current.SuccessAmt {
current.SuccessAmt = successAmt
}
// If the success amount goes into the failure range, move the
// failure range up. Future attempts up to the success amount
// are likely to succeed. We don't want to clear the failure
// completely, because we haven't learnt much for amounts above
// the current success amount.
if !current.FailTime.IsZero() && successAmt >= current.FailAmt {
current.FailAmt = successAmt + 1
}
} else {
// For failures we always want to update both the amount and the
// time. Those need to relate to the same result, because the
// time is used to gradually diminish the penality for that
// specific result. Updating the timestamp but not the amount
// could cause a failure for a lower amount (a more severe
// condition) to be revived as if it just happened.
failAmt := result.amt
current.FailTime = timestamp
current.FailAmt = failAmt
switch {
// The failure amount is set to zero when the failure is
// amount-independent, meaning that the attempt would have
// failed regardless of the amount. This should also reset the
// success amount to zero.
case failAmt == 0:
current.SuccessAmt = 0
// If the failure range goes into the success range, move the
// success range down.
case failAmt <= current.SuccessAmt:
current.SuccessAmt = failAmt - 1
}
}
log.Debugf("Setting %v->%v range to [%v-%v]",
fromNode, toNode, current.SuccessAmt, current.FailAmt)
nodePairs[toNode] = current
}
// setAllFail stores a fail result for all known connections to and from the
// given node.
func (m *MissionControl) setAllFail(node route.Vertex,
timestamp time.Time) {
for fromNode, nodePairs := range m.lastPairResult {
for toNode := range nodePairs {
if fromNode == node || toNode == node {
nodePairs[toNode] = TimedPairResult{
FailTime: timestamp,
}
}
}
}
}
// requestSecondChance checks whether the node fromNode can have a second chance
// at providing a channel update for its channel with toNode.
func (m *MissionControl) requestSecondChance(timestamp time.Time,
fromNode, toNode route.Vertex) bool {
// Look up previous second chance time.
pair := DirectedNodePair{
From: fromNode,
To: toNode,
}
lastSecondChance, ok := m.lastSecondChance[pair]
// If the channel hasn't already be given a second chance or its last
// second chance was long ago, we give it another chance.
if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval {
m.lastSecondChance[pair] = timestamp
log.Debugf("Second chance granted for %v->%v", fromNode, toNode)
return true
}
// Otherwise penalize the channel, because we don't allow channel
// updates that are that frequent. This is to prevent nodes from keeping
// us busy by continuously sending new channel updates.
log.Debugf("Second chance denied for %v->%v, remaining interval: %v",
fromNode, toNode, timestamp.Sub(lastSecondChance))
return false
}
// GetHistorySnapshot takes a snapshot from the current mission control state // GetHistorySnapshot takes a snapshot from the current mission control state
// and actual probability estimates. // and actual probability estimates.
func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
log.Debugf("Requesting history snapshot from mission control: "+ log.Debugf("Requesting history snapshot from mission control")
"pair_result_count=%v", len(m.lastPairResult))
pairs := make([]MissionControlPairSnapshot, 0, len(m.lastPairResult)) return m.state.getSnapshot()
for fromNode, fromPairs := range m.lastPairResult {
for toNode, result := range fromPairs {
pair := NewDirectedNodePair(fromNode, toNode)
pairSnapshot := MissionControlPairSnapshot{
Pair: pair,
TimedPairResult: result,
}
pairs = append(pairs, pairSnapshot)
}
}
snapshot := MissionControlSnapshot{
Pairs: pairs,
}
return &snapshot
} }
// GetPairHistorySnapshot returns the stored history for a given node pair. // GetPairHistorySnapshot returns the stored history for a given node pair.
@ -418,7 +287,7 @@ func (m *MissionControl) GetPairHistorySnapshot(
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
results, ok := m.lastPairResult[fromNode] results, ok := m.state.getLastPairResult(fromNode)
if !ok { if !ok {
return TimedPairResult{} return TimedPairResult{}
} }
@ -507,7 +376,7 @@ func (m *MissionControl) applyPaymentResult(
defer m.Unlock() defer m.Unlock()
if i.policyFailure != nil { if i.policyFailure != nil {
if m.requestSecondChance( if m.state.requestSecondChance(
result.timeReply, result.timeReply,
i.policyFailure.From, i.policyFailure.To, i.policyFailure.From, i.policyFailure.To,
) { ) {
@ -536,7 +405,7 @@ func (m *MissionControl) applyPaymentResult(
log.Debugf("Reporting node failure to Mission Control: "+ log.Debugf("Reporting node failure to Mission Control: "+
"node=%v", *i.nodeFailure) "node=%v", *i.nodeFailure)
m.setAllFail(*i.nodeFailure, result.timeReply) m.state.setAllFail(*i.nodeFailure, result.timeReply)
} }
for pair, pairResult := range i.pairResults { for pair, pairResult := range i.pairResults {
@ -552,7 +421,7 @@ func (m *MissionControl) applyPaymentResult(
pair, pairResult.amt) pair, pairResult.amt)
} }
m.setLastPairResult( m.state.setLastPairResult(
pair.From, pair.To, result.timeReply, &pairResult, pair.From, pair.To, result.timeReply, &pairResult,
) )
} }

@ -0,0 +1,211 @@
package routing
import (
"time"
"github.com/lightningnetwork/lnd/routing/route"
)
// missionControlState is an object that manages the internal mission control
// state. Note that it isn't thread safe and synchronization needs to be
// enforced externally.
type missionControlState struct {
// lastPairResult tracks the last payment result (on a pair basis) for
// each transited node. This is a multi-layer map that allows us to look
// up the failure history of all connected channels (node pairs) for a
// particular node.
lastPairResult map[route.Vertex]NodeResults
// lastSecondChance tracks the last time a second chance was granted for
// a directed node pair.
lastSecondChance map[DirectedNodePair]time.Time
// minFailureRelaxInterval is the minimum time that must have passed
// since the previously recorded failure before the failure amount may
// be raised.
minFailureRelaxInterval time.Duration
}
// newMissionControlState instantiates a new mission control state object.
func newMissionControlState(
minFailureRelaxInterval time.Duration) *missionControlState {
return &missionControlState{
lastPairResult: make(map[route.Vertex]NodeResults),
lastSecondChance: make(map[DirectedNodePair]time.Time),
minFailureRelaxInterval: minFailureRelaxInterval,
}
}
// getLastPairResult returns the current state for connections to the given node.
func (m *missionControlState) getLastPairResult(node route.Vertex) (NodeResults,
bool) {
result, ok := m.lastPairResult[node]
return result, ok
}
// ResetHistory resets the history of MissionControl returning it to a state as
// if no payment attempts have been made.
func (m *missionControlState) resetHistory() {
m.lastPairResult = make(map[route.Vertex]NodeResults)
m.lastSecondChance = make(map[DirectedNodePair]time.Time)
}
// setLastPairResult stores a result for a node pair.
func (m *missionControlState) setLastPairResult(fromNode, toNode route.Vertex,
timestamp time.Time, result *pairResult) {
nodePairs, ok := m.lastPairResult[fromNode]
if !ok {
nodePairs = make(NodeResults)
m.lastPairResult[fromNode] = nodePairs
}
current := nodePairs[toNode]
// Apply the new result to the existing data for this pair. If there is
// no existing data, apply it to the default values for TimedPairResult.
if result.success {
successAmt := result.amt
current.SuccessTime = timestamp
// Only update the success amount if this amount is higher. This
// prevents the success range from shrinking when there is no
// reason to do so. For example: small amount probes shouldn't
// affect a previous success for a much larger amount.
if successAmt > current.SuccessAmt {
current.SuccessAmt = successAmt
}
// If the success amount goes into the failure range, move the
// failure range up. Future attempts up to the success amount
// are likely to succeed. We don't want to clear the failure
// completely, because we haven't learnt much for amounts above
// the current success amount.
if !current.FailTime.IsZero() && successAmt >= current.FailAmt {
current.FailAmt = successAmt + 1
}
} else {
// For failures we always want to update both the amount and the
// time. Those need to relate to the same result, because the
// time is used to gradually diminish the penality for that
// specific result. Updating the timestamp but not the amount
// could cause a failure for a lower amount (a more severe
// condition) to be revived as if it just happened.
failAmt := result.amt
// Drop result if it would increase the failure amount too soon
// after a previous failure. This can happen if htlc results
// come in out of order. This check makes it easier for payment
// processes to converge to a final state.
failInterval := timestamp.Sub(current.FailTime)
if failAmt > current.FailAmt &&
failInterval < m.minFailureRelaxInterval {
log.Debugf("Ignoring higher amount failure within min "+
"failure relaxation interval: prev_fail_amt=%v, "+
"fail_amt=%v, interval=%v",
current.FailAmt, failAmt, failInterval)
return
}
current.FailTime = timestamp
current.FailAmt = failAmt
switch {
// The failure amount is set to zero when the failure is
// amount-independent, meaning that the attempt would have
// failed regardless of the amount. This should also reset the
// success amount to zero.
case failAmt == 0:
current.SuccessAmt = 0
// If the failure range goes into the success range, move the
// success range down.
case failAmt <= current.SuccessAmt:
current.SuccessAmt = failAmt - 1
}
}
log.Debugf("Setting %v->%v range to [%v-%v]",
fromNode, toNode, current.SuccessAmt, current.FailAmt)
nodePairs[toNode] = current
}
// setAllFail stores a fail result for all known connections to and from the
// given node.
func (m *missionControlState) setAllFail(node route.Vertex,
timestamp time.Time) {
for fromNode, nodePairs := range m.lastPairResult {
for toNode := range nodePairs {
if fromNode == node || toNode == node {
nodePairs[toNode] = TimedPairResult{
FailTime: timestamp,
}
}
}
}
}
// requestSecondChance checks whether the node fromNode can have a second chance
// at providing a channel update for its channel with toNode.
func (m *missionControlState) requestSecondChance(timestamp time.Time,
fromNode, toNode route.Vertex) bool {
// Look up previous second chance time.
pair := DirectedNodePair{
From: fromNode,
To: toNode,
}
lastSecondChance, ok := m.lastSecondChance[pair]
// If the channel hasn't already be given a second chance or its last
// second chance was long ago, we give it another chance.
if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval {
m.lastSecondChance[pair] = timestamp
log.Debugf("Second chance granted for %v->%v", fromNode, toNode)
return true
}
// Otherwise penalize the channel, because we don't allow channel
// updates that are that frequent. This is to prevent nodes from keeping
// us busy by continuously sending new channel updates.
log.Debugf("Second chance denied for %v->%v, remaining interval: %v",
fromNode, toNode, timestamp.Sub(lastSecondChance))
return false
}
// GetHistorySnapshot takes a snapshot from the current mission control state
// and actual probability estimates.
func (m *missionControlState) getSnapshot() *MissionControlSnapshot {
log.Debugf("Requesting history snapshot from mission control: "+
"pair_result_count=%v", len(m.lastPairResult))
pairs := make([]MissionControlPairSnapshot, 0, len(m.lastPairResult))
for fromNode, fromPairs := range m.lastPairResult {
for toNode, result := range fromPairs {
pair := NewDirectedNodePair(fromNode, toNode)
pairSnapshot := MissionControlPairSnapshot{
Pair: pair,
TimedPairResult: result,
}
pairs = append(pairs, pairSnapshot)
}
}
snapshot := MissionControlSnapshot{
Pairs: pairs,
}
return &snapshot
}

@ -0,0 +1,47 @@
package routing
import (
"testing"
"time"
"github.com/lightningnetwork/lnd/routing/route"
)
// TestMissionControlStateFailureResult tests setting failure results on the
// mission control state.
func TestMissionControlStateFailureResult(t *testing.T) {
const minFailureRelaxInterval = time.Minute
state := newMissionControlState(minFailureRelaxInterval)
var (
from = route.Vertex{1}
to = route.Vertex{2}
timestamp = testTime
)
// Report a 1000 sat failure.
state.setLastPairResult(from, to, timestamp, &pairResult{amt: 1000})
result, _ := state.getLastPairResult(from)
if result[to].FailAmt != 1000 {
t.Fatalf("unexpected fail amount %v", result[to].FailAmt)
}
// Report an 1100 sat failure one hour later. It is expected to
// overwrite the previous failure.
timestamp = timestamp.Add(time.Hour)
state.setLastPairResult(from, to, timestamp, &pairResult{amt: 1100})
result, _ = state.getLastPairResult(from)
if result[to].FailAmt != 1100 {
t.Fatalf("unexpected fail amount %v", result[to].FailAmt)
}
// Report a 1200 sat failure one second later. Because this increase of
// the failure amount is too soon after the previous failure, the result
// is not applied.
timestamp = timestamp.Add(time.Second)
state.setLastPairResult(from, to, timestamp, &pairResult{amt: 1200})
result, _ = state.getLastPairResult(from)
if result[to].FailAmt != 1100 {
t.Fatalf("unexpected fail amount %v", result[to].FailAmt)
}
}

@ -710,11 +710,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
s.missionControl, err = routing.NewMissionControl( s.missionControl, err = routing.NewMissionControl(
chanDB, chanDB,
&routing.MissionControlConfig{ &routing.MissionControlConfig{
AprioriHopProbability: routingConfig.AprioriHopProbability, AprioriHopProbability: routingConfig.AprioriHopProbability,
PenaltyHalfLife: routingConfig.PenaltyHalfLife, PenaltyHalfLife: routingConfig.PenaltyHalfLife,
MaxMcHistory: routingConfig.MaxMcHistory, MaxMcHistory: routingConfig.MaxMcHistory,
AprioriWeight: routingConfig.AprioriWeight, AprioriWeight: routingConfig.AprioriWeight,
SelfNode: selfNode.PubKeyBytes, SelfNode: selfNode.PubKeyBytes,
MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
}, },
) )
if err != nil { if err != nil {