routing: move sendErr handling in shardHandler

This commit moves the handleSendError method from ChannelRouter to
shardHandler. In doing so, shardHandler can now apply updates to the
in-memory paymentSession if they are found in the error message.
This commit is contained in:
yyforyongyu 2021-04-15 18:26:20 +08:00
parent 1656611358
commit cf2b5744a1
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
2 changed files with 128 additions and 122 deletions

@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/davecgh/go-spew/spew"
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb"
@ -721,25 +722,145 @@ func (p *shardHandler) sendPaymentAttempt(
// handleSendError inspects the given error from the Switch and determines
// whether we should make another payment attempt, or if it should be
// considered a terminal error. Terminal errors will be recorded with the
// control tower.
// control tower. It analyzes the sendErr for the payment attempt received from
// the switch and updates mission control and/or channel policies. Depending on
// the error type, the error is either the final outcome of the payment or we
// need to continue with an alternative route. A final outcome is indicated by
// a non-nil reason value.
func (p *shardHandler) handleSendError(attempt *channeldb.HTLCAttemptInfo,
sendErr error) error {
reason := p.router.processSendError(
attempt.AttemptID, &attempt.Route, sendErr,
)
if reason == nil {
return nil
}
internalErrorReason := channeldb.FailureReasonError
// failPayment is a helper closure that fails the payment via the
// router's control tower, which marks the payment as failed in db.
failPayment := func(reason *channeldb.FailureReason,
sendErr error) error {
log.Infof("Payment %v failed: final_outcome=%v, raw_err=%v",
p.identifier, *reason, sendErr)
err := p.router.cfg.Control.Fail(p.identifier, *reason)
if err != nil {
// Fail the payment via control tower.
if err := p.router.cfg.Control.Fail(
p.identifier, *reason); err != nil {
return err
}
return *reason
}
// reportFail is a helper closure that reports the failure to the
// mission control, which helps us to decide whether we want to retry
// the payment or not. If a non nil reason is returned from mission
// control, it will further fail the payment via control tower.
reportFail := func(srcIdx *int, msg lnwire.FailureMessage) error {
// Report outcome to mission control.
reason, err := p.router.cfg.MissionControl.ReportPaymentFail(
attempt.AttemptID, &attempt.Route, srcIdx, msg,
)
if err != nil {
log.Errorf("Error reporting payment result to mc: %v",
err)
reason = &internalErrorReason
}
// Exit early if there's no reason.
if reason == nil {
return nil
}
return failPayment(reason, sendErr)
}
if sendErr == htlcswitch.ErrUnreadableFailureMessage {
log.Tracef("Unreadable failure when sending htlc")
return reportFail(nil, nil)
}
// If the error is a ClearTextError, we have received a valid wire
// failure message, either from our own outgoing link or from a node
// down the route. If the error is not related to the propagation of
// our payment, we can stop trying because an internal error has
// occurred.
rtErr, ok := sendErr.(htlcswitch.ClearTextError)
if !ok {
return failPayment(&internalErrorReason, sendErr)
}
// failureSourceIdx is the index of the node that the failure occurred
// at. If the ClearTextError received is not a ForwardingError the
// payment error occurred at our node, so we leave this value as 0
// to indicate that the failure occurred locally. If the error is a
// ForwardingError, it did not originate at our node, so we set
// failureSourceIdx to the index of the node where the failure occurred.
failureSourceIdx := 0
source, ok := rtErr.(*htlcswitch.ForwardingError)
if ok {
failureSourceIdx = source.FailureSourceIdx
}
// Extract the wire failure and apply channel update if it contains one.
// If we received an unknown failure message from a node along the
// route, the failure message will be nil.
failureMessage := rtErr.WireMessage()
err := p.handleFailureMessage(
&attempt.Route, failureSourceIdx, failureMessage,
)
if err != nil {
return failPayment(&internalErrorReason, sendErr)
}
log.Tracef("Node=%v reported failure when sending htlc",
failureSourceIdx)
return reportFail(&failureSourceIdx, failureMessage)
}
// handleFailureMessage tries to apply a channel update present in the failure
// message if any.
func (p *shardHandler) handleFailureMessage(rt *route.Route,
errorSourceIdx int, failure lnwire.FailureMessage) error {
if failure == nil {
return nil
}
// It makes no sense to apply our own channel updates.
if errorSourceIdx == 0 {
log.Errorf("Channel update of ourselves received")
return nil
}
// Extract channel update if the error contains one.
update := p.router.extractChannelUpdate(failure)
if update == nil {
return nil
}
// Parse pubkey to allow validation of the channel update. This should
// always succeed, otherwise there is something wrong in our
// implementation. Therefore return an error.
errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes
errSource, err := btcec.ParsePubKey(
errVertex[:], btcec.S256(),
)
if err != nil {
log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v",
errorSourceIdx, errVertex)
return err
}
// Apply channel update.
if !p.router.applyChannelUpdate(update, errSource) {
log.Debugf("Invalid channel update received: node=%v",
errVertex)
}
return nil
}

@ -2228,121 +2228,6 @@ func (r *ChannelRouter) sendPayment(
}
// tryApplyChannelUpdate tries to apply a channel update present in the failure
// message if any.
func (r *ChannelRouter) tryApplyChannelUpdate(rt *route.Route,
errorSourceIdx int, failure lnwire.FailureMessage) error {
// It makes no sense to apply our own channel updates.
if errorSourceIdx == 0 {
log.Errorf("Channel update of ourselves received")
return nil
}
// Extract channel update if the error contains one.
update := r.extractChannelUpdate(failure)
if update == nil {
return nil
}
// Parse pubkey to allow validation of the channel update. This should
// always succeed, otherwise there is something wrong in our
// implementation. Therefore return an error.
errVertex := rt.Hops[errorSourceIdx-1].PubKeyBytes
errSource, err := btcec.ParsePubKey(
errVertex[:], btcec.S256(),
)
if err != nil {
log.Errorf("Cannot parse pubkey: idx=%v, pubkey=%v",
errorSourceIdx, errVertex)
return err
}
// Apply channel update.
if !r.applyChannelUpdate(update, errSource) {
log.Debugf("Invalid channel update received: node=%v",
errVertex)
}
return nil
}
// processSendError analyzes the error for the payment attempt received from the
// switch and updates mission control and/or channel policies. Depending on the
// error type, this error is either the final outcome of the payment or we need
// to continue with an alternative route. A final outcome is indicated by a
// non-nil return value.
func (r *ChannelRouter) processSendError(attemptID uint64, rt *route.Route,
sendErr error) *channeldb.FailureReason {
internalErrorReason := channeldb.FailureReasonError
reportFail := func(srcIdx *int,
msg lnwire.FailureMessage) *channeldb.FailureReason {
// Report outcome to mission control.
reason, err := r.cfg.MissionControl.ReportPaymentFail(
attemptID, rt, srcIdx, msg,
)
if err != nil {
log.Errorf("Error reporting payment result to mc: %v",
err)
return &internalErrorReason
}
return reason
}
if sendErr == htlcswitch.ErrUnreadableFailureMessage {
log.Tracef("Unreadable failure when sending htlc")
return reportFail(nil, nil)
}
// If the error is a ClearTextError, we have received a valid wire
// failure message, either from our own outgoing link or from a node
// down the route. If the error is not related to the propagation of
// our payment, we can stop trying because an internal error has
// occurred.
rtErr, ok := sendErr.(htlcswitch.ClearTextError)
if !ok {
return &internalErrorReason
}
// failureSourceIdx is the index of the node that the failure occurred
// at. If the ClearTextError received is not a ForwardingError the
// payment error occurred at our node, so we leave this value as 0
// to indicate that the failure occurred locally. If the error is a
// ForwardingError, it did not originate at our node, so we set
// failureSourceIdx to the index of the node where the failure occurred.
failureSourceIdx := 0
source, ok := rtErr.(*htlcswitch.ForwardingError)
if ok {
failureSourceIdx = source.FailureSourceIdx
}
// Extract the wire failure and apply channel update if it contains one.
// If we received an unknown failure message from a node along the
// route, the failure message will be nil.
failureMessage := rtErr.WireMessage()
if failureMessage != nil {
err := r.tryApplyChannelUpdate(
rt, failureSourceIdx, failureMessage,
)
if err != nil {
return &internalErrorReason
}
}
log.Tracef("Node=%v reported failure when sending htlc",
failureSourceIdx)
return reportFail(&failureSourceIdx, failureMessage)
}
// extractChannelUpdate examines the error and extracts the channel update.
func (r *ChannelRouter) extractChannelUpdate(
failure lnwire.FailureMessage) *lnwire.ChannelUpdate {