routing/payment_lifeycle: remove payment level attempt and circuit

We replace the cached attempt, and instead use the control tower
(database) to fetch any in-flight attempt. This is done as a
preparation for having multiple attempts in flight.

In addition we remove the cached circuit, as it won't be applicable when
multiple shards are in flight.

Instead of tracking the attemp we consult the database on every
iteration, and pick up any existing attempt. This also let us avoid
having to pass in the existing attempts from the payment loop, as we
just fetch them direclty.
This commit is contained in:
Johan T. Halseth 2020-04-01 00:13:24 +02:00
parent 9bcf41d401
commit bcca1ab821
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 90 additions and 72 deletions

@ -37,8 +37,6 @@ type paymentLifecycle struct {
paySession PaymentSession paySession PaymentSession
timeoutChan <-chan time.Time timeoutChan <-chan time.Time
currentHeight int32 currentHeight int32
attempt *channeldb.HTLCAttemptInfo
circuit *sphinx.Circuit
lastError error lastError error
} }
@ -47,10 +45,52 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// We'll continue until either our payment succeeds, or we encounter a // We'll continue until either our payment succeeds, or we encounter a
// critical error during path finding. // critical error during path finding.
for { for {
// We start every iteration by fetching the lastest state of
// the payment from the ControlTower. This ensures that we will
// act on the latest available information, whether we are
// resuming an existing payment or just sent a new attempt.
payment, err := p.router.cfg.Control.FetchPayment(
p.paymentHash,
)
if err != nil {
return [32]byte{}, nil, err
}
// Go through the HTLCs for this payment, determining if there
// are any in flight or settled.
var (
attempt *channeldb.HTLCAttemptInfo
settle *channeldb.HTLCAttempt
)
for _, a := range payment.HTLCs {
a := a
// We have a settled HTLC, and should return when all
// shards are back.
if a.Settle != nil {
settle = &a
continue
}
// This HTLC already failed, ignore.
if a.Failure != nil {
continue
}
// HTLC was neither setteld nor failed, it is still in
// flight.
attempt = &a.HTLCAttemptInfo
break
}
// Terminal state, return the preimage and the route taken.
if attempt == nil && settle != nil {
return settle.Settle.Preimage, &settle.Route, nil
}
// If this payment had no existing payment attempt, we create // If this payment had no existing payment attempt, we create
// and send one now. // and send one now.
if p.attempt == nil { if attempt == nil {
// Before we attempt this next payment, we'll check to see if either // Before we attempt this next payment, we'll check to see if either
// we've gone past the payment attempt timeout, or the router is // we've gone past the payment attempt timeout, or the router is
// exiting. In either case, we'll stop this payment attempt short. If a // exiting. In either case, we'll stop this payment attempt short. If a
@ -114,7 +154,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// Using the route received from the payment session, // Using the route received from the payment session,
// create a new shard to send. // create a new shard to send.
firstHop, htlcAdd, attempt, err := p.createNewPaymentAttempt( firstHop, htlcAdd, a, err := p.createNewPaymentAttempt(
rt, rt,
) )
// With SendToRoute, it can happen that the route exceeds protocol // With SendToRoute, it can happen that the route exceeds protocol
@ -138,7 +178,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
p.attempt = attempt attempt = a
// Before sending this HTLC to the switch, we checkpoint the // Before sending this HTLC to the switch, we checkpoint the
// fresh paymentID and route to the DB. This lets us know on // fresh paymentID and route to the DB. This lets us know on
@ -153,11 +193,13 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// Now that the attempt is created and checkpointed to // Now that the attempt is created and checkpointed to
// the DB, we send it. // the DB, we send it.
sendErr := p.sendPaymentAttempt(firstHop, htlcAdd) sendErr := p.sendPaymentAttempt(
attempt, firstHop, htlcAdd,
)
if sendErr != nil { if sendErr != nil {
// TODO(joostjager): Distinguish unexpected // TODO(joostjager): Distinguish unexpected
// internal errors from real send errors. // internal errors from real send errors.
err = p.failAttempt(sendErr) err = p.failAttempt(attempt, sendErr)
if err != nil { if err != nil {
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
@ -165,7 +207,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// We must inspect the error to know whether it // We must inspect the error to know whether it
// was critical or not, to decide whether we // was critical or not, to decide whether we
// should continue trying. // should continue trying.
err := p.handleSendError(sendErr) err := p.handleSendError(attempt, sendErr)
if err != nil { if err != nil {
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
@ -173,35 +215,29 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// Error was handled successfully, reset the // Error was handled successfully, reset the
// attempt to indicate we want to make a new // attempt to indicate we want to make a new
// attempt. // attempt.
p.attempt = nil
continue continue
} }
} else { }
// If this was a resumed attempt, we must regenerate the
// circuit. We don't need to check for errors resulting // Regenerate the circuit for this attempt.
// from an invalid route, because the sphinx packet has _, circuit, err := generateSphinxPacket(
// been successfully generated before. &attempt.Route, p.paymentHash[:], attempt.SessionKey,
_, c, err := generateSphinxPacket( )
&p.attempt.Route, p.paymentHash[:], if err != nil {
p.attempt.SessionKey, return [32]byte{}, nil, err
)
if err != nil {
return [32]byte{}, nil, err
}
p.circuit = c
} }
// Using the created circuit, initialize the error decrypter so we can // Using the created circuit, initialize the error decrypter so we can
// parse+decode any failures incurred by this payment within the // parse+decode any failures incurred by this payment within the
// switch. // switch.
errorDecryptor := &htlcswitch.SphinxErrorDecrypter{ errorDecryptor := &htlcswitch.SphinxErrorDecrypter{
OnionErrorDecrypter: sphinx.NewOnionErrorDecrypter(p.circuit), OnionErrorDecrypter: sphinx.NewOnionErrorDecrypter(circuit),
} }
// Now ask the switch to return the result of the payment when // Now ask the switch to return the result of the payment when
// available. // available.
resultChan, err := p.router.cfg.Payer.GetPaymentResult( resultChan, err := p.router.cfg.Payer.GetPaymentResult(
p.attempt.AttemptID, p.paymentHash, errorDecryptor, attempt.AttemptID, p.paymentHash, errorDecryptor,
) )
switch { switch {
@ -211,23 +247,22 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// attempt, and wait for its result to be available. // attempt, and wait for its result to be available.
case err == htlcswitch.ErrPaymentIDNotFound: case err == htlcswitch.ErrPaymentIDNotFound:
log.Debugf("Payment ID %v for hash %x not found in "+ log.Debugf("Payment ID %v for hash %x not found in "+
"the Switch, retrying.", p.attempt.AttemptID, "the Switch, retrying.", attempt.AttemptID,
p.paymentHash) p.paymentHash)
err = p.failAttempt(err) err = p.failAttempt(attempt, err)
if err != nil { if err != nil {
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
// Reset the attempt to indicate we want to make a new // Reset the attempt to indicate we want to make a new
// attempt. // attempt.
p.attempt = nil
continue continue
// A critical, unexpected error was encountered. // A critical, unexpected error was encountered.
case err != nil: case err != nil:
log.Errorf("Failed getting result for attemptID %d "+ log.Errorf("Failed getting result for attemptID %d "+
"from switch: %v", p.attempt.AttemptID, err) "from switch: %v", attempt.AttemptID, err)
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
@ -255,7 +290,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
log.Errorf("Attempt to send payment %x failed: %v", log.Errorf("Attempt to send payment %x failed: %v",
p.paymentHash, result.Error) p.paymentHash, result.Error)
err = p.failAttempt(result.Error) err = p.failAttempt(attempt, result.Error)
if err != nil { if err != nil {
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
@ -263,23 +298,22 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// We must inspect the error to know whether it was // We must inspect the error to know whether it was
// critical or not, to decide whether we should // critical or not, to decide whether we should
// continue trying. // continue trying.
if err := p.handleSendError(result.Error); err != nil { if err := p.handleSendError(attempt, result.Error); err != nil {
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
// Error was handled successfully, reset the attempt to // Error was handled successfully, reset the attempt to
// indicate we want to make a new attempt. // indicate we want to make a new attempt.
p.attempt = nil
continue continue
} }
// We successfully got a payment result back from the switch. // We successfully got a payment result back from the switch.
log.Debugf("Payment %x succeeded with pid=%v", log.Debugf("Payment %x succeeded with pid=%v",
p.paymentHash, p.attempt.AttemptID) p.paymentHash, attempt.AttemptID)
// Report success to mission control. // Report success to mission control.
err = p.router.cfg.MissionControl.ReportPaymentSuccess( err = p.router.cfg.MissionControl.ReportPaymentSuccess(
p.attempt.AttemptID, &p.attempt.Route, attempt.AttemptID, &attempt.Route,
) )
if err != nil { if err != nil {
log.Errorf("Error reporting payment success to mc: %v", log.Errorf("Error reporting payment success to mc: %v",
@ -289,7 +323,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
// In case of success we atomically store the db payment and // In case of success we atomically store the db payment and
// move the payment to the success state. // move the payment to the success state.
err = p.router.cfg.Control.SettleAttempt( err = p.router.cfg.Control.SettleAttempt(
p.paymentHash, p.attempt.AttemptID, p.paymentHash, attempt.AttemptID,
&channeldb.HTLCSettleInfo{ &channeldb.HTLCSettleInfo{
Preimage: result.Preimage, Preimage: result.Preimage,
SettleTime: p.router.cfg.Clock.Now(), SettleTime: p.router.cfg.Clock.Now(),
@ -300,12 +334,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
"attempt: %v", err) "attempt: %v", err)
return [32]byte{}, nil, err return [32]byte{}, nil, err
} }
// Terminal state, return the preimage and the route
// taken.
return result.Preimage, &p.attempt.Route, nil
} }
} }
// errorToPaymentFailure takes a path finding error and converts it into a // errorToPaymentFailure takes a path finding error and converts it into a
@ -341,17 +370,13 @@ func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route) (
// Generate the raw encoded sphinx packet to be included along // Generate the raw encoded sphinx packet to be included along
// with the htlcAdd message that we send directly to the // with the htlcAdd message that we send directly to the
// switch. // switch.
onionBlob, c, err := generateSphinxPacket( onionBlob, _, err := generateSphinxPacket(
rt, p.paymentHash[:], sessionKey, rt, p.paymentHash[:], sessionKey,
) )
if err != nil { if err != nil {
return lnwire.ShortChannelID{}, nil, nil, err return lnwire.ShortChannelID{}, nil, nil, err
} }
// Update our cached circuit with the newly generated
// one.
p.circuit = c
// Craft an HTLC packet to send to the layer 2 switch. The // Craft an HTLC packet to send to the layer 2 switch. The
// metadata within this packet will be used to route the // metadata within this packet will be used to route the
// payment through the network, starting with the first-hop. // payment through the network, starting with the first-hop.
@ -389,13 +414,14 @@ func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route) (
} }
// sendPaymentAttempt attempts to send the current attempt to the switch. // sendPaymentAttempt attempts to send the current attempt to the switch.
func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID, func (p *paymentLifecycle) sendPaymentAttempt(
attempt *channeldb.HTLCAttemptInfo, firstHop lnwire.ShortChannelID,
htlcAdd *lnwire.UpdateAddHTLC) error { htlcAdd *lnwire.UpdateAddHTLC) error {
log.Tracef("Attempting to send payment %x (pid=%v), "+ log.Tracef("Attempting to send payment %x (pid=%v), "+
"using route: %v", p.paymentHash, p.attempt.AttemptID, "using route: %v", p.paymentHash, attempt.AttemptID,
newLogClosure(func() string { newLogClosure(func() string {
return spew.Sdump(p.attempt.Route) return spew.Sdump(attempt.Route)
}), }),
) )
@ -404,27 +430,28 @@ func (p *paymentLifecycle) sendPaymentAttempt(firstHop lnwire.ShortChannelID,
// such that we can resume waiting for the result after a // such that we can resume waiting for the result after a
// restart. // restart.
err := p.router.cfg.Payer.SendHTLC( err := p.router.cfg.Payer.SendHTLC(
firstHop, p.attempt.AttemptID, htlcAdd, firstHop, attempt.AttemptID, htlcAdd,
) )
if err != nil { if err != nil {
log.Errorf("Failed sending attempt %d for payment "+ log.Errorf("Failed sending attempt %d for payment "+
"%x to switch: %v", p.attempt.AttemptID, "%x to switch: %v", attempt.AttemptID,
p.paymentHash, err) p.paymentHash, err)
return err return err
} }
log.Debugf("Payment %x (pid=%v) successfully sent to switch, route: %v", log.Debugf("Payment %x (pid=%v) successfully sent to switch, route: %v",
p.paymentHash, p.attempt.AttemptID, &p.attempt.Route) p.paymentHash, attempt.AttemptID, &attempt.Route)
return nil return nil
} }
// handleSendError inspects the given error from the Switch and determines // handleSendError inspects the given error from the Switch and determines
// whether we should make another payment attempt. // whether we should make another payment attempt.
func (p *paymentLifecycle) handleSendError(sendErr error) error { func (p *paymentLifecycle) handleSendError(attempt *channeldb.HTLCAttemptInfo,
sendErr error) error {
reason := p.router.processSendError( reason := p.router.processSendError(
p.attempt.AttemptID, &p.attempt.Route, sendErr, attempt.AttemptID, &attempt.Route, sendErr,
) )
if reason == nil { if reason == nil {
// Save the forwarding error so it can be returned if // Save the forwarding error so it can be returned if
@ -453,14 +480,16 @@ func (p *paymentLifecycle) handleSendError(sendErr error) error {
} }
// failAttempt calls control tower to fail the current payment attempt. // failAttempt calls control tower to fail the current payment attempt.
func (p *paymentLifecycle) failAttempt(sendError error) error { func (p *paymentLifecycle) failAttempt(attempt *channeldb.HTLCAttemptInfo,
sendError error) error {
failInfo := marshallError( failInfo := marshallError(
sendError, sendError,
p.router.cfg.Clock.Now(), p.router.cfg.Clock.Now(),
) )
return p.router.cfg.Control.FailAttempt( return p.router.cfg.Control.FailAttempt(
p.paymentHash, p.attempt.AttemptID, p.paymentHash, attempt.AttemptID,
failInfo, failInfo,
) )
} }

@ -533,20 +533,13 @@ func (r *ChannelRouter) Start() error {
// result for the in-flight attempt is received. // result for the in-flight attempt is received.
paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
// TODO(joostjager): For mpp, possibly relaunch multiple
// in-flight htlcs here.
var attempt *channeldb.HTLCAttemptInfo
if len(payment.Attempts) > 0 {
attempt = &payment.Attempts[0]
}
// We pass in a zero timeout value, to indicate we // We pass in a zero timeout value, to indicate we
// don't need it to timeout. It will stop immediately // don't need it to timeout. It will stop immediately
// after the existing attempt has finished anyway. We // after the existing attempt has finished anyway. We
// also set a zero fee limit, as no more routes should // also set a zero fee limit, as no more routes should
// be tried. // be tried.
_, _, err := r.sendPayment( _, _, err := r.sendPayment(
attempt, payment.Info.Value, 0, payment.Info.Value, 0,
payment.Info.PaymentHash, 0, paySession, payment.Info.PaymentHash, 0, paySession,
) )
if err != nil { if err != nil {
@ -1646,7 +1639,7 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte,
// Since this is the first time this payment is being made, we pass nil // Since this is the first time this payment is being made, we pass nil
// for the existing attempt. // for the existing attempt.
return r.sendPayment( return r.sendPayment(
nil, payment.Amount, payment.FeeLimit, payment.PaymentHash, payment.Amount, payment.FeeLimit, payment.PaymentHash,
payment.PayAttemptTimeout, paySession, payment.PayAttemptTimeout, paySession,
) )
} }
@ -1669,9 +1662,8 @@ func (r *ChannelRouter) SendPaymentAsync(payment *LightningPayment) error {
spewPayment(payment)) spewPayment(payment))
_, _, err := r.sendPayment( _, _, err := r.sendPayment(
nil, payment.Amount, payment.FeeLimit, payment.Amount, payment.FeeLimit, payment.PaymentHash,
payment.PaymentHash, payment.PayAttemptTimeout, payment.PayAttemptTimeout, paySession,
paySession,
) )
if err != nil { if err != nil {
log.Errorf("Payment with hash %x failed: %v", log.Errorf("Payment with hash %x failed: %v",
@ -1770,14 +1762,14 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
// We set the timeout to a zero value, indicating the payment shouldn't // We set the timeout to a zero value, indicating the payment shouldn't
// timeout. It is only a single attempt, so no more attempts will be // timeout. It is only a single attempt, so no more attempts will be
// done anyway. Since this is the first time this payment is being // done anyway.
// made, we pass nil for the existing attempt. //
// We pass the route receiver amount as the total payment amount such // We pass the route receiver amount as the total payment amount such
// that the payment loop will request a route for this amount. As fee // that the payment loop will request a route for this amount. As fee
// limit we pass the route's total fees, since we already know this is // limit we pass the route's total fees, since we already know this is
// the route that is going to be used. // the route that is going to be used.
preimage, _, err := r.sendPayment( preimage, _, err := r.sendPayment(
nil, amt, route.TotalFees(), hash, 0, paySession, amt, route.TotalFees(), hash, 0, paySession,
) )
if err != nil { if err != nil {
// SendToRoute should return a structured error. In case the // SendToRoute should return a structured error. In case the
@ -1815,7 +1807,6 @@ func (r *ChannelRouter) SendToRoute(hash lntypes.Hash, route *route.Route) (
// router will call this method for every payment still in-flight according to // router will call this method for every payment still in-flight according to
// the ControlTower. // the ControlTower.
func (r *ChannelRouter) sendPayment( func (r *ChannelRouter) sendPayment(
existingAttempt *channeldb.HTLCAttemptInfo,
totalAmt, feeLimit lnwire.MilliSatoshi, paymentHash lntypes.Hash, totalAmt, feeLimit lnwire.MilliSatoshi, paymentHash lntypes.Hash,
timeout time.Duration, timeout time.Duration,
paySession PaymentSession) ([32]byte, *route.Route, error) { paySession PaymentSession) ([32]byte, *route.Route, error) {
@ -1836,8 +1827,6 @@ func (r *ChannelRouter) sendPayment(
paymentHash: paymentHash, paymentHash: paymentHash,
paySession: paySession, paySession: paySession,
currentHeight: currentHeight, currentHeight: currentHeight,
attempt: existingAttempt,
circuit: nil,
lastError: nil, lastError: nil,
} }