routing/router+lifecycle: remove LightningPayment from payment lifecycle

Now that the information needed is stored by the paymentSession, we no
longer need to pass the LightningPayment into the payment lifecycle.
This commit is contained in:
Johan T. Halseth 2020-04-01 00:13:22 +02:00
parent c2301c14b2
commit f9eeb6b41f
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 88 additions and 76 deletions

@ -8,6 +8,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion" sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
@ -30,7 +31,7 @@ func (e errNoRoute) Error() string {
// needed to resume if from any point. // needed to resume if from any point.
type paymentLifecycle struct { type paymentLifecycle struct {
router *ChannelRouter router *ChannelRouter
payment *LightningPayment paymentHash lntypes.Hash
paySession PaymentSession paySession PaymentSession
timeoutChan <-chan time.Time timeoutChan <-chan time.Time
currentHeight int32 currentHeight int32
@ -84,7 +85,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// from an invalid route, because the sphinx packet has // from an invalid route, because the sphinx packet has
// been successfully generated before. // been successfully generated before.
_, c, err := generateSphinxPacket( _, c, err := generateSphinxPacket(
&p.attempt.Route, p.payment.PaymentHash[:], &p.attempt.Route, p.paymentHash[:],
p.attempt.SessionKey, p.attempt.SessionKey,
) )
if err != nil { if err != nil {
@ -103,7 +104,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// Now ask the switch to return the result of the payment when // Now ask the switch to return the result of the payment when
// available. // available.
resultChan, err := p.router.cfg.Payer.GetPaymentResult( resultChan, err := p.router.cfg.Payer.GetPaymentResult(
p.attempt.AttemptID, p.payment.PaymentHash, errorDecryptor, p.attempt.AttemptID, p.paymentHash, errorDecryptor,
) )
switch { switch {
@ -114,7 +115,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
case err == htlcswitch.ErrPaymentIDNotFound: case err == htlcswitch.ErrPaymentIDNotFound:
log.Debugf("Payment ID %v for hash %x not found in "+ log.Debugf("Payment ID %v for hash %x not found in "+
"the Switch, retrying.", p.attempt.AttemptID, "the Switch, retrying.", p.attempt.AttemptID,
p.payment.PaymentHash) p.paymentHash)
err = p.failAttempt(err) err = p.failAttempt(err)
if err != nil { if err != nil {
@ -155,7 +156,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// whether we should retry. // whether we should retry.
if result.Error != nil { if result.Error != nil {
log.Errorf("Attempt to send payment %x failed: %v", log.Errorf("Attempt to send payment %x failed: %v",
p.payment.PaymentHash, result.Error) p.paymentHash, result.Error)
err = p.failAttempt(result.Error) err = p.failAttempt(result.Error)
if err != nil { if err != nil {
@ -177,7 +178,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// We successfully got a payment result back from the switch. // We successfully got a payment result back from the switch.
log.Debugf("Payment %x succeeded with pid=%v", log.Debugf("Payment %x succeeded with pid=%v",
p.payment.PaymentHash, p.attempt.AttemptID) p.paymentHash, p.attempt.AttemptID)
// Report success to mission control. // Report success to mission control.
err = p.router.cfg.MissionControl.ReportPaymentSuccess( err = p.router.cfg.MissionControl.ReportPaymentSuccess(
@ -191,7 +192,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// In case of success we atomically store the db payment and // In case of success we atomically store the db payment and
// move the payment to the success state. // move the payment to the success state.
err = p.router.cfg.Control.SettleAttempt( err = p.router.cfg.Control.SettleAttempt(
p.payment.PaymentHash, p.attempt.AttemptID, p.paymentHash, p.attempt.AttemptID,
&channeldb.HTLCSettleInfo{ &channeldb.HTLCSettleInfo{
Preimage: result.Preimage, Preimage: result.Preimage,
SettleTime: p.router.cfg.Clock.Now(), SettleTime: p.router.cfg.Clock.Now(),
@ -243,7 +244,7 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
// Mark the payment as failed because of the // Mark the payment as failed because of the
// timeout. // timeout.
err := p.router.cfg.Control.Fail( err := p.router.cfg.Control.Fail(
p.payment.PaymentHash, channeldb.FailureReasonTimeout, p.paymentHash, channeldb.FailureReasonTimeout,
) )
if err != nil { if err != nil {
return lnwire.ShortChannelID{}, nil, err return lnwire.ShortChannelID{}, nil, err
@ -269,7 +270,7 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
rt, err := p.paySession.RequestRoute(uint32(p.currentHeight)) rt, err := p.paySession.RequestRoute(uint32(p.currentHeight))
if err != nil { if err != nil {
log.Warnf("Failed to find route for payment %x: %v", log.Warnf("Failed to find route for payment %x: %v",
p.payment.PaymentHash, err) p.paymentHash, err)
// Convert error to payment-level failure. // Convert error to payment-level failure.
failure := errorToPaymentFailure(err) failure := errorToPaymentFailure(err)
@ -278,7 +279,7 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
// any of the routes we've found, then mark the payment // any of the routes we've found, then mark the payment
// as permanently failed. // as permanently failed.
saveErr := p.router.cfg.Control.Fail( saveErr := p.router.cfg.Control.Fail(
p.payment.PaymentHash, failure, p.paymentHash, failure,
) )
if saveErr != nil { if saveErr != nil {
return lnwire.ShortChannelID{}, nil, saveErr return lnwire.ShortChannelID{}, nil, saveErr
@ -304,7 +305,7 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
// with the htlcAdd message that we send directly to the // with the htlcAdd message that we send directly to the
// switch. // switch.
onionBlob, c, err := generateSphinxPacket( onionBlob, c, err := generateSphinxPacket(
rt, p.payment.PaymentHash[:], sessionKey, rt, p.paymentHash[:], sessionKey,
) )
// With SendToRoute, it can happen that the route exceeds protocol // With SendToRoute, it can happen that the route exceeds protocol
@ -313,10 +314,10 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
err == sphinx.ErrMaxRoutingInfoSizeExceeded { err == sphinx.ErrMaxRoutingInfoSizeExceeded {
log.Debugf("Invalid route provided for payment %x: %v", log.Debugf("Invalid route provided for payment %x: %v",
p.payment.PaymentHash, err) p.paymentHash, err)
controlErr := p.router.cfg.Control.Fail( controlErr := p.router.cfg.Control.Fail(
p.payment.PaymentHash, channeldb.FailureReasonError, p.paymentHash, channeldb.FailureReasonError,
) )
if controlErr != nil { if controlErr != nil {
return lnwire.ShortChannelID{}, nil, controlErr return lnwire.ShortChannelID{}, nil, controlErr
@ -338,7 +339,7 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
htlcAdd := &lnwire.UpdateAddHTLC{ htlcAdd := &lnwire.UpdateAddHTLC{
Amount: rt.TotalAmount, Amount: rt.TotalAmount,
Expiry: rt.TotalTimeLock, Expiry: rt.TotalTimeLock,
PaymentHash: p.payment.PaymentHash, PaymentHash: p.paymentHash,
} }
copy(htlcAdd.OnionBlob[:], onionBlob) copy(htlcAdd.OnionBlob[:], onionBlob)
@ -371,7 +372,7 @@ func (p *paymentLifecycle) createNewPaymentAttempt() (lnwire.ShortChannelID,
// such that we can query the Switch for its whereabouts. The // such that we can query the Switch for its whereabouts. The
// route is needed to handle the result when it eventually // route is needed to handle the result when it eventually
// comes back. // comes back.
err = p.router.cfg.Control.RegisterAttempt(p.payment.PaymentHash, p.attempt) err = p.router.cfg.Control.RegisterAttempt(p.paymentHash, p.attempt)
if err != nil { if err != nil {
return lnwire.ShortChannelID{}, nil, err return lnwire.ShortChannelID{}, nil, err
} }
@ -384,7 +385,7 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID,
htlcAdd *lnwire.UpdateAddHTLC) error { htlcAdd *lnwire.UpdateAddHTLC) error {
log.Tracef("Attempting to send payment %x (pid=%v), "+ log.Tracef("Attempting to send payment %x (pid=%v), "+
"using route: %v", p.payment.PaymentHash, p.attempt.AttemptID, "using route: %v", p.paymentHash, p.attempt.AttemptID,
newLogClosure(func() string { newLogClosure(func() string {
return spew.Sdump(p.attempt.Route) return spew.Sdump(p.attempt.Route)
}), }),
@ -400,12 +401,12 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID,
if err != nil { if err != nil {
log.Errorf("Failed sending attempt %d for payment "+ log.Errorf("Failed sending attempt %d for payment "+
"%x to switch: %v", p.attempt.AttemptID, "%x to switch: %v", p.attempt.AttemptID,
p.payment.PaymentHash, err) p.paymentHash, err)
return err return err
} }
log.Debugf("Payment %x (pid=%v) successfully sent to switch, route: %v", log.Debugf("Payment %x (pid=%v) successfully sent to switch, route: %v",
p.payment.PaymentHash, p.attempt.AttemptID, &p.attempt.Route) p.paymentHash, p.attempt.AttemptID, &p.attempt.Route)
return nil return nil
} }
@ -426,14 +427,14 @@ func (p *paymentLifecycle) handleSendError(sendErr error) error {
} }
log.Debugf("Payment %x failed: final_outcome=%v, raw_err=%v", log.Debugf("Payment %x failed: final_outcome=%v, raw_err=%v",
p.payment.PaymentHash, *reason, sendErr) p.paymentHash, *reason, sendErr)
// Mark the payment failed with no route. // Mark the payment failed with no route.
// //
// TODO(halseth): make payment codes for the actual reason we don't // TODO(halseth): make payment codes for the actual reason we don't
// continue path finding. // continue path finding.
err := p.router.cfg.Control.Fail( err := p.router.cfg.Control.Fail(
p.payment.PaymentHash, *reason, p.paymentHash, *reason,
) )
if err != nil { if err != nil {
return err return err
@ -451,7 +452,7 @@ func (p *paymentLifecycle) failAttempt(sendError error) error {
) )
return p.router.cfg.Control.FailAttempt( return p.router.cfg.Control.FailAttempt(
p.payment.PaymentHash, p.attempt.AttemptID, p.paymentHash, p.attempt.AttemptID,
failInfo, failInfo,
) )
} }

@ -531,15 +531,8 @@ func (r *ChannelRouter) Start() error {
// We create a dummy, empty payment session such that // We create a dummy, empty payment session such that
// we won't make another payment attempt when the // we won't make another payment attempt when the
// result for the in-flight attempt is received. // result for the in-flight attempt is received.
//
// PayAttemptTime doesn't need to be set, as there is
// only a single attempt.
paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
lPayment := &LightningPayment{
PaymentHash: payment.Info.PaymentHash,
}
// TODO(joostjager): For mpp, possibly relaunch multiple // TODO(joostjager): For mpp, possibly relaunch multiple
// in-flight htlcs here. // in-flight htlcs here.
var attempt *channeldb.HTLCAttemptInfo var attempt *channeldb.HTLCAttemptInfo
@ -547,7 +540,13 @@ func (r *ChannelRouter) Start() error {
attempt = &payment.Attempts[0] attempt = &payment.Attempts[0]
} }
_, _, err := r.sendPayment(attempt, lPayment, paySession) // We pass in a zero timeout value, to indicate we
// don't need it to timeout. It will stop immediately
// after the existing attempt has finished anyway.
_, _, err := r.sendPayment(
attempt,
payment.Info.PaymentHash, 0, paySession,
)
if err != nil { if err != nil {
log.Errorf("Resuming payment with hash %v "+ log.Errorf("Resuming payment with hash %v "+
"failed: %v.", payment.Info.PaymentHash, err) "failed: %v.", payment.Info.PaymentHash, err)
@ -1639,9 +1638,15 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte,
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
log.Tracef("Dispatching SendPayment for lightning payment: %v",
spewPayment(payment))
// Since this is the first time this payment is being made, we pass nil // Since this is the first time this payment is being made, we pass nil
// for the existing attempt. // for the existing attempt.
return r.sendPayment(nil, payment, paySession) return r.sendPayment(
nil, payment.PaymentHash,
payment.PayAttemptTimeout, paySession,
)
} }
// SendPaymentAsync is the non-blocking version of SendPayment. The payment // SendPaymentAsync is the non-blocking version of SendPayment. The payment
@ -1658,7 +1663,13 @@ func (r *ChannelRouter) SendPaymentAsync(payment *LightningPayment) error {
go func() { go func() {
defer r.wg.Done() defer r.wg.Done()
_, _, err := r.sendPayment(nil, payment, paySession) log.Tracef("Dispatching SendPayment for lightning payment: %v",
spewPayment(payment))
_, _, err := r.sendPayment(
nil, payment.PaymentHash,
payment.PayAttemptTimeout, paySession,
)
if err != nil { if err != nil {
log.Errorf("Payment with hash %x failed: %v", log.Errorf("Payment with hash %x failed: %v",
payment.PaymentHash, err) payment.PaymentHash, err)
@ -1668,6 +1679,28 @@ func (r *ChannelRouter) SendPaymentAsync(payment *LightningPayment) error {
return nil return nil
} }
// spewPayment returns a log closures that provides a spewed string
// representation of the passed payment.
func spewPayment(payment *LightningPayment) logClosure {
return newLogClosure(func() string {
// Make a copy of the payment with a nilled Curve
// before spewing.
var routeHints [][]zpay32.HopHint
for _, routeHint := range payment.RouteHints {
var hopHints []zpay32.HopHint
for _, hopHint := range routeHint {
h := hopHint.Copy()
h.NodeID.Curve = nil
hopHints = append(hopHints, h)
}
routeHints = append(routeHints, hopHints)
}
p := *payment
p.RouteHints = routeHints
return spew.Sdump(p)
})
}
// preparePayment creates the payment session and registers the payment with the // preparePayment creates the payment session and registers the payment with the
// control tower. // control tower.
func (r *ChannelRouter) preparePayment(payment *LightningPayment) ( func (r *ChannelRouter) preparePayment(payment *LightningPayment) (
@ -1726,20 +1759,17 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
return [32]byte{}, err return [32]byte{}, err
} }
// Create a (mostly) dummy payment, as the created payment session is log.Tracef("Dispatching SendToRoute for hash %v: %v",
// not going to do path finding. hash, newLogClosure(func() string {
// TODO(halseth): sendPayment doesn't really need LightningPayment, make return spew.Sdump(route)
// it take just needed fields instead. }),
// )
// PayAttemptTime doesn't need to be set, as there is only a single
// attempt.
payment := &LightningPayment{
PaymentHash: hash,
}
// Since this is the first time this payment is being made, we pass nil // We set the timeout to a zero value, indicating the payment shouldn't
// for the existing attempt. // timeout. It is only a single attempt, so no more attempts will be
preimage, _, err := r.sendPayment(nil, payment, paySession) // done anyway. Since this is the first time this payment is being
// made, we pass nil for the existing attempt.
preimage, _, err := r.sendPayment(nil, hash, 0, paySession)
if err != nil { if err != nil {
// SendToRoute should return a structured error. In case the // SendToRoute should return a structured error. In case the
// provided route fails, payment lifecycle will return a // provided route fails, payment lifecycle will return a
@ -1759,13 +1789,13 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
return preimage, nil return preimage, nil
} }
// sendPayment attempts to send a payment as described within the passed // sendPayment attempts to send a payment to the passed payment hash. This
// LightningPayment. This function is blocking and will return either: when the // function is blocking and will return either: when the payment is successful,
// payment is successful, or all candidates routes have been attempted and // or all candidates routes have been attempted and resulted in a failed
// resulted in a failed payment. If the payment succeeds, then a non-nil Route // payment. If the payment succeeds, then a non-nil Route will be returned
// will be returned which describes the path the successful payment traversed // which describes the path the successful payment traversed within the network
// within the network to reach the destination. Additionally, the payment // to reach the destination. Additionally, the payment preimage will also be
// preimage will also be returned. // returned.
// //
// The existing attempt argument should be set to nil if this is a payment that // The existing attempt argument should be set to nil if this is a payment that
// haven't had any payment attempt sent to the switch yet. If it has had an // haven't had any payment attempt sent to the switch yet. If it has had an
@ -1777,28 +1807,9 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
// the ControlTower. // the ControlTower.
func (r *ChannelRouter) sendPayment( func (r *ChannelRouter) sendPayment(
existingAttempt *channeldb.HTLCAttemptInfo, existingAttempt *channeldb.HTLCAttemptInfo,
payment *LightningPayment, paySession PaymentSession) ( paymentHash lntypes.Hash,
[32]byte, *route.Route, error) { timeout time.Duration,
paySession PaymentSession) ([32]byte, *route.Route, error) {
log.Tracef("Dispatching route for lightning payment: %v",
newLogClosure(func() string {
// Make a copy of the payment with a nilled Curve
// before spewing.
var routeHints [][]zpay32.HopHint
for _, routeHint := range payment.RouteHints {
var hopHints []zpay32.HopHint
for _, hopHint := range routeHint {
h := hopHint.Copy()
h.NodeID.Curve = nil
hopHints = append(hopHints, h)
}
routeHints = append(routeHints, hopHints)
}
p := *payment
p.RouteHints = routeHints
return spew.Sdump(p)
}),
)
// We'll also fetch the current block height so we can properly // We'll also fetch the current block height so we can properly
// calculate the required HTLC time locks within the route. // calculate the required HTLC time locks within the route.
@ -1811,7 +1822,7 @@ func (r *ChannelRouter) sendPayment(
// can resume the payment from the current state. // can resume the payment from the current state.
p := &paymentLifecycle{ p := &paymentLifecycle{
router: r, router: r,
payment: payment, paymentHash: paymentHash,
paySession: paySession, paySession: paySession,
currentHeight: currentHeight, currentHeight: currentHeight,
attempt: existingAttempt, attempt: existingAttempt,
@ -1822,8 +1833,8 @@ func (r *ChannelRouter) sendPayment(
// If a timeout is specified, create a timeout channel. If no timeout is // If a timeout is specified, create a timeout channel. If no timeout is
// specified, the channel is left nil and will never abort the payment // specified, the channel is left nil and will never abort the payment
// loop. // loop.
if payment.PayAttemptTimeout != 0 { if timeout != 0 {
p.timeoutChan = time.After(payment.PayAttemptTimeout) p.timeoutChan = time.After(timeout)
} }
return p.resumePayment() return p.resumePayment()