lnd.xprv/channeldb/invoices.go
Olaoluwa Osuntokun 24ae13d3a5
channeldb: add two methods to allow seeking into the invoice time series
In this commit, we add two new methods: InvoicesAddedSince and
InvoicesSettledSince. These methods will be used by higher level
sub-systems that implement notifications to deliver any notifications
backlog based on the last add index, and last settle index that the
client knows of.

It's important to note that care has been taken to ensure that this new
API can be used in a backwards compatible manner. If a client specifies
and index of 0 for either of the methods, then no backlog will be sent.
This is due to the fact that current users of the API don't expect any
backlog notifications to be sent. Additionally, the index actually
starts at 1, instead of 0.
2018-07-06 12:21:57 -07:00

712 lines
21 KiB
Go

package channeldb
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"fmt"
"io"
"time"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/wire"
)
var (
// invoiceBucket is the name of the bucket within the database that
// stores all data related to invoices no matter their final state.
// Within the invoice bucket, each invoice is keyed by its invoice ID
// which is a monotonically increasing uint32.
invoiceBucket = []byte("invoices")
// paymentHashIndexBucket is the name of the sub-bucket within the
// invoiceBucket which indexes all invoices by their payment hash. The
// payment hash is the sha256 of the invoice's payment preimage. This
// index is used to detect duplicates, and also to provide a fast path
// for looking up incoming HTLCs to determine if we're able to settle
// them fully.
//
// maps: payHash => invoiceIndex
invoiceIndexBucket = []byte("paymenthashes")
// numInvoicesKey is the name of key which houses the auto-incrementing
// invoice ID which is essentially used as a primary key. With each
// invoice inserted, the primary key is incremented by one. This key is
// stored within the invoiceIndexBucket. Within the invoiceBucket
// invoices are uniquely identified by the invoice ID.
numInvoicesKey = []byte("nik")
// addIndexBucket is an index bucket that we'll use to create a
// monotonically increasing set of add indexes. Each time we add a new
// invoice, this sequence number will be incremented and then populated
// within the new invoice.
//
// In addition to this sequence number, we map:
//
// addIndexNo => invoiceIndex
addIndexBucket = []byte("invoice-add-index")
// settleIndexBucket is an index bucket that we'll use to create a
// monotonically increasing integer for tracking a "settle index". Each
// time an invoice is settled, this sequence number will be incremented
// as populate within the newly settled invoice.
//
// In addition to this sequence number, we map:
//
// settleIndexNo => invoiceIndex
settleIndexBucket = []byte("invoice-settle-index")
)
const (
// MaxMemoSize is maximum size of the memo field within invoices stored
// in the database.
MaxMemoSize = 1024
// MaxReceiptSize is the maximum size of the payment receipt stored
// within the database along side incoming/outgoing invoices.
MaxReceiptSize = 1024
// MaxPaymentRequestSize is the max size of a payment request for
// this invoice.
// TODO(halseth): determine the max length payment request when field
// lengths are final.
MaxPaymentRequestSize = 4096
)
// ContractTerm is a companion struct to the Invoice struct. This struct houses
// the necessary conditions required before the invoice can be considered fully
// settled by the payee.
type ContractTerm struct {
// PaymentPreimage is the preimage which is to be revealed in the
// occasion that an HTLC paying to the hash of this preimage is
// extended.
PaymentPreimage [32]byte
// Value is the expected amount of milli-satoshis to be paid to an HTLC
// which can be satisfied by the above preimage.
Value lnwire.MilliSatoshi
// Settled indicates if this particular contract term has been fully
// settled by the payer.
Settled bool
}
// Invoice is a payment invoice generated by a payee in order to request
// payment for some good or service. The inclusion of invoices within Lightning
// creates a payment work flow for merchants very similar to that of the
// existing financial system within PayPal, etc. Invoices are added to the
// database when a payment is requested, then can be settled manually once the
// payment is received at the upper layer. For record keeping purposes,
// invoices are never deleted from the database, instead a bit is toggled
// denoting the invoice has been fully settled. Within the database, all
// invoices must have a unique payment hash which is generated by taking the
// sha256 of the payment preimage.
type Invoice struct {
// Memo is an optional memo to be stored along side an invoice. The
// memo may contain further details pertaining to the invoice itself,
// or any other message which fits within the size constraints.
Memo []byte
// Receipt is an optional field dedicated for storing a
// cryptographically binding receipt of payment.
//
// TODO(roasbeef): document scheme.
Receipt []byte
// PaymentRequest is an optional field where a payment request created
// for this invoice can be stored.
PaymentRequest []byte
// CreationDate is the exact time the invoice was created.
CreationDate time.Time
// SettleDate is the exact time the invoice was settled.
SettleDate time.Time
// Terms are the contractual payment terms of the invoice. Once all the
// terms have been satisfied by the payer, then the invoice can be
// considered fully fulfilled.
//
// TODO(roasbeef): later allow for multiple terms to fulfill the final
// invoice: payment fragmentation, etc.
Terms ContractTerm
// AddIndex is an auto-incrementing integer that acts as a
// monotonically increasing sequence number for all invoices created.
// Clients can then use this field as a "checkpoint" of sorts when
// implementing a streaming RPC to notify consumers of instances where
// an invoice has been added before they re-connected.
//
// NOTE: This index starts at 1.
AddIndex uint64
// SettleIndex is an auto-incrementing integer that acts as a
// monotonically increasing sequence number for all settled invoices.
// Clients can then use this field as a "checkpoint" of sorts when
// implementing a streaming RPC to notify consumers of instances where
// an invoice has been settled before they re-connected.
//
// NOTE: This index starts at 1.
SettleIndex uint64
// AmtPaid is the final amount that we ultimately accepted for pay for
// this invoice. We specify this value independently as it's possible
// that the invoice originally didn't specify an amount, or the sender
// overpaid.
AmtPaid lnwire.MilliSatoshi
}
func validateInvoice(i *Invoice) error {
if len(i.Memo) > MaxMemoSize {
return fmt.Errorf("max length a memo is %v, and invoice "+
"of length %v was provided", MaxMemoSize, len(i.Memo))
}
if len(i.Receipt) > MaxReceiptSize {
return fmt.Errorf("max length a receipt is %v, and invoice "+
"of length %v was provided", MaxReceiptSize,
len(i.Receipt))
}
if len(i.PaymentRequest) > MaxPaymentRequestSize {
return fmt.Errorf("max length of payment request is %v, length "+
"provided was %v", MaxPaymentRequestSize,
len(i.PaymentRequest))
}
return nil
}
// AddInvoice inserts the targeted invoice into the database. If the invoice
// has *any* payment hashes which already exists within the database, then the
// insertion will be aborted and rejected due to the strict policy banning any
// duplicate payment hashes.
func (d *DB) AddInvoice(newInvoice *Invoice) error {
if err := validateInvoice(newInvoice); err != nil {
return err
}
err := d.Update(func(tx *bolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err
}
invoiceIndex, err := invoices.CreateBucketIfNotExists(
invoiceIndexBucket,
)
if err != nil {
return err
}
addIndex, err := invoices.CreateBucketIfNotExists(
addIndexBucket,
)
if err != nil {
return err
}
// Ensure that an invoice an identical payment hash doesn't
// already exist within the index.
paymentHash := sha256.Sum256(
newInvoice.Terms.PaymentPreimage[:],
)
if invoiceIndex.Get(paymentHash[:]) != nil {
return ErrDuplicateInvoice
}
// If the current running payment ID counter hasn't yet been
// created, then create it now.
var invoiceNum uint32
invoiceCounter := invoiceIndex.Get(numInvoicesKey)
if invoiceCounter == nil {
var scratch [4]byte
byteOrder.PutUint32(scratch[:], invoiceNum)
err := invoiceIndex.Put(numInvoicesKey, scratch[:])
if err != nil {
return nil
}
} else {
invoiceNum = byteOrder.Uint32(invoiceCounter)
}
return putInvoice(
invoices, invoiceIndex, addIndex, newInvoice, invoiceNum,
)
})
if err != nil {
return err
}
return err
}
// InvoicesAddedSince can be used by callers to seek into the event time series
// of all the invoices added in the database. The specified sinceAddIndex
// should be the highest add index that the caller knows of. This method will
// return all invoices with an add index greater than the specified
// sinceAddIndex.
//
// NOTE: The index starts from 1, as a result. We enforce that specifying a
// value below the starting index value is a noop.
func (d *DB) InvoicesAddedSince(sinceAddIndex uint64) ([]Invoice, error) {
var newInvoices []Invoice
// If an index of zero was specified, then in order to maintain
// backwards compat, we won't send out any new invoices.
if sinceAddIndex == 0 {
return newInvoices, nil
}
var startIndex [8]byte
byteOrder.PutUint64(startIndex[:], sinceAddIndex)
err := d.DB.View(func(tx *bolt.Tx) error {
invoices := tx.Bucket(invoiceBucket)
if invoices == nil {
return ErrNoInvoicesCreated
}
addIndex := invoices.Bucket(addIndexBucket)
if addIndex == nil {
return ErrNoInvoicesCreated
}
// We'll now run through each entry in the add index starting
// at our starting index. We'll continue until we reach the
// very end of the current key space.
invoiceCursor := addIndex.Cursor()
// We'll seek to the starting index, then manually advance the
// cursor in order to skip the entry with the since add index.
invoiceCursor.Seek(startIndex[:])
addSeqNo, invoiceKey := invoiceCursor.Next()
for ; addSeqNo != nil && bytes.Compare(addSeqNo, startIndex[:]) > 0; addSeqNo, invoiceKey = invoiceCursor.Next() {
// For each key found, we'll look up the actual
// invoice, then accumulate it into our return value.
invoice, err := fetchInvoice(invoiceKey, invoices)
if err != nil {
return err
}
newInvoices = append(newInvoices, invoice)
}
return nil
})
switch {
// If no invoices have been created, then we'll return the empty set of
// invoices.
case err == ErrNoInvoicesCreated:
case err != nil:
return nil, err
}
return newInvoices, nil
}
// LookupInvoice attempts to look up an invoice according to its 32 byte
// payment hash. If an invoice which can settle the HTLC identified by the
// passed payment hash isn't found, then an error is returned. Otherwise, the
// full invoice is returned. Before setting the incoming HTLC, the values
// SHOULD be checked to ensure the payer meets the agreed upon contractual
// terms of the payment.
func (d *DB) LookupInvoice(paymentHash [32]byte) (Invoice, error) {
var invoice Invoice
err := d.View(func(tx *bolt.Tx) error {
invoices := tx.Bucket(invoiceBucket)
if invoices == nil {
return ErrNoInvoicesCreated
}
invoiceIndex := invoices.Bucket(invoiceIndexBucket)
if invoiceIndex == nil {
return ErrNoInvoicesCreated
}
// Check the invoice index to see if an invoice paying to this
// hash exists within the DB.
invoiceNum := invoiceIndex.Get(paymentHash[:])
if invoiceNum == nil {
return ErrInvoiceNotFound
}
// An invoice matching the payment hash has been found, so
// retrieve the record of the invoice itself.
i, err := fetchInvoice(invoiceNum, invoices)
if err != nil {
return err
}
invoice = i
return nil
})
if err != nil {
return invoice, err
}
return invoice, nil
}
// FetchAllInvoices returns all invoices currently stored within the database.
// If the pendingOnly param is true, then only unsettled invoices will be
// returned, skipping all invoices that are fully settled.
func (d *DB) FetchAllInvoices(pendingOnly bool) ([]Invoice, error) {
var invoices []Invoice
err := d.View(func(tx *bolt.Tx) error {
invoiceB := tx.Bucket(invoiceBucket)
if invoiceB == nil {
return ErrNoInvoicesCreated
}
// Iterate through the entire key space of the top-level
// invoice bucket. If key with a non-nil value stores the next
// invoice ID which maps to the corresponding invoice.
return invoiceB.ForEach(func(k, v []byte) error {
if v == nil {
return nil
}
invoiceReader := bytes.NewReader(v)
invoice, err := deserializeInvoice(invoiceReader)
if err != nil {
return err
}
if pendingOnly && invoice.Terms.Settled {
return nil
}
invoices = append(invoices, invoice)
return nil
})
})
if err != nil {
return nil, err
}
return invoices, nil
}
// SettleInvoice attempts to mark an invoice corresponding to the passed
// payment hash as fully settled. If an invoice matching the passed payment
// hash doesn't existing within the database, then the action will fail with a
// "not found" error.
func (d *DB) SettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) error {
return d.Update(func(tx *bolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err
}
invoiceIndex, err := invoices.CreateBucketIfNotExists(
invoiceIndexBucket,
)
if err != nil {
return err
}
settleIndex, err := invoices.CreateBucketIfNotExists(
settleIndexBucket,
)
if err != nil {
return err
}
// Check the invoice index to see if an invoice paying to this
// hash exists within the DB.
invoiceNum := invoiceIndex.Get(paymentHash[:])
if invoiceNum == nil {
return ErrInvoiceNotFound
}
return settleInvoice(
invoices, settleIndex, invoiceNum, amtPaid,
)
})
}
// InvoicesSettledSince can be used by callers to catch up any settled invoices
// they missed within the settled invoice time series. We'll return all known
// settled invoice that have a settle index higher than the passed
// sinceSettleIndex.
//
// NOTE: The index starts from 1, as a result. We enforce that specifying a
// value below the starting index value is a noop.
func (d *DB) InvoicesSettledSince(sinceSettleIndex uint64) ([]Invoice, error) {
var settledInvoices []Invoice
// If an index of zero was specified, then in order to maintain
// backwards compat, we won't send out any new invoices.
if sinceSettleIndex == 0 {
return settledInvoices, nil
}
var startIndex [8]byte
byteOrder.PutUint64(startIndex[:], sinceSettleIndex)
err := d.DB.View(func(tx *bolt.Tx) error {
invoices := tx.Bucket(invoiceBucket)
if invoices == nil {
return ErrNoInvoicesCreated
}
settleIndex := invoices.Bucket(settleIndexBucket)
if settleIndex == nil {
return ErrNoInvoicesCreated
}
// We'll now run through each entry in the add index starting
// at our starting index. We'll continue until we reach the
// very end of the current key space.
invoiceCursor := settleIndex.Cursor()
// We'll seek to the starting index, then manually advance the
// cursor in order to skip the entry with the since add index.
invoiceCursor.Seek(startIndex[:])
seqNo, invoiceKey := invoiceCursor.Next()
for ; seqNo != nil && bytes.Compare(seqNo, startIndex[:]) > 0; seqNo, invoiceKey = invoiceCursor.Next() {
// For each key found, we'll look up the actual
// invoice, then accumulate it into our return value.
invoice, err := fetchInvoice(invoiceKey, invoices)
if err != nil {
return err
}
settledInvoices = append(settledInvoices, invoice)
}
return nil
})
if err != nil {
return nil, err
}
return settledInvoices, nil
}
func putInvoice(invoices, invoiceIndex, addIndex *bolt.Bucket,
i *Invoice, invoiceNum uint32) error {
// Create the invoice key which is just the big-endian representation
// of the invoice number.
var invoiceKey [4]byte
byteOrder.PutUint32(invoiceKey[:], invoiceNum)
// Increment the num invoice counter index so the next invoice bares
// the proper ID.
var scratch [4]byte
invoiceCounter := invoiceNum + 1
byteOrder.PutUint32(scratch[:], invoiceCounter)
if err := invoiceIndex.Put(numInvoicesKey, scratch[:]); err != nil {
return err
}
// Add the payment hash to the invoice index. This will let us quickly
// identify if we can settle an incoming payment, and also to possibly
// allow a single invoice to have multiple payment installations.
paymentHash := sha256.Sum256(i.Terms.PaymentPreimage[:])
err := invoiceIndex.Put(paymentHash[:], invoiceKey[:])
if err != nil {
return err
}
// Next, we'll obtain the next add invoice index (sequence
// number), so we can properly place this invoice within this
// event stream.
nextAddSeqNo, err := addIndex.NextSequence()
if err != nil {
return err
}
// With the next sequence obtained, we'll updating the event series in
// the add index bucket to map this current add counter to the index of
// this new invoice.
var seqNoBytes [8]byte
byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo)
if err := addIndex.Put(seqNoBytes[:], invoiceKey[:]); err != nil {
return err
}
i.AddIndex = nextAddSeqNo
// Finally, serialize the invoice itself to be written to the disk.
var buf bytes.Buffer
if err := serializeInvoice(&buf, i); err != nil {
return nil
}
return invoices.Put(invoiceKey[:], buf.Bytes())
}
func serializeInvoice(w io.Writer, i *Invoice) error {
if err := wire.WriteVarBytes(w, 0, i.Memo[:]); err != nil {
return err
}
if err := wire.WriteVarBytes(w, 0, i.Receipt[:]); err != nil {
return err
}
if err := wire.WriteVarBytes(w, 0, i.PaymentRequest[:]); err != nil {
return err
}
birthBytes, err := i.CreationDate.MarshalBinary()
if err != nil {
return err
}
if err := wire.WriteVarBytes(w, 0, birthBytes); err != nil {
return err
}
settleBytes, err := i.SettleDate.MarshalBinary()
if err != nil {
return err
}
if err := wire.WriteVarBytes(w, 0, settleBytes); err != nil {
return err
}
if _, err := w.Write(i.Terms.PaymentPreimage[:]); err != nil {
return err
}
var scratch [8]byte
byteOrder.PutUint64(scratch[:], uint64(i.Terms.Value))
if _, err := w.Write(scratch[:]); err != nil {
return err
}
if err := binary.Write(w, byteOrder, i.Terms.Settled); err != nil {
return err
}
if err := binary.Write(w, byteOrder, i.AddIndex); err != nil {
return err
}
if err := binary.Write(w, byteOrder, i.SettleIndex); err != nil {
return err
}
if err := binary.Write(w, byteOrder, int64(i.AmtPaid)); err != nil {
return err
}
return nil
}
func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (Invoice, error) {
invoiceBytes := invoices.Get(invoiceNum)
if invoiceBytes == nil {
return Invoice{}, ErrInvoiceNotFound
}
invoiceReader := bytes.NewReader(invoiceBytes)
return deserializeInvoice(invoiceReader)
}
func deserializeInvoice(r io.Reader) (Invoice, error) {
var err error
invoice := Invoice{}
// TODO(roasbeef): use read full everywhere
invoice.Memo, err = wire.ReadVarBytes(r, 0, MaxMemoSize, "")
if err != nil {
return invoice, err
}
invoice.Receipt, err = wire.ReadVarBytes(r, 0, MaxReceiptSize, "")
if err != nil {
return invoice, err
}
invoice.PaymentRequest, err = wire.ReadVarBytes(r, 0, MaxPaymentRequestSize, "")
if err != nil {
return invoice, err
}
birthBytes, err := wire.ReadVarBytes(r, 0, 300, "birth")
if err != nil {
return invoice, err
}
if err := invoice.CreationDate.UnmarshalBinary(birthBytes); err != nil {
return invoice, err
}
settledBytes, err := wire.ReadVarBytes(r, 0, 300, "settled")
if err != nil {
return invoice, err
}
if err := invoice.SettleDate.UnmarshalBinary(settledBytes); err != nil {
return invoice, err
}
if _, err := io.ReadFull(r, invoice.Terms.PaymentPreimage[:]); err != nil {
return invoice, err
}
var scratch [8]byte
if _, err := io.ReadFull(r, scratch[:]); err != nil {
return invoice, err
}
invoice.Terms.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
if err := binary.Read(r, byteOrder, &invoice.Terms.Settled); err != nil {
return invoice, err
}
if err := binary.Read(r, byteOrder, &invoice.AddIndex); err != nil {
return invoice, err
}
if err := binary.Read(r, byteOrder, &invoice.SettleIndex); err != nil {
return invoice, err
}
if err := binary.Read(r, byteOrder, &invoice.AmtPaid); err != nil {
return invoice, err
}
return invoice, nil
}
func settleInvoice(invoices, settleIndex *bolt.Bucket, invoiceNum []byte,
amtPaid lnwire.MilliSatoshi) error {
invoice, err := fetchInvoice(invoiceNum, invoices)
if err != nil {
return err
}
// Add idempotency to duplicate settles, return here to avoid
// overwriting the previous info.
if invoice.Terms.Settled {
return nil
}
// Now that we know the invoice hasn't already been settled, we'll
// update the settle index so we can place this settle event in the
// proper location within our time series.
nextSettleSeqNo, err := settleIndex.NextSequence()
if err != nil {
return err
}
var seqNoBytes [8]byte
byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo)
if err := settleIndex.Put(seqNoBytes[:], invoiceNum); err != nil {
return err
}
invoice.AmtPaid = amtPaid
invoice.Terms.Settled = true
invoice.SettleDate = time.Now()
invoice.SettleIndex = nextSettleSeqNo
var buf bytes.Buffer
if err := serializeInvoice(&buf, &invoice); err != nil {
return nil
}
return invoices.Put(invoiceNum[:], buf.Bytes())
}