diff --git a/channeldb/mp_payment.go b/channeldb/mp_payment.go index bf2f65ef..e359ff03 100644 --- a/channeldb/mp_payment.go +++ b/channeldb/mp_payment.go @@ -144,6 +144,24 @@ func (m *MPPayment) TerminalInfo() (*HTLCSettleInfo, *FailureReason) { return nil, m.FailureReason } +// SentAmt returns the sum of sent amount and fees for HTLCs that are either +// settled or still in flight. +func (m *MPPayment) SentAmt() (lnwire.MilliSatoshi, lnwire.MilliSatoshi) { + var sent, fees lnwire.MilliSatoshi + for _, h := range m.HTLCs { + if h.Failure != nil { + continue + } + + // The attempt was not failed, meaning the amount was + // potentially sent to the receiver. + sent += h.Route.ReceiverAmt() + fees += h.Route.TotalFees() + } + + return sent, fees +} + // InFlightHTLCs returns the HTLCs that are still in-flight, meaning they have // not been settled or failed. func (m *MPPayment) InFlightHTLCs() []HTLCAttempt { diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 6e98eb1b..3a0eb75b 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -1,6 +1,8 @@ package routing import ( + "fmt" + "sync" "time" "github.com/davecgh/go-spew/spew" @@ -24,14 +26,74 @@ type paymentLifecycle struct { currentHeight int32 } +// payemntState holds a number of key insights learned from a given MPPayment +// that we use to determine what to do on each payment loop iteration. +type paymentState struct { + numShardsInFlight int + remainingAmt lnwire.MilliSatoshi + remainingFees lnwire.MilliSatoshi + terminate bool +} + +// paymentState uses the passed payment to find the latest information we need +// to act on every iteration of the payment loop. +func (p *paymentLifecycle) paymentState(payment *channeldb.MPPayment) ( + *paymentState, error) { + + // Fetch the total amount and fees that has already been sent in + // settled and still in-flight shards. + sentAmt, fees := payment.SentAmt() + + // Sanity check we haven't sent a value larger than the payment amount. + if sentAmt > p.totalAmount { + return nil, fmt.Errorf("amount sent %v exceeds "+ + "total amount %v", sentAmt, p.totalAmount) + } + + // We'll subtract the used fee from our fee budget, but allow the fees + // of the already sent shards to exceed our budget (can happen after + // restarts). + feeBudget := p.feeLimit + if fees <= feeBudget { + feeBudget -= fees + } else { + feeBudget = 0 + } + + // Get any terminal info for this payment. + settle, failure := payment.TerminalInfo() + + // If either an HTLC settled, or the payment has a payment level + // failure recorded, it means we should terminate the moment all shards + // have returned with a result. + terminate := settle != nil || failure != nil + + activeShards := payment.InFlightHTLCs() + return &paymentState{ + numShardsInFlight: len(activeShards), + remainingAmt: p.totalAmount - sentAmt, + remainingFees: feeBudget, + terminate: terminate, + }, nil +} + // resumePayment resumes the paymentLifecycle from the current state. func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) { shardHandler := &shardHandler{ router: p.router, paymentHash: p.paymentHash, + shardErrors: make(chan error), + quit: make(chan struct{}), } - // If we have an existing attempt, we'll start by collecting its result. + // When the payment lifecycle loop exits, we make sure to signal any + // sub goroutine of the shardHandler to exit, then wait for them to + // return. + defer shardHandler.stop() + + // If we had any existing attempts outstanding, we'll start by spinning + // up goroutines that'll collect their results and deliver them to the + // lifecycle loop below. payment, err := p.router.cfg.Control.FetchPayment( p.paymentHash, ) @@ -42,15 +104,21 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) { for _, a := range payment.InFlightHTLCs() { a := a - _, err := shardHandler.collectResult(&a.HTLCAttemptInfo) - if err != nil { - return [32]byte{}, nil, err - } + log.Debugf("Resuming payment shard %v for hash %v", + a.AttemptID, p.paymentHash) + + shardHandler.collectResultAsync(&a.HTLCAttemptInfo) } // We'll continue until either our payment succeeds, or we encounter a // critical error during path finding. for { + // Start by quickly checking if there are any outcomes already + // available to handle before we reevaluate our state. + if err := shardHandler.checkShards(); err != nil { + return [32]byte{}, nil, err + } + // We start every iteration by fetching the lastest state of // the payment from the ControlTower. This ensures that we will // act on the latest available information, whether we are @@ -62,90 +130,97 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) { return [32]byte{}, nil, err } - // Go through the HTLCs for this payment, determining if there - // are any in flight or settled. - var ( - attempt *channeldb.HTLCAttemptInfo - settle *channeldb.HTLCAttempt - ) - for _, a := range payment.HTLCs { - a := a - - // We have a settled HTLC, and should return when all - // shards are back. - if a.Settle != nil { - settle = &a - continue - } - - // This HTLC already failed, ignore. - if a.Failure != nil { - continue - } - - // HTLC was neither setteld nor failed, it is still in - // flight. - attempt = &a.HTLCAttemptInfo - break + // Using this latest state of the payment, calculate + // information about our active shards and terminal conditions. + state, err := p.paymentState(payment) + if err != nil { + return [32]byte{}, nil, err } - // Terminal state, return the preimage and the route taken. - if attempt == nil && settle != nil { - return settle.Settle.Preimage, &settle.Route, nil - } + log.Debugf("Payment %v in state terminate=%v, "+ + "active_shards=%v, rem_value=%v, fee_limit=%v", + p.paymentHash, state.terminate, state.numShardsInFlight, + state.remainingAmt, state.remainingFees) - // If the payment already is failed, and there is no in-flight - // HTLC, return immediately. - if attempt == nil && payment.FailureReason != nil { - return [32]byte{}, nil, *payment.FailureReason - } + switch { - // If this payment had no existing payment attempt, we create - // and send one now. - if attempt == nil { - // Before we attempt this next payment, we'll check to see if either - // we've gone past the payment attempt timeout, or the router is - // exiting. In either case, we'll stop this payment attempt short. If a - // timeout is not applicable, timeoutChan will be nil. - select { - case <-p.timeoutChan: - // Mark the payment as failed because of the - // timeout. - err := p.router.cfg.Control.Fail( - p.paymentHash, channeldb.FailureReasonTimeout, - ) - if err != nil { - return [32]byte{}, nil, err + // We have a terminal condition and no active shards, we are + // ready to exit. + case state.terminate && state.numShardsInFlight == 0: + // Find the first successful shard and return + // the preimage and route. + for _, a := range payment.HTLCs { + if a.Settle != nil { + return a.Settle.Preimage, &a.Route, nil } - - continue - - // The payment will be resumed from the current state - // after restart. - case <-p.router.quit: - return [32]byte{}, nil, ErrRouterShuttingDown - - // Fall through if we haven't hit our time limit or are - // exiting. - default: } - // Create a new payment attempt from the given payment session. - rt, err := p.paySession.RequestRoute( - p.totalAmount, p.feeLimit, 0, uint32(p.currentHeight), + // Payment failed. + return [32]byte{}, nil, *payment.FailureReason + + // If we either reached a terminal error condition (but had + // active shards still) or there is no remaining value to send, + // we'll wait for a shard outcome. + case state.terminate || state.remainingAmt == 0: + // We still have outstanding shards, so wait for a new + // outcome to be available before re-evaluating our + // state. + if err := shardHandler.waitForShard(); err != nil { + return [32]byte{}, nil, err + } + continue + } + + // Before we attempt any new shard, we'll check to see if + // either we've gone past the payment attempt timeout, or the + // router is exiting. In either case, we'll stop this payment + // attempt short. If a timeout is not applicable, timeoutChan + // will be nil. + select { + case <-p.timeoutChan: + log.Warnf("payment attempt not completed before " + + "timeout") + + // By marking the payment failed with the control + // tower, no further shards will be launched and we'll + // return with an error the moment all active shards + // have finished. + saveErr := p.router.cfg.Control.Fail( + p.paymentHash, channeldb.FailureReasonTimeout, ) - if err != nil { - log.Warnf("Failed to find route for payment %x: %v", - p.paymentHash, err) + if saveErr != nil { + return [32]byte{}, nil, saveErr + } - // Convert error to payment-level failure. - failure := errorToPaymentFailure(err) + continue + + case <-p.router.quit: + return [32]byte{}, nil, ErrRouterShuttingDown + + // Fall through if we haven't hit our time limit. + default: + } + + // Create a new payment attempt from the given payment session. + rt, err := p.paySession.RequestRoute( + state.remainingAmt, state.remainingFees, + uint32(state.numShardsInFlight), uint32(p.currentHeight), + ) + if err != nil { + log.Warnf("Failed to find route for payment %x: %v", + p.paymentHash, err) + + // There is no route to try, and we have no active + // shards. This means that there is no way for us to + // send the payment, so mark it failed with no route. + if state.numShardsInFlight == 0 { + failureCode := errorToPaymentFailure(err) + log.Debugf("Marking payment %v permanently "+ + "failed with no route: %v", + p.paymentHash, failureCode) - // If we're unable to successfully make a payment using - // any of the routes we've found, then mark the payment - // as permanently failed. saveErr := p.router.cfg.Control.Fail( - p.paymentHash, failure, + p.paymentHash, failureCode, ) if saveErr != nil { return [32]byte{}, nil, saveErr @@ -154,56 +229,45 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) { continue } - // With the route in hand, launch a new shard. - var outcome *launchOutcome - attempt, outcome, err = shardHandler.launchShard(rt) - if err != nil { + // We still have active shards, we'll wait for an + // outcome to be available before retrying. + if err := shardHandler.waitForShard(); err != nil { return [32]byte{}, nil, err } - - // We ew encountered a non-critical error when - // launching the shard, handle it - if outcome.err != nil { - // We must inspect the error to know whether it - // was critical or not, to decide whether we - // should continue trying. - err = shardHandler.handleSendError( - attempt, outcome.err, - ) - if err != nil { - return [32]byte{}, nil, err - } - - // Error was handled successfully, continue to - // make a new attempt. - continue - } - - // We'll collect the result of the shard just sent. We - // ignore the result for now if it is a success, as we - // will look it up in the control tower on the next - // loop iteration. - result, err := shardHandler.collectResult(attempt) - if err != nil { - return [32]byte{}, nil, err - } - - if result.err != nil { - // We must inspect the error to know whether it - // was critical or not, to decide whether we - // should continue trying. - err := shardHandler.handleSendError( - attempt, result.err, - ) - if err != nil { - return [32]byte{}, nil, err - } - - // Error was handled successfully, continue to - // make a new attempt. - continue - } + continue } + + // We found a route to try, launch a new shard. + attempt, outcome, err := shardHandler.launchShard(rt) + if err != nil { + return [32]byte{}, nil, err + } + + // If we encountered a non-critical error when launching the + // shard, handle it. + if outcome.err != nil { + log.Warnf("Failed to launch shard %v for "+ + "payment %v: %v", attempt.AttemptID, + p.paymentHash, outcome.err) + + // We must inspect the error to know whether it was + // critical or not, to decide whether we should + // continue trying. + err := shardHandler.handleSendError( + attempt, outcome.err, + ) + if err != nil { + return [32]byte{}, nil, err + } + + // Error was handled successfully, continue to make a + // new attempt. + continue + } + + // Now that the shard was successfully sent, launch a go + // routine that will handle its result when its back. + shardHandler.collectResultAsync(attempt) } } @@ -212,6 +276,60 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) { type shardHandler struct { paymentHash lntypes.Hash router *ChannelRouter + + // shardErrors is a channel where errors collected by calling + // collectResultAsync will be delivered. These results are meant to be + // inspected by calling waitForShard or checkShards, and the channel + // doesn't need to be initiated if the caller is using the sync + // collectResult directly. + shardErrors chan error + + // quit is closed to signal the sub goroutines of the payment lifecycle + // to stop. + quit chan struct{} + wg sync.WaitGroup +} + +// stop signals any active shard goroutine to exit and waits for them to exit. +func (p *shardHandler) stop() { + close(p.quit) + p.wg.Wait() +} + +// waitForShard blocks until any of the outstanding shards return. +func (p *shardHandler) waitForShard() error { + select { + case err := <-p.shardErrors: + return err + + case <-p.quit: + return fmt.Errorf("shard handler quitting") + + case <-p.router.quit: + return ErrRouterShuttingDown + } +} + +// checkShards is a non-blocking method that check if any shards has finished +// their execution. +func (p *shardHandler) checkShards() error { + for { + select { + case err := <-p.shardErrors: + if err != nil { + return err + } + + case <-p.quit: + return fmt.Errorf("shard handler quitting") + + case <-p.router.quit: + return ErrRouterShuttingDown + + default: + return nil + } + } } // launchOutcome is a type returned from launchShard that indicates whether the @@ -283,6 +401,55 @@ type shardResult struct { err error } +// collectResultAsync launches a goroutine that will wait for the result of the +// given HTLC attempt to be available then handle its result. Note that it will +// fail the payment with the control tower if a terminal error is encountered. +func (p *shardHandler) collectResultAsync(attempt *channeldb.HTLCAttemptInfo) { + p.wg.Add(1) + go func() { + defer p.wg.Done() + + // Block until the result is available. + result, err := p.collectResult(attempt) + if err != nil { + if err != ErrRouterShuttingDown && + err != htlcswitch.ErrSwitchExiting { + + log.Errorf("Error collecting result for "+ + "shard %v for payment %v: %v", + attempt.AttemptID, p.paymentHash, err) + } + + select { + case p.shardErrors <- err: + case <-p.router.quit: + case <-p.quit: + } + return + } + + // If a non-critical error was encountered handle it and mark + // the payment failed if the failure was terminal. + if result.err != nil { + err := p.handleSendError(attempt, result.err) + if err != nil { + select { + case p.shardErrors <- err: + case <-p.router.quit: + case <-p.quit: + } + return + } + } + + select { + case p.shardErrors <- nil: + case <-p.router.quit: + case <-p.quit: + } + }() +} + // collectResult waits for the result for the given attempt to be available // from the Switch, then records the attempt outcome with the control tower. A // shardResult is returned, indicating the final outcome of this HTLC attempt. @@ -353,14 +520,14 @@ func (p *shardHandler) collectResult(attempt *channeldb.HTLCAttemptInfo) ( case <-p.router.quit: return nil, ErrRouterShuttingDown + + case <-p.quit: + return nil, fmt.Errorf("shard handler exiting") } // In case of a payment failure, fail the attempt with the control // tower and return. if result.Error != nil { - log.Errorf("Attempt to send payment %x failed: %v", - p.paymentHash, result.Error) - err := p.failAttempt(attempt, result.Error) if err != nil { return nil, err @@ -540,6 +707,9 @@ func (p *shardHandler) handleSendError(attempt *channeldb.HTLCAttemptInfo, func (p *shardHandler) failAttempt(attempt *channeldb.HTLCAttemptInfo, sendError error) error { + log.Warnf("Attempt %v for payment %v failed: %v", attempt.AttemptID, + p.paymentHash, sendError) + failInfo := marshallError( sendError, p.router.cfg.Clock.Now(),