diff --git a/channeldb/kvdb/etcd/bucket.go b/channeldb/kvdb/etcd/bucket.go index 3bc087db..8a1ff071 100644 --- a/channeldb/kvdb/etcd/bucket.go +++ b/channeldb/kvdb/etcd/bucket.go @@ -11,9 +11,9 @@ const ( ) var ( - bucketPrefix = []byte("b") - valuePrefix = []byte("v") - sequencePrefix = []byte("$") + valuePostfix = []byte{0x00} + bucketPostfix = []byte{0xFF} + sequencePrefix = []byte("$seq$") ) // makeBucketID returns a deterministic key for the passed byte slice. @@ -28,52 +28,65 @@ func isValidBucketID(s []byte) bool { return len(s) == bucketIDLength } -// makeKey concatenates prefix, parent and key into one byte slice. -// The prefix indicates the use of this key (whether bucket, value or sequence), -// while parentID refers to the parent bucket. -func makeKey(prefix, parent, key []byte) []byte { - keyBuf := make([]byte, len(prefix)+len(parent)+len(key)) - copy(keyBuf, prefix) - copy(keyBuf[len(prefix):], parent) - copy(keyBuf[len(prefix)+len(parent):], key) +// makeKey concatenates parent, key and postfix into one byte slice. +// The postfix indicates the use of this key (whether bucket or value), while +// parent refers to the parent bucket. +func makeKey(parent, key, postfix []byte) []byte { + keyBuf := make([]byte, len(parent)+len(key)+len(postfix)) + copy(keyBuf, parent) + copy(keyBuf[len(parent):], key) + copy(keyBuf[len(parent)+len(key):], postfix) return keyBuf } -// makePrefix concatenates prefix with parent into one byte slice. -func makePrefix(prefix []byte, parent []byte) []byte { - prefixBuf := make([]byte, len(prefix)+len(parent)) - copy(prefixBuf, prefix) - copy(prefixBuf[len(prefix):], parent) - - return prefixBuf -} - // makeBucketKey returns a bucket key from the passed parent bucket id and // the key. func makeBucketKey(parent []byte, key []byte) []byte { - return makeKey(bucketPrefix, parent, key) + return makeKey(parent, key, bucketPostfix) } // makeValueKey returns a value key from the passed parent bucket id and // the key. func makeValueKey(parent []byte, key []byte) []byte { - return makeKey(valuePrefix, parent, key) + return makeKey(parent, key, valuePostfix) } // makeSequenceKey returns a sequence key of the passed parent bucket id. func makeSequenceKey(parent []byte) []byte { - return makeKey(sequencePrefix, parent, nil) + keyBuf := make([]byte, len(sequencePrefix)+len(parent)) + copy(keyBuf, sequencePrefix) + copy(keyBuf[len(sequencePrefix):], parent) + return keyBuf } -// makeBucketPrefix returns the bucket prefix of the passed parent bucket id. -// This prefix is used for all sub buckets. -func makeBucketPrefix(parent []byte) []byte { - return makePrefix(bucketPrefix, parent) +// isBucketKey returns true if the passed key is a bucket key, meaning it +// keys a bucket name. +func isBucketKey(key string) bool { + if len(key) < bucketIDLength+1 { + return false + } + + return key[len(key)-1] == bucketPostfix[0] } -// makeValuePrefix returns the value prefix of the passed parent bucket id. -// This prefix is used for all key/values in the bucket. -func makeValuePrefix(parent []byte) []byte { - return makePrefix(valuePrefix, parent) +// getKey chops out the key from the raw key (by removing the bucket id +// prefixing the key and the postfix indicating whether it is a bucket or +// a value key) +func getKey(rawKey string) []byte { + return []byte(rawKey[bucketIDLength : len(rawKey)-1]) +} + +// getKeyVal chops out the key from the raw key (by removing the bucket id +// prefixing the key and the postfix indicating whether it is a bucket or +// a value key) and also returns the appropriate value for the key, which is +// nil in case of buckets (or the set value otherwise). +func getKeyVal(kv *KV) ([]byte, []byte) { + var val []byte + + if !isBucketKey(kv.key) { + val = []byte(kv.val) + } + + return getKey(kv.key), val } diff --git a/channeldb/kvdb/etcd/readwrite_bucket.go b/channeldb/kvdb/etcd/readwrite_bucket.go index e60d2cec..20af7d92 100644 --- a/channeldb/kvdb/etcd/readwrite_bucket.go +++ b/channeldb/kvdb/etcd/readwrite_bucket.go @@ -46,44 +46,23 @@ func (b *readWriteBucket) NestedReadBucket(key []byte) walletdb.ReadBucket { // is nil, but it does not include the key/value pairs within those // nested buckets. func (b *readWriteBucket) ForEach(cb func(k, v []byte) error) error { - prefix := makeValuePrefix(b.id) - prefixLen := len(prefix) + prefix := string(b.id) // Get the first matching key that is in the bucket. - kv, err := b.tx.stm.First(string(prefix)) + kv, err := b.tx.stm.First(prefix) if err != nil { return err } for kv != nil { - if err := cb([]byte(kv.key[prefixLen:]), []byte(kv.val)); err != nil { + key, val := getKeyVal(kv) + + if err := cb(key, val); err != nil { return err } // Step to the next key. - kv, err = b.tx.stm.Next(string(prefix), kv.key) - if err != nil { - return err - } - } - - // Make a bucket prefix. This prefixes all sub buckets. - prefix = makeBucketPrefix(b.id) - prefixLen = len(prefix) - - // Get the first bucket. - kv, err = b.tx.stm.First(string(prefix)) - if err != nil { - return err - } - - for kv != nil { - if err := cb([]byte(kv.key[prefixLen:]), nil); err != nil { - return err - } - - // Step to the next bucket. - kv, err = b.tx.stm.Next(string(prefix), kv.key) + kv, err = b.tx.stm.Next(prefix, kv.key) if err != nil { return err } @@ -241,10 +220,7 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error { id := queue[0] queue = queue[1:] - // Delete values in the current bucket - valuePrefix := string(makeValuePrefix(id)) - - kv, err := b.tx.stm.First(valuePrefix) + kv, err := b.tx.stm.First(string(id)) if err != nil { return err } @@ -252,35 +228,23 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error { for kv != nil { b.tx.del(kv.key) - kv, err = b.tx.stm.Next(valuePrefix, kv.key) + if isBucketKey(kv.key) { + queue = append(queue, []byte(kv.val)) + } + + kv, err = b.tx.stm.Next(string(id), kv.key) if err != nil { return err } } - // Iterate sub buckets - bucketPrefix := string(makeBucketPrefix(id)) - - kv, err = b.tx.stm.First(bucketPrefix) - if err != nil { - return err - } - - for kv != nil { - // Delete sub bucket key. - b.tx.del(kv.key) - // Queue it for traversal. - queue = append(queue, []byte(kv.val)) - - kv, err = b.tx.stm.Next(bucketPrefix, kv.key) - if err != nil { - return err - } - } + // Finally delete the sequence key for the bucket. + b.tx.del(string(makeSequenceKey(id))) } - // Delete the top level bucket. + // Delete the top level bucket and sequence key. b.tx.del(bucketKey) + b.tx.del(string(makeSequenceKey(bucketVal))) return nil } diff --git a/channeldb/kvdb/etcd/readwrite_bucket_test.go b/channeldb/kvdb/etcd/readwrite_bucket_test.go index f5de23b5..6fb32136 100644 --- a/channeldb/kvdb/etcd/readwrite_bucket_test.go +++ b/channeldb/kvdb/etcd/readwrite_bucket_test.go @@ -315,7 +315,7 @@ func TestBucketForEachWithError(t *testing.T) { i := 0 // Error while iterating value keys. err = apple.ForEach(func(key, val []byte) error { - if i == 1 { + if i == 2 { return fmt.Errorf("error") } @@ -325,7 +325,8 @@ func TestBucketForEachWithError(t *testing.T) { }) expected := map[string]string{ - "key1": "val1", + "banana": "", + "key1": "val1", } require.Equal(t, expected, got) @@ -345,9 +346,9 @@ func TestBucketForEachWithError(t *testing.T) { }) expected = map[string]string{ + "banana": "", "key1": "val1", "key2": "val2", - "banana": "", } require.Equal(t, expected, got) diff --git a/channeldb/kvdb/etcd/readwrite_cursor.go b/channeldb/kvdb/etcd/readwrite_cursor.go index 98965693..75c0456d 100644 --- a/channeldb/kvdb/etcd/readwrite_cursor.go +++ b/channeldb/kvdb/etcd/readwrite_cursor.go @@ -19,7 +19,7 @@ type readWriteCursor struct { func newReadWriteCursor(bucket *readWriteBucket) *readWriteCursor { return &readWriteCursor{ bucket: bucket, - prefix: string(makeValuePrefix(bucket.id)), + prefix: string(bucket.id), } } @@ -35,8 +35,7 @@ func (c *readWriteCursor) First() (key, value []byte) { if kv != nil { c.currKey = kv.key - // Chop the prefix and return the key/value. - return []byte(kv.key[len(c.prefix):]), []byte(kv.val) + return getKeyVal(kv) } return nil, nil @@ -53,8 +52,7 @@ func (c *readWriteCursor) Last() (key, value []byte) { if kv != nil { c.currKey = kv.key - // Chop the prefix and return the key/value. - return []byte(kv.key[len(c.prefix):]), []byte(kv.val) + return getKeyVal(kv) } return nil, nil @@ -71,8 +69,7 @@ func (c *readWriteCursor) Next() (key, value []byte) { if kv != nil { c.currKey = kv.key - // Chop the prefix and return the key/value. - return []byte(kv.key[len(c.prefix):]), []byte(kv.val) + return getKeyVal(kv) } return nil, nil @@ -89,8 +86,7 @@ func (c *readWriteCursor) Prev() (key, value []byte) { if kv != nil { c.currKey = kv.key - // Chop the prefix and return the key/value. - return []byte(kv.key[len(c.prefix):]), []byte(kv.val) + return getKeyVal(kv) } return nil, nil @@ -115,8 +111,7 @@ func (c *readWriteCursor) Seek(seek []byte) (key, value []byte) { if kv != nil { c.currKey = kv.key - // Chop the prefix and return the key/value. - return []byte(kv.key[len(c.prefix):]), []byte(kv.val) + return getKeyVal(kv) } return nil, nil @@ -133,11 +128,14 @@ func (c *readWriteCursor) Delete() error { return err } - // Delete the current key. - c.bucket.tx.stm.Del(c.currKey) + if isBucketKey(c.currKey) { + c.bucket.DeleteNestedBucket(getKey(c.currKey)) + } else { + c.bucket.Delete(getKey(c.currKey)) + } - // Set current key to the next one if possible. if nextKey != nil { + // Set current key to the next one. c.currKey = nextKey.key } diff --git a/channeldb/kvdb/etcd/readwrite_cursor_test.go b/channeldb/kvdb/etcd/readwrite_cursor_test.go index bc457e7e..216b47c4 100644 --- a/channeldb/kvdb/etcd/readwrite_cursor_test.go +++ b/channeldb/kvdb/etcd/readwrite_cursor_test.go @@ -291,3 +291,78 @@ func TestReadWriteCursor(t *testing.T) { } require.Equal(t, expected, f.Dump()) } + +// TestReadWriteCursorWithBucketAndValue tests that cursors are able to iterate +// over both bucket and value keys if both are present in the iterated bucket. +func TestReadWriteCursorWithBucketAndValue(t *testing.T) { + t.Parallel() + + f := NewEtcdTestFixture(t) + defer f.Cleanup() + + db, err := newEtcdBackend(f.BackendConfig()) + require.NoError(t, err) + + // Pre-store the first half of the interval. + require.NoError(t, db.Update(func(tx walletdb.ReadWriteTx) error { + b, err := tx.CreateTopLevelBucket([]byte("apple")) + require.NoError(t, err) + require.NotNil(t, b) + + require.NoError(t, b.Put([]byte("key"), []byte("val"))) + + b1, err := b.CreateBucket([]byte("banana")) + require.NoError(t, err) + require.NotNil(t, b1) + + b2, err := b.CreateBucket([]byte("pear")) + require.NoError(t, err) + require.NotNil(t, b2) + + return nil + })) + + err = db.View(func(tx walletdb.ReadTx) error { + b := tx.ReadBucket([]byte("apple")) + require.NotNil(t, b) + + cursor := b.ReadCursor() + + // First on valid interval. + k, v := cursor.First() + require.Equal(t, []byte("banana"), k) + require.Nil(t, v) + + k, v = cursor.Next() + require.Equal(t, []byte("key"), k) + require.Equal(t, []byte("val"), v) + + k, v = cursor.Last() + require.Equal(t, []byte("pear"), k) + require.Nil(t, v) + + k, v = cursor.Seek([]byte("k")) + require.Equal(t, []byte("key"), k) + require.Equal(t, []byte("val"), v) + + k, v = cursor.Seek([]byte("banana")) + require.Equal(t, []byte("banana"), k) + require.Nil(t, v) + + k, v = cursor.Next() + require.Equal(t, []byte("key"), k) + require.Equal(t, []byte("val"), v) + + return nil + }) + + require.NoError(t, err) + + expected := map[string]string{ + bkey("apple"): bval("apple"), + bkey("apple", "banana"): bval("apple", "banana"), + bkey("apple", "pear"): bval("apple", "pear"), + vkey("key", "apple"): "val", + } + require.Equal(t, expected, f.Dump()) +}