channeldb: add new add+settle index to invoice database

In this commit, we add two new indexes to the invoice database: the add
index, and the settle index. These to indexes essentially form a time
series index on top of the existing primary index bucket. Each time an
invoice is added, we'll advance the addIndex seqno, and then create a
mapping from seqNo -> invoiceNum. Each time an invoice is settled, we'll
do the same, but within the settle index.

This change is required in order to allow callers to effectively seek
into the current invoice database in order to obtain notifications for
any invoices they may have missed out on while they were disconnected.
This will allow us to implement robust streaming invoice notifications
within lnd to ensure that clients never miss an event.
This commit is contained in:
Olaoluwa Osuntokun 2018-04-24 20:57:30 -07:00
parent fc0f0d33b2
commit 5d20c02ea8
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -36,6 +36,26 @@ var (
// stored within the invoiceIndexBucket. Within the invoiceBucket
// invoices are uniquely identified by the invoice ID.
numInvoicesKey = []byte("nik")
// addIndexBucket is an index bucket that we'll use to create a
// monotonically increasing set of add indexes. Each time we add a new
// invoice, this sequence number will be incremented and then populated
// within the new invoice.
//
// In addition to this sequence number, we map:
//
// addIndexNo => invoiceIndex
addIndexBucket = []byte("invoice-add-index")
// settleIndexBucket is an index bucket that we'll use to create a
// monotonically increasing integer for tracking a "settle index". Each
// time an invoice is settled, this sequence number will be incremented
// as populate within the newly settled invoice.
//
// In addition to this sequence number, we map:
//
// settleIndexNo => invoiceIndex
settleIndexBucket = []byte("invoice-settle-index")
)
const (
@ -111,6 +131,25 @@ type Invoice struct {
// TODO(roasbeef): later allow for multiple terms to fulfill the final
// invoice: payment fragmentation, etc.
Terms ContractTerm
// AddIndex is an auto-incrementing integer that acts as a
// monotonically increasing sequence number for all invoices created.
// Clients can then use this field as a "checkpoint" of sorts when
// implementing a streaming RPC to notify consumers of instances where
// an invoice has been added before they re-connected.
//
// NOTE: This index starts at 1.
AddIndex uint64
// SettleIndex is an auto-incrementing integer that acts as a
// monotonically increasing sequence number for all settled invoices.
// Clients can then use this field as a "checkpoint" of sorts when
// implementing a streaming RPC to notify consumers of instances where
// an invoice has been settled before they re-connected.
//
// NOTE: This index starts at 1.
SettleIndex uint64
// AmtPaid is the final amount that we ultimately accepted for pay for
// this invoice. We specify this value independently as it's possible
// that the invoice originally didn't specify an amount, or the sender
@ -140,24 +179,35 @@ func validateInvoice(i *Invoice) error {
// has *any* payment hashes which already exists within the database, then the
// insertion will be aborted and rejected due to the strict policy banning any
// duplicate payment hashes.
func (d *DB) AddInvoice(i *Invoice) error {
if err := validateInvoice(i); err != nil {
func (d *DB) AddInvoice(newInvoice *Invoice) error {
if err := validateInvoice(newInvoice); err != nil {
return err
}
return d.Update(func(tx *bolt.Tx) error {
err := d.Update(func(tx *bolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err
}
invoiceIndex, err := invoices.CreateBucketIfNotExists(invoiceIndexBucket)
invoiceIndex, err := invoices.CreateBucketIfNotExists(
invoiceIndexBucket,
)
if err != nil {
return err
}
addIndex, err := invoices.CreateBucketIfNotExists(
addIndexBucket,
)
if err != nil {
return err
}
// Ensure that an invoice an identical payment hash doesn't
// already exist within the index.
paymentHash := sha256.Sum256(i.Terms.PaymentPreimage[:])
paymentHash := sha256.Sum256(
newInvoice.Terms.PaymentPreimage[:],
)
if invoiceIndex.Get(paymentHash[:]) != nil {
return ErrDuplicateInvoice
}
@ -169,14 +219,24 @@ func (d *DB) AddInvoice(i *Invoice) error {
if invoiceCounter == nil {
var scratch [4]byte
byteOrder.PutUint32(scratch[:], invoiceNum)
if err := invoiceIndex.Put(numInvoicesKey, scratch[:]); err != nil {
err := invoiceIndex.Put(numInvoicesKey, scratch[:])
if err != nil {
return nil
}
} else {
invoiceNum = byteOrder.Uint32(invoiceCounter)
}
return putInvoice(invoices, invoiceIndex, i, invoiceNum)
return putInvoice(
invoices, invoiceIndex, addIndex, newInvoice, invoiceNum,
)
})
if err != nil {
return err
}
return err
}
})
}
@ -186,8 +246,8 @@ func (d *DB) AddInvoice(i *Invoice) error {
// full invoice is returned. Before setting the incoming HTLC, the values
// SHOULD be checked to ensure the payer meets the agreed upon contractual
// terms of the payment.
func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) {
var invoice *Invoice
func (d *DB) LookupInvoice(paymentHash [32]byte) (Invoice, error) {
var invoice Invoice
err := d.View(func(tx *bolt.Tx) error {
invoices := tx.Bucket(invoiceBucket)
if invoices == nil {
@ -216,7 +276,7 @@ func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) {
return nil
})
if err != nil {
return nil, err
return invoice, err
}
return invoice, nil
@ -225,8 +285,8 @@ func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) {
// FetchAllInvoices returns all invoices currently stored within the database.
// If the pendingOnly param is true, then only unsettled invoices will be
// returned, skipping all invoices that are fully settled.
func (d *DB) FetchAllInvoices(pendingOnly bool) ([]*Invoice, error) {
var invoices []*Invoice
func (d *DB) FetchAllInvoices(pendingOnly bool) ([]Invoice, error) {
var invoices []Invoice
err := d.View(func(tx *bolt.Tx) error {
invoiceB := tx.Bucket(invoiceBucket)
@ -275,7 +335,15 @@ func (d *DB) SettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) er
if err != nil {
return err
}
invoiceIndex, err := invoices.CreateBucketIfNotExists(invoiceIndexBucket)
invoiceIndex, err := invoices.CreateBucketIfNotExists(
invoiceIndexBucket,
)
if err != nil {
return err
}
settleIndex, err := invoices.CreateBucketIfNotExists(
settleIndexBucket,
)
if err != nil {
return err
}
@ -293,7 +361,6 @@ func (d *DB) SettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) er
})
}
func putInvoice(invoices *bolt.Bucket, invoiceIndex *bolt.Bucket,
i *Invoice, invoiceNum uint32) error {
// Create the invoice key which is just the big-endian representation
@ -314,10 +381,30 @@ func putInvoice(invoices *bolt.Bucket, invoiceIndex *bolt.Bucket,
// identify if we can settle an incoming payment, and also to possibly
// allow a single invoice to have multiple payment installations.
paymentHash := sha256.Sum256(i.Terms.PaymentPreimage[:])
if err := invoiceIndex.Put(paymentHash[:], invoiceKey[:]); err != nil {
err := invoiceIndex.Put(paymentHash[:], invoiceKey[:])
if err != nil {
return err
}
// Next, we'll obtain the next add invoice index (sequence
// number), so we can properly place this invoice within this
// event stream.
nextAddSeqNo, err := addIndex.NextSequence()
if err != nil {
return err
}
// With the next sequence obtained, we'll updating the event series in
// the add index bucket to map this current add counter to the index of
// this new invoice.
var seqNoBytes [8]byte
byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo)
if err := addIndex.Put(seqNoBytes[:], invoiceKey[:]); err != nil {
return err
}
i.AddIndex = nextAddSeqNo
// Finally, serialize the invoice itself to be written to the disk.
var buf bytes.Buffer
if err := serializeInvoice(&buf, i); err != nil {
@ -370,16 +457,23 @@ func serializeInvoice(w io.Writer, i *Invoice) error {
return err
}
if err := binary.Write(w, byteOrder, i.AddIndex); err != nil {
return err
}
if err := binary.Write(w, byteOrder, i.SettleIndex); err != nil {
return err
}
if err := binary.Write(w, byteOrder, int64(i.AmtPaid)); err != nil {
return err
}
return nil
}
func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (*Invoice, error) {
func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (Invoice, error) {
invoiceBytes := invoices.Get(invoiceNum)
if invoiceBytes == nil {
return nil, ErrInvoiceNotFound
return Invoice{}, ErrInvoiceNotFound
}
invoiceReader := bytes.NewReader(invoiceBytes)
@ -387,52 +481,60 @@ func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (*Invoice, error) {
return deserializeInvoice(invoiceReader)
}
func deserializeInvoice(r io.Reader) (*Invoice, error) {
func deserializeInvoice(r io.Reader) (Invoice, error) {
var err error
invoice := &Invoice{}
invoice := Invoice{}
// TODO(roasbeef): use read full everywhere
invoice.Memo, err = wire.ReadVarBytes(r, 0, MaxMemoSize, "")
if err != nil {
return nil, err
return invoice, err
}
invoice.Receipt, err = wire.ReadVarBytes(r, 0, MaxReceiptSize, "")
if err != nil {
return nil, err
return invoice, err
}
invoice.PaymentRequest, err = wire.ReadVarBytes(r, 0, MaxPaymentRequestSize, "")
if err != nil {
return nil, err
return invoice, err
}
birthBytes, err := wire.ReadVarBytes(r, 0, 300, "birth")
if err != nil {
return nil, err
return invoice, err
}
if err := invoice.CreationDate.UnmarshalBinary(birthBytes); err != nil {
return nil, err
return invoice, err
}
settledBytes, err := wire.ReadVarBytes(r, 0, 300, "settled")
if err != nil {
return nil, err
return invoice, err
}
if err := invoice.SettleDate.UnmarshalBinary(settledBytes); err != nil {
return nil, err
return invoice, err
}
if _, err := io.ReadFull(r, invoice.Terms.PaymentPreimage[:]); err != nil {
return nil, err
return invoice, err
}
var scratch [8]byte
if _, err := io.ReadFull(r, scratch[:]); err != nil {
return nil, err
return invoice, err
}
invoice.Terms.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
if err := binary.Read(r, byteOrder, &invoice.Terms.Settled); err != nil {
return nil, err
return invoice, err
}
if err := binary.Read(r, byteOrder, &invoice.AddIndex); err != nil {
return invoice, err
}
if err := binary.Read(r, byteOrder, &invoice.SettleIndex); err != nil {
return invoice, err
}
if err := binary.Read(r, byteOrder, &invoice.AmtPaid); err != nil {
return invoice, err
}
@ -454,12 +556,27 @@ func settleInvoice(invoices, settleIndex *bolt.Bucket, invoiceNum []byte,
return nil
}
// Now that we know the invoice hasn't already been settled, we'll
// update the settle index so we can place this settle event in the
// proper location within our time series.
nextSettleSeqNo, err := settleIndex.NextSequence()
if err != nil {
return err
}
var seqNoBytes [8]byte
byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo)
if err := settleIndex.Put(seqNoBytes[:], invoiceNum); err != nil {
return err
}
invoice.AmtPaid = amtPaid
invoice.Terms.Settled = true
invoice.SettleDate = time.Now()
invoice.SettleIndex = nextSettleSeqNo
var buf bytes.Buffer
if err := serializeInvoice(&buf, invoice); err != nil {
if err := serializeInvoice(&buf, &invoice); err != nil {
return nil
}