From 934ea8e78dceb45f7990c5c3c29dae935bbe53f3 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 09:49:16 +0200 Subject: [PATCH 1/9] routing: move failure interpretation into mission control --- routing/missioncontrol.go | 248 ++++++++++++++++++++++++++++++++- routing/missioncontrol_test.go | 73 +++++++--- routing/mock_test.go | 7 + routing/router.go | 248 ++------------------------------- 4 files changed, 312 insertions(+), 264 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 627b1789..f3a79e0d 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -282,8 +283,8 @@ func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHist return node } -// ReportVertexFailure reports a node level failure. -func (m *MissionControl) ReportVertexFailure(v route.Vertex) { +// reportVertexFailure reports a node level failure. +func (m *MissionControl) reportVertexFailure(v route.Vertex) { log.Debugf("Reporting vertex %v failure to Mission Control", v) now := m.now() @@ -295,8 +296,8 @@ func (m *MissionControl) ReportVertexFailure(v route.Vertex) { history.lastFail = &now } -// ReportEdgePolicyFailure reports a policy related failure. -func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) { +// reportEdgePolicyFailure reports a policy related failure. +func (m *MissionControl) reportEdgePolicyFailure(failedEdge edge) { now := m.now() m.Lock() @@ -315,10 +316,10 @@ func (m *MissionControl) ReportEdgePolicyFailure(failedEdge edge) { history.lastFail = &now } -// ReportEdgeFailure reports a channel level failure. +// reportEdgeFailure reports a channel level failure. // // TODO(roasbeef): also add value attempted to send and capacity of channel -func (m *MissionControl) ReportEdgeFailure(failedEdge edge, +func (m *MissionControl) reportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { log.Debugf("Reporting channel %v failure to Mission Control", @@ -387,3 +388,238 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { return &snapshot } + +// ReportPaymentFail reports a failed payment to mission control as input for +// future probability estimates. It returns a bool indicating whether this error +// is a final error and no further payment attempts need to be made. +func (m *MissionControl) ReportPaymentFail(rt *route.Route, + failureSourceIdx int, failure lnwire.FailureMessage) (bool, + channeldb.FailureReason) { + + var ( + failureVertex route.Vertex + ) + + // For any non-self failure, look up the source pub key in the hops + // slice. Otherwise return the self node pubkey. + if failureSourceIdx > 0 { + failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes + } else { + failureVertex = rt.SourcePubKey + } + log.Tracef("Node %x (index %v) reported failure when sending htlc", + failureVertex, failureSourceIdx) + + // Always determine chan id ourselves, because a channel + // update with id may not be available. + failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) + + switch failure.(type) { + + // If the end destination didn't know the payment + // hash or we sent the wrong payment amount to the + // destination, then we'll terminate immediately. + case *lnwire.FailUnknownPaymentHash: + // TODO(joostjager): Check onionErr.Amount() whether it matches + // what we expect. (Will it ever not match, because if not + // final_incorrect_htlc_amount would be returned?) + + return true, channeldb.FailureReasonIncorrectPaymentDetails + + // If we sent the wrong amount to the destination, then + // we'll exit early. + case *lnwire.FailIncorrectPaymentAmount: + return true, channeldb.FailureReasonIncorrectPaymentDetails + + // If the time-lock that was extended to the final node + // was incorrect, then we can't proceed. + case *lnwire.FailFinalIncorrectCltvExpiry: + // TODO(joostjager): Take into account that second last hop may + // have deliberately handed out an htlc that expires too soon. + // In that case we should continue routing. + return true, channeldb.FailureReasonError + + // If we crafted an invalid onion payload for the final + // node, then we'll exit early. + case *lnwire.FailFinalIncorrectHtlcAmount: + // TODO(joostjager): Take into account that second last hop may + // have deliberately handed out an htlc with a too low value. In + // that case we should continue routing. + + return true, channeldb.FailureReasonError + + // Similarly, if the HTLC expiry that we extended to + // the final hop expires too soon, then will fail the + // payment. + // + // TODO(roasbeef): can happen to to race condition, try + // again with recent block height + case *lnwire.FailFinalExpiryTooSoon: + // TODO(joostjager): Take into account that any hop may have + // delayed. Ideally we should continue routing. Knowing the + // delaying node at this point would help. + return true, channeldb.FailureReasonIncorrectPaymentDetails + + // If we erroneously attempted to cross a chain border, + // then we'll cancel the payment. + case *lnwire.FailInvalidRealm: + return true, channeldb.FailureReasonError + + // If we get a notice that the expiry was too soon for + // an intermediate node, then we'll prune out the node + // that sent us this error, as it doesn't now what the + // correct block height is. + case *lnwire.FailExpiryTooSoon: + m.reportVertexFailure(failureVertex) + return false, 0 + + // If we hit an instance of onion payload corruption or an invalid + // version, then we'll exit early as this shouldn't happen in the + // typical case. + // + // TODO(joostjager): Take into account that the previous hop may have + // tampered with the onion. Routing should continue using other paths. + case *lnwire.FailInvalidOnionVersion: + return true, channeldb.FailureReasonError + case *lnwire.FailInvalidOnionHmac: + return true, channeldb.FailureReasonError + case *lnwire.FailInvalidOnionKey: + return true, channeldb.FailureReasonError + + // If we get a failure due to violating the minimum + // amount, we'll apply the new minimum amount and retry + // routing. + case *lnwire.FailAmountBelowMinimum: + m.reportEdgePolicyFailure(failedEdge) + return false, 0 + + // If we get a failure due to a fee, we'll apply the + // new fee update, and retry our attempt using the + // newly updated fees. + case *lnwire.FailFeeInsufficient: + m.reportEdgePolicyFailure(failedEdge) + return false, 0 + + // If we get the failure for an intermediate node that + // disagrees with our time lock values, then we'll + // apply the new delta value and try it once more. + case *lnwire.FailIncorrectCltvExpiry: + m.reportEdgePolicyFailure(failedEdge) + return false, 0 + + // The outgoing channel that this node was meant to + // forward one is currently disabled, so we'll apply + // the update and continue. + case *lnwire.FailChannelDisabled: + m.reportEdgeFailure(failedEdge, 0) + return false, 0 + + // It's likely that the outgoing channel didn't have + // sufficient capacity, so we'll prune this edge for + // now, and continue onwards with our path finding. + case *lnwire.FailTemporaryChannelFailure: + m.reportEdgeFailure(failedEdge, failedAmt) + return false, 0 + + // If the send fail due to a node not having the + // required features, then we'll note this error and + // continue. + case *lnwire.FailRequiredNodeFeatureMissing: + m.reportVertexFailure(failureVertex) + return false, 0 + + // If the send fail due to a node not having the + // required features, then we'll note this error and + // continue. + case *lnwire.FailRequiredChannelFeatureMissing: + m.reportVertexFailure(failureVertex) + return false, 0 + + // If the next hop in the route wasn't known or + // offline, we'll only the channel which we attempted + // to route over. This is conservative, and it can + // handle faulty channels between nodes properly. + // Additionally, this guards against routing nodes + // returning errors in order to attempt to black list + // another node. + case *lnwire.FailUnknownNextPeer: + m.reportEdgeFailure(failedEdge, 0) + return false, 0 + + // If the node wasn't able to forward for which ever + // reason, then we'll note this and continue with the + // routes. + case *lnwire.FailTemporaryNodeFailure: + m.reportVertexFailure(failureVertex) + return false, 0 + + case *lnwire.FailPermanentNodeFailure: + m.reportVertexFailure(failureVertex) + return false, 0 + + // If we crafted a route that contains a too long time + // lock for an intermediate node, we'll prune the node. + // As there currently is no way of knowing that node's + // maximum acceptable cltv, we cannot take this + // constraint into account during routing. + // + // TODO(joostjager): Record the rejected cltv and use + // that as a hint during future path finding through + // that node. + case *lnwire.FailExpiryTooFar: + m.reportVertexFailure(failureVertex) + return false, 0 + + // If we get a permanent channel or node failure, then + // we'll prune the channel in both directions and + // continue with the rest of the routes. + case *lnwire.FailPermanentChannelFailure: + m.reportEdgeFailure(failedEdge, 0) + m.reportEdgeFailure(edge{ + from: failedEdge.to, + to: failedEdge.from, + channel: failedEdge.channel, + }, 0) + return false, 0 + + // Any other failure or an empty failure will get the node pruned. + default: + m.reportVertexFailure(failureVertex) + return false, 0 + } +} + +// getFailedEdge tries to locate the failing channel given a route and the +// pubkey of the node that sent the failure. It will assume that the failure is +// associated with the outgoing channel of the failing node. As a second result, +// it returns the amount sent over the edge. +func getFailedEdge(route *route.Route, failureSource int) (edge, + lnwire.MilliSatoshi) { + + // Determine if we have a failure from the final hop. If it is, we + // assume that the failing channel is the incoming channel. + // + // TODO(joostjager): In this case, certain types of failures are not + // expected. For example FailUnknownNextPeer. This could be a reason to + // prune the node? + if failureSource == len(route.Hops) { + failureSource-- + } + + // As this failure indicates that the target channel was unable to carry + // this HTLC (for w/e reason), we'll return the _outgoing_ channel that + // the source of the failure was meant to pass the HTLC along to. + if failureSource == 0 { + return edge{ + from: route.SourcePubKey, + to: route.Hops[0].PubKeyBytes, + channel: route.Hops[0].ChannelID, + }, route.TotalAmount + } + + return edge{ + from: route.Hops[failureSource-1].PubKeyBytes, + to: route.Hops[failureSource].PubKeyBytes, + channel: route.Hops[failureSource].ChannelID, + }, route.Hops[failureSource-1].AmtToForward +} diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 237f81df..5eaf9980 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -9,11 +9,28 @@ import ( ) var ( - mcTestNode = route.Vertex{} mcTestEdge = EdgeLocator{ - ChannelID: 123, + ChannelID: 2, } - mcTestTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) + + mcTestRoute = &route.Route{ + SourcePubKey: route.Vertex{10}, + Hops: []*route.Hop{ + { + ChannelID: 1, + PubKeyBytes: route.Vertex{11}, + AmtToForward: 1000, + }, + { + ChannelID: 2, + PubKeyBytes: route.Vertex{12}, + }, + }, + } + + mcTestTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) + mcTestNode1 = mcTestRoute.Hops[0].PubKeyBytes + mcTestNode2 = mcTestRoute.Hops[1].PubKeyBytes ) type mcTestContext struct { @@ -47,12 +64,24 @@ func (ctx *mcTestContext) expectP(amt lnwire.MilliSatoshi, ctx.t.Helper() - p := ctx.mc.GetEdgeProbability(mcTestNode, mcTestEdge, amt) + p := ctx.mc.GetEdgeProbability(mcTestNode1, mcTestEdge, amt) if p != expected { ctx.t.Fatalf("unexpected probability %v", p) } } +// reportFailure reports a failure by using a test route. +func (ctx *mcTestContext) reportFailure(t time.Time, + amt lnwire.MilliSatoshi, failure lnwire.FailureMessage) { + + mcTestRoute.Hops[0].AmtToForward = amt + + errorSourceIdx := 1 + ctx.mc.ReportPaymentFail( + mcTestRoute, errorSourceIdx, failure, + ) +} + // TestMissionControl tests mission control probability estimation. func TestMissionControl(t *testing.T) { ctx := createMcTestContext(t) @@ -61,16 +90,14 @@ func TestMissionControl(t *testing.T) { testTime := time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) - testNode := route.Vertex{} - testEdge := edge{ - channel: 123, - } - // Initial probability is expected to be 1. ctx.expectP(1000, 0.8) // Expect probability to be zero after reporting the edge as failed. - ctx.mc.ReportEdgeFailure(testEdge, 1000) + ctx.reportFailure( + testTime, 1000, + lnwire.NewTemporaryChannelFailure(nil), + ) ctx.expectP(1000, 0) // As we reported with a min penalization amt, a lower amt than reported @@ -83,7 +110,10 @@ func TestMissionControl(t *testing.T) { // Edge fails again, this time without a min penalization amt. The edge // should be penalized regardless of amount. - ctx.mc.ReportEdgeFailure(testEdge, 0) + ctx.reportFailure( + ctx.now, 0, + lnwire.NewTemporaryChannelFailure(nil), + ) ctx.expectP(1000, 0) ctx.expectP(500, 0) @@ -93,7 +123,10 @@ func TestMissionControl(t *testing.T) { // A node level failure should bring probability of every channel back // to zero. - ctx.mc.ReportVertexFailure(testNode) + ctx.reportFailure( + ctx.now, 0, + lnwire.NewExpiryTooSoon(lnwire.ChannelUpdate{}), + ) ctx.expectP(1000, 0) // Check whether history snapshot looks sane. @@ -112,19 +145,19 @@ func TestMissionControl(t *testing.T) { func TestMissionControlChannelUpdate(t *testing.T) { ctx := createMcTestContext(t) - testEdge := edge{ - channel: 123, - } - // Report a policy related failure. Because it is the first, we don't // expect a penalty. - ctx.mc.ReportEdgePolicyFailure(testEdge) - + ctx.reportFailure( + ctx.now, 0, + lnwire.NewFeeInsufficient(0, lnwire.ChannelUpdate{}), + ) ctx.expectP(0, 0.8) // Report another failure for the same channel. We expect it to be // pruned. - ctx.mc.ReportEdgePolicyFailure(testEdge) - + ctx.reportFailure( + ctx.now, 0, + lnwire.NewFeeInsufficient(0, lnwire.ChannelUpdate{}), + ) ctx.expectP(0, 0) } diff --git a/routing/mock_test.go b/routing/mock_test.go index 4c4f97eb..23667b5a 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -98,6 +98,13 @@ type mockMissionControl struct { var _ MissionController = (*mockMissionControl)(nil) +func (m *mockMissionControl) ReportPaymentFail(rt *route.Route, + failureSourceIdx int, failure lnwire.FailureMessage) (bool, + channeldb.FailureReason) { + + return false, 0 +} + func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { } diff --git a/routing/router.go b/routing/router.go index ec51a57f..afe7ed0d 100644 --- a/routing/router.go +++ b/routing/router.go @@ -174,15 +174,13 @@ type PaymentSessionSource interface { // MissionController is an interface that exposes failure reporting and // probability estimation. type MissionController interface { - // ReportEdgeFailure reports a channel level failure. - ReportEdgeFailure(failedEdge edge, - minPenalizeAmt lnwire.MilliSatoshi) - - // ReportEdgePolicyFailure reports a policy related failure. - ReportEdgePolicyFailure(failedEdge edge) - - // ReportVertexFailure reports a node level failure. - ReportVertexFailure(v route.Vertex) + // ReportPaymentFail reports a failed payment to mission control as + // input for future probability estimates. It returns a bool indicating + // whether this error is a final error and no further payment attempts + // need to be made. + ReportPaymentFail(rt *route.Route, + failureSourceIdx int, failure lnwire.FailureMessage) (bool, + channeldb.FailureReason) // GetEdgeProbability is expected to return the success probability of a // payment from fromNode along edge. @@ -1929,195 +1927,9 @@ func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) ( } } - var failureVertex route.Vertex - - // For any non-self failure, look up the source pub key in the hops - // slice. Otherwise return the self node pubkey. - if failureSourceIdx > 0 { - failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes - } else { - failureVertex = r.selfNode.PubKeyBytes - } - log.Tracef("Node %x (index %v) reported failure when sending htlc", - failureVertex, failureSourceIdx) - - // Always determine chan id ourselves, because a channel - // update with id may not be available. - failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) - - switch fErr.FailureMessage.(type) { - - // If the end destination didn't know the payment - // hash or we sent the wrong payment amount to the - // destination, then we'll terminate immediately. - case *lnwire.FailUnknownPaymentHash: - // TODO(joostjager): Check onionErr.Amount() whether it matches - // what we expect. (Will it ever not match, because if not - // final_incorrect_htlc_amount would be returned?) - - return true, channeldb.FailureReasonIncorrectPaymentDetails - - // If we sent the wrong amount to the destination, then - // we'll exit early. - case *lnwire.FailIncorrectPaymentAmount: - return true, channeldb.FailureReasonIncorrectPaymentDetails - - // If the time-lock that was extended to the final node - // was incorrect, then we can't proceed. - case *lnwire.FailFinalIncorrectCltvExpiry: - // TODO(joostjager): Take into account that second last hop may - // have deliberately handed out an htlc that expires too soon. - // In that case we should continue routing. - return true, channeldb.FailureReasonError - - // If we crafted an invalid onion payload for the final - // node, then we'll exit early. - case *lnwire.FailFinalIncorrectHtlcAmount: - // TODO(joostjager): Take into account that second last hop may - // have deliberately handed out an htlc with a too low value. In - // that case we should continue routing. - - return true, channeldb.FailureReasonError - - // Similarly, if the HTLC expiry that we extended to - // the final hop expires too soon, then will fail the - // payment. - // - // TODO(roasbeef): can happen to to race condition, try - // again with recent block height - case *lnwire.FailFinalExpiryTooSoon: - // TODO(joostjager): Take into account that any hop may have - // delayed. Ideally we should continue routing. Knowing the - // delaying node at this point would help. - return true, channeldb.FailureReasonIncorrectPaymentDetails - - // If we erroneously attempted to cross a chain border, - // then we'll cancel the payment. - case *lnwire.FailInvalidRealm: - return true, channeldb.FailureReasonError - - // If we get a notice that the expiry was too soon for - // an intermediate node, then we'll prune out the node - // that sent us this error, as it doesn't now what the - // correct block height is. - case *lnwire.FailExpiryTooSoon: - r.cfg.MissionControl.ReportVertexFailure(failureVertex) - return false, 0 - - // If we hit an instance of onion payload corruption or an invalid - // version, then we'll exit early as this shouldn't happen in the - // typical case. - // - // TODO(joostjager): Take into account that the previous hop may have - // tampered with the onion. Routing should continue using other paths. - case *lnwire.FailInvalidOnionVersion: - return true, channeldb.FailureReasonError - case *lnwire.FailInvalidOnionHmac: - return true, channeldb.FailureReasonError - case *lnwire.FailInvalidOnionKey: - return true, channeldb.FailureReasonError - - // If we get a failure due to violating the minimum - // amount, we'll apply the new minimum amount and retry - // routing. - case *lnwire.FailAmountBelowMinimum: - r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) - return false, 0 - - // If we get a failure due to a fee, we'll apply the - // new fee update, and retry our attempt using the - // newly updated fees. - case *lnwire.FailFeeInsufficient: - r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) - return false, 0 - - // If we get the failure for an intermediate node that - // disagrees with our time lock values, then we'll - // apply the new delta value and try it once more. - case *lnwire.FailIncorrectCltvExpiry: - r.cfg.MissionControl.ReportEdgePolicyFailure(failedEdge) - return false, 0 - - // The outgoing channel that this node was meant to - // forward one is currently disabled, so we'll apply - // the update and continue. - case *lnwire.FailChannelDisabled: - r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) - return false, 0 - - // It's likely that the outgoing channel didn't have - // sufficient capacity, so we'll prune this edge for - // now, and continue onwards with our path finding. - case *lnwire.FailTemporaryChannelFailure: - r.cfg.MissionControl.ReportEdgeFailure(failedEdge, failedAmt) - return false, 0 - - // If the send fail due to a node not having the - // required features, then we'll note this error and - // continue. - case *lnwire.FailRequiredNodeFeatureMissing: - r.cfg.MissionControl.ReportVertexFailure(failureVertex) - return false, 0 - - // If the send fail due to a node not having the - // required features, then we'll note this error and - // continue. - case *lnwire.FailRequiredChannelFeatureMissing: - r.cfg.MissionControl.ReportVertexFailure(failureVertex) - return false, 0 - - // If the next hop in the route wasn't known or - // offline, we'll only the channel which we attempted - // to route over. This is conservative, and it can - // handle faulty channels between nodes properly. - // Additionally, this guards against routing nodes - // returning errors in order to attempt to black list - // another node. - case *lnwire.FailUnknownNextPeer: - r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) - return false, 0 - - // If the node wasn't able to forward for which ever - // reason, then we'll note this and continue with the - // routes. - case *lnwire.FailTemporaryNodeFailure: - r.cfg.MissionControl.ReportVertexFailure(failureVertex) - return false, 0 - - case *lnwire.FailPermanentNodeFailure: - r.cfg.MissionControl.ReportVertexFailure(failureVertex) - return false, 0 - - // If we crafted a route that contains a too long time - // lock for an intermediate node, we'll prune the node. - // As there currently is no way of knowing that node's - // maximum acceptable cltv, we cannot take this - // constraint into account during routing. - // - // TODO(joostjager): Record the rejected cltv and use - // that as a hint during future path finding through - // that node. - case *lnwire.FailExpiryTooFar: - r.cfg.MissionControl.ReportVertexFailure(failureVertex) - return false, 0 - - // If we get a permanent channel or node failure, then - // we'll prune the channel in both directions and - // continue with the rest of the routes. - case *lnwire.FailPermanentChannelFailure: - r.cfg.MissionControl.ReportEdgeFailure(failedEdge, 0) - r.cfg.MissionControl.ReportEdgeFailure(edge{ - from: failedEdge.to, - to: failedEdge.from, - channel: failedEdge.channel, - }, 0) - return false, 0 - - // Any other failure or an empty failure will get the node pruned. - default: - r.cfg.MissionControl.ReportVertexFailure(failureVertex) - return false, 0 - } + return r.cfg.MissionControl.ReportPaymentFail( + rt, failureSourceIdx, failureMessage, + ) } // extractChannelUpdate examines the error and extracts the channel update. @@ -2143,46 +1955,6 @@ func (r *ChannelRouter) extractChannelUpdate( return update } -// getFailedEdge tries to locate the failing channel given a route and the -// pubkey of the node that sent the failure. It will assume that the failure is -// associated with the outgoing channel of the failing node. As a second result, -// it returns the amount sent over the edge. -func getFailedEdge(route *route.Route, failureSource int) (edge, - lnwire.MilliSatoshi) { - - // Determine if we have a failure from the final hop. If it is, we - // assume that the failing channel is the incoming channel. In this - // function the outgoing channel of the hop indicated by failureSource - // is returned, where index zero is the self node. By decrementing - // failureSource by one, the outgoing channel of the penultimate hop is - // returned, which is the same as the incoming channel of the final - // node. - // - // TODO(joostjager): In this case, certain types of failures are not - // expected. For example FailUnknownNextPeer. This could be a reason to - // prune the node? - if failureSource == len(route.Hops) { - failureSource-- - } - - // As this failure indicates that the target channel was unable to carry - // this HTLC (for w/e reason), we'll return the _outgoing_ channel that - // the source of the failure was meant to pass the HTLC along to. - if failureSource == 0 { - return edge{ - from: route.SourcePubKey, - to: route.Hops[0].PubKeyBytes, - channel: route.Hops[0].ChannelID, - }, route.TotalAmount - } - - return edge{ - from: route.Hops[failureSource-1].PubKeyBytes, - to: route.Hops[failureSource].PubKeyBytes, - channel: route.Hops[failureSource].ChannelID, - }, route.Hops[failureSource-1].AmtToForward -} - // applyChannelUpdate validates a channel update and if valid, applies it to the // database. It returns a bool indicating whether the updates was successful. func (r *ChannelRouter) applyChannelUpdate(msg *lnwire.ChannelUpdate, From 334b6a3bfe958f01bfad4e430b74b53d874c9a15 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 11:34:25 +0200 Subject: [PATCH 2/9] routing: move unreadable failure handling into mission control --- routing/missioncontrol.go | 37 ++++++++++++++++++++++------------ routing/missioncontrol_test.go | 2 +- routing/mock_test.go | 2 +- routing/router.go | 26 +++++++++++------------- 4 files changed, 38 insertions(+), 29 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index f3a79e0d..c4f0157e 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -390,29 +390,40 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { } // ReportPaymentFail reports a failed payment to mission control as input for -// future probability estimates. It returns a bool indicating whether this error -// is a final error and no further payment attempts need to be made. +// future probability estimates. The failureSourceIdx argument indicates the +// failure source. If it is nil, the failure source is unknown. This function +// returns a bool indicating whether this error is a final error. If it is +// final, a failure reason is returned and no further payment attempts need to +// be made. func (m *MissionControl) ReportPaymentFail(rt *route.Route, - failureSourceIdx int, failure lnwire.FailureMessage) (bool, + failureSourceIdx *int, failure lnwire.FailureMessage) (bool, channeldb.FailureReason) { - var ( - failureVertex route.Vertex - ) + var failureSourceIdxInt int + if failureSourceIdx == nil { + // If the failure message could not be decrypted, attribute the + // failure to our own outgoing channel. + // + // TODO(joostager): Penalize all channels in the route. + failureSourceIdxInt = 0 + failure = lnwire.NewTemporaryChannelFailure(nil) + } else { + failureSourceIdxInt = *failureSourceIdx + } - // For any non-self failure, look up the source pub key in the hops - // slice. Otherwise return the self node pubkey. - if failureSourceIdx > 0 { - failureVertex = rt.Hops[failureSourceIdx-1].PubKeyBytes + var failureVertex route.Vertex + + if failureSourceIdxInt > 0 { + failureVertex = rt.Hops[failureSourceIdxInt-1].PubKeyBytes } else { failureVertex = rt.SourcePubKey } log.Tracef("Node %x (index %v) reported failure when sending htlc", failureVertex, failureSourceIdx) - // Always determine chan id ourselves, because a channel - // update with id may not be available. - failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdx) + // Always determine chan id ourselves, because a channel update with id + // may not be available. + failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdxInt) switch failure.(type) { diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 5eaf9980..21c835f1 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -78,7 +78,7 @@ func (ctx *mcTestContext) reportFailure(t time.Time, errorSourceIdx := 1 ctx.mc.ReportPaymentFail( - mcTestRoute, errorSourceIdx, failure, + mcTestRoute, &errorSourceIdx, failure, ) } diff --git a/routing/mock_test.go b/routing/mock_test.go index 23667b5a..eae42f6f 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -99,7 +99,7 @@ type mockMissionControl struct { var _ MissionController = (*mockMissionControl)(nil) func (m *mockMissionControl) ReportPaymentFail(rt *route.Route, - failureSourceIdx int, failure lnwire.FailureMessage) (bool, + failureSourceIdx *int, failure lnwire.FailureMessage) (bool, channeldb.FailureReason) { return false, 0 diff --git a/routing/router.go b/routing/router.go index afe7ed0d..8b3143e9 100644 --- a/routing/router.go +++ b/routing/router.go @@ -179,7 +179,7 @@ type MissionController interface { // whether this error is a final error and no further payment attempts // need to be made. ReportPaymentFail(rt *route.Route, - failureSourceIdx int, failure lnwire.FailureMessage) (bool, + failureSourceIdx *int, failure lnwire.FailureMessage) (bool, channeldb.FailureReason) // GetEdgeProbability is expected to return the success probability of a @@ -1893,21 +1893,16 @@ func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, // error type, this error is either the final outcome of the payment or we need // to continue with an alternative route. This is indicated by the boolean // return value. -func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) ( - bool, channeldb.FailureReason) { +func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) (bool, + channeldb.FailureReason) { - // If the failure message could not be decrypted, attribute the failure - // to our own outgoing channel. - // - // TODO(joostager): Penalize all channels in the route. if sendErr == htlcswitch.ErrUnreadableFailureMessage { - sendErr = &htlcswitch.ForwardingError{ - FailureSourceIdx: 0, - FailureMessage: lnwire.NewTemporaryChannelFailure(nil), - } - } + log.Tracef("Unreadable failure when sending htlc") - // If an internal, non-forwarding error occurred, we can stop trying. + return r.cfg.MissionControl.ReportPaymentFail(rt, nil, nil) + } + // If an internal, non-forwarding error occurred, we can stop + // trying. fErr, ok := sendErr.(*htlcswitch.ForwardingError) if !ok { return true, channeldb.FailureReasonError @@ -1927,8 +1922,11 @@ func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) ( } } + log.Tracef("Node=%v reported failure when sending htlc", + failureSourceIdx) + return r.cfg.MissionControl.ReportPaymentFail( - rt, failureSourceIdx, failureMessage, + rt, &failureSourceIdx, failureMessage, ) } From b4a7665bae06e26cb5f90818ccd56b1ecd7ddcb7 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 11:48:59 +0200 Subject: [PATCH 3/9] routing: provide payment id to mission control --- routing/missioncontrol.go | 2 +- routing/missioncontrol_test.go | 3 ++- routing/mock_test.go | 6 +++--- routing/payment_lifecycle.go | 2 +- routing/router.go | 12 +++++++----- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index c4f0157e..ee6b0c82 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -395,7 +395,7 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { // returns a bool indicating whether this error is a final error. If it is // final, a failure reason is returned and no further payment attempts need to // be made. -func (m *MissionControl) ReportPaymentFail(rt *route.Route, +func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (bool, channeldb.FailureReason) { diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 21c835f1..1675b044 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -37,6 +37,7 @@ type mcTestContext struct { t *testing.T mc *MissionControl now time.Time + pid uint64 } func createMcTestContext(t *testing.T) *mcTestContext { @@ -78,7 +79,7 @@ func (ctx *mcTestContext) reportFailure(t time.Time, errorSourceIdx := 1 ctx.mc.ReportPaymentFail( - mcTestRoute, &errorSourceIdx, failure, + ctx.pid, mcTestRoute, &errorSourceIdx, failure, ) } diff --git a/routing/mock_test.go b/routing/mock_test.go index eae42f6f..b0444546 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -98,9 +98,9 @@ type mockMissionControl struct { var _ MissionController = (*mockMissionControl)(nil) -func (m *mockMissionControl) ReportPaymentFail(rt *route.Route, - failureSourceIdx *int, failure lnwire.FailureMessage) (bool, - channeldb.FailureReason) { +func (m *mockMissionControl) ReportPaymentFail(paymentID uint64, + rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) ( + bool, channeldb.FailureReason) { return false, 0 } diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 4de85b5d..db9b9803 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -343,7 +343,7 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID, func (p *paymentLifecycle) handleSendError(sendErr error) error { final, reason := p.router.processSendError( - &p.attempt.Route, sendErr, + p.attempt.PaymentID, &p.attempt.Route, sendErr, ) if !final { // Save the forwarding error so it can be returned if diff --git a/routing/router.go b/routing/router.go index 8b3143e9..edc3aa3a 100644 --- a/routing/router.go +++ b/routing/router.go @@ -178,7 +178,7 @@ type MissionController interface { // input for future probability estimates. It returns a bool indicating // whether this error is a final error and no further payment attempts // need to be made. - ReportPaymentFail(rt *route.Route, + ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (bool, channeldb.FailureReason) @@ -1893,13 +1893,15 @@ func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, // error type, this error is either the final outcome of the payment or we need // to continue with an alternative route. This is indicated by the boolean // return value. -func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) (bool, - channeldb.FailureReason) { +func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route, + sendErr error) (bool, channeldb.FailureReason) { if sendErr == htlcswitch.ErrUnreadableFailureMessage { log.Tracef("Unreadable failure when sending htlc") - return r.cfg.MissionControl.ReportPaymentFail(rt, nil, nil) + return r.cfg.MissionControl.ReportPaymentFail( + paymentID, rt, nil, nil, + ) } // If an internal, non-forwarding error occurred, we can stop // trying. @@ -1926,7 +1928,7 @@ func (r *ChannelRouter) processSendError(rt *route.Route, sendErr error) (bool, failureSourceIdx) return r.cfg.MissionControl.ReportPaymentFail( - rt, &failureSourceIdx, failureMessage, + paymentID, rt, &failureSourceIdx, failureMessage, ) } From 1507ff6590385913b5caa33539a779f1890e76f1 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 2 Jul 2019 11:29:44 +0200 Subject: [PATCH 4/9] routing: extract timestamp from applying payment result This commit prepares for timestamps being supplied from the database for processing of historical payment results. --- routing/missioncontrol.go | 78 ++++++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index ee6b0c82..f12a2dbf 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -284,21 +284,21 @@ func (m *MissionControl) createHistoryIfNotExists(vertex route.Vertex) *nodeHist } // reportVertexFailure reports a node level failure. -func (m *MissionControl) reportVertexFailure(v route.Vertex) { - log.Debugf("Reporting vertex %v failure to Mission Control", v) +func (m *MissionControl) reportVertexFailure(timestamp time.Time, + v route.Vertex) { - now := m.now() + log.Debugf("Reporting vertex %v failure to Mission Control", v) m.Lock() defer m.Unlock() history := m.createHistoryIfNotExists(v) - history.lastFail = &now + history.lastFail = ×tamp } // reportEdgePolicyFailure reports a policy related failure. -func (m *MissionControl) reportEdgePolicyFailure(failedEdge edge) { - now := m.now() +func (m *MissionControl) reportEdgePolicyFailure(timestamp time.Time, + failedEdge edge) { m.Lock() defer m.Unlock() @@ -307,32 +307,30 @@ func (m *MissionControl) reportEdgePolicyFailure(failedEdge edge) { // immediately. If some time has passed since the last policy failure, // we grant the node a second chance at forwarding the payment. if m.requestSecondChance( - now, failedEdge.from, failedEdge.to, + timestamp, failedEdge.from, failedEdge.to, ) { return } history := m.createHistoryIfNotExists(failedEdge.from) - history.lastFail = &now + history.lastFail = ×tamp } // reportEdgeFailure reports a channel level failure. // // TODO(roasbeef): also add value attempted to send and capacity of channel -func (m *MissionControl) reportEdgeFailure(failedEdge edge, +func (m *MissionControl) reportEdgeFailure(timestamp time.Time, failedEdge edge, minPenalizeAmt lnwire.MilliSatoshi) { log.Debugf("Reporting channel %v failure to Mission Control", failedEdge.channel) - now := m.now() - m.Lock() defer m.Unlock() history := m.createHistoryIfNotExists(failedEdge.from) history.channelLastFail[failedEdge.channel] = &channelHistory{ - lastFail: now, + lastFail: timestamp, minPenalizeAmt: minPenalizeAmt, } } @@ -399,7 +397,26 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (bool, channeldb.FailureReason) { - var failureSourceIdxInt int + timestamp := m.now() + + // Apply result to update mission control state. + return m.applyPaymentResult( + timestamp, paymentID, rt, failureSourceIdx, failure, + ) +} + +// applyPaymentResult applies a payment result as input for future probability +// estimates. It returns a bool indicating whether this error is a final error +// and no further payment attempts need to be made. +func (m *MissionControl) applyPaymentResult(timeReply time.Time, + paymentID uint64, rt *route.Route, failureSourceIdx *int, + failureMessage lnwire.FailureMessage) (bool, channeldb.FailureReason) { + + var ( + failureSourceIdxInt int + failure lnwire.FailureMessage + ) + if failureSourceIdx == nil { // If the failure message could not be decrypted, attribute the // failure to our own outgoing channel. @@ -409,6 +426,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failure = lnwire.NewTemporaryChannelFailure(nil) } else { failureSourceIdxInt = *failureSourceIdx + failure = failureMessage } var failureVertex route.Vertex @@ -423,7 +441,9 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, // Always determine chan id ourselves, because a channel update with id // may not be available. - failedEdge, failedAmt := getFailedEdge(rt, failureSourceIdxInt) + failedEdge, failedAmt := getFailedEdge( + rt, failureSourceIdxInt, + ) switch failure.(type) { @@ -481,7 +501,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, // that sent us this error, as it doesn't now what the // correct block height is. case *lnwire.FailExpiryTooSoon: - m.reportVertexFailure(failureVertex) + m.reportVertexFailure(timeReply, failureVertex) return false, 0 // If we hit an instance of onion payload corruption or an invalid @@ -501,49 +521,49 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, // amount, we'll apply the new minimum amount and retry // routing. case *lnwire.FailAmountBelowMinimum: - m.reportEdgePolicyFailure(failedEdge) + m.reportEdgePolicyFailure(timeReply, failedEdge) return false, 0 // If we get a failure due to a fee, we'll apply the // new fee update, and retry our attempt using the // newly updated fees. case *lnwire.FailFeeInsufficient: - m.reportEdgePolicyFailure(failedEdge) + m.reportEdgePolicyFailure(timeReply, failedEdge) return false, 0 // If we get the failure for an intermediate node that // disagrees with our time lock values, then we'll // apply the new delta value and try it once more. case *lnwire.FailIncorrectCltvExpiry: - m.reportEdgePolicyFailure(failedEdge) + m.reportEdgePolicyFailure(timeReply, failedEdge) return false, 0 // The outgoing channel that this node was meant to // forward one is currently disabled, so we'll apply // the update and continue. case *lnwire.FailChannelDisabled: - m.reportEdgeFailure(failedEdge, 0) + m.reportEdgeFailure(timeReply, failedEdge, 0) return false, 0 // It's likely that the outgoing channel didn't have // sufficient capacity, so we'll prune this edge for // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: - m.reportEdgeFailure(failedEdge, failedAmt) + m.reportEdgeFailure(timeReply, failedEdge, failedAmt) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredNodeFeatureMissing: - m.reportVertexFailure(failureVertex) + m.reportVertexFailure(timeReply, failureVertex) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredChannelFeatureMissing: - m.reportVertexFailure(failureVertex) + m.reportVertexFailure(timeReply, failureVertex) return false, 0 // If the next hop in the route wasn't known or @@ -554,18 +574,18 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, // returning errors in order to attempt to black list // another node. case *lnwire.FailUnknownNextPeer: - m.reportEdgeFailure(failedEdge, 0) + m.reportEdgeFailure(timeReply, failedEdge, 0) return false, 0 // If the node wasn't able to forward for which ever // reason, then we'll note this and continue with the // routes. case *lnwire.FailTemporaryNodeFailure: - m.reportVertexFailure(failureVertex) + m.reportVertexFailure(timeReply, failureVertex) return false, 0 case *lnwire.FailPermanentNodeFailure: - m.reportVertexFailure(failureVertex) + m.reportVertexFailure(timeReply, failureVertex) return false, 0 // If we crafted a route that contains a too long time @@ -578,15 +598,15 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, // that as a hint during future path finding through // that node. case *lnwire.FailExpiryTooFar: - m.reportVertexFailure(failureVertex) + m.reportVertexFailure(timeReply, failureVertex) return false, 0 // If we get a permanent channel or node failure, then // we'll prune the channel in both directions and // continue with the rest of the routes. case *lnwire.FailPermanentChannelFailure: - m.reportEdgeFailure(failedEdge, 0) - m.reportEdgeFailure(edge{ + m.reportEdgeFailure(timeReply, failedEdge, 0) + m.reportEdgeFailure(timeReply, edge{ from: failedEdge.to, to: failedEdge.from, channel: failedEdge.channel, @@ -595,7 +615,7 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, // Any other failure or an empty failure will get the node pruned. default: - m.reportVertexFailure(failureVertex) + m.reportVertexFailure(timeReply, failureVertex) return false, 0 } } From 9e26f0d0796ee563e80ee63f0fde21f3726597f0 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 12:25:23 +0200 Subject: [PATCH 5/9] routing: define payment result This commit groups together all payment result data. It is a preparation for historical payment results being retrieved from the database. --- routing/missioncontrol.go | 75 ++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 28 deletions(-) diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index f12a2dbf..350f35b6 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -147,6 +147,17 @@ type MissionControlChannelSnapshot struct { SuccessProb float64 } +// paymentResult is the information that becomes available when a payment +// attempt completes. +type paymentResult struct { + id uint64 + timeFwd, timeReply time.Time + route *route.Route + success bool + failureSourceIdx *int + failure lnwire.FailureMessage +} + // NewMissionControl returns a new instance of missionControl. func NewMissionControl(cfg *MissionControlConfig) *MissionControl { log.Debugf("Instantiating mission control with config: "+ @@ -399,25 +410,33 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, timestamp := m.now() + // TODO(joostjager): Use actual payment initiation time for timeFwd. + result := &paymentResult{ + success: false, + timeFwd: timestamp, + timeReply: timestamp, + id: paymentID, + failureSourceIdx: failureSourceIdx, + failure: failure, + route: rt, + } + // Apply result to update mission control state. - return m.applyPaymentResult( - timestamp, paymentID, rt, failureSourceIdx, failure, - ) + return m.applyPaymentResult(result) } // applyPaymentResult applies a payment result as input for future probability // estimates. It returns a bool indicating whether this error is a final error // and no further payment attempts need to be made. -func (m *MissionControl) applyPaymentResult(timeReply time.Time, - paymentID uint64, rt *route.Route, failureSourceIdx *int, - failureMessage lnwire.FailureMessage) (bool, channeldb.FailureReason) { +func (m *MissionControl) applyPaymentResult(result *paymentResult) ( + bool, channeldb.FailureReason) { var ( failureSourceIdxInt int failure lnwire.FailureMessage ) - if failureSourceIdx == nil { + if result.failureSourceIdx == nil { // If the failure message could not be decrypted, attribute the // failure to our own outgoing channel. // @@ -425,24 +444,24 @@ func (m *MissionControl) applyPaymentResult(timeReply time.Time, failureSourceIdxInt = 0 failure = lnwire.NewTemporaryChannelFailure(nil) } else { - failureSourceIdxInt = *failureSourceIdx - failure = failureMessage + failureSourceIdxInt = *result.failureSourceIdx + failure = result.failure } var failureVertex route.Vertex if failureSourceIdxInt > 0 { - failureVertex = rt.Hops[failureSourceIdxInt-1].PubKeyBytes + failureVertex = result.route.Hops[failureSourceIdxInt-1].PubKeyBytes } else { - failureVertex = rt.SourcePubKey + failureVertex = result.route.SourcePubKey } log.Tracef("Node %x (index %v) reported failure when sending htlc", - failureVertex, failureSourceIdx) + failureVertex, result.failureSourceIdx) // Always determine chan id ourselves, because a channel update with id // may not be available. failedEdge, failedAmt := getFailedEdge( - rt, failureSourceIdxInt, + result.route, failureSourceIdxInt, ) switch failure.(type) { @@ -501,7 +520,7 @@ func (m *MissionControl) applyPaymentResult(timeReply time.Time, // that sent us this error, as it doesn't now what the // correct block height is. case *lnwire.FailExpiryTooSoon: - m.reportVertexFailure(timeReply, failureVertex) + m.reportVertexFailure(result.timeReply, failureVertex) return false, 0 // If we hit an instance of onion payload corruption or an invalid @@ -521,49 +540,49 @@ func (m *MissionControl) applyPaymentResult(timeReply time.Time, // amount, we'll apply the new minimum amount and retry // routing. case *lnwire.FailAmountBelowMinimum: - m.reportEdgePolicyFailure(timeReply, failedEdge) + m.reportEdgePolicyFailure(result.timeReply, failedEdge) return false, 0 // If we get a failure due to a fee, we'll apply the // new fee update, and retry our attempt using the // newly updated fees. case *lnwire.FailFeeInsufficient: - m.reportEdgePolicyFailure(timeReply, failedEdge) + m.reportEdgePolicyFailure(result.timeReply, failedEdge) return false, 0 // If we get the failure for an intermediate node that // disagrees with our time lock values, then we'll // apply the new delta value and try it once more. case *lnwire.FailIncorrectCltvExpiry: - m.reportEdgePolicyFailure(timeReply, failedEdge) + m.reportEdgePolicyFailure(result.timeReply, failedEdge) return false, 0 // The outgoing channel that this node was meant to // forward one is currently disabled, so we'll apply // the update and continue. case *lnwire.FailChannelDisabled: - m.reportEdgeFailure(timeReply, failedEdge, 0) + m.reportEdgeFailure(result.timeReply, failedEdge, 0) return false, 0 // It's likely that the outgoing channel didn't have // sufficient capacity, so we'll prune this edge for // now, and continue onwards with our path finding. case *lnwire.FailTemporaryChannelFailure: - m.reportEdgeFailure(timeReply, failedEdge, failedAmt) + m.reportEdgeFailure(result.timeReply, failedEdge, failedAmt) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredNodeFeatureMissing: - m.reportVertexFailure(timeReply, failureVertex) + m.reportVertexFailure(result.timeReply, failureVertex) return false, 0 // If the send fail due to a node not having the // required features, then we'll note this error and // continue. case *lnwire.FailRequiredChannelFeatureMissing: - m.reportVertexFailure(timeReply, failureVertex) + m.reportVertexFailure(result.timeReply, failureVertex) return false, 0 // If the next hop in the route wasn't known or @@ -574,18 +593,18 @@ func (m *MissionControl) applyPaymentResult(timeReply time.Time, // returning errors in order to attempt to black list // another node. case *lnwire.FailUnknownNextPeer: - m.reportEdgeFailure(timeReply, failedEdge, 0) + m.reportEdgeFailure(result.timeReply, failedEdge, 0) return false, 0 // If the node wasn't able to forward for which ever // reason, then we'll note this and continue with the // routes. case *lnwire.FailTemporaryNodeFailure: - m.reportVertexFailure(timeReply, failureVertex) + m.reportVertexFailure(result.timeReply, failureVertex) return false, 0 case *lnwire.FailPermanentNodeFailure: - m.reportVertexFailure(timeReply, failureVertex) + m.reportVertexFailure(result.timeReply, failureVertex) return false, 0 // If we crafted a route that contains a too long time @@ -598,15 +617,15 @@ func (m *MissionControl) applyPaymentResult(timeReply time.Time, // that as a hint during future path finding through // that node. case *lnwire.FailExpiryTooFar: - m.reportVertexFailure(timeReply, failureVertex) + m.reportVertexFailure(result.timeReply, failureVertex) return false, 0 // If we get a permanent channel or node failure, then // we'll prune the channel in both directions and // continue with the rest of the routes. case *lnwire.FailPermanentChannelFailure: - m.reportEdgeFailure(timeReply, failedEdge, 0) - m.reportEdgeFailure(timeReply, edge{ + m.reportEdgeFailure(result.timeReply, failedEdge, 0) + m.reportEdgeFailure(result.timeReply, edge{ from: failedEdge.to, to: failedEdge.from, channel: failedEdge.channel, @@ -615,7 +634,7 @@ func (m *MissionControl) applyPaymentResult(timeReply time.Time, // Any other failure or an empty failure will get the node pruned. default: - m.reportVertexFailure(timeReply, failureVertex) + m.reportVertexFailure(result.timeReply, failureVertex) return false, 0 } } From 67ddb72eabc11e887ff781d5f363e327c3361394 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 2 Jul 2019 16:16:17 +0200 Subject: [PATCH 6/9] lnwire: create failure message encode/decode functions --- lnwire/onion_error.go | 56 +++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/lnwire/onion_error.go b/lnwire/onion_error.go index 6ea01d52..c363aa49 100644 --- a/lnwire/onion_error.go +++ b/lnwire/onion_error.go @@ -1113,10 +1113,16 @@ func DecodeFailure(r io.Reader, pver uint32) (FailureMessage, error) { dataReader := bytes.NewReader(failureData) + return DecodeFailureMessage(dataReader, pver) +} + +// DecodeFailureMessage decodes just the failure message, ignoring any padding +// that may be present at the end. +func DecodeFailureMessage(r io.Reader, pver uint32) (FailureMessage, error) { // Once we have the failure data, we can obtain the failure code from // the first two bytes of the buffer. var codeBytes [2]byte - if _, err := io.ReadFull(dataReader, codeBytes[:]); err != nil { + if _, err := io.ReadFull(r, codeBytes[:]); err != nil { return nil, fmt.Errorf("unable to read failure code: %v", err) } failCode := FailCode(binary.BigEndian.Uint16(codeBytes[:])) @@ -1132,10 +1138,9 @@ func DecodeFailure(r io.Reader, pver uint32) (FailureMessage, error) { // well. switch f := failure.(type) { case Serializable: - if err := f.Decode(dataReader, pver); err != nil { + if err := f.Decode(r, pver); err != nil { return nil, fmt.Errorf("unable to decode error "+ - "update (type=%T, len_bytes=%v, bytes=%x): %v", - failure, failureLength, failureData[:], err) + "update (type=%T): %v", failure, err) } } @@ -1147,26 +1152,11 @@ func DecodeFailure(r io.Reader, pver uint32) (FailureMessage, error) { func EncodeFailure(w io.Writer, failure FailureMessage, pver uint32) error { var failureMessageBuffer bytes.Buffer - // First, we'll write out the error code itself into the failure - // buffer. - var codeBytes [2]byte - code := uint16(failure.Code()) - binary.BigEndian.PutUint16(codeBytes[:], code) - _, err := failureMessageBuffer.Write(codeBytes[:]) + err := EncodeFailureMessage(&failureMessageBuffer, failure, pver) if err != nil { return err } - // Next, some message have an additional message payload, if this is - // one of those types, then we'll also encode the error payload as - // well. - switch failure := failure.(type) { - case Serializable: - if err := failure.Encode(&failureMessageBuffer, pver); err != nil { - return err - } - } - // The combined size of this message must be below the max allowed // failure message length. failureMessage := failureMessageBuffer.Bytes() @@ -1187,6 +1177,32 @@ func EncodeFailure(w io.Writer, failure FailureMessage, pver uint32) error { ) } +// EncodeFailureMessage encodes just the failure message without adding a length +// and padding the message for the onion protocol. +func EncodeFailureMessage(w io.Writer, failure FailureMessage, pver uint32) error { + // First, we'll write out the error code itself into the failure + // buffer. + var codeBytes [2]byte + code := uint16(failure.Code()) + binary.BigEndian.PutUint16(codeBytes[:], code) + _, err := w.Write(codeBytes[:]) + if err != nil { + return err + } + + // Next, some message have an additional message payload, if this is + // one of those types, then we'll also encode the error payload as + // well. + switch failure := failure.(type) { + case Serializable: + if err := failure.Encode(w, pver); err != nil { + return err + } + } + + return nil +} + // makeEmptyOnionError creates a new empty onion error of the proper concrete // type based on the passed failure code. func makeEmptyOnionError(code FailCode) (FailureMessage, error) { From eb4e65e54f3aef054858c8c5a4fe8ccf03ea6389 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 14 Jun 2019 15:01:48 +0200 Subject: [PATCH 7/9] channeldb: export route serialization --- channeldb/payments.go | 10 ++++++---- channeldb/payments_test.go | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/channeldb/payments.go b/channeldb/payments.go index f97dc393..8a0f04a0 100644 --- a/channeldb/payments.go +++ b/channeldb/payments.go @@ -484,7 +484,7 @@ func serializePaymentAttemptInfo(w io.Writer, a *PaymentAttemptInfo) error { return err } - if err := serializeRoute(w, a.Route); err != nil { + if err := SerializeRoute(w, a.Route); err != nil { return err } @@ -497,7 +497,7 @@ func deserializePaymentAttemptInfo(r io.Reader) (*PaymentAttemptInfo, error) { if err != nil { return nil, err } - a.Route, err = deserializeRoute(r) + a.Route, err = DeserializeRoute(r) if err != nil { return nil, err } @@ -533,7 +533,8 @@ func deserializeHop(r io.Reader) (*route.Hop, error) { return h, nil } -func serializeRoute(w io.Writer, r route.Route) error { +// SerializeRoute serializes a route. +func SerializeRoute(w io.Writer, r route.Route) error { if err := WriteElements(w, r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:], ); err != nil { @@ -553,7 +554,8 @@ func serializeRoute(w io.Writer, r route.Route) error { return nil } -func deserializeRoute(r io.Reader) (route.Route, error) { +// DeserializeRoute deserializes a route. +func DeserializeRoute(r io.Reader) (route.Route, error) { rt := route.Route{} if err := ReadElements(r, &rt.TotalTimeLock, &rt.TotalAmount, diff --git a/channeldb/payments_test.go b/channeldb/payments_test.go index 2be1f38b..a12cf65e 100644 --- a/channeldb/payments_test.go +++ b/channeldb/payments_test.go @@ -203,12 +203,12 @@ func TestRouteSerialization(t *testing.T) { t.Parallel() var b bytes.Buffer - if err := serializeRoute(&b, testRoute); err != nil { + if err := SerializeRoute(&b, testRoute); err != nil { t.Fatal(err) } r := bytes.NewReader(b.Bytes()) - route2, err := deserializeRoute(r) + route2, err := DeserializeRoute(r) if err != nil { t.Fatal(err) } From 3dc83d1b6bf22e6535727de49c982c73c11dbcdc Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Sat, 13 Jul 2019 23:26:26 +0200 Subject: [PATCH 8/9] routerrpc: embed routing config --- lnrpc/routerrpc/config.go | 29 +++++++++++----------- lnrpc/routerrpc/config_active.go | 40 +++++++++---------------------- lnrpc/routerrpc/config_default.go | 5 ++-- server.go | 8 ++++--- 4 files changed, 33 insertions(+), 49 deletions(-) diff --git a/lnrpc/routerrpc/config.go b/lnrpc/routerrpc/config.go index 15dd6c40..2f82c5ae 100644 --- a/lnrpc/routerrpc/config.go +++ b/lnrpc/routerrpc/config.go @@ -3,26 +3,25 @@ package routerrpc import ( "time" - "github.com/lightningnetwork/lnd/lnwire" + "github.com/btcsuite/btcutil" ) // RoutingConfig contains the configurable parameters that control routing. type RoutingConfig struct { - // PenaltyHalfLife defines after how much time a penalized node or - // channel is back at 50% probability. - PenaltyHalfLife time.Duration - - // PaymentAttemptPenalty is the virtual cost in path finding weight - // units of executing a payment attempt that fails. It is used to trade - // off potentially better routes against their probability of - // succeeding. - PaymentAttemptPenalty lnwire.MilliSatoshi - - // MinProbability defines the minimum success probability of the - // returned route. - MinRouteProbability float64 + // MinRouteProbability is the minimum required route success probability + // to attempt the payment. + MinRouteProbability float64 `long:"minrtprob" description:"Minimum required route success probability to attempt the payment"` // AprioriHopProbability is the assumed success probability of a hop in // a route when no other information is available. - AprioriHopProbability float64 + AprioriHopProbability float64 `long:"apriorihopprob" description:"Assumed success probability of a hop in a route when no other information is available."` + + // PenaltyHalfLife defines after how much time a penalized node or + // channel is back at 50% probability. + PenaltyHalfLife time.Duration `long:"penaltyhalflife" description:"Defines the duration after which a penalized node or channel is back at 50% probability"` + + // AttemptCost is the virtual cost in path finding weight units of + // executing a payment attempt that fails. It is used to trade off + // potentially better routes against their probability of succeeding. + AttemptCost btcutil.Amount `long:"attemptcost" description:"The (virtual) cost in sats of a failed payment attempt"` } diff --git a/lnrpc/routerrpc/config_active.go b/lnrpc/routerrpc/config_active.go index b919364e..832c86d7 100644 --- a/lnrpc/routerrpc/config_active.go +++ b/lnrpc/routerrpc/config_active.go @@ -3,10 +3,6 @@ package routerrpc import ( - "time" - - "github.com/btcsuite/btcutil" - "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/routing" ) @@ -17,28 +13,13 @@ import ( // options, while if able to be populated, the latter fields MUST also be // specified. type Config struct { + RoutingConfig + // RouterMacPath is the path for the router macaroon. If unspecified // then we assume that the macaroon will be found under the network // directory, named DefaultRouterMacFilename. RouterMacPath string `long:"routermacaroonpath" description:"Path to the router macaroon"` - // MinProbability is the minimum required route success probability to - // attempt the payment. - MinRouteProbability float64 `long:"minrtprob" description:"Minimum required route success probability to attempt the payment"` - - // AprioriHopProbability is the assumed success probability of a hop in - // a route when no other information is available. - AprioriHopProbability float64 `long:"apriorihopprob" description:"Assumed success probability of a hop in a route when no other information is available."` - - // PenaltyHalfLife defines after how much time a penalized node or - // channel is back at 50% probability. - PenaltyHalfLife time.Duration `long:"penaltyhalflife" description:"Defines the duration after which a penalized node or channel is back at 50% probability"` - - // AttemptCost is the virtual cost in path finding weight units of - // executing a payment attempt that fails. It is used to trade off - // potentially better routes against their probability of succeeding. - AttemptCost int64 `long:"attemptcost" description:"The (virtual) cost in sats of a failed payment attempt"` - // NetworkDir is the main network directory wherein the router rpc // server will find the macaroon named DefaultRouterMacFilename. NetworkDir string @@ -62,13 +43,16 @@ type Config struct { // DefaultConfig defines the config defaults. func DefaultConfig() *Config { - return &Config{ + defaultRoutingConfig := RoutingConfig{ AprioriHopProbability: routing.DefaultAprioriHopProbability, MinRouteProbability: routing.DefaultMinRouteProbability, PenaltyHalfLife: routing.DefaultPenaltyHalfLife, - AttemptCost: int64( - routing.DefaultPaymentAttemptPenalty.ToSatoshis(), - ), + AttemptCost: routing.DefaultPaymentAttemptPenalty. + ToSatoshis(), + } + + return &Config{ + RoutingConfig: defaultRoutingConfig, } } @@ -77,9 +61,7 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { return &RoutingConfig{ AprioriHopProbability: cfg.AprioriHopProbability, MinRouteProbability: cfg.MinRouteProbability, - PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis( - btcutil.Amount(cfg.AttemptCost), - ), - PenaltyHalfLife: cfg.PenaltyHalfLife, + AttemptCost: cfg.AttemptCost, + PenaltyHalfLife: cfg.PenaltyHalfLife, } } diff --git a/lnrpc/routerrpc/config_default.go b/lnrpc/routerrpc/config_default.go index a3a95021..6308ca54 100644 --- a/lnrpc/routerrpc/config_default.go +++ b/lnrpc/routerrpc/config_default.go @@ -19,7 +19,8 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { return &RoutingConfig{ AprioriHopProbability: routing.DefaultAprioriHopProbability, MinRouteProbability: routing.DefaultMinRouteProbability, - PaymentAttemptPenalty: routing.DefaultPaymentAttemptPenalty, - PenaltyHalfLife: routing.DefaultPenaltyHalfLife, + AttemptCost: routing.DefaultPaymentAttemptPenalty. + ToSatoshis(), + PenaltyHalfLife: routing.DefaultPenaltyHalfLife, } } diff --git a/server.go b/server.go index 11f32cb8..658b47f9 100644 --- a/server.go +++ b/server.go @@ -662,12 +662,14 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, srvrLog.Debugf("Instantiating payment session source with config: "+ "PaymentAttemptPenalty=%v, MinRouteProbability=%v", - int64(routingConfig.PaymentAttemptPenalty.ToSatoshis()), + int64(routingConfig.AttemptCost), routingConfig.MinRouteProbability) pathFindingConfig := routing.PathFindingConfig{ - PaymentAttemptPenalty: routingConfig.PaymentAttemptPenalty, - MinProbability: routingConfig.MinRouteProbability, + PaymentAttemptPenalty: lnwire.NewMSatFromSatoshis( + routingConfig.AttemptCost, + ), + MinProbability: routingConfig.MinRouteProbability, } paymentSessionSource := &routing.SessionSource{ From 7e7b62035598bd66127add8cc0bff1b3611a66de Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 26 Jun 2019 13:00:35 +0200 Subject: [PATCH 9/9] routing: persist mission control data --- lnrpc/routerrpc/config.go | 4 + lnrpc/routerrpc/config_active.go | 2 + lnrpc/routerrpc/config_default.go | 1 + lnrpc/routerrpc/router_backend.go | 2 +- lnrpc/routerrpc/router_backend_test.go | 4 +- lnrpc/routerrpc/router_server.go | 5 +- routing/missioncontrol.go | 68 ++++++- routing/missioncontrol_store.go | 269 +++++++++++++++++++++++++ routing/missioncontrol_store_test.go | 140 +++++++++++++ routing/missioncontrol_test.go | 44 +++- routing/mock_test.go | 4 +- routing/router.go | 27 ++- routing/router_test.go | 17 +- server.go | 7 +- 14 files changed, 566 insertions(+), 28 deletions(-) create mode 100644 routing/missioncontrol_store.go create mode 100644 routing/missioncontrol_store_test.go diff --git a/lnrpc/routerrpc/config.go b/lnrpc/routerrpc/config.go index 2f82c5ae..ba094c8f 100644 --- a/lnrpc/routerrpc/config.go +++ b/lnrpc/routerrpc/config.go @@ -24,4 +24,8 @@ type RoutingConfig struct { // executing a payment attempt that fails. It is used to trade off // potentially better routes against their probability of succeeding. AttemptCost btcutil.Amount `long:"attemptcost" description:"The (virtual) cost in sats of a failed payment attempt"` + + // MaxMcHistory defines the maximum number of payment results that + // are held on disk by mission control. + MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"` } diff --git a/lnrpc/routerrpc/config_active.go b/lnrpc/routerrpc/config_active.go index 832c86d7..0479211c 100644 --- a/lnrpc/routerrpc/config_active.go +++ b/lnrpc/routerrpc/config_active.go @@ -49,6 +49,7 @@ func DefaultConfig() *Config { PenaltyHalfLife: routing.DefaultPenaltyHalfLife, AttemptCost: routing.DefaultPaymentAttemptPenalty. ToSatoshis(), + MaxMcHistory: routing.DefaultMaxMcHistory, } return &Config{ @@ -63,5 +64,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { MinRouteProbability: cfg.MinRouteProbability, AttemptCost: cfg.AttemptCost, PenaltyHalfLife: cfg.PenaltyHalfLife, + MaxMcHistory: cfg.MaxMcHistory, } } diff --git a/lnrpc/routerrpc/config_default.go b/lnrpc/routerrpc/config_default.go index 6308ca54..f8af98f3 100644 --- a/lnrpc/routerrpc/config_default.go +++ b/lnrpc/routerrpc/config_default.go @@ -22,5 +22,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { AttemptCost: routing.DefaultPaymentAttemptPenalty. ToSatoshis(), PenaltyHalfLife: routing.DefaultPenaltyHalfLife, + MaxMcHistory: routing.DefaultMaxMcHistory, } } diff --git a/lnrpc/routerrpc/router_backend.go b/lnrpc/routerrpc/router_backend.go index 139d4963..d4d1d8e2 100644 --- a/lnrpc/routerrpc/router_backend.go +++ b/lnrpc/routerrpc/router_backend.go @@ -64,7 +64,7 @@ type MissionControl interface { // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. - ResetHistory() + ResetHistory() error // GetHistorySnapshot takes a snapshot from the current mission control state // and actual probability estimates. diff --git a/lnrpc/routerrpc/router_backend_test.go b/lnrpc/routerrpc/router_backend_test.go index e36baa7e..1a0ce3ee 100644 --- a/lnrpc/routerrpc/router_backend_test.go +++ b/lnrpc/routerrpc/router_backend_test.go @@ -147,7 +147,9 @@ func (m *mockMissionControl) GetEdgeProbability(fromNode route.Vertex, return testMissionControlProb } -func (m *mockMissionControl) ResetHistory() {} +func (m *mockMissionControl) ResetHistory() error { + return nil +} func (m *mockMissionControl) GetHistorySnapshot() *routing.MissionControlSnapshot { return nil diff --git a/lnrpc/routerrpc/router_server.go b/lnrpc/routerrpc/router_server.go index fc577969..64b88f62 100644 --- a/lnrpc/routerrpc/router_server.go +++ b/lnrpc/routerrpc/router_server.go @@ -436,7 +436,10 @@ func marshallChannelUpdate(update *lnwire.ChannelUpdate) *ChannelUpdate { func (s *Server) ResetMissionControl(ctx context.Context, req *ResetMissionControlRequest) (*ResetMissionControlResponse, error) { - s.cfg.RouterBackend.MissionControl.ResetHistory() + err := s.cfg.RouterBackend.MissionControl.ResetHistory() + if err != nil { + return nil, err + } return &ResetMissionControlResponse{}, nil } diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 350f35b6..e85fb19a 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" @@ -39,6 +40,9 @@ const ( // Nodes forward non-strict, so it isn't necessary to apply a less // restrictive channel level tracking scheme here. minSecondChanceInterval = time.Minute + + // DefaultMaxMcHistory is the default maximum history size. + DefaultMaxMcHistory = 1000 ) // MissionControl contains state which summarizes the past attempts of HTLC @@ -63,6 +67,8 @@ type MissionControl struct { cfg *MissionControlConfig + store *missionControlStore + sync.Mutex // TODO(roasbeef): further counters, if vertex continually unavailable, @@ -81,6 +87,10 @@ type MissionControlConfig struct { // AprioriHopProbability is the assumed success probability of a hop in // a route when no other information is available. AprioriHopProbability float64 + + // MaxMcHistory defines the maximum number of payment results that are + // held on disk. + MaxMcHistory int } // nodeHistory contains a summary of payment attempt outcomes involving a @@ -159,29 +169,70 @@ type paymentResult struct { } // NewMissionControl returns a new instance of missionControl. -func NewMissionControl(cfg *MissionControlConfig) *MissionControl { +func NewMissionControl(db *bbolt.DB, cfg *MissionControlConfig) ( + *MissionControl, error) { + log.Debugf("Instantiating mission control with config: "+ "PenaltyHalfLife=%v, AprioriHopProbability=%v", cfg.PenaltyHalfLife, cfg.AprioriHopProbability) - return &MissionControl{ + store, err := newMissionControlStore(db, cfg.MaxMcHistory) + if err != nil { + return nil, err + } + + mc := &MissionControl{ history: make(map[route.Vertex]*nodeHistory), lastSecondChance: make(map[DirectedNodePair]time.Time), now: time.Now, cfg: cfg, + store: store, } + + if err := mc.init(); err != nil { + return nil, err + } + + return mc, nil +} + +// init initializes mission control with historical data. +func (m *MissionControl) init() error { + log.Debugf("Mission control state reconstruction started") + + start := time.Now() + + results, err := m.store.fetchAll() + if err != nil { + return err + } + + for _, result := range results { + m.applyPaymentResult(result) + } + + log.Debugf("Mission control state reconstruction finished: "+ + "n=%v, time=%v", len(results), time.Now().Sub(start)) + + return nil } // ResetHistory resets the history of MissionControl returning it to a state as // if no payment attempts have been made. -func (m *MissionControl) ResetHistory() { +func (m *MissionControl) ResetHistory() error { m.Lock() defer m.Unlock() + if err := m.store.clear(); err != nil { + return err + } + m.history = make(map[route.Vertex]*nodeHistory) m.lastSecondChance = make(map[DirectedNodePair]time.Time) log.Debugf("Mission control history cleared") + + return nil } // GetEdgeProbability is expected to return the success probability of a payment @@ -406,7 +457,7 @@ func (m *MissionControl) GetHistorySnapshot() *MissionControlSnapshot { // be made. func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (bool, - channeldb.FailureReason) { + channeldb.FailureReason, error) { timestamp := m.now() @@ -421,8 +472,15 @@ func (m *MissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, route: rt, } + // Store complete result in database. + if err := m.store.AddResult(result); err != nil { + return false, 0, err + } + // Apply result to update mission control state. - return m.applyPaymentResult(result) + final, reason := m.applyPaymentResult(result) + + return final, reason, nil } // applyPaymentResult applies a payment result as input for future probability diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go new file mode 100644 index 00000000..329d819f --- /dev/null +++ b/routing/missioncontrol_store.go @@ -0,0 +1,269 @@ +package routing + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/btcsuite/btcd/wire" + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + // resultsKey is the fixed key under which the attempt results are + // stored. + resultsKey = []byte("missioncontrol-results") + + // Big endian is the preferred byte order, due to cursor scans over + // integer keys iterating in order. + byteOrder = binary.BigEndian +) + +const ( + // unknownFailureSourceIdx is the database encoding of an unknown error + // source. + unknownFailureSourceIdx = -1 +) + +// missionControlStore is a bolt db based implementation of a mission control +// store. It stores the raw payment attempt data from which the internal mission +// controls state can be rederived on startup. This allows the mission control +// internal data structure to be changed without requiring a database migration. +// Also changes to mission control parameters can be applied to historical data. +// Finally, it enables importing raw data from an external source. +type missionControlStore struct { + db *bbolt.DB + maxRecords int + numRecords int +} + +func newMissionControlStore(db *bbolt.DB, maxRecords int) (*missionControlStore, error) { + store := &missionControlStore{ + db: db, + maxRecords: maxRecords, + } + + // Create buckets if not yet existing. + err := db.Update(func(tx *bbolt.Tx) error { + resultsBucket, err := tx.CreateBucketIfNotExists(resultsKey) + if err != nil { + return fmt.Errorf("cannot create results bucket: %v", + err) + } + + // Count initial number of results and track this number in + // memory to avoid calling Stats().KeyN. The reliability of + // Stats() is doubtful and seemed to have caused crashes in the + // past (see #1874). + c := resultsBucket.Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + store.numRecords++ + } + + return nil + }) + if err != nil { + return nil, err + } + + return store, nil +} + +// clear removes all results from the db. +func (b *missionControlStore) clear() error { + return b.db.Update(func(tx *bbolt.Tx) error { + if err := tx.DeleteBucket(resultsKey); err != nil { + return err + } + + _, err := tx.CreateBucket(resultsKey) + return err + }) +} + +// fetchAll returns all results currently stored in the database. +func (b *missionControlStore) fetchAll() ([]*paymentResult, error) { + var results []*paymentResult + + err := b.db.View(func(tx *bbolt.Tx) error { + resultBucket := tx.Bucket(resultsKey) + results = make([]*paymentResult, 0) + + return resultBucket.ForEach(func(k, v []byte) error { + result, err := deserializeResult(k, v) + if err != nil { + return err + } + + results = append(results, result) + + return nil + }) + + }) + if err != nil { + return nil, err + } + + return results, nil +} + +// serializeResult serializes a payment result and returns a key and value byte +// slice to insert into the bucket. +func serializeResult(rp *paymentResult) ([]byte, []byte, error) { + // Write timestamps, success status, failure source index and route. + var b bytes.Buffer + + var dbFailureSourceIdx int32 + if rp.failureSourceIdx == nil { + dbFailureSourceIdx = unknownFailureSourceIdx + } else { + dbFailureSourceIdx = int32(*rp.failureSourceIdx) + } + + err := channeldb.WriteElements( + &b, + uint64(rp.timeFwd.UnixNano()), + uint64(rp.timeReply.UnixNano()), + rp.success, dbFailureSourceIdx, + ) + if err != nil { + return nil, nil, err + } + + if err := channeldb.SerializeRoute(&b, *rp.route); err != nil { + return nil, nil, err + } + + // Write failure. If there is no failure message, write an empty + // byte slice. + var failureBytes bytes.Buffer + if rp.failure != nil { + err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0) + if err != nil { + return nil, nil, err + } + } + err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes()) + if err != nil { + return nil, nil, err + } + + // Compose key that identifies this result. + key := getResultKey(rp) + + return key, b.Bytes(), nil +} + +// deserializeResult deserializes a payment result. +func deserializeResult(k, v []byte) (*paymentResult, error) { + // Parse payment id. + result := paymentResult{ + id: byteOrder.Uint64(k[8:]), + } + + r := bytes.NewReader(v) + + // Read timestamps, success status and failure source index. + var ( + timeFwd, timeReply uint64 + dbFailureSourceIdx int32 + ) + + err := channeldb.ReadElements( + r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx, + ) + if err != nil { + return nil, err + } + + // Convert time stamps to local time zone for consistent logging. + result.timeFwd = time.Unix(0, int64(timeFwd)).Local() + result.timeReply = time.Unix(0, int64(timeReply)).Local() + + // Convert from unknown index magic number to nil value. + if dbFailureSourceIdx != unknownFailureSourceIdx { + failureSourceIdx := int(dbFailureSourceIdx) + result.failureSourceIdx = &failureSourceIdx + } + + // Read route. + route, err := channeldb.DeserializeRoute(r) + if err != nil { + return nil, err + } + result.route = &route + + // Read failure. + failureBytes, err := wire.ReadVarBytes( + r, 0, lnwire.FailureMessageLength, "failure", + ) + if err != nil { + return nil, err + } + if len(failureBytes) > 0 { + result.failure, err = lnwire.DecodeFailureMessage( + bytes.NewReader(failureBytes), 0, + ) + if err != nil { + return nil, err + } + } + + return &result, nil +} + +// AddResult adds a new result to the db. +func (b *missionControlStore) AddResult(rp *paymentResult) error { + return b.db.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(resultsKey) + + // Prune oldest entries. + if b.maxRecords > 0 { + for b.numRecords >= b.maxRecords { + cursor := bucket.Cursor() + cursor.First() + if err := cursor.Delete(); err != nil { + return err + } + + b.numRecords-- + } + } + + // Serialize result into key and value byte slices. + k, v, err := serializeResult(rp) + if err != nil { + return err + } + + // The store is assumed to be idempotent. It could be that the + // same result is added twice and in that case the counter + // shouldn't be increased. + if bucket.Get(k) == nil { + b.numRecords++ + } + + // Put into results bucket. + return bucket.Put(k, v) + }) +} + +// getResultKey returns a byte slice representing a unique key for this payment +// result. +func getResultKey(rp *paymentResult) []byte { + var keyBytes [8 + 8 + 33]byte + + // Identify records by a combination of time, payment id and sender pub + // key. This allows importing mission control data from an external + // source without key collisions and keeps the records sorted + // chronologically. + byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano())) + byteOrder.PutUint64(keyBytes[8:], rp.id) + copy(keyBytes[16:], rp.route.SourcePubKey[:]) + + return keyBytes[:] +} diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go new file mode 100644 index 00000000..43d150fa --- /dev/null +++ b/routing/missioncontrol_store_test.go @@ -0,0 +1,140 @@ +package routing + +import ( + "io/ioutil" + "os" + "reflect" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/routing/route" +) + +const testMaxRecords = 2 + +func TestMissionControlStore(t *testing.T) { + // Set time zone explictly to keep test deterministic. + time.Local = time.UTC + + file, err := ioutil.TempFile("", "*.db") + if err != nil { + t.Fatal(err) + } + + dbPath := file.Name() + + db, err := bbolt.Open(dbPath, 0600, nil) + if err != nil { + t.Fatal(err) + } + defer db.Close() + defer os.Remove(dbPath) + + store, err := newMissionControlStore(db, testMaxRecords) + if err != nil { + t.Fatal(err) + } + + results, err := store.fetchAll() + if err != nil { + t.Fatal(err) + } + if len(results) != 0 { + t.Fatal("expected no results") + } + + testRoute := route.Route{ + SourcePubKey: route.Vertex{1}, + Hops: []*route.Hop{ + { + PubKeyBytes: route.Vertex{2}, + }, + }, + } + + failureSourceIdx := 1 + + result1 := paymentResult{ + route: &testRoute, + failure: lnwire.NewFailUnknownPaymentHash(100), + failureSourceIdx: &failureSourceIdx, + id: 99, + timeReply: testTime, + timeFwd: testTime.Add(-time.Minute), + } + + result2 := result1 + result2.timeReply = result1.timeReply.Add(time.Hour) + result2.timeFwd = result1.timeReply.Add(time.Hour) + result2.id = 2 + + // Store result. + err = store.AddResult(&result2) + if err != nil { + t.Fatal(err) + } + + // Store again to test idempotency. + err = store.AddResult(&result2) + if err != nil { + t.Fatal(err) + } + + // Store second result which has an earlier timestamp. + err = store.AddResult(&result1) + if err != nil { + t.Fatal(err) + } + + results, err = store.fetchAll() + if err != nil { + t.Fatal(err) + } + if len(results) != 2 { + t.Fatal("expected two results") + } + + // Check that results are stored in chronological order. + if !reflect.DeepEqual(&result1, results[0]) { + t.Fatal() + } + if !reflect.DeepEqual(&result2, results[1]) { + t.Fatal() + } + + // Recreate store to test pruning. + store, err = newMissionControlStore(db, testMaxRecords) + if err != nil { + t.Fatal(err) + } + + // Add a newer result. + result3 := result1 + result3.timeReply = result1.timeReply.Add(2 * time.Hour) + result3.timeFwd = result1.timeReply.Add(2 * time.Hour) + result3.id = 3 + + err = store.AddResult(&result3) + if err != nil { + t.Fatal(err) + } + + // Check that results are pruned. + results, err = store.fetchAll() + if err != nil { + t.Fatal(err) + } + if len(results) != 2 { + t.Fatal("expected two results") + } + + if !reflect.DeepEqual(&result2, results[0]) { + t.Fatal() + } + if !reflect.DeepEqual(&result3, results[1]) { + t.Fatal() + } +} diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 1675b044..798161c9 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -1,9 +1,12 @@ package routing import ( + "io/ioutil" + "os" "testing" "time" + "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -37,6 +40,10 @@ type mcTestContext struct { t *testing.T mc *MissionControl now time.Time + + db *bbolt.DB + dbPath string + pid uint64 } @@ -46,17 +53,44 @@ func createMcTestContext(t *testing.T) *mcTestContext { now: mcTestTime, } - mc := NewMissionControl( + file, err := ioutil.TempFile("", "*.db") + if err != nil { + t.Fatal(err) + } + + ctx.dbPath = file.Name() + + ctx.db, err = bbolt.Open(ctx.dbPath, 0600, nil) + if err != nil { + t.Fatal(err) + } + + ctx.restartMc() + + return ctx +} + +// restartMc creates a new instances of mission control on the same database. +func (ctx *mcTestContext) restartMc() { + mc, err := NewMissionControl( + ctx.db, &MissionControlConfig{ PenaltyHalfLife: 30 * time.Minute, AprioriHopProbability: 0.8, }, ) + if err != nil { + ctx.t.Fatal(err) + } mc.now = func() time.Time { return ctx.now } ctx.mc = mc +} - return ctx +// cleanup closes the database and removes the temp file. +func (ctx *mcTestContext) cleanup() { + ctx.db.Close() + os.Remove(ctx.dbPath) } // Assert that mission control returns a probability for an edge. @@ -86,6 +120,7 @@ func (ctx *mcTestContext) reportFailure(t time.Time, // TestMissionControl tests mission control probability estimation. func TestMissionControl(t *testing.T) { ctx := createMcTestContext(t) + defer ctx.cleanup() ctx.now = testTime @@ -122,6 +157,10 @@ func TestMissionControl(t *testing.T) { ctx.now = testTime.Add(60 * time.Minute) ctx.expectP(1000, 0.4) + // Restart mission control to test persistence. + ctx.restartMc() + ctx.expectP(1000, 0.4) + // A node level failure should bring probability of every channel back // to zero. ctx.reportFailure( @@ -145,6 +184,7 @@ func TestMissionControl(t *testing.T) { // penalizing the channel yet. func TestMissionControlChannelUpdate(t *testing.T) { ctx := createMcTestContext(t) + defer ctx.cleanup() // Report a policy related failure. Because it is the first, we don't // expect a penalty. diff --git a/routing/mock_test.go b/routing/mock_test.go index b0444546..54a8011d 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -100,9 +100,9 @@ var _ MissionController = (*mockMissionControl)(nil) func (m *mockMissionControl) ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) ( - bool, channeldb.FailureReason) { + bool, channeldb.FailureReason, error) { - return false, 0 + return false, 0, nil } func (m *mockMissionControl) ReportEdgeFailure(failedEdge edge, diff --git a/routing/router.go b/routing/router.go index edc3aa3a..146e06ca 100644 --- a/routing/router.go +++ b/routing/router.go @@ -180,7 +180,7 @@ type MissionController interface { // need to be made. ReportPaymentFail(paymentID uint64, rt *route.Route, failureSourceIdx *int, failure lnwire.FailureMessage) (bool, - channeldb.FailureReason) + channeldb.FailureReason, error) // GetEdgeProbability is expected to return the success probability of a // payment from fromNode along edge. @@ -1896,12 +1896,27 @@ func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route, func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route, sendErr error) (bool, channeldb.FailureReason) { + reportFail := func(srcIdx *int, msg lnwire.FailureMessage) (bool, + channeldb.FailureReason) { + + // Report outcome to mission control. + final, reason, err := r.cfg.MissionControl.ReportPaymentFail( + paymentID, rt, srcIdx, msg, + ) + if err != nil { + log.Errorf("Error reporting payment result to mc: %v", + err) + + return true, channeldb.FailureReasonError + } + + return final, reason + } + if sendErr == htlcswitch.ErrUnreadableFailureMessage { log.Tracef("Unreadable failure when sending htlc") - return r.cfg.MissionControl.ReportPaymentFail( - paymentID, rt, nil, nil, - ) + return reportFail(nil, nil) } // If an internal, non-forwarding error occurred, we can stop // trying. @@ -1927,9 +1942,7 @@ func (r *ChannelRouter) processSendError(paymentID uint64, rt *route.Route, log.Tracef("Node=%v reported failure when sending htlc", failureSourceIdx) - return r.cfg.MissionControl.ReportPaymentFail( - paymentID, rt, &failureSourceIdx, failureMessage, - ) + return reportFail(&failureSourceIdx, failureMessage) } // extractChannelUpdate examines the error and extracts the channel update. diff --git a/routing/router_test.go b/routing/router_test.go index d0383c09..28ab1692 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -90,7 +90,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr return nil, nil, err } - pathFindingConfig := &PathFindingConfig{ + pathFindingConfig := PathFindingConfig{ MinProbability: 0.01, PaymentAttemptPenalty: 100, } @@ -100,9 +100,13 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr AprioriHopProbability: 0.9, } - mc := NewMissionControl( + mc, err := NewMissionControl( + graphInstance.graph.Database().DB, mcConfig, ) + if err != nil { + return nil, nil, err + } sessionSource := &SessionSource{ Graph: graphInstance.graph, @@ -110,11 +114,8 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { return lnwire.NewMSatFromSatoshis(e.Capacity) }, - PathFindingConfig: PathFindingConfig{ - MinProbability: 0.01, - PaymentAttemptPenalty: 100, - }, - MissionControl: mc, + PathFindingConfig: pathFindingConfig, + MissionControl: mc, } router, err := New(Config{ @@ -134,7 +135,7 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr next := atomic.AddUint64(&uniquePaymentID, 1) return next, nil }, - PathFindingConfig: *pathFindingConfig, + PathFindingConfig: pathFindingConfig, }) if err != nil { return nil, nil, fmt.Errorf("unable to create router %v", err) diff --git a/server.go b/server.go index 658b47f9..da2c51cb 100644 --- a/server.go +++ b/server.go @@ -653,12 +653,17 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, // servers, the mission control instance itself can be moved there too. routingConfig := routerrpc.GetRoutingConfig(cfg.SubRPCServers.RouterRPC) - s.missionControl = routing.NewMissionControl( + s.missionControl, err = routing.NewMissionControl( + chanDB.DB, &routing.MissionControlConfig{ AprioriHopProbability: routingConfig.AprioriHopProbability, PenaltyHalfLife: routingConfig.PenaltyHalfLife, + MaxMcHistory: routingConfig.MaxMcHistory, }, ) + if err != nil { + return nil, fmt.Errorf("can't create mission control: %v", err) + } srvrLog.Debugf("Instantiating payment session source with config: "+ "PaymentAttemptPenalty=%v, MinRouteProbability=%v",