kvdb+etcd: change flattened bucket key derivation algorithm
This commit changes the key derivation algo we use to emulate buckets similar to bbolt. The issue with prefixing keys with either a bucket or a value prefix is that the cursor couldn't effectively iterate trough all keys in a bucket, as it skipped the bucket keys. While there are multiple ways to fix that issue (eg. two pointers, iterating value keys then bucket keys, etc), the cleanest is to instead of prefixes in keys we use a postfix indicating whether a key is a bucket or a value. This also simplifies all operations where we (recursively) iterate a bucket and is equivalent with the prefixing key derivation with the addition that bucket and value keys are now continous.
This commit is contained in:
parent
9173958f9c
commit
63e9d6102f
@ -11,9 +11,9 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
bucketPrefix = []byte("b")
|
||||
valuePrefix = []byte("v")
|
||||
sequencePrefix = []byte("$")
|
||||
valuePostfix = []byte{0x00}
|
||||
bucketPostfix = []byte{0xFF}
|
||||
sequencePrefix = []byte("$seq$")
|
||||
)
|
||||
|
||||
// makeBucketID returns a deterministic key for the passed byte slice.
|
||||
@ -28,52 +28,65 @@ func isValidBucketID(s []byte) bool {
|
||||
return len(s) == bucketIDLength
|
||||
}
|
||||
|
||||
// makeKey concatenates prefix, parent and key into one byte slice.
|
||||
// The prefix indicates the use of this key (whether bucket, value or sequence),
|
||||
// while parentID refers to the parent bucket.
|
||||
func makeKey(prefix, parent, key []byte) []byte {
|
||||
keyBuf := make([]byte, len(prefix)+len(parent)+len(key))
|
||||
copy(keyBuf, prefix)
|
||||
copy(keyBuf[len(prefix):], parent)
|
||||
copy(keyBuf[len(prefix)+len(parent):], key)
|
||||
// makeKey concatenates parent, key and postfix into one byte slice.
|
||||
// The postfix indicates the use of this key (whether bucket or value), while
|
||||
// parent refers to the parent bucket.
|
||||
func makeKey(parent, key, postfix []byte) []byte {
|
||||
keyBuf := make([]byte, len(parent)+len(key)+len(postfix))
|
||||
copy(keyBuf, parent)
|
||||
copy(keyBuf[len(parent):], key)
|
||||
copy(keyBuf[len(parent)+len(key):], postfix)
|
||||
|
||||
return keyBuf
|
||||
}
|
||||
|
||||
// makePrefix concatenates prefix with parent into one byte slice.
|
||||
func makePrefix(prefix []byte, parent []byte) []byte {
|
||||
prefixBuf := make([]byte, len(prefix)+len(parent))
|
||||
copy(prefixBuf, prefix)
|
||||
copy(prefixBuf[len(prefix):], parent)
|
||||
|
||||
return prefixBuf
|
||||
}
|
||||
|
||||
// makeBucketKey returns a bucket key from the passed parent bucket id and
|
||||
// the key.
|
||||
func makeBucketKey(parent []byte, key []byte) []byte {
|
||||
return makeKey(bucketPrefix, parent, key)
|
||||
return makeKey(parent, key, bucketPostfix)
|
||||
}
|
||||
|
||||
// makeValueKey returns a value key from the passed parent bucket id and
|
||||
// the key.
|
||||
func makeValueKey(parent []byte, key []byte) []byte {
|
||||
return makeKey(valuePrefix, parent, key)
|
||||
return makeKey(parent, key, valuePostfix)
|
||||
}
|
||||
|
||||
// makeSequenceKey returns a sequence key of the passed parent bucket id.
|
||||
func makeSequenceKey(parent []byte) []byte {
|
||||
return makeKey(sequencePrefix, parent, nil)
|
||||
keyBuf := make([]byte, len(sequencePrefix)+len(parent))
|
||||
copy(keyBuf, sequencePrefix)
|
||||
copy(keyBuf[len(sequencePrefix):], parent)
|
||||
return keyBuf
|
||||
}
|
||||
|
||||
// makeBucketPrefix returns the bucket prefix of the passed parent bucket id.
|
||||
// This prefix is used for all sub buckets.
|
||||
func makeBucketPrefix(parent []byte) []byte {
|
||||
return makePrefix(bucketPrefix, parent)
|
||||
// isBucketKey returns true if the passed key is a bucket key, meaning it
|
||||
// keys a bucket name.
|
||||
func isBucketKey(key string) bool {
|
||||
if len(key) < bucketIDLength+1 {
|
||||
return false
|
||||
}
|
||||
|
||||
return key[len(key)-1] == bucketPostfix[0]
|
||||
}
|
||||
|
||||
// makeValuePrefix returns the value prefix of the passed parent bucket id.
|
||||
// This prefix is used for all key/values in the bucket.
|
||||
func makeValuePrefix(parent []byte) []byte {
|
||||
return makePrefix(valuePrefix, parent)
|
||||
// getKey chops out the key from the raw key (by removing the bucket id
|
||||
// prefixing the key and the postfix indicating whether it is a bucket or
|
||||
// a value key)
|
||||
func getKey(rawKey string) []byte {
|
||||
return []byte(rawKey[bucketIDLength : len(rawKey)-1])
|
||||
}
|
||||
|
||||
// getKeyVal chops out the key from the raw key (by removing the bucket id
|
||||
// prefixing the key and the postfix indicating whether it is a bucket or
|
||||
// a value key) and also returns the appropriate value for the key, which is
|
||||
// nil in case of buckets (or the set value otherwise).
|
||||
func getKeyVal(kv *KV) ([]byte, []byte) {
|
||||
var val []byte
|
||||
|
||||
if !isBucketKey(kv.key) {
|
||||
val = []byte(kv.val)
|
||||
}
|
||||
|
||||
return getKey(kv.key), val
|
||||
}
|
||||
|
@ -46,44 +46,23 @@ func (b *readWriteBucket) NestedReadBucket(key []byte) walletdb.ReadBucket {
|
||||
// is nil, but it does not include the key/value pairs within those
|
||||
// nested buckets.
|
||||
func (b *readWriteBucket) ForEach(cb func(k, v []byte) error) error {
|
||||
prefix := makeValuePrefix(b.id)
|
||||
prefixLen := len(prefix)
|
||||
prefix := string(b.id)
|
||||
|
||||
// Get the first matching key that is in the bucket.
|
||||
kv, err := b.tx.stm.First(string(prefix))
|
||||
kv, err := b.tx.stm.First(prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for kv != nil {
|
||||
if err := cb([]byte(kv.key[prefixLen:]), []byte(kv.val)); err != nil {
|
||||
key, val := getKeyVal(kv)
|
||||
|
||||
if err := cb(key, val); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Step to the next key.
|
||||
kv, err = b.tx.stm.Next(string(prefix), kv.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Make a bucket prefix. This prefixes all sub buckets.
|
||||
prefix = makeBucketPrefix(b.id)
|
||||
prefixLen = len(prefix)
|
||||
|
||||
// Get the first bucket.
|
||||
kv, err = b.tx.stm.First(string(prefix))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for kv != nil {
|
||||
if err := cb([]byte(kv.key[prefixLen:]), nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Step to the next bucket.
|
||||
kv, err = b.tx.stm.Next(string(prefix), kv.key)
|
||||
kv, err = b.tx.stm.Next(prefix, kv.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -241,10 +220,7 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error {
|
||||
id := queue[0]
|
||||
queue = queue[1:]
|
||||
|
||||
// Delete values in the current bucket
|
||||
valuePrefix := string(makeValuePrefix(id))
|
||||
|
||||
kv, err := b.tx.stm.First(valuePrefix)
|
||||
kv, err := b.tx.stm.First(string(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -252,35 +228,23 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error {
|
||||
for kv != nil {
|
||||
b.tx.del(kv.key)
|
||||
|
||||
kv, err = b.tx.stm.Next(valuePrefix, kv.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Iterate sub buckets
|
||||
bucketPrefix := string(makeBucketPrefix(id))
|
||||
|
||||
kv, err = b.tx.stm.First(bucketPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for kv != nil {
|
||||
// Delete sub bucket key.
|
||||
b.tx.del(kv.key)
|
||||
// Queue it for traversal.
|
||||
if isBucketKey(kv.key) {
|
||||
queue = append(queue, []byte(kv.val))
|
||||
}
|
||||
|
||||
kv, err = b.tx.stm.Next(bucketPrefix, kv.key)
|
||||
kv, err = b.tx.stm.Next(string(id), kv.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Finally delete the sequence key for the bucket.
|
||||
b.tx.del(string(makeSequenceKey(id)))
|
||||
}
|
||||
|
||||
// Delete the top level bucket.
|
||||
// Delete the top level bucket and sequence key.
|
||||
b.tx.del(bucketKey)
|
||||
b.tx.del(string(makeSequenceKey(bucketVal)))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -315,7 +315,7 @@ func TestBucketForEachWithError(t *testing.T) {
|
||||
i := 0
|
||||
// Error while iterating value keys.
|
||||
err = apple.ForEach(func(key, val []byte) error {
|
||||
if i == 1 {
|
||||
if i == 2 {
|
||||
return fmt.Errorf("error")
|
||||
}
|
||||
|
||||
@ -325,6 +325,7 @@ func TestBucketForEachWithError(t *testing.T) {
|
||||
})
|
||||
|
||||
expected := map[string]string{
|
||||
"banana": "",
|
||||
"key1": "val1",
|
||||
}
|
||||
|
||||
@ -345,9 +346,9 @@ func TestBucketForEachWithError(t *testing.T) {
|
||||
})
|
||||
|
||||
expected = map[string]string{
|
||||
"banana": "",
|
||||
"key1": "val1",
|
||||
"key2": "val2",
|
||||
"banana": "",
|
||||
}
|
||||
|
||||
require.Equal(t, expected, got)
|
||||
|
@ -19,7 +19,7 @@ type readWriteCursor struct {
|
||||
func newReadWriteCursor(bucket *readWriteBucket) *readWriteCursor {
|
||||
return &readWriteCursor{
|
||||
bucket: bucket,
|
||||
prefix: string(makeValuePrefix(bucket.id)),
|
||||
prefix: string(bucket.id),
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,8 +35,7 @@ func (c *readWriteCursor) First() (key, value []byte) {
|
||||
|
||||
if kv != nil {
|
||||
c.currKey = kv.key
|
||||
// Chop the prefix and return the key/value.
|
||||
return []byte(kv.key[len(c.prefix):]), []byte(kv.val)
|
||||
return getKeyVal(kv)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
@ -53,8 +52,7 @@ func (c *readWriteCursor) Last() (key, value []byte) {
|
||||
|
||||
if kv != nil {
|
||||
c.currKey = kv.key
|
||||
// Chop the prefix and return the key/value.
|
||||
return []byte(kv.key[len(c.prefix):]), []byte(kv.val)
|
||||
return getKeyVal(kv)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
@ -71,8 +69,7 @@ func (c *readWriteCursor) Next() (key, value []byte) {
|
||||
|
||||
if kv != nil {
|
||||
c.currKey = kv.key
|
||||
// Chop the prefix and return the key/value.
|
||||
return []byte(kv.key[len(c.prefix):]), []byte(kv.val)
|
||||
return getKeyVal(kv)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
@ -89,8 +86,7 @@ func (c *readWriteCursor) Prev() (key, value []byte) {
|
||||
|
||||
if kv != nil {
|
||||
c.currKey = kv.key
|
||||
// Chop the prefix and return the key/value.
|
||||
return []byte(kv.key[len(c.prefix):]), []byte(kv.val)
|
||||
return getKeyVal(kv)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
@ -115,8 +111,7 @@ func (c *readWriteCursor) Seek(seek []byte) (key, value []byte) {
|
||||
|
||||
if kv != nil {
|
||||
c.currKey = kv.key
|
||||
// Chop the prefix and return the key/value.
|
||||
return []byte(kv.key[len(c.prefix):]), []byte(kv.val)
|
||||
return getKeyVal(kv)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
@ -133,11 +128,14 @@ func (c *readWriteCursor) Delete() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete the current key.
|
||||
c.bucket.tx.stm.Del(c.currKey)
|
||||
if isBucketKey(c.currKey) {
|
||||
c.bucket.DeleteNestedBucket(getKey(c.currKey))
|
||||
} else {
|
||||
c.bucket.Delete(getKey(c.currKey))
|
||||
}
|
||||
|
||||
// Set current key to the next one if possible.
|
||||
if nextKey != nil {
|
||||
// Set current key to the next one.
|
||||
c.currKey = nextKey.key
|
||||
}
|
||||
|
||||
|
@ -291,3 +291,78 @@ func TestReadWriteCursor(t *testing.T) {
|
||||
}
|
||||
require.Equal(t, expected, f.Dump())
|
||||
}
|
||||
|
||||
// TestReadWriteCursorWithBucketAndValue tests that cursors are able to iterate
|
||||
// over both bucket and value keys if both are present in the iterated bucket.
|
||||
func TestReadWriteCursorWithBucketAndValue(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f := NewEtcdTestFixture(t)
|
||||
defer f.Cleanup()
|
||||
|
||||
db, err := newEtcdBackend(f.BackendConfig())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Pre-store the first half of the interval.
|
||||
require.NoError(t, db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||
b, err := tx.CreateTopLevelBucket([]byte("apple"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, b)
|
||||
|
||||
require.NoError(t, b.Put([]byte("key"), []byte("val")))
|
||||
|
||||
b1, err := b.CreateBucket([]byte("banana"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, b1)
|
||||
|
||||
b2, err := b.CreateBucket([]byte("pear"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, b2)
|
||||
|
||||
return nil
|
||||
}))
|
||||
|
||||
err = db.View(func(tx walletdb.ReadTx) error {
|
||||
b := tx.ReadBucket([]byte("apple"))
|
||||
require.NotNil(t, b)
|
||||
|
||||
cursor := b.ReadCursor()
|
||||
|
||||
// First on valid interval.
|
||||
k, v := cursor.First()
|
||||
require.Equal(t, []byte("banana"), k)
|
||||
require.Nil(t, v)
|
||||
|
||||
k, v = cursor.Next()
|
||||
require.Equal(t, []byte("key"), k)
|
||||
require.Equal(t, []byte("val"), v)
|
||||
|
||||
k, v = cursor.Last()
|
||||
require.Equal(t, []byte("pear"), k)
|
||||
require.Nil(t, v)
|
||||
|
||||
k, v = cursor.Seek([]byte("k"))
|
||||
require.Equal(t, []byte("key"), k)
|
||||
require.Equal(t, []byte("val"), v)
|
||||
|
||||
k, v = cursor.Seek([]byte("banana"))
|
||||
require.Equal(t, []byte("banana"), k)
|
||||
require.Nil(t, v)
|
||||
|
||||
k, v = cursor.Next()
|
||||
require.Equal(t, []byte("key"), k)
|
||||
require.Equal(t, []byte("val"), v)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
expected := map[string]string{
|
||||
bkey("apple"): bval("apple"),
|
||||
bkey("apple", "banana"): bval("apple", "banana"),
|
||||
bkey("apple", "pear"): bval("apple", "pear"),
|
||||
vkey("key", "apple"): "val",
|
||||
}
|
||||
require.Equal(t, expected, f.Dump())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user