routing/payment_lifecycle+channeldb: enable multi shard send

This commit finally enables MP payments within the payment lifecycle
(used for SendPayment). This is done by letting the loop launch shards
as long as there is value remaining to send, inspecting the outcomes for
the sent shards when the full payment amount has been filled.

The method channeldb.MPPayment.SentAmt() is added to easily look up how
much value we have sent for the payment.
This commit is contained in:
Johan T. Halseth 2020-04-01 00:13:26 +02:00
parent 0fd71cd596
commit 7b318a4be7
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 316 additions and 128 deletions

@ -144,6 +144,24 @@ func (m *MPPayment) TerminalInfo() (*HTLCSettleInfo, *FailureReason) {
return nil, m.FailureReason
}
// SentAmt returns the sum of sent amount and fees for HTLCs that are either
// settled or still in flight.
func (m *MPPayment) SentAmt() (lnwire.MilliSatoshi, lnwire.MilliSatoshi) {
var sent, fees lnwire.MilliSatoshi
for _, h := range m.HTLCs {
if h.Failure != nil {
continue
}
// The attempt was not failed, meaning the amount was
// potentially sent to the receiver.
sent += h.Route.ReceiverAmt()
fees += h.Route.TotalFees()
}
return sent, fees
}
// InFlightHTLCs returns the HTLCs that are still in-flight, meaning they have
// not been settled or failed.
func (m *MPPayment) InFlightHTLCs() []HTLCAttempt {

@ -1,6 +1,8 @@
package routing
import (
"fmt"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
@ -24,14 +26,74 @@ type paymentLifecycle struct {
currentHeight int32
}
// payemntState holds a number of key insights learned from a given MPPayment
// that we use to determine what to do on each payment loop iteration.
type paymentState struct {
numShardsInFlight int
remainingAmt lnwire.MilliSatoshi
remainingFees lnwire.MilliSatoshi
terminate bool
}
// paymentState uses the passed payment to find the latest information we need
// to act on every iteration of the payment loop.
func (p *paymentLifecycle) paymentState(payment *channeldb.MPPayment) (
*paymentState, error) {
// Fetch the total amount and fees that has already been sent in
// settled and still in-flight shards.
sentAmt, fees := payment.SentAmt()
// Sanity check we haven't sent a value larger than the payment amount.
if sentAmt > p.totalAmount {
return nil, fmt.Errorf("amount sent %v exceeds "+
"total amount %v", sentAmt, p.totalAmount)
}
// We'll subtract the used fee from our fee budget, but allow the fees
// of the already sent shards to exceed our budget (can happen after
// restarts).
feeBudget := p.feeLimit
if fees <= feeBudget {
feeBudget -= fees
} else {
feeBudget = 0
}
// Get any terminal info for this payment.
settle, failure := payment.TerminalInfo()
// If either an HTLC settled, or the payment has a payment level
// failure recorded, it means we should terminate the moment all shards
// have returned with a result.
terminate := settle != nil || failure != nil
activeShards := payment.InFlightHTLCs()
return &paymentState{
numShardsInFlight: len(activeShards),
remainingAmt: p.totalAmount - sentAmt,
remainingFees: feeBudget,
terminate: terminate,
}, nil
}
// resumePayment resumes the paymentLifecycle from the current state.
func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
shardHandler := &shardHandler{
router: p.router,
paymentHash: p.paymentHash,
shardErrors: make(chan error),
quit: make(chan struct{}),
}
// If we have an existing attempt, we'll start by collecting its result.
// When the payment lifecycle loop exits, we make sure to signal any
// sub goroutine of the shardHandler to exit, then wait for them to
// return.
defer shardHandler.stop()
// If we had any existing attempts outstanding, we'll start by spinning
// up goroutines that'll collect their results and deliver them to the
// lifecycle loop below.
payment, err := p.router.cfg.Control.FetchPayment(
p.paymentHash,
)
@ -42,15 +104,21 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
for _, a := range payment.InFlightHTLCs() {
a := a
_, err := shardHandler.collectResult(&a.HTLCAttemptInfo)
if err != nil {
return [32]byte{}, nil, err
}
log.Debugf("Resuming payment shard %v for hash %v",
a.AttemptID, p.paymentHash)
shardHandler.collectResultAsync(&a.HTLCAttemptInfo)
}
// We'll continue until either our payment succeeds, or we encounter a
// critical error during path finding.
for {
// Start by quickly checking if there are any outcomes already
// available to handle before we reevaluate our state.
if err := shardHandler.checkShards(); err != nil {
return [32]byte{}, nil, err
}
// We start every iteration by fetching the lastest state of
// the payment from the ControlTower. This ensures that we will
// act on the latest available information, whether we are
@ -62,90 +130,97 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
return [32]byte{}, nil, err
}
// Go through the HTLCs for this payment, determining if there
// are any in flight or settled.
var (
attempt *channeldb.HTLCAttemptInfo
settle *channeldb.HTLCAttempt
)
for _, a := range payment.HTLCs {
a := a
// We have a settled HTLC, and should return when all
// shards are back.
if a.Settle != nil {
settle = &a
continue
}
// This HTLC already failed, ignore.
if a.Failure != nil {
continue
}
// HTLC was neither setteld nor failed, it is still in
// flight.
attempt = &a.HTLCAttemptInfo
break
}
// Terminal state, return the preimage and the route taken.
if attempt == nil && settle != nil {
return settle.Settle.Preimage, &settle.Route, nil
}
// If the payment already is failed, and there is no in-flight
// HTLC, return immediately.
if attempt == nil && payment.FailureReason != nil {
return [32]byte{}, nil, *payment.FailureReason
}
// If this payment had no existing payment attempt, we create
// and send one now.
if attempt == nil {
// Before we attempt this next payment, we'll check to see if either
// we've gone past the payment attempt timeout, or the router is
// exiting. In either case, we'll stop this payment attempt short. If a
// timeout is not applicable, timeoutChan will be nil.
select {
case <-p.timeoutChan:
// Mark the payment as failed because of the
// timeout.
err := p.router.cfg.Control.Fail(
p.paymentHash, channeldb.FailureReasonTimeout,
)
// Using this latest state of the payment, calculate
// information about our active shards and terminal conditions.
state, err := p.paymentState(payment)
if err != nil {
return [32]byte{}, nil, err
}
log.Debugf("Payment %v in state terminate=%v, "+
"active_shards=%v, rem_value=%v, fee_limit=%v",
p.paymentHash, state.terminate, state.numShardsInFlight,
state.remainingAmt, state.remainingFees)
switch {
// We have a terminal condition and no active shards, we are
// ready to exit.
case state.terminate && state.numShardsInFlight == 0:
// Find the first successful shard and return
// the preimage and route.
for _, a := range payment.HTLCs {
if a.Settle != nil {
return a.Settle.Preimage, &a.Route, nil
}
}
// Payment failed.
return [32]byte{}, nil, *payment.FailureReason
// If we either reached a terminal error condition (but had
// active shards still) or there is no remaining value to send,
// we'll wait for a shard outcome.
case state.terminate || state.remainingAmt == 0:
// We still have outstanding shards, so wait for a new
// outcome to be available before re-evaluating our
// state.
if err := shardHandler.waitForShard(); err != nil {
return [32]byte{}, nil, err
}
continue
}
// Before we attempt any new shard, we'll check to see if
// either we've gone past the payment attempt timeout, or the
// router is exiting. In either case, we'll stop this payment
// attempt short. If a timeout is not applicable, timeoutChan
// will be nil.
select {
case <-p.timeoutChan:
log.Warnf("payment attempt not completed before " +
"timeout")
// By marking the payment failed with the control
// tower, no further shards will be launched and we'll
// return with an error the moment all active shards
// have finished.
saveErr := p.router.cfg.Control.Fail(
p.paymentHash, channeldb.FailureReasonTimeout,
)
if saveErr != nil {
return [32]byte{}, nil, saveErr
}
continue
// The payment will be resumed from the current state
// after restart.
case <-p.router.quit:
return [32]byte{}, nil, ErrRouterShuttingDown
// Fall through if we haven't hit our time limit or are
// exiting.
// Fall through if we haven't hit our time limit.
default:
}
// Create a new payment attempt from the given payment session.
rt, err := p.paySession.RequestRoute(
p.totalAmount, p.feeLimit, 0, uint32(p.currentHeight),
state.remainingAmt, state.remainingFees,
uint32(state.numShardsInFlight), uint32(p.currentHeight),
)
if err != nil {
log.Warnf("Failed to find route for payment %x: %v",
p.paymentHash, err)
// Convert error to payment-level failure.
failure := errorToPaymentFailure(err)
// There is no route to try, and we have no active
// shards. This means that there is no way for us to
// send the payment, so mark it failed with no route.
if state.numShardsInFlight == 0 {
failureCode := errorToPaymentFailure(err)
log.Debugf("Marking payment %v permanently "+
"failed with no route: %v",
p.paymentHash, failureCode)
// If we're unable to successfully make a payment using
// any of the routes we've found, then mark the payment
// as permanently failed.
saveErr := p.router.cfg.Control.Fail(
p.paymentHash, failure,
p.paymentHash, failureCode,
)
if saveErr != nil {
return [32]byte{}, nil, saveErr
@ -154,56 +229,45 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
continue
}
// With the route in hand, launch a new shard.
var outcome *launchOutcome
attempt, outcome, err = shardHandler.launchShard(rt)
// We still have active shards, we'll wait for an
// outcome to be available before retrying.
if err := shardHandler.waitForShard(); err != nil {
return [32]byte{}, nil, err
}
continue
}
// We found a route to try, launch a new shard.
attempt, outcome, err := shardHandler.launchShard(rt)
if err != nil {
return [32]byte{}, nil, err
}
// We ew encountered a non-critical error when
// launching the shard, handle it
// If we encountered a non-critical error when launching the
// shard, handle it.
if outcome.err != nil {
// We must inspect the error to know whether it
// was critical or not, to decide whether we
// should continue trying.
err = shardHandler.handleSendError(
log.Warnf("Failed to launch shard %v for "+
"payment %v: %v", attempt.AttemptID,
p.paymentHash, outcome.err)
// We must inspect the error to know whether it was
// critical or not, to decide whether we should
// continue trying.
err := shardHandler.handleSendError(
attempt, outcome.err,
)
if err != nil {
return [32]byte{}, nil, err
}
// Error was handled successfully, continue to
// make a new attempt.
// Error was handled successfully, continue to make a
// new attempt.
continue
}
// We'll collect the result of the shard just sent. We
// ignore the result for now if it is a success, as we
// will look it up in the control tower on the next
// loop iteration.
result, err := shardHandler.collectResult(attempt)
if err != nil {
return [32]byte{}, nil, err
}
if result.err != nil {
// We must inspect the error to know whether it
// was critical or not, to decide whether we
// should continue trying.
err := shardHandler.handleSendError(
attempt, result.err,
)
if err != nil {
return [32]byte{}, nil, err
}
// Error was handled successfully, continue to
// make a new attempt.
continue
}
}
// Now that the shard was successfully sent, launch a go
// routine that will handle its result when its back.
shardHandler.collectResultAsync(attempt)
}
}
@ -212,6 +276,60 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
type shardHandler struct {
paymentHash lntypes.Hash
router *ChannelRouter
// shardErrors is a channel where errors collected by calling
// collectResultAsync will be delivered. These results are meant to be
// inspected by calling waitForShard or checkShards, and the channel
// doesn't need to be initiated if the caller is using the sync
// collectResult directly.
shardErrors chan error
// quit is closed to signal the sub goroutines of the payment lifecycle
// to stop.
quit chan struct{}
wg sync.WaitGroup
}
// stop signals any active shard goroutine to exit and waits for them to exit.
func (p *shardHandler) stop() {
close(p.quit)
p.wg.Wait()
}
// waitForShard blocks until any of the outstanding shards return.
func (p *shardHandler) waitForShard() error {
select {
case err := <-p.shardErrors:
return err
case <-p.quit:
return fmt.Errorf("shard handler quitting")
case <-p.router.quit:
return ErrRouterShuttingDown
}
}
// checkShards is a non-blocking method that check if any shards has finished
// their execution.
func (p *shardHandler) checkShards() error {
for {
select {
case err := <-p.shardErrors:
if err != nil {
return err
}
case <-p.quit:
return fmt.Errorf("shard handler quitting")
case <-p.router.quit:
return ErrRouterShuttingDown
default:
return nil
}
}
}
// launchOutcome is a type returned from launchShard that indicates whether the
@ -283,6 +401,55 @@ type shardResult struct {
err error
}
// collectResultAsync launches a goroutine that will wait for the result of the
// given HTLC attempt to be available then handle its result. Note that it will
// fail the payment with the control tower if a terminal error is encountered.
func (p *shardHandler) collectResultAsync(attempt *channeldb.HTLCAttemptInfo) {
p.wg.Add(1)
go func() {
defer p.wg.Done()
// Block until the result is available.
result, err := p.collectResult(attempt)
if err != nil {
if err != ErrRouterShuttingDown &&
err != htlcswitch.ErrSwitchExiting {
log.Errorf("Error collecting result for "+
"shard %v for payment %v: %v",
attempt.AttemptID, p.paymentHash, err)
}
select {
case p.shardErrors <- err:
case <-p.router.quit:
case <-p.quit:
}
return
}
// If a non-critical error was encountered handle it and mark
// the payment failed if the failure was terminal.
if result.err != nil {
err := p.handleSendError(attempt, result.err)
if err != nil {
select {
case p.shardErrors <- err:
case <-p.router.quit:
case <-p.quit:
}
return
}
}
select {
case p.shardErrors <- nil:
case <-p.router.quit:
case <-p.quit:
}
}()
}
// collectResult waits for the result for the given attempt to be available
// from the Switch, then records the attempt outcome with the control tower. A
// shardResult is returned, indicating the final outcome of this HTLC attempt.
@ -353,14 +520,14 @@ func (p *shardHandler) collectResult(attempt *channeldb.HTLCAttemptInfo) (
case <-p.router.quit:
return nil, ErrRouterShuttingDown
case <-p.quit:
return nil, fmt.Errorf("shard handler exiting")
}
// In case of a payment failure, fail the attempt with the control
// tower and return.
if result.Error != nil {
log.Errorf("Attempt to send payment %x failed: %v",
p.paymentHash, result.Error)
err := p.failAttempt(attempt, result.Error)
if err != nil {
return nil, err
@ -540,6 +707,9 @@ func (p *shardHandler) handleSendError(attempt *channeldb.HTLCAttemptInfo,
func (p *shardHandler) failAttempt(attempt *channeldb.HTLCAttemptInfo,
sendError error) error {
log.Warnf("Attempt %v for payment %v failed: %v", attempt.AttemptID,
p.paymentHash, sendError)
failInfo := marshallError(
sendError,
p.router.cfg.Clock.Now(),