diff --git a/channeldb/invoices.go b/channeldb/invoices.go index 9d4a7172..85338a9c 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -36,6 +36,26 @@ var ( // stored within the invoiceIndexBucket. Within the invoiceBucket // invoices are uniquely identified by the invoice ID. numInvoicesKey = []byte("nik") + + // addIndexBucket is an index bucket that we'll use to create a + // monotonically increasing set of add indexes. Each time we add a new + // invoice, this sequence number will be incremented and then populated + // within the new invoice. + // + // In addition to this sequence number, we map: + // + // addIndexNo => invoiceIndex + addIndexBucket = []byte("invoice-add-index") + + // settleIndexBucket is an index bucket that we'll use to create a + // monotonically increasing integer for tracking a "settle index". Each + // time an invoice is settled, this sequence number will be incremented + // as populate within the newly settled invoice. + // + // In addition to this sequence number, we map: + // + // settleIndexNo => invoiceIndex + settleIndexBucket = []byte("invoice-settle-index") ) const ( @@ -111,6 +131,25 @@ type Invoice struct { // TODO(roasbeef): later allow for multiple terms to fulfill the final // invoice: payment fragmentation, etc. Terms ContractTerm + + // AddIndex is an auto-incrementing integer that acts as a + // monotonically increasing sequence number for all invoices created. + // Clients can then use this field as a "checkpoint" of sorts when + // implementing a streaming RPC to notify consumers of instances where + // an invoice has been added before they re-connected. + // + // NOTE: This index starts at 1. + AddIndex uint64 + + // SettleIndex is an auto-incrementing integer that acts as a + // monotonically increasing sequence number for all settled invoices. + // Clients can then use this field as a "checkpoint" of sorts when + // implementing a streaming RPC to notify consumers of instances where + // an invoice has been settled before they re-connected. + // + // NOTE: This index starts at 1. + SettleIndex uint64 + // AmtPaid is the final amount that we ultimately accepted for pay for // this invoice. We specify this value independently as it's possible // that the invoice originally didn't specify an amount, or the sender @@ -140,24 +179,35 @@ func validateInvoice(i *Invoice) error { // has *any* payment hashes which already exists within the database, then the // insertion will be aborted and rejected due to the strict policy banning any // duplicate payment hashes. -func (d *DB) AddInvoice(i *Invoice) error { - if err := validateInvoice(i); err != nil { +func (d *DB) AddInvoice(newInvoice *Invoice) error { + if err := validateInvoice(newInvoice); err != nil { return err } - return d.Update(func(tx *bolt.Tx) error { + + err := d.Update(func(tx *bolt.Tx) error { invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err } - invoiceIndex, err := invoices.CreateBucketIfNotExists(invoiceIndexBucket) + invoiceIndex, err := invoices.CreateBucketIfNotExists( + invoiceIndexBucket, + ) + if err != nil { + return err + } + addIndex, err := invoices.CreateBucketIfNotExists( + addIndexBucket, + ) if err != nil { return err } // Ensure that an invoice an identical payment hash doesn't // already exist within the index. - paymentHash := sha256.Sum256(i.Terms.PaymentPreimage[:]) + paymentHash := sha256.Sum256( + newInvoice.Terms.PaymentPreimage[:], + ) if invoiceIndex.Get(paymentHash[:]) != nil { return ErrDuplicateInvoice } @@ -169,14 +219,24 @@ func (d *DB) AddInvoice(i *Invoice) error { if invoiceCounter == nil { var scratch [4]byte byteOrder.PutUint32(scratch[:], invoiceNum) - if err := invoiceIndex.Put(numInvoicesKey, scratch[:]); err != nil { + err := invoiceIndex.Put(numInvoicesKey, scratch[:]) + if err != nil { return nil } } else { invoiceNum = byteOrder.Uint32(invoiceCounter) } - return putInvoice(invoices, invoiceIndex, i, invoiceNum) + return putInvoice( + invoices, invoiceIndex, addIndex, newInvoice, invoiceNum, + ) + }) + if err != nil { + return err + } + + return err +} }) } @@ -186,8 +246,8 @@ func (d *DB) AddInvoice(i *Invoice) error { // full invoice is returned. Before setting the incoming HTLC, the values // SHOULD be checked to ensure the payer meets the agreed upon contractual // terms of the payment. -func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) { - var invoice *Invoice +func (d *DB) LookupInvoice(paymentHash [32]byte) (Invoice, error) { + var invoice Invoice err := d.View(func(tx *bolt.Tx) error { invoices := tx.Bucket(invoiceBucket) if invoices == nil { @@ -216,7 +276,7 @@ func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) { return nil }) if err != nil { - return nil, err + return invoice, err } return invoice, nil @@ -225,8 +285,8 @@ func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) { // FetchAllInvoices returns all invoices currently stored within the database. // If the pendingOnly param is true, then only unsettled invoices will be // returned, skipping all invoices that are fully settled. -func (d *DB) FetchAllInvoices(pendingOnly bool) ([]*Invoice, error) { - var invoices []*Invoice +func (d *DB) FetchAllInvoices(pendingOnly bool) ([]Invoice, error) { + var invoices []Invoice err := d.View(func(tx *bolt.Tx) error { invoiceB := tx.Bucket(invoiceBucket) @@ -275,7 +335,15 @@ func (d *DB) SettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) er if err != nil { return err } - invoiceIndex, err := invoices.CreateBucketIfNotExists(invoiceIndexBucket) + invoiceIndex, err := invoices.CreateBucketIfNotExists( + invoiceIndexBucket, + ) + if err != nil { + return err + } + settleIndex, err := invoices.CreateBucketIfNotExists( + settleIndexBucket, + ) if err != nil { return err } @@ -293,7 +361,6 @@ func (d *DB) SettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) er }) } -func putInvoice(invoices *bolt.Bucket, invoiceIndex *bolt.Bucket, i *Invoice, invoiceNum uint32) error { // Create the invoice key which is just the big-endian representation @@ -314,10 +381,30 @@ func putInvoice(invoices *bolt.Bucket, invoiceIndex *bolt.Bucket, // identify if we can settle an incoming payment, and also to possibly // allow a single invoice to have multiple payment installations. paymentHash := sha256.Sum256(i.Terms.PaymentPreimage[:]) - if err := invoiceIndex.Put(paymentHash[:], invoiceKey[:]); err != nil { + err := invoiceIndex.Put(paymentHash[:], invoiceKey[:]) + if err != nil { return err } + // Next, we'll obtain the next add invoice index (sequence + // number), so we can properly place this invoice within this + // event stream. + nextAddSeqNo, err := addIndex.NextSequence() + if err != nil { + return err + } + + // With the next sequence obtained, we'll updating the event series in + // the add index bucket to map this current add counter to the index of + // this new invoice. + var seqNoBytes [8]byte + byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo) + if err := addIndex.Put(seqNoBytes[:], invoiceKey[:]); err != nil { + return err + } + + i.AddIndex = nextAddSeqNo + // Finally, serialize the invoice itself to be written to the disk. var buf bytes.Buffer if err := serializeInvoice(&buf, i); err != nil { @@ -370,16 +457,23 @@ func serializeInvoice(w io.Writer, i *Invoice) error { return err } + if err := binary.Write(w, byteOrder, i.AddIndex); err != nil { + return err + } + if err := binary.Write(w, byteOrder, i.SettleIndex); err != nil { + return err + } if err := binary.Write(w, byteOrder, int64(i.AmtPaid)); err != nil { return err } + return nil } -func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (*Invoice, error) { +func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (Invoice, error) { invoiceBytes := invoices.Get(invoiceNum) if invoiceBytes == nil { - return nil, ErrInvoiceNotFound + return Invoice{}, ErrInvoiceNotFound } invoiceReader := bytes.NewReader(invoiceBytes) @@ -387,52 +481,60 @@ func fetchInvoice(invoiceNum []byte, invoices *bolt.Bucket) (*Invoice, error) { return deserializeInvoice(invoiceReader) } -func deserializeInvoice(r io.Reader) (*Invoice, error) { +func deserializeInvoice(r io.Reader) (Invoice, error) { var err error - invoice := &Invoice{} + invoice := Invoice{} // TODO(roasbeef): use read full everywhere invoice.Memo, err = wire.ReadVarBytes(r, 0, MaxMemoSize, "") if err != nil { - return nil, err + return invoice, err } invoice.Receipt, err = wire.ReadVarBytes(r, 0, MaxReceiptSize, "") if err != nil { - return nil, err + return invoice, err } invoice.PaymentRequest, err = wire.ReadVarBytes(r, 0, MaxPaymentRequestSize, "") if err != nil { - return nil, err + return invoice, err } birthBytes, err := wire.ReadVarBytes(r, 0, 300, "birth") if err != nil { - return nil, err + return invoice, err } if err := invoice.CreationDate.UnmarshalBinary(birthBytes); err != nil { - return nil, err + return invoice, err } settledBytes, err := wire.ReadVarBytes(r, 0, 300, "settled") if err != nil { - return nil, err + return invoice, err } if err := invoice.SettleDate.UnmarshalBinary(settledBytes); err != nil { - return nil, err + return invoice, err } if _, err := io.ReadFull(r, invoice.Terms.PaymentPreimage[:]); err != nil { - return nil, err + return invoice, err } var scratch [8]byte if _, err := io.ReadFull(r, scratch[:]); err != nil { - return nil, err + return invoice, err } invoice.Terms.Value = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:])) if err := binary.Read(r, byteOrder, &invoice.Terms.Settled); err != nil { - return nil, err + return invoice, err + } + + if err := binary.Read(r, byteOrder, &invoice.AddIndex); err != nil { + return invoice, err + } + if err := binary.Read(r, byteOrder, &invoice.SettleIndex); err != nil { + return invoice, err + } if err := binary.Read(r, byteOrder, &invoice.AmtPaid); err != nil { return invoice, err } @@ -454,12 +556,27 @@ func settleInvoice(invoices, settleIndex *bolt.Bucket, invoiceNum []byte, return nil } + // Now that we know the invoice hasn't already been settled, we'll + // update the settle index so we can place this settle event in the + // proper location within our time series. + nextSettleSeqNo, err := settleIndex.NextSequence() + if err != nil { + return err + } + + var seqNoBytes [8]byte + byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo) + if err := settleIndex.Put(seqNoBytes[:], invoiceNum); err != nil { + return err + } + invoice.AmtPaid = amtPaid invoice.Terms.Settled = true invoice.SettleDate = time.Now() + invoice.SettleIndex = nextSettleSeqNo var buf bytes.Buffer - if err := serializeInvoice(&buf, invoice); err != nil { + if err := serializeInvoice(&buf, &invoice); err != nil { return nil }