From 6a36ed44f856ebd5a3b0731f0eea10f793a43c7f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 24 Mar 2020 16:26:48 +0100 Subject: [PATCH 1/2] routing: extract mission control state Preparation for unit testing just the state logic. --- routing/missioncontrol.go | 173 +++-------------------------- routing/missioncontrol_state.go | 187 ++++++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+), 157 deletions(-) create mode 100644 routing/missioncontrol_state.go diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index a590f20a..963ea886 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -65,15 +65,9 @@ type NodeResults map[route.Vertex]TimedPairResult // since the last failure is used to estimate a success probability that is fed // into the path finding process for subsequent payment attempts. type MissionControl struct { - // lastPairResult tracks the last payment result (on a pair basis) for - // each transited node. This is a multi-layer map that allows us to look - // up the failure history of all connected channels (node pairs) for a - // particular node. - lastPairResult map[route.Vertex]NodeResults - - // lastSecondChance tracks the last time a second chance was granted for - // a directed node pair. - lastSecondChance map[DirectedNodePair]time.Time + // state is the internal mission control state that is input for + // probability estimation. + state *missionControlState // now is expected to return the current time. It is supplied as an // external function to enable deterministic unit tests. @@ -194,12 +188,11 @@ func NewMissionControl(db kvdb.Backend, cfg *MissionControlConfig) ( } mc := &MissionControl{ - lastPairResult: make(map[route.Vertex]NodeResults), - lastSecondChance: make(map[DirectedNodePair]time.Time), - now: time.Now, - cfg: cfg, - store: store, - estimator: estimator, + state: newMissionControlState(), + now: time.Now, + cfg: cfg, + store: store, + estimator: estimator, } if err := mc.init(); err != nil { @@ -240,8 +233,7 @@ func (m *MissionControl) ResetHistory() error { return err } - m.lastPairResult = make(map[route.Vertex]NodeResults) - m.lastSecondChance = make(map[DirectedNodePair]time.Time) + m.state.resetHistory() log.Debugf("Mission control history cleared") @@ -257,7 +249,7 @@ func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex, defer m.Unlock() now := m.now() - results := m.lastPairResult[fromNode] + results, _ := m.state.getLastPairResult(fromNode) // Use a distinct probability estimation function for local channels. if fromNode == m.cfg.SelfNode { @@ -267,148 +259,15 @@ func (m *MissionControl) GetProbability(fromNode, toNode route.Vertex, return m.estimator.getPairProbability(now, results, toNode, amt) } -// setLastPairResult stores a result for a node pair. -func (m *MissionControl) setLastPairResult(fromNode, toNode route.Vertex, - timestamp time.Time, result *pairResult) { - - nodePairs, ok := m.lastPairResult[fromNode] - if !ok { - nodePairs = make(NodeResults) - m.lastPairResult[fromNode] = nodePairs - } - - current := nodePairs[toNode] - - // Apply the new result to the existing data for this pair. If there is - // no existing data, apply it to the default values for TimedPairResult. - if result.success { - successAmt := result.amt - current.SuccessTime = timestamp - - // Only update the success amount if this amount is higher. This - // prevents the success range from shrinking when there is no - // reason to do so. For example: small amount probes shouldn't - // affect a previous success for a much larger amount. - if successAmt > current.SuccessAmt { - current.SuccessAmt = successAmt - } - - // If the success amount goes into the failure range, move the - // failure range up. Future attempts up to the success amount - // are likely to succeed. We don't want to clear the failure - // completely, because we haven't learnt much for amounts above - // the current success amount. - if !current.FailTime.IsZero() && successAmt >= current.FailAmt { - current.FailAmt = successAmt + 1 - } - } else { - // For failures we always want to update both the amount and the - // time. Those need to relate to the same result, because the - // time is used to gradually diminish the penality for that - // specific result. Updating the timestamp but not the amount - // could cause a failure for a lower amount (a more severe - // condition) to be revived as if it just happened. - failAmt := result.amt - current.FailTime = timestamp - current.FailAmt = failAmt - - switch { - // The failure amount is set to zero when the failure is - // amount-independent, meaning that the attempt would have - // failed regardless of the amount. This should also reset the - // success amount to zero. - case failAmt == 0: - current.SuccessAmt = 0 - - // If the failure range goes into the success range, move the - // success range down. - case failAmt <= current.SuccessAmt: - current.SuccessAmt = failAmt - 1 - } - } - - log.Debugf("Setting %v->%v range to [%v-%v]", - fromNode, toNode, current.SuccessAmt, current.FailAmt) - - nodePairs[toNode] = current -} - -// setAllFail stores a fail result for all known connections to and from the -// given node. -func (m *MissionControl) setAllFail(node route.Vertex, - timestamp time.Time) { - - for fromNode, nodePairs := range m.lastPairResult { - for toNode := range nodePairs { - if fromNode == node || toNode == node { - nodePairs[toNode] = TimedPairResult{ - FailTime: timestamp, - } - } - } - } -} - -// requestSecondChance checks whether the node fromNode can have a second chance -// at providing a channel update for its channel with toNode. -func (m *MissionControl) requestSecondChance(timestamp time.Time, - fromNode, toNode route.Vertex) bool { - - // Look up previous second chance time. - pair := DirectedNodePair{ - From: fromNode, - To: toNode, - } - lastSecondChance, ok := m.lastSecondChance[pair] - - // If the channel hasn't already be given a second chance or its last - // second chance was long ago, we give it another chance. - if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval { - m.lastSecondChance[pair] = timestamp - - log.Debugf("Second chance granted for %v->%v", fromNode, toNode) - - return true - } - - // Otherwise penalize the channel, because we don't allow channel - // updates that are that frequent. This is to prevent nodes from keeping - // us busy by continuously sending new channel updates. - log.Debugf("Second chance denied for %v->%v, remaining interval: %v", - fromNode, toNode, timestamp.Sub(lastSecondChance)) - - return false -} - // GetHistorySnapshot takes a snapshot from the current mission control state // and actual probability estimates. func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { m.Lock() defer m.Unlock() - log.Debugf("Requesting history snapshot from mission control: "+ - "pair_result_count=%v", len(m.lastPairResult)) + log.Debugf("Requesting history snapshot from mission control") - pairs := make([]MissionControlPairSnapshot, 0, len(m.lastPairResult)) - - for fromNode, fromPairs := range m.lastPairResult { - for toNode, result := range fromPairs { - pair := NewDirectedNodePair(fromNode, toNode) - - pairSnapshot := MissionControlPairSnapshot{ - Pair: pair, - TimedPairResult: result, - } - - pairs = append(pairs, pairSnapshot) - } - } - - snapshot := MissionControlSnapshot{ - Pairs: pairs, - } - - return &snapshot + return m.state.getSnapshot() } // GetPairHistorySnapshot returns the stored history for a given node pair. @@ -418,7 +277,7 @@ func (m *MissionControl) GetPairHistorySnapshot( m.Lock() defer m.Unlock() - results, ok := m.lastPairResult[fromNode] + results, ok := m.state.getLastPairResult(fromNode) if !ok { return TimedPairResult{} } @@ -507,7 +366,7 @@ func (m *MissionControl) applyPaymentResult( defer m.Unlock() if i.policyFailure != nil { - if m.requestSecondChance( + if m.state.requestSecondChance( result.timeReply, i.policyFailure.From, i.policyFailure.To, ) { @@ -536,7 +395,7 @@ func (m *MissionControl) applyPaymentResult( log.Debugf("Reporting node failure to Mission Control: "+ "node=%v", *i.nodeFailure) - m.setAllFail(*i.nodeFailure, result.timeReply) + m.state.setAllFail(*i.nodeFailure, result.timeReply) } for pair, pairResult := range i.pairResults { @@ -551,7 +410,7 @@ func (m *MissionControl) applyPaymentResult( pair, pairResult.amt) } - m.setLastPairResult( + m.state.setLastPairResult( pair.From, pair.To, result.timeReply, &pairResult, ) } diff --git a/routing/missioncontrol_state.go b/routing/missioncontrol_state.go new file mode 100644 index 00000000..9b342f6d --- /dev/null +++ b/routing/missioncontrol_state.go @@ -0,0 +1,187 @@ +package routing + +import ( + "time" + + "github.com/lightningnetwork/lnd/routing/route" +) + +// missionControlState is an object that manages the internal mission control +// state. Note that it isn't thread safe and synchronization needs to be +// enforced externally. +type missionControlState struct { + // lastPairResult tracks the last payment result (on a pair basis) for + // each transited node. This is a multi-layer map that allows us to look + // up the failure history of all connected channels (node pairs) for a + // particular node. + lastPairResult map[route.Vertex]NodeResults + + // lastSecondChance tracks the last time a second chance was granted for + // a directed node pair. + lastSecondChance map[DirectedNodePair]time.Time +} + +// newMissionControlState instantiates a new mission control state object. +func newMissionControlState() *missionControlState { + return &missionControlState{ + lastPairResult: make(map[route.Vertex]NodeResults), + lastSecondChance: make(map[DirectedNodePair]time.Time), + } +} + +// getLastPairResult returns the current state for connections to the given node. +func (m *missionControlState) getLastPairResult(node route.Vertex) (NodeResults, + bool) { + + result, ok := m.lastPairResult[node] + return result, ok +} + +// ResetHistory resets the history of MissionControl returning it to a state as +// if no payment attempts have been made. +func (m *missionControlState) resetHistory() { + m.lastPairResult = make(map[route.Vertex]NodeResults) + m.lastSecondChance = make(map[DirectedNodePair]time.Time) +} + +// setLastPairResult stores a result for a node pair. +func (m *missionControlState) setLastPairResult(fromNode, toNode route.Vertex, + timestamp time.Time, result *pairResult) { + + nodePairs, ok := m.lastPairResult[fromNode] + if !ok { + nodePairs = make(NodeResults) + m.lastPairResult[fromNode] = nodePairs + } + + current := nodePairs[toNode] + + // Apply the new result to the existing data for this pair. If there is + // no existing data, apply it to the default values for TimedPairResult. + if result.success { + successAmt := result.amt + current.SuccessTime = timestamp + + // Only update the success amount if this amount is higher. This + // prevents the success range from shrinking when there is no + // reason to do so. For example: small amount probes shouldn't + // affect a previous success for a much larger amount. + if successAmt > current.SuccessAmt { + current.SuccessAmt = successAmt + } + + // If the success amount goes into the failure range, move the + // failure range up. Future attempts up to the success amount + // are likely to succeed. We don't want to clear the failure + // completely, because we haven't learnt much for amounts above + // the current success amount. + if !current.FailTime.IsZero() && successAmt >= current.FailAmt { + current.FailAmt = successAmt + 1 + } + } else { + // For failures we always want to update both the amount and the + // time. Those need to relate to the same result, because the + // time is used to gradually diminish the penality for that + // specific result. Updating the timestamp but not the amount + // could cause a failure for a lower amount (a more severe + // condition) to be revived as if it just happened. + failAmt := result.amt + + current.FailTime = timestamp + current.FailAmt = failAmt + + switch { + // The failure amount is set to zero when the failure is + // amount-independent, meaning that the attempt would have + // failed regardless of the amount. This should also reset the + // success amount to zero. + case failAmt == 0: + current.SuccessAmt = 0 + + // If the failure range goes into the success range, move the + // success range down. + case failAmt <= current.SuccessAmt: + current.SuccessAmt = failAmt - 1 + } + } + + log.Debugf("Setting %v->%v range to [%v-%v]", + fromNode, toNode, current.SuccessAmt, current.FailAmt) + + nodePairs[toNode] = current +} + +// setAllFail stores a fail result for all known connections to and from the +// given node. +func (m *missionControlState) setAllFail(node route.Vertex, + timestamp time.Time) { + + for fromNode, nodePairs := range m.lastPairResult { + for toNode := range nodePairs { + if fromNode == node || toNode == node { + nodePairs[toNode] = TimedPairResult{ + FailTime: timestamp, + } + } + } + } +} + +// requestSecondChance checks whether the node fromNode can have a second chance +// at providing a channel update for its channel with toNode. +func (m *missionControlState) requestSecondChance(timestamp time.Time, + fromNode, toNode route.Vertex) bool { + + // Look up previous second chance time. + pair := DirectedNodePair{ + From: fromNode, + To: toNode, + } + lastSecondChance, ok := m.lastSecondChance[pair] + + // If the channel hasn't already be given a second chance or its last + // second chance was long ago, we give it another chance. + if !ok || timestamp.Sub(lastSecondChance) > minSecondChanceInterval { + m.lastSecondChance[pair] = timestamp + + log.Debugf("Second chance granted for %v->%v", fromNode, toNode) + + return true + } + + // Otherwise penalize the channel, because we don't allow channel + // updates that are that frequent. This is to prevent nodes from keeping + // us busy by continuously sending new channel updates. + log.Debugf("Second chance denied for %v->%v, remaining interval: %v", + fromNode, toNode, timestamp.Sub(lastSecondChance)) + + return false +} + +// GetHistorySnapshot takes a snapshot from the current mission control state +// and actual probability estimates. +func (m *missionControlState) getSnapshot() *MissionControlSnapshot { + log.Debugf("Requesting history snapshot from mission control: "+ + "pair_result_count=%v", len(m.lastPairResult)) + + pairs := make([]MissionControlPairSnapshot, 0, len(m.lastPairResult)) + + for fromNode, fromPairs := range m.lastPairResult { + for toNode, result := range fromPairs { + pair := NewDirectedNodePair(fromNode, toNode) + + pairSnapshot := MissionControlPairSnapshot{ + Pair: pair, + TimedPairResult: result, + } + + pairs = append(pairs, pairSnapshot) + } + } + + snapshot := MissionControlSnapshot{ + Pairs: pairs, + } + + return &snapshot +} From 1a6b28553adfbf5e929e273522c4513fbedaed38 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 23 Mar 2020 19:59:16 +0100 Subject: [PATCH 2/2] routing: stricter mission control state failure updates This commit puts a mechanism in place to prevent a failure for a low amount from being overwritten very soon after by a higher amount failure. --- routing/missioncontrol.go | 12 ++++++- routing/missioncontrol_state.go | 30 ++++++++++++++++-- routing/missioncontrol_state_test.go | 47 ++++++++++++++++++++++++++++ server.go | 11 ++++--- 4 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 routing/missioncontrol_state_test.go diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 963ea886..da3f38b6 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -50,6 +50,11 @@ const ( // DefaultAprioriWeight is the default a priori weight. See // MissionControlConfig for further explanation. DefaultAprioriWeight = 0.5 + + // DefaultMinFailureRelaxInterval is the default minimum time that must + // have passed since the previously recorded failure before the failure + // amount may be raised. + DefaultMinFailureRelaxInterval = time.Minute ) // NodeResults contains previous results from a node to its peers. @@ -113,6 +118,11 @@ type MissionControlConfig struct { // results, unless there are none available. AprioriWeight float64 + // MinFailureRelaxInterval is the minimum time that must have passed + // since the previously recorded failure before the failure amount may + // be raised. + MinFailureRelaxInterval time.Duration + // SelfNode is our own pubkey. SelfNode route.Vertex } @@ -188,7 +198,7 @@ func NewMissionControl(db kvdb.Backend, cfg *MissionControlConfig) ( } mc := &MissionControl{ - state: newMissionControlState(), + state: newMissionControlState(cfg.MinFailureRelaxInterval), now: time.Now, cfg: cfg, store: store, diff --git a/routing/missioncontrol_state.go b/routing/missioncontrol_state.go index 9b342f6d..06e2ef13 100644 --- a/routing/missioncontrol_state.go +++ b/routing/missioncontrol_state.go @@ -19,13 +19,21 @@ type missionControlState struct { // lastSecondChance tracks the last time a second chance was granted for // a directed node pair. lastSecondChance map[DirectedNodePair]time.Time + + // minFailureRelaxInterval is the minimum time that must have passed + // since the previously recorded failure before the failure amount may + // be raised. + minFailureRelaxInterval time.Duration } // newMissionControlState instantiates a new mission control state object. -func newMissionControlState() *missionControlState { +func newMissionControlState( + minFailureRelaxInterval time.Duration) *missionControlState { + return &missionControlState{ - lastPairResult: make(map[route.Vertex]NodeResults), - lastSecondChance: make(map[DirectedNodePair]time.Time), + lastPairResult: make(map[route.Vertex]NodeResults), + lastSecondChance: make(map[DirectedNodePair]time.Time), + minFailureRelaxInterval: minFailureRelaxInterval, } } @@ -87,6 +95,22 @@ func (m *missionControlState) setLastPairResult(fromNode, toNode route.Vertex, // condition) to be revived as if it just happened. failAmt := result.amt + // Drop result if it would increase the failure amount too soon + // after a previous failure. This can happen if htlc results + // come in out of order. This check makes it easier for payment + // processes to converge to a final state. + failInterval := timestamp.Sub(current.FailTime) + if failAmt > current.FailAmt && + failInterval < m.minFailureRelaxInterval { + + log.Debugf("Ignoring higher amount failure within min "+ + "failure relaxation interval: prev_fail_amt=%v, "+ + "fail_amt=%v, interval=%v", + current.FailAmt, failAmt, failInterval) + + return + } + current.FailTime = timestamp current.FailAmt = failAmt diff --git a/routing/missioncontrol_state_test.go b/routing/missioncontrol_state_test.go new file mode 100644 index 00000000..28635d43 --- /dev/null +++ b/routing/missioncontrol_state_test.go @@ -0,0 +1,47 @@ +package routing + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/routing/route" +) + +// TestMissionControlStateFailureResult tests setting failure results on the +// mission control state. +func TestMissionControlStateFailureResult(t *testing.T) { + const minFailureRelaxInterval = time.Minute + state := newMissionControlState(minFailureRelaxInterval) + + var ( + from = route.Vertex{1} + to = route.Vertex{2} + timestamp = testTime + ) + + // Report a 1000 sat failure. + state.setLastPairResult(from, to, timestamp, &pairResult{amt: 1000}) + result, _ := state.getLastPairResult(from) + if result[to].FailAmt != 1000 { + t.Fatalf("unexpected fail amount %v", result[to].FailAmt) + } + + // Report an 1100 sat failure one hour later. It is expected to + // overwrite the previous failure. + timestamp = timestamp.Add(time.Hour) + state.setLastPairResult(from, to, timestamp, &pairResult{amt: 1100}) + result, _ = state.getLastPairResult(from) + if result[to].FailAmt != 1100 { + t.Fatalf("unexpected fail amount %v", result[to].FailAmt) + } + + // Report a 1200 sat failure one second later. Because this increase of + // the failure amount is too soon after the previous failure, the result + // is not applied. + timestamp = timestamp.Add(time.Second) + state.setLastPairResult(from, to, timestamp, &pairResult{amt: 1200}) + result, _ = state.getLastPairResult(from) + if result[to].FailAmt != 1100 { + t.Fatalf("unexpected fail amount %v", result[to].FailAmt) + } +} diff --git a/server.go b/server.go index b9a628e3..fe935f3c 100644 --- a/server.go +++ b/server.go @@ -710,11 +710,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, s.missionControl, err = routing.NewMissionControl( chanDB, &routing.MissionControlConfig{ - AprioriHopProbability: routingConfig.AprioriHopProbability, - PenaltyHalfLife: routingConfig.PenaltyHalfLife, - MaxMcHistory: routingConfig.MaxMcHistory, - AprioriWeight: routingConfig.AprioriWeight, - SelfNode: selfNode.PubKeyBytes, + AprioriHopProbability: routingConfig.AprioriHopProbability, + PenaltyHalfLife: routingConfig.PenaltyHalfLife, + MaxMcHistory: routingConfig.MaxMcHistory, + AprioriWeight: routingConfig.AprioriWeight, + SelfNode: selfNode.PubKeyBytes, + MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, }, ) if err != nil {