channeldb: add a paginator struct to process generic pagination

We now use the same method of pagination for invoices and payments.
Rather than duplicate logic across calls, we add a pagnator struct
which can have query specific logic plugged into it. This commit also
addresses an existing issue where a reverse query for invoices with an
offset larger than our last offset would not return any invoices. We
update this behaviour to act more like c.Seek and just start from the
last entry. This behaviour change is covered by a unit test that
previously checked for the lack of invoices.
This commit is contained in:
carla 2020-06-10 12:34:27 +02:00
parent f4933c67fd
commit eea871b583
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
3 changed files with 169 additions and 67 deletions

@ -1008,8 +1008,8 @@ func TestQueryInvoices(t *testing.T) {
expected: pendingInvoices[len(pendingInvoices)-15:],
},
// Fetch all invoices paginating backwards, with an index offset
// that is beyond our last offset. We currently do not return
// anything if our index is greater than our last index.
// that is beyond our last offset. We expect all invoices to be
// returned.
{
query: InvoiceQuery{
IndexOffset: numInvoices * 2,
@ -1017,7 +1017,7 @@ func TestQueryInvoices(t *testing.T) {
Reversed: true,
NumMaxInvoices: numInvoices,
},
expected: nil,
expected: invoices,
},
}

@ -839,85 +839,47 @@ func (d *DB) QueryInvoices(q InvoiceQuery) (InvoiceSlice, error) {
if invoices == nil {
return ErrNoInvoicesCreated
}
// Get the add index bucket which we will use to iterate through
// our indexed invoices.
invoiceAddIndex := invoices.NestedReadBucket(addIndexBucket)
if invoiceAddIndex == nil {
return ErrNoInvoicesCreated
}
// keyForIndex is a helper closure that retrieves the invoice
// key for the given add index of an invoice.
keyForIndex := func(c kvdb.RCursor, index uint64) []byte {
var keyIndex [8]byte
byteOrder.PutUint64(keyIndex[:], index)
_, invoiceKey := c.Seek(keyIndex[:])
return invoiceKey
}
// Create a paginator which reads from our add index bucket with
// the parameters provided by the invoice query.
paginator := newPaginator(
invoiceAddIndex.ReadCursor(), q.Reversed, q.IndexOffset,
q.NumMaxInvoices,
)
// nextKey is a helper closure to determine what the next
// invoice key is when iterating over the invoice add index.
nextKey := func(c kvdb.RCursor) ([]byte, []byte) {
if q.Reversed {
return c.Prev()
}
return c.Next()
}
// We'll be using a cursor to seek into the database and return
// a slice of invoices. We'll need to determine where to start
// our cursor depending on the parameters set within the query.
c := invoiceAddIndex.ReadCursor()
invoiceKey := keyForIndex(c, q.IndexOffset+1)
// If the query is specifying reverse iteration, then we must
// handle a few offset cases.
if q.Reversed {
switch q.IndexOffset {
// This indicates the default case, where no offset was
// specified. In that case we just start from the last
// invoice.
case 0:
_, invoiceKey = c.Last()
// This indicates the offset being set to the very
// first invoice. Since there are no invoices before
// this offset, and the direction is reversed, we can
// return without adding any invoices to the response.
case 1:
return nil
// Otherwise we start iteration at the invoice prior to
// the offset.
default:
invoiceKey = keyForIndex(c, q.IndexOffset-1)
}
}
// If we know that a set of invoices exists, then we'll begin
// our seek through the bucket in order to satisfy the query.
// We'll continue until either we reach the end of the range, or
// reach our max number of invoices.
for ; invoiceKey != nil; _, invoiceKey = nextKey(c) {
// If our current return payload exceeds the max number
// of invoices, then we'll exit now.
if uint64(len(resp.Invoices)) >= q.NumMaxInvoices {
break
}
invoice, err := fetchInvoice(invoiceKey, invoices)
// accumulateInvoices looks up an invoice based on the index we
// are given, adds it to our set of invoices if it has the right
// characteristics for our query and returns the number of items
// we have added to our set of invoices.
accumulateInvoices := func(_, indexValue []byte) (bool, error) {
invoice, err := fetchInvoice(indexValue, invoices)
if err != nil {
return err
return false, err
}
// Skip any settled or canceled invoices if the caller is
// only interested in pending ones.
// Skip any settled or canceled invoices if the caller
// is only interested in pending ones.
if q.PendingOnly && !invoice.IsPending() {
continue
return false, nil
}
// At this point, we've exhausted the offset, so we'll
// begin collecting invoices found within the range.
resp.Invoices = append(resp.Invoices, invoice)
return true, nil
}
// Query our paginator using accumulateInvoices to build up a
// set of invoices.
if err := paginator.query(accumulateInvoices); err != nil {
return err
}
// If we iterated through the add index in reverse order, then

140
channeldb/paginate.go Normal file

@ -0,0 +1,140 @@
package channeldb
import "github.com/lightningnetwork/lnd/channeldb/kvdb"
type paginator struct {
// cursor is the cursor which we are using to iterate through a bucket.
cursor kvdb.RCursor
// reversed indicates whether we are paginating forwards or backwards.
reversed bool
// indexOffset is the index from which we will begin querying.
indexOffset uint64
// totalItems is the total number of items we allow in our response.
totalItems uint64
}
// newPaginator returns a struct which can be used to query an indexed bucket
// in pages.
func newPaginator(c kvdb.RCursor, reversed bool,
indexOffset, totalItems uint64) paginator {
return paginator{
cursor: c,
reversed: reversed,
indexOffset: indexOffset,
totalItems: totalItems,
}
}
// keyValueForIndex seeks our cursor to a given index and returns the key and
// value at that position.
func (p paginator) keyValueForIndex(index uint64) ([]byte, []byte) {
var keyIndex [8]byte
byteOrder.PutUint64(keyIndex[:], index)
return p.cursor.Seek(keyIndex[:])
}
// lastIndex returns the last value in our index, if our index is empty it
// returns 0.
func (p paginator) lastIndex() uint64 {
keyIndex, _ := p.cursor.Last()
if keyIndex == nil {
return 0
}
return byteOrder.Uint64(keyIndex)
}
// nextKey is a helper closure to determine what key we should use next when
// we are iterating, depending on whether we are iterating forwards or in
// reverse.
func (p paginator) nextKey() ([]byte, []byte) {
if p.reversed {
return p.cursor.Prev()
}
return p.cursor.Next()
}
// cursorStart gets the index key and value for the first item we are looking
// up, taking into account that we may be paginating in reverse. The index
// offset provided is *excusive* so we will start with the item after the offset
// for forwards queries, and the item before the index for backwards queries.
func (p paginator) cursorStart() ([]byte, []byte) {
indexKey, indexValue := p.keyValueForIndex(p.indexOffset + 1)
// If the query is specifying reverse iteration, then we must
// handle a few offset cases.
if p.reversed {
switch {
// This indicates the default case, where no offset was
// specified. In that case we just start from the last
// entry.
case p.indexOffset == 0:
indexKey, indexValue = p.cursor.Last()
// This indicates the offset being set to the very
// first entry. Since there are no entries before
// this offset, and the direction is reversed, we can
// return without adding any invoices to the response.
case p.indexOffset == 1:
return nil, nil
// If we have been given an index offset that is beyond our last
// index value, we just return the last indexed value in our set
// since we are querying in reverse. We do not cover the case
// where our index offset equals our last index value, because
// index offset is exclusive, so we would want to start at the
// value before our last index.
case p.indexOffset > p.lastIndex():
return p.cursor.Last()
// Otherwise we have an index offset which is within our set of
// indexed keys, and we want to start at the item before our
// offset. We seek to our index offset, then return the element
// before it. We do this rather than p.indexOffset-1 to account
// for indexes that have gaps.
default:
p.keyValueForIndex(p.indexOffset)
indexKey, indexValue = p.cursor.Prev()
}
}
return indexKey, indexValue
}
// query gets the start point for our index offset and iterates through keys
// in our index until we reach the total number of items required for the query
// or we run out of cursor values. This function takes a fetchAndAppend function
// which is responsible for looking up the entry at that index, adding the entry
// to its set of return items (if desired) and return a boolean which indicates
// whether the item was added. This is required to allow the paginator to
// determine when the response has the maximum number of required items.
func (p paginator) query(fetchAndAppend func(k, v []byte) (bool, error)) error {
indexKey, indexValue := p.cursorStart()
var totalItems int
for ; indexKey != nil; indexKey, indexValue = p.nextKey() {
// If our current return payload exceeds the max number
// of invoices, then we'll exit now.
if uint64(totalItems) >= p.totalItems {
break
}
added, err := fetchAndAppend(indexKey, indexValue)
if err != nil {
return err
}
// If we added an item to our set in the latest fetch and append
// we increment our total count.
if added {
totalItems++
}
}
return nil
}