Merge pull request #5514 from bhandras/payment_seq_blocks

payments: allocate payment sequences in blocks
This commit is contained in:
András Bánki-Horváth 2021-07-22 19:21:44 +02:00 committed by GitHub
commit 275eca1640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 20 deletions

@ -6,11 +6,18 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"sync"
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
) )
const (
// paymentSeqBlockSize is the block size used when we batch allocate
// payment sequences for future payments.
paymentSeqBlockSize = 1000
)
var ( var (
// ErrAlreadyPaid signals we have already paid this payment hash. // ErrAlreadyPaid signals we have already paid this payment hash.
ErrAlreadyPaid = errors.New("invoice is already paid") ErrAlreadyPaid = errors.New("invoice is already paid")
@ -84,6 +91,9 @@ var (
// PaymentControl implements persistence for payments and payment attempts. // PaymentControl implements persistence for payments and payment attempts.
type PaymentControl struct { type PaymentControl struct {
paymentSeqMx sync.Mutex
currPaymentSeq uint64
storedPaymentSeq uint64
db *DB db *DB
} }
@ -101,6 +111,14 @@ func NewPaymentControl(db *DB) *PaymentControl {
func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash, func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
info *PaymentCreationInfo) error { info *PaymentCreationInfo) error {
// Obtain a new sequence number for this payment. This is used
// to sort the payments in order of creation, and also acts as
// a unique identifier for each payment.
sequenceNum, err := p.nextPaymentSequence()
if err != nil {
return err
}
var b bytes.Buffer var b bytes.Buffer
if err := serializePaymentCreationInfo(&b, info); err != nil { if err := serializePaymentCreationInfo(&b, info); err != nil {
return err return err
@ -108,7 +126,7 @@ func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
infoBytes := b.Bytes() infoBytes := b.Bytes()
var updateErr error var updateErr error
err := kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error { err = kvdb.Batch(p.db.Backend, func(tx kvdb.RwTx) error {
// Reset the update error, to avoid carrying over an error // Reset the update error, to avoid carrying over an error
// from a previous execution of the batched db transaction. // from a previous execution of the batched db transaction.
updateErr = nil updateErr = nil
@ -150,14 +168,6 @@ func (p *PaymentControl) InitPayment(paymentHash lntypes.Hash,
return nil return nil
} }
// Obtain a new sequence number for this payment. This is used
// to sort the payments in order of creation, and also acts as
// a unique identifier for each payment.
sequenceNum, err := nextPaymentSequence(tx)
if err != nil {
return err
}
// Before we set our new sequence number, we check whether this // Before we set our new sequence number, we check whether this
// payment has a previously set sequence number and remove its // payment has a previously set sequence number and remove its
// index entry if it exists. This happens in the case where we // index entry if it exists. This happens in the case where we
@ -615,19 +625,45 @@ func fetchPaymentBucketUpdate(tx kvdb.RwTx, paymentHash lntypes.Hash) (
// nextPaymentSequence returns the next sequence number to store for a new // nextPaymentSequence returns the next sequence number to store for a new
// payment. // payment.
func nextPaymentSequence(tx kvdb.RwTx) ([]byte, error) { func (p *PaymentControl) nextPaymentSequence() ([]byte, error) {
payments, err := tx.CreateTopLevelBucket(paymentsRootBucket) p.paymentSeqMx.Lock()
defer p.paymentSeqMx.Unlock()
// Set a new upper bound in the DB every 1000 payments to avoid
// conflicts on the sequence when using etcd.
if p.currPaymentSeq == p.storedPaymentSeq {
var currPaymentSeq, newUpperBound uint64
if err := kvdb.Update(p.db.Backend, func(tx kvdb.RwTx) error {
paymentsBucket, err := tx.CreateTopLevelBucket(
paymentsRootBucket,
)
if err != nil { if err != nil {
return err
}
currPaymentSeq = paymentsBucket.Sequence()
newUpperBound = currPaymentSeq + paymentSeqBlockSize
return paymentsBucket.SetSequence(newUpperBound)
}, func() {}); err != nil {
return nil, err return nil, err
} }
seq, err := payments.NextSequence() // We lazy initialize the cached currPaymentSeq here using the
if err != nil { // first nextPaymentSequence() call. This if statement will auto
return nil, err // initialize our stored currPaymentSeq, since by default both
// this variable and storedPaymentSeq are zero which in turn
// will have us fetch the current values from the DB.
if p.currPaymentSeq == 0 {
p.currPaymentSeq = currPaymentSeq
} }
p.storedPaymentSeq = newUpperBound
}
p.currPaymentSeq++
b := make([]byte, 8) b := make([]byte, 8)
binary.BigEndian.PutUint64(b, seq) binary.BigEndian.PutUint64(b, p.currPaymentSeq)
return b, nil return b, nil
} }

@ -36,6 +36,9 @@ the release notes folder that at leasts links to PR being added.
code](https://github.com/lightningnetwork/lnd/pull/5547) when using etcd code](https://github.com/lightningnetwork/lnd/pull/5547) when using etcd
backend. backend.
[Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/)
to make LNDs payment throughput (and latency) with better when using etcd.
# Contributors (Alphabetical Order) # Contributors (Alphabetical Order)
* ErikEk * ErikEk
* Zero-1729 * Zero-1729