htlcswitch/control_tower: use one db txn for transitions

Composes the new payment status helper methods such that
we only require one db txn per state transition. This
also allows us to remove the exclusive lock from the
control tower, and enable more concurrent requests.
This commit is contained in:
Conner Fromknecht 2018-08-20 21:14:52 -07:00
parent 86b347c996
commit 5dc2a4a4b8
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -2,8 +2,8 @@ package htlcswitch
import ( import (
"errors" "errors"
"sync"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -59,11 +59,17 @@ type ControlTower interface {
type paymentControl struct { type paymentControl struct {
strict bool strict bool
mx sync.Mutex
db *channeldb.DB db *channeldb.DB
} }
// NewPaymentControl creates a new instance of the paymentControl. // NewPaymentControl creates a new instance of the paymentControl. The strict
// flag indicates whether the controller should require "strict" state
// transitions, which would be otherwise intolerant to older databases that may
// already have duplicate payments to the same payment hash. It should be
// enabled only after sufficient checks have been made to ensure the db does not
// contain such payments. In the meantime, non-strict mode enforces a superset
// of the state transitions that prevent additional payments to a given payment
// hash from being added.
func NewPaymentControl(strict bool, db *channeldb.DB) ControlTower { func NewPaymentControl(strict bool, db *channeldb.DB) ControlTower {
return &paymentControl{ return &paymentControl{
strict: strict, strict: strict,
@ -74,114 +80,166 @@ func NewPaymentControl(strict bool, db *channeldb.DB) ControlTower {
// ClearForTakeoff checks that we don't already have an InFlight or Completed // ClearForTakeoff checks that we don't already have an InFlight or Completed
// payment identified by the same payment hash. // payment identified by the same payment hash.
func (p *paymentControl) ClearForTakeoff(htlc *lnwire.UpdateAddHTLC) error { func (p *paymentControl) ClearForTakeoff(htlc *lnwire.UpdateAddHTLC) error {
p.mx.Lock() var takeoffErr error
defer p.mx.Unlock() err := p.db.Batch(func(tx *bolt.Tx) error {
// Retrieve current status of payment from local database. // Retrieve current status of payment from local database.
paymentStatus, err := p.db.FetchPaymentStatus(htlc.PaymentHash) paymentStatus, err := channeldb.FetchPaymentStatusTx(
tx, htlc.PaymentHash,
)
if err != nil { if err != nil {
return err return err
} }
// Reset the takeoff error, to avoid carrying over an error
// from a previous execution of the batched db transaction.
takeoffErr = nil
switch paymentStatus { switch paymentStatus {
case channeldb.StatusGrounded: case channeldb.StatusGrounded:
// It is safe to reattempt a payment if we know that we haven't // It is safe to reattempt a payment if we know that we
// left one in flight. Since this one is grounded, Transition // haven't left one in flight. Since this one is
// the payment status to InFlight to prevent others. // grounded, Transition the payment status to InFlight
return p.db.UpdatePaymentStatus(htlc.PaymentHash, channeldb.StatusInFlight) // to prevent others.
return channeldb.UpdatePaymentStatusTx(
tx, htlc.PaymentHash, channeldb.StatusInFlight,
)
case channeldb.StatusInFlight: case channeldb.StatusInFlight:
// We already have an InFlight payment on the network. We will // We already have an InFlight payment on the network. We will
// disallow any more payment until a response is received. // disallow any more payment until a response is received.
return ErrPaymentInFlight takeoffErr = ErrPaymentInFlight
case channeldb.StatusCompleted: case channeldb.StatusCompleted:
// We've already completed a payment to this payment hash, // We've already completed a payment to this payment hash,
// forbid the switch from sending another. // forbid the switch from sending another.
return ErrAlreadyPaid takeoffErr = ErrAlreadyPaid
default: default:
return ErrUnknownPaymentStatus takeoffErr = ErrUnknownPaymentStatus
} }
return nil
})
if err != nil {
return err
}
return takeoffErr
} }
// Success transitions an InFlight payment to Completed, otherwise it returns an // Success transitions an InFlight payment to Completed, otherwise it returns an
// error. After calling Success, ClearForTakeoff should prevent any further // error. After calling Success, ClearForTakeoff should prevent any further
// attempts for the same payment hash. // attempts for the same payment hash.
func (p *paymentControl) Success(paymentHash [32]byte) error { func (p *paymentControl) Success(paymentHash [32]byte) error {
p.mx.Lock() var updateErr error
defer p.mx.Unlock() err := p.db.Batch(func(tx *bolt.Tx) error {
paymentStatus, err := channeldb.FetchPaymentStatusTx(
paymentStatus, err := p.db.FetchPaymentStatus(paymentHash) tx, paymentHash,
)
if err != nil { if err != nil {
return err return err
} }
// Reset the update error, to avoid carrying over an error
// from a previous execution of the batched db transaction.
updateErr = nil
switch { switch {
case paymentStatus == channeldb.StatusGrounded && p.strict: case paymentStatus == channeldb.StatusGrounded && p.strict:
// Our records show the payment as still being grounded, meaning // Our records show the payment as still being grounded,
// it never should have left the switch. // meaning it never should have left the switch.
return ErrPaymentNotInitiated updateErr = ErrPaymentNotInitiated
case paymentStatus == channeldb.StatusGrounded && !p.strict: case paymentStatus == channeldb.StatusGrounded && !p.strict:
// Our records show the payment as still being grounded, meaning // Though our records show the payment as still being
// it never should have left the switch. // grounded, meaning it never should have left the
// switch, we permit this transition in non-strict mode
// to handle inconsistent db states.
fallthrough fallthrough
case paymentStatus == channeldb.StatusInFlight: case paymentStatus == channeldb.StatusInFlight:
// A successful response was received for an InFlight payment, // A successful response was received for an InFlight
// mark it as completed to prevent sending to this payment hash // payment, mark it as completed to prevent sending to
// again. // this payment hash again.
return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusCompleted) return channeldb.UpdatePaymentStatusTx(
tx, paymentHash, channeldb.StatusCompleted,
)
case paymentStatus == channeldb.StatusCompleted: case paymentStatus == channeldb.StatusCompleted:
// The payment was completed previously, alert the caller that // The payment was completed previously, alert the
// this may be a duplicate call. // caller that this may be a duplicate call.
return ErrPaymentAlreadyCompleted updateErr = ErrPaymentAlreadyCompleted
default: default:
return ErrUnknownPaymentStatus updateErr = ErrUnknownPaymentStatus
} }
return nil
})
if err != nil {
return err
}
return updateErr
} }
// Fail transitions an InFlight payment to Grounded, otherwise it returns an // Fail transitions an InFlight payment to Grounded, otherwise it returns an
// error. After calling Fail, ClearForTakeoff should fail any further attempts // error. After calling Fail, ClearForTakeoff should fail any further attempts
// for the same payment hash. // for the same payment hash.
func (p *paymentControl) Fail(paymentHash [32]byte) error { func (p *paymentControl) Fail(paymentHash [32]byte) error {
p.mx.Lock() var updateErr error
defer p.mx.Unlock() err := p.db.Batch(func(tx *bolt.Tx) error {
paymentStatus, err := channeldb.FetchPaymentStatusTx(
paymentStatus, err := p.db.FetchPaymentStatus(paymentHash) tx, paymentHash,
)
if err != nil { if err != nil {
return err return err
} }
// Reset the update error, to avoid carrying over an error
// from a previous execution of the batched db transaction.
updateErr = nil
switch { switch {
case paymentStatus == channeldb.StatusGrounded && p.strict: case paymentStatus == channeldb.StatusGrounded && p.strict:
// Our records show the payment as still being grounded, meaning // Our records show the payment as still being grounded,
// it never should have left the switch. // meaning it never should have left the switch.
return ErrPaymentNotInitiated updateErr = ErrPaymentNotInitiated
case paymentStatus == channeldb.StatusGrounded && !p.strict: case paymentStatus == channeldb.StatusGrounded && !p.strict:
// Our records show the payment as still being grounded, meaning // Though our records show the payment as still being
// it never should have left the switch. // grounded, meaning it never should have left the
// switch, we permit this transition in non-strict mode
// to handle inconsistent db states.
fallthrough fallthrough
case paymentStatus == channeldb.StatusInFlight: case paymentStatus == channeldb.StatusInFlight:
// A failed response was received for an InFlight payment, mark // A failed response was received for an InFlight
// it as Grounded again to allow subsequent attempts. // payment, mark it as Grounded again to allow
return p.db.UpdatePaymentStatus(paymentHash, channeldb.StatusGrounded) // subsequent attempts.
return channeldb.UpdatePaymentStatusTx(
tx, paymentHash, channeldb.StatusGrounded,
)
case paymentStatus == channeldb.StatusCompleted: case paymentStatus == channeldb.StatusCompleted:
// The payment was completed previously, and we are now // The payment was completed previously, and we are now
// reporting that it has failed. Leave the status as completed, // reporting that it has failed. Leave the status as
// but alert the user that something is wrong. // completed, but alert the user that something is
return ErrPaymentAlreadyCompleted // wrong.
updateErr = ErrPaymentAlreadyCompleted
default: default:
return ErrUnknownPaymentStatus updateErr = ErrUnknownPaymentStatus
} }
return nil
})
if err != nil {
return err
}
return updateErr
} }