channeldb: make database logic MPP compatible

This commit redefines how the control tower handles shard and payment
level settles and failures. We now consider the payment in flight as
long it has active shards, or it has no active shards but has not
reached a terminal condition (settle of one of the shards, or a payment
level failure has been encountered).

We also make it possible to settle/fail shards regardless of the payment
level status (since we must allow late shards recording their status
even though we have already settled/failed the payment).

Finally, we make it possible to Fail the payment when it is already
failed. This is to allow multiple concurrent shards that reach terminal
errors to mark the payment failed, without havinng to synchronize.
This commit is contained in:
Johan T. Halseth 2020-04-01 00:13:25 +02:00
parent f6c97daf0c
commit 70202be580
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
4 changed files with 138 additions and 83 deletions

@ -131,6 +131,19 @@ type MPPayment struct {
Status PaymentStatus Status PaymentStatus
} }
// TerminalInfo returns any HTLC settle info recorded. If no settle info is
// recorded, any payment level failure will be returned. If neither a settle
// nor a failure is recorded, both return values will be nil.
func (m *MPPayment) TerminalInfo() (*HTLCSettleInfo, *FailureReason) {
for _, h := range m.HTLCs {
if h.Settle != nil {
return h.Settle, nil
}
}
return nil, m.FailureReason
}
// InFlightHTLCs returns the HTLCs that are still in-flight, meaning they have // InFlightHTLCs returns the HTLCs that are still in-flight, meaning they have
// not been settled or failed. // not been settled or failed.
func (m *MPPayment) InFlightHTLCs() []HTLCAttempt { func (m *MPPayment) InFlightHTLCs() []HTLCAttempt {

@ -18,22 +18,33 @@ var (
// already "in flight" on the network. // already "in flight" on the network.
ErrPaymentInFlight = errors.New("payment is in transition") ErrPaymentInFlight = errors.New("payment is in transition")
// ErrPaymentNotInitiated is returned if payment wasn't initiated in // ErrPaymentNotInitiated is returned if the payment wasn't initiated.
// switch.
ErrPaymentNotInitiated = errors.New("payment isn't initiated") ErrPaymentNotInitiated = errors.New("payment isn't initiated")
// ErrPaymentAlreadySucceeded is returned in the event we attempt to // ErrPaymentAlreadySucceeded is returned in the event we attempt to
// change the status of a payment already succeeded. // change the status of a payment already succeeded.
ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded") ErrPaymentAlreadySucceeded = errors.New("payment is already succeeded")
// ErrPaymentAlreadyFailed is returned in the event we attempt to // ErrPaymentAlreadyFailed is returned in the event we attempt to alter
// re-fail a failed payment. // a failed payment.
ErrPaymentAlreadyFailed = errors.New("payment has already failed") ErrPaymentAlreadyFailed = errors.New("payment has already failed")
// ErrUnknownPaymentStatus is returned when we do not recognize the // ErrUnknownPaymentStatus is returned when we do not recognize the
// existing state of a payment. // existing state of a payment.
ErrUnknownPaymentStatus = errors.New("unknown payment status") ErrUnknownPaymentStatus = errors.New("unknown payment status")
// ErrPaymentTerminal is returned if we attempt to alter a payment that
// already has reached a terminal condition.
ErrPaymentTerminal = errors.New("payment has reached terminal condition")
// ErrAttemptAlreadySettled is returned if we try to alter an already
// settled HTLC attempt.
ErrAttemptAlreadySettled = errors.New("attempt already settled")
// ErrAttemptAlreadyFailed is returned if we try to alter an already
// failed HTLC attempt.
ErrAttemptAlreadyFailed = errors.New("attempt already failed")
// errNoAttemptInfo is returned when no attempt info is stored yet. // errNoAttemptInfo is returned when no attempt info is stored yet.
errNoAttemptInfo = errors.New("unable to find attempt info for " + errNoAttemptInfo = errors.New("unable to find attempt info for " +
"inflight payment") "inflight payment")
@ -52,7 +63,7 @@ func NewPaymentControl(db *DB) *PaymentControl {
} }
// InitPayment checks or records the given PaymentCreationInfo with the DB, // InitPayment checks or records the given PaymentCreationInfo with the DB,
// making sure it does not already exist as an in-flight payment. Then this // making sure it does not already exist as an in-flight payment. When this
// method returns successfully, the payment is guranteeed to be in the InFlight // method returns successfully, the payment is guranteeed to be in the InFlight
// state. // state.
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash, func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
@ -168,12 +179,21 @@ func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
return err return err
} }
// We can only register attempts for payments that are payment, err := fetchPayment(bucket)
// in-flight. if err != nil {
if err := ensureInFlight(bucket); err != nil {
return err return err
} }
// Ensure the payment is in-flight.
if err := ensureInFlight(payment); err != nil {
return err
}
settle, fail := payment.TerminalInfo()
if settle != nil || fail != nil {
return ErrPaymentTerminal
}
htlcsBucket, err := bucket.CreateBucketIfNotExists( htlcsBucket, err := bucket.CreateBucketIfNotExists(
paymentHtlcsBucket, paymentHtlcsBucket,
) )
@ -241,8 +261,15 @@ func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
return err return err
} }
// We can only update keys of in-flight payments. p, err := fetchPayment(bucket)
if err := ensureInFlight(bucket); err != nil { if err != nil {
return err
}
// We can only update keys of in-flight payments. We allow
// updating keys even if the payment has reached a terminal
// condition, since the HTLC outcomes must still be updated.
if err := ensureInFlight(p); err != nil {
return err return err
} }
@ -257,6 +284,15 @@ func (p *PaymentControl) updateHtlcKey(paymentHash lntypes.Hash,
attemptID) attemptID)
} }
// Make sure the shard is not already failed or settled.
if htlcBucket.Get(htlcFailInfoKey) != nil {
return ErrAttemptAlreadyFailed
}
if htlcBucket.Get(htlcSettleInfoKey) != nil {
return ErrAttemptAlreadySettled
}
// Add or update the key for this htlc. // Add or update the key for this htlc.
err = htlcBucket.Put(key, value) err = htlcBucket.Put(key, value)
if err != nil { if err != nil {
@ -299,9 +335,17 @@ func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
return err return err
} }
// We can only mark in-flight payments as failed. // We mark the payent as failed as long as it is known. This
if err := ensureInFlight(bucket); err != nil { // lets the last attempt to fail with a terminal write its
updateErr = err // failure to the PaymentControl without synchronizing with
// other attempts.
paymentStatus, err := fetchPaymentStatus(bucket)
if err != nil {
return err
}
if paymentStatus == StatusUnknown {
updateErr = ErrPaymentNotInitiated
return nil return nil
} }
@ -318,14 +362,6 @@ func (p *PaymentControl) Fail(paymentHash lntypes.Hash,
return err return err
} }
// Final sanity check to see if there are no in-flight htlcs.
for _, htlc := range payment.HTLCs {
if htlc.Settle == nil && htlc.Failure == nil {
return errors.New("payment failed with " +
"in-flight htlc(s)")
}
}
return nil return nil
}) })
if err != nil { if err != nil {
@ -428,45 +464,29 @@ func nextPaymentSequence(tx kvdb.RwTx) ([]byte, error) {
// fetchPaymentStatus fetches the payment status of the payment. If the payment // fetchPaymentStatus fetches the payment status of the payment. If the payment
// isn't found, it will default to "StatusUnknown". // isn't found, it will default to "StatusUnknown".
func fetchPaymentStatus(bucket kvdb.ReadBucket) (PaymentStatus, error) { func fetchPaymentStatus(bucket kvdb.ReadBucket) (PaymentStatus, error) {
htlcsBucket := bucket.NestedReadBucket(paymentHtlcsBucket) // Creation info should be set for all payments, regardless of state.
if htlcsBucket != nil { // If not, it is unknown.
htlcs, err := fetchHtlcAttempts(htlcsBucket) if bucket.Get(paymentCreationInfoKey) == nil {
if err != nil { return StatusUnknown, nil
return 0, err
}
// Go through all HTLCs, and return StatusSucceeded if any of
// them did succeed.
for _, h := range htlcs {
if h.Settle != nil {
return StatusSucceeded, nil
}
}
} }
if bucket.Get(paymentFailInfoKey) != nil { payment, err := fetchPayment(bucket)
return StatusFailed, nil if err != nil {
return 0, err
} }
if bucket.Get(paymentCreationInfoKey) != nil { return payment.Status, nil
return StatusInFlight, nil
}
return StatusUnknown, nil
} }
// ensureInFlight checks whether the payment found in the given bucket has // ensureInFlight checks whether the payment found in the given bucket has
// status InFlight, and returns an error otherwise. This should be used to // status InFlight, and returns an error otherwise. This should be used to
// ensure we only mark in-flight payments as succeeded or failed. // ensure we only mark in-flight payments as succeeded or failed.
func ensureInFlight(bucket kvdb.ReadBucket) error { func ensureInFlight(payment *MPPayment) error {
paymentStatus, err := fetchPaymentStatus(bucket) paymentStatus := payment.Status
if err != nil {
return err
}
switch { switch {
// The payment was indeed InFlight, return. // The payment was indeed InFlight.
case paymentStatus == StatusInFlight: case paymentStatus == StatusInFlight:
return nil return nil

@ -260,12 +260,6 @@ func fetchPayment(bucket kvdb.ReadBucket) (*MPPayment, error) {
sequenceNum := binary.BigEndian.Uint64(seqBytes) sequenceNum := binary.BigEndian.Uint64(seqBytes)
// Get the payment status.
paymentStatus, err := fetchPaymentStatus(bucket)
if err != nil {
return nil, err
}
// Get the PaymentCreationInfo. // Get the PaymentCreationInfo.
b := bucket.Get(paymentCreationInfoKey) b := bucket.Get(paymentCreationInfoKey)
if b == nil { if b == nil {
@ -297,6 +291,44 @@ func fetchPayment(bucket kvdb.ReadBucket) (*MPPayment, error) {
failureReason = &reason failureReason = &reason
} }
// Go through all HTLCs for this payment, noting whether we have any
// settled HTLC, and any still in-flight.
var inflight, settled bool
for _, h := range htlcs {
if h.Failure != nil {
continue
}
if h.Settle != nil {
settled = true
continue
}
// If any of the HTLCs are not failed nor settled, we
// still have inflight HTLCs.
inflight = true
}
// Use the DB state to determine the status of the payment.
var paymentStatus PaymentStatus
switch {
// If any of the the HTLCs did succeed and there are no HTLCs in
// flight, the payment succeeded.
case !inflight && settled:
paymentStatus = StatusSucceeded
// If we have no in-flight HTLCs, and the payment failure is set, the
// payment is considered failed.
case !inflight && failureReason != nil:
paymentStatus = StatusFailed
// Otherwise it is still in flight.
default:
paymentStatus = StatusInFlight
}
return &MPPayment{ return &MPPayment{
sequenceNum: sequenceNum, sequenceNum: sequenceNum,
Info: creationInfo, Info: creationInfo,
@ -412,6 +444,8 @@ func (db *DB) DeletePayments() error {
return err return err
} }
// If the status is InFlight, we cannot safely delete
// the payment information, so we return early.
if paymentStatus == StatusInFlight { if paymentStatus == StatusInFlight {
return nil return nil
} }

@ -290,16 +290,7 @@ func (m *mockControlTower) SettleAttempt(phash lntypes.Hash,
m.success <- successArgs{settleInfo.Preimage} m.success <- successArgs{settleInfo.Preimage}
} }
// Only allow setting attempts for payments not yet succeeded or // Only allow setting attempts if the payment is known.
// failed.
if _, ok := m.successful[phash]; ok {
return channeldb.ErrPaymentAlreadySucceeded
}
if _, ok := m.failed[phash]; ok {
return channeldb.ErrPaymentAlreadyFailed
}
p, ok := m.payments[phash] p, ok := m.payments[phash]
if !ok { if !ok {
return channeldb.ErrPaymentNotInitiated return channeldb.ErrPaymentNotInitiated
@ -311,6 +302,13 @@ func (m *mockControlTower) SettleAttempt(phash lntypes.Hash,
continue continue
} }
if a.Settle != nil {
return channeldb.ErrAttemptAlreadySettled
}
if a.Failure != nil {
return channeldb.ErrAttemptAlreadyFailed
}
p.attempts[i].Settle = settleInfo p.attempts[i].Settle = settleInfo
// Mark the payment successful on first settled attempt. // Mark the payment successful on first settled attempt.
@ -327,16 +325,7 @@ func (m *mockControlTower) FailAttempt(phash lntypes.Hash, pid uint64,
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
// Only allow failing attempts for payments not yet succeeded or // Only allow failing attempts if the payment is known.
// failed.
if _, ok := m.successful[phash]; ok {
return channeldb.ErrPaymentAlreadySucceeded
}
if _, ok := m.failed[phash]; ok {
return channeldb.ErrPaymentAlreadyFailed
}
p, ok := m.payments[phash] p, ok := m.payments[phash]
if !ok { if !ok {
return channeldb.ErrPaymentNotInitiated return channeldb.ErrPaymentNotInitiated
@ -348,6 +337,13 @@ func (m *mockControlTower) FailAttempt(phash lntypes.Hash, pid uint64,
continue continue
} }
if a.Settle != nil {
return channeldb.ErrAttemptAlreadySettled
}
if a.Failure != nil {
return channeldb.ErrAttemptAlreadyFailed
}
p.attempts[i].Failure = failInfo p.attempts[i].Failure = failInfo
return nil return nil
} }
@ -365,15 +361,7 @@ func (m *mockControlTower) Fail(phash lntypes.Hash,
m.fail <- failArgs{reason} m.fail <- failArgs{reason}
} }
// Cannot fail already successful or failed payments. // Payment must be known.
if _, ok := m.successful[phash]; ok {
return channeldb.ErrPaymentAlreadySucceeded
}
if _, ok := m.failed[phash]; ok {
return channeldb.ErrPaymentAlreadyFailed
}
if _, ok := m.payments[phash]; !ok { if _, ok := m.payments[phash]; !ok {
return channeldb.ErrPaymentNotInitiated return channeldb.ErrPaymentNotInitiated
} }