diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index 1d79e0fe..9cbaa911 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -70,6 +70,10 @@ type MissionControl struct { // TODO(roasbeef): also add favorable metrics for nodes } +// A compile time assertion to ensure MissionControl meets the +// PaymentSessionSource interface. +var _ PaymentSessionSource = (*MissionControl)(nil) + // NewMissionControl returns a new instance of MissionControl. // // TODO(roasbeef): persist memory diff --git a/routing/router.go b/routing/router.go index a7d70114..6e85a096 100644 --- a/routing/router.go +++ b/routing/router.go @@ -147,6 +147,28 @@ type PaymentAttemptDispatcher interface { <-chan *htlcswitch.PaymentResult, error) } +// PaymentSessionSource is an interface that defines a source for the router to +// retrive new payment sessions. +type PaymentSessionSource interface { + // NewPaymentSession creates a new payment session that will produce + // routes to the given target. An optional set of routing hints can be + // provided in order to populate additional edges to explore when + // finding a path to the payment's destination. + NewPaymentSession(routeHints [][]zpay32.HopHint, + target route.Vertex) (PaymentSession, error) + + // NewPaymentSessionForRoute creates a new paymentSession instance that + // is just used for failure reporting to missioncontrol, and will only + // attempt the given route. + NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession + + // NewPaymentSessionEmpty creates a new paymentSession instance that is + // empty, and will be exhausted immediately. Used for failure reporting + // to missioncontrol for resumed payment we don't want to make more + // attempts for. + NewPaymentSessionEmpty() PaymentSession +} + // FeeSchema is the set fee configuration for a Lightning Node on the network. // Using the coefficients described within the schema, the required fee to // forward outgoing payments can be derived. @@ -203,6 +225,15 @@ type Config struct { // can properly resume them across restarts. Control channeldb.ControlTower + // MissionControl is a shared memory of sorts that executions of + // payment path finding use in order to remember which vertexes/edges + // were pruned from prior attempts. During SendPayment execution, + // errors sent by nodes are mapped into a vertex or edge to be pruned. + // Each run will then take into account this set of pruned + // vertexes/edges to reduce route failure and pass on graph information + // gained to the next execution. + MissionControl PaymentSessionSource + // ChannelPruneExpiry is the duration used to determine if a channel // should be pruned or not. If the delta between now and when the // channel was last updated is greater than ChannelPruneExpiry, then @@ -342,15 +373,6 @@ type ChannelRouter struct { // existing client. ntfnClientUpdates chan *topologyClientUpdate - // missionControl is a shared memory of sorts that executions of - // payment path finding use in order to remember which vertexes/edges - // were pruned from prior attempts. During SendPayment execution, - // errors sent by nodes are mapped into a vertex or edge to be pruned. - // Each run will then take into account this set of pruned - // vertexes/edges to reduce route failure and pass on graph information - // gained to the next execution. - missionControl *MissionControl - // channelEdgeMtx is a mutex we use to make sure we process only one // ChannelEdgePolicy at a time for a given channelID, to ensure // consistency between the various database accesses. @@ -392,10 +414,6 @@ func New(cfg Config) (*ChannelRouter, error) { quit: make(chan struct{}), } - r.missionControl = NewMissionControl( - cfg.Graph, selfNode, cfg.QueryBandwidth, - ) - return r, nil } @@ -508,7 +526,7 @@ func (r *ChannelRouter) Start() error { // We create a dummy, empty payment session such that // we won't make another payment attempt when the // result for the in-flight attempt is received. - paySession := r.missionControl.NewPaymentSessionEmpty() + paySession := r.cfg.MissionControl.NewPaymentSessionEmpty() lPayment := &LightningPayment{ PaymentHash: payment.Info.PaymentHash, @@ -1582,7 +1600,7 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *route // Before starting the HTLC routing attempt, we'll create a fresh // payment session which will report our errors back to mission // control. - paySession, err := r.missionControl.NewPaymentSession( + paySession, err := r.cfg.MissionControl.NewPaymentSession( payment.RouteHints, payment.Target, ) if err != nil { @@ -1615,7 +1633,7 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) ( lntypes.Preimage, error) { // Create a payment session for just this route. - paySession := r.missionControl.NewPaymentSessionForRoute(route) + paySession := r.cfg.MissionControl.NewPaymentSessionForRoute(route) // Create a (mostly) dummy payment, as the created payment session is // not going to do path finding. diff --git a/routing/router_test.go b/routing/router_test.go index 206c4c30..0380475d 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -84,12 +84,25 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr // be populated. chain := newMockChain(startingHeight) chainView := newMockChainView(chain) + + selfNode, err := graphInstance.graph.SourceNode() + if err != nil { + return nil, nil, err + } + + mc := NewMissionControl( + graphInstance.graph, selfNode, + func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { + return lnwire.NewMSatFromSatoshis(e.Capacity) + }, + ) router, err := New(Config{ Graph: graphInstance.graph, Chain: chain, ChainView: chainView, Payer: &mockPaymentAttemptDispatcher{}, Control: makeMockControlTower(), + MissionControl: mc, ChannelPruneExpiry: time.Hour * 24, GraphPruneInterval: time.Hour * 2, QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { @@ -807,7 +820,7 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { return preImage, nil }) - ctx.router.missionControl.ResetHistory() + ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() // When we try to dispatch that payment, we should receive an error as // both attempts should fail and cause both routes to be pruned. @@ -822,7 +835,7 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { t.Fatalf("expected UnknownNextPeer instead got: %v", err) } - ctx.router.missionControl.ResetHistory() + ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() // Next, we'll modify the SendToSwitch method to indicate that luo ji // wasn't originally online. This should also halt the send all @@ -865,7 +878,7 @@ func TestSendPaymentErrorPathPruning(t *testing.T) { ctx.aliases)) } - ctx.router.missionControl.ResetHistory() + ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory() // Finally, we'll modify the SendToSwitch function to indicate that the // roasbeef -> luoji channel has insufficient capacity. This should diff --git a/server.go b/server.go index 921e0475..0d1193a4 100644 --- a/server.go +++ b/server.go @@ -616,43 +616,50 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, return nil, err } + queryBandwidth := func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { + // If we aren't on either side of this edge, then we'll + // just thread through the capacity of the edge as we + // know it. + if !bytes.Equal(edge.NodeKey1Bytes[:], selfNode.PubKeyBytes[:]) && + !bytes.Equal(edge.NodeKey2Bytes[:], selfNode.PubKeyBytes[:]) { + + return lnwire.NewMSatFromSatoshis(edge.Capacity) + } + + cid := lnwire.NewChanIDFromOutPoint(&edge.ChannelPoint) + link, err := s.htlcSwitch.GetLink(cid) + if err != nil { + // If the link isn't online, then we'll report + // that it has zero bandwidth to the router. + return 0 + } + + // If the link is found within the switch, but it isn't + // yet eligible to forward any HTLCs, then we'll treat + // it as if it isn't online in the first place. + if !link.EligibleToForward() { + return 0 + } + + // Otherwise, we'll return the current best estimate + // for the available bandwidth for the link. + return link.Bandwidth() + } + + missionControl := routing.NewMissionControl( + chanGraph, selfNode, queryBandwidth, + ) + s.chanRouter, err = routing.New(routing.Config{ Graph: chanGraph, Chain: cc.chainIO, ChainView: cc.chainView, Payer: s.htlcSwitch, Control: channeldb.NewPaymentControl(chanDB), + MissionControl: missionControl, ChannelPruneExpiry: routing.DefaultChannelPruneExpiry, GraphPruneInterval: time.Duration(time.Hour), - QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { - // If we aren't on either side of this edge, then we'll - // just thread through the capacity of the edge as we - // know it. - if !bytes.Equal(edge.NodeKey1Bytes[:], selfNode.PubKeyBytes[:]) && - !bytes.Equal(edge.NodeKey2Bytes[:], selfNode.PubKeyBytes[:]) { - - return lnwire.NewMSatFromSatoshis(edge.Capacity) - } - - cid := lnwire.NewChanIDFromOutPoint(&edge.ChannelPoint) - link, err := s.htlcSwitch.GetLink(cid) - if err != nil { - // If the link isn't online, then we'll report - // that it has zero bandwidth to the router. - return 0 - } - - // If the link is found within the switch, but it isn't - // yet eligible to forward any HTLCs, then we'll treat - // it as if it isn't online in the first place. - if !link.EligibleToForward() { - return 0 - } - - // Otherwise, we'll return the current best estimate - // for the available bandwidth for the link. - return link.Bandwidth() - }, + QueryBandwidth: queryBandwidth, AssumeChannelValid: cfg.Routing.UseAssumeChannelValid(), NextPaymentID: sequencer.NextID, })