routing+server: define PaymentSessionSource

This commit is contained in:
Johan T. Halseth 2019-05-23 20:05:30 +02:00
parent f4306b1178
commit 3f76bc0629
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
4 changed files with 90 additions and 48 deletions

@ -70,6 +70,10 @@ type MissionControl struct {
// TODO(roasbeef): also add favorable metrics for nodes // TODO(roasbeef): also add favorable metrics for nodes
} }
// A compile time assertion to ensure MissionControl meets the
// PaymentSessionSource interface.
var _ PaymentSessionSource = (*MissionControl)(nil)
// NewMissionControl returns a new instance of MissionControl. // NewMissionControl returns a new instance of MissionControl.
// //
// TODO(roasbeef): persist memory // TODO(roasbeef): persist memory

@ -147,6 +147,28 @@ type PaymentAttemptDispatcher interface {
<-chan *htlcswitch.PaymentResult, error) <-chan *htlcswitch.PaymentResult, error)
} }
// PaymentSessionSource is an interface that defines a source for the router to
// retrive new payment sessions.
type PaymentSessionSource interface {
// NewPaymentSession creates a new payment session that will produce
// routes to the given target. An optional set of routing hints can be
// provided in order to populate additional edges to explore when
// finding a path to the payment's destination.
NewPaymentSession(routeHints [][]zpay32.HopHint,
target route.Vertex) (PaymentSession, error)
// NewPaymentSessionForRoute creates a new paymentSession instance that
// is just used for failure reporting to missioncontrol, and will only
// attempt the given route.
NewPaymentSessionForRoute(preBuiltRoute *route.Route) PaymentSession
// NewPaymentSessionEmpty creates a new paymentSession instance that is
// empty, and will be exhausted immediately. Used for failure reporting
// to missioncontrol for resumed payment we don't want to make more
// attempts for.
NewPaymentSessionEmpty() PaymentSession
}
// FeeSchema is the set fee configuration for a Lightning Node on the network. // FeeSchema is the set fee configuration for a Lightning Node on the network.
// Using the coefficients described within the schema, the required fee to // Using the coefficients described within the schema, the required fee to
// forward outgoing payments can be derived. // forward outgoing payments can be derived.
@ -203,6 +225,15 @@ type Config struct {
// can properly resume them across restarts. // can properly resume them across restarts.
Control channeldb.ControlTower Control channeldb.ControlTower
// MissionControl is a shared memory of sorts that executions of
// payment path finding use in order to remember which vertexes/edges
// were pruned from prior attempts. During SendPayment execution,
// errors sent by nodes are mapped into a vertex or edge to be pruned.
// Each run will then take into account this set of pruned
// vertexes/edges to reduce route failure and pass on graph information
// gained to the next execution.
MissionControl PaymentSessionSource
// ChannelPruneExpiry is the duration used to determine if a channel // ChannelPruneExpiry is the duration used to determine if a channel
// should be pruned or not. If the delta between now and when the // should be pruned or not. If the delta between now and when the
// channel was last updated is greater than ChannelPruneExpiry, then // channel was last updated is greater than ChannelPruneExpiry, then
@ -342,15 +373,6 @@ type ChannelRouter struct {
// existing client. // existing client.
ntfnClientUpdates chan *topologyClientUpdate ntfnClientUpdates chan *topologyClientUpdate
// missionControl is a shared memory of sorts that executions of
// payment path finding use in order to remember which vertexes/edges
// were pruned from prior attempts. During SendPayment execution,
// errors sent by nodes are mapped into a vertex or edge to be pruned.
// Each run will then take into account this set of pruned
// vertexes/edges to reduce route failure and pass on graph information
// gained to the next execution.
missionControl *MissionControl
// channelEdgeMtx is a mutex we use to make sure we process only one // channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure // ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses. // consistency between the various database accesses.
@ -392,10 +414,6 @@ func New(cfg Config) (*ChannelRouter, error) {
quit: make(chan struct{}), quit: make(chan struct{}),
} }
r.missionControl = NewMissionControl(
cfg.Graph, selfNode, cfg.QueryBandwidth,
)
return r, nil return r, nil
} }
@ -508,7 +526,7 @@ func (r *ChannelRouter) Start() error {
// We create a dummy, empty payment session such that // We create a dummy, empty payment session such that
// we won't make another payment attempt when the // we won't make another payment attempt when the
// result for the in-flight attempt is received. // result for the in-flight attempt is received.
paySession := r.missionControl.NewPaymentSessionEmpty() paySession := r.cfg.MissionControl.NewPaymentSessionEmpty()
lPayment := &LightningPayment{ lPayment := &LightningPayment{
PaymentHash: payment.Info.PaymentHash, PaymentHash: payment.Info.PaymentHash,
@ -1582,7 +1600,7 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *route
// Before starting the HTLC routing attempt, we'll create a fresh // Before starting the HTLC routing attempt, we'll create a fresh
// payment session which will report our errors back to mission // payment session which will report our errors back to mission
// control. // control.
paySession, err := r.missionControl.NewPaymentSession( paySession, err := r.cfg.MissionControl.NewPaymentSession(
payment.RouteHints, payment.Target, payment.RouteHints, payment.Target,
) )
if err != nil { if err != nil {
@ -1615,7 +1633,7 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
lntypes.Preimage, error) { lntypes.Preimage, error) {
// Create a payment session for just this route. // Create a payment session for just this route.
paySession := r.missionControl.NewPaymentSessionForRoute(route) paySession := r.cfg.MissionControl.NewPaymentSessionForRoute(route)
// Create a (mostly) dummy payment, as the created payment session is // Create a (mostly) dummy payment, as the created payment session is
// not going to do path finding. // not going to do path finding.

@ -84,12 +84,25 @@ func createTestCtxFromGraphInstance(startingHeight uint32, graphInstance *testGr
// be populated. // be populated.
chain := newMockChain(startingHeight) chain := newMockChain(startingHeight)
chainView := newMockChainView(chain) chainView := newMockChainView(chain)
selfNode, err := graphInstance.graph.SourceNode()
if err != nil {
return nil, nil, err
}
mc := NewMissionControl(
graphInstance.graph, selfNode,
func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
return lnwire.NewMSatFromSatoshis(e.Capacity)
},
)
router, err := New(Config{ router, err := New(Config{
Graph: graphInstance.graph, Graph: graphInstance.graph,
Chain: chain, Chain: chain,
ChainView: chainView, ChainView: chainView,
Payer: &mockPaymentAttemptDispatcher{}, Payer: &mockPaymentAttemptDispatcher{},
Control: makeMockControlTower(), Control: makeMockControlTower(),
MissionControl: mc,
ChannelPruneExpiry: time.Hour * 24, ChannelPruneExpiry: time.Hour * 24,
GraphPruneInterval: time.Hour * 2, GraphPruneInterval: time.Hour * 2,
QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi { QueryBandwidth: func(e *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
@ -807,7 +820,7 @@ func TestSendPaymentErrorPathPruning(t *testing.T) {
return preImage, nil return preImage, nil
}) })
ctx.router.missionControl.ResetHistory() ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory()
// When we try to dispatch that payment, we should receive an error as // When we try to dispatch that payment, we should receive an error as
// both attempts should fail and cause both routes to be pruned. // both attempts should fail and cause both routes to be pruned.
@ -822,7 +835,7 @@ func TestSendPaymentErrorPathPruning(t *testing.T) {
t.Fatalf("expected UnknownNextPeer instead got: %v", err) t.Fatalf("expected UnknownNextPeer instead got: %v", err)
} }
ctx.router.missionControl.ResetHistory() ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory()
// Next, we'll modify the SendToSwitch method to indicate that luo ji // Next, we'll modify the SendToSwitch method to indicate that luo ji
// wasn't originally online. This should also halt the send all // wasn't originally online. This should also halt the send all
@ -865,7 +878,7 @@ func TestSendPaymentErrorPathPruning(t *testing.T) {
ctx.aliases)) ctx.aliases))
} }
ctx.router.missionControl.ResetHistory() ctx.router.cfg.MissionControl.(*MissionControl).ResetHistory()
// Finally, we'll modify the SendToSwitch function to indicate that the // Finally, we'll modify the SendToSwitch function to indicate that the
// roasbeef -> luoji channel has insufficient capacity. This should // roasbeef -> luoji channel has insufficient capacity. This should

@ -616,15 +616,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return nil, err return nil, err
} }
s.chanRouter, err = routing.New(routing.Config{ queryBandwidth := func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
Graph: chanGraph,
Chain: cc.chainIO,
ChainView: cc.chainView,
Payer: s.htlcSwitch,
Control: channeldb.NewPaymentControl(chanDB),
ChannelPruneExpiry: routing.DefaultChannelPruneExpiry,
GraphPruneInterval: time.Duration(time.Hour),
QueryBandwidth: func(edge *channeldb.ChannelEdgeInfo) lnwire.MilliSatoshi {
// If we aren't on either side of this edge, then we'll // If we aren't on either side of this edge, then we'll
// just thread through the capacity of the edge as we // just thread through the capacity of the edge as we
// know it. // know it.
@ -652,7 +644,22 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
// Otherwise, we'll return the current best estimate // Otherwise, we'll return the current best estimate
// for the available bandwidth for the link. // for the available bandwidth for the link.
return link.Bandwidth() return link.Bandwidth()
}, }
missionControl := routing.NewMissionControl(
chanGraph, selfNode, queryBandwidth,
)
s.chanRouter, err = routing.New(routing.Config{
Graph: chanGraph,
Chain: cc.chainIO,
ChainView: cc.chainView,
Payer: s.htlcSwitch,
Control: channeldb.NewPaymentControl(chanDB),
MissionControl: missionControl,
ChannelPruneExpiry: routing.DefaultChannelPruneExpiry,
GraphPruneInterval: time.Duration(time.Hour),
QueryBandwidth: queryBandwidth,
AssumeChannelValid: cfg.Routing.UseAssumeChannelValid(), AssumeChannelValid: cfg.Routing.UseAssumeChannelValid(),
NextPaymentID: sequencer.NextID, NextPaymentID: sequencer.NextID,
}) })