kvdb+etcd: assert on bucket/value key when putting value/bucket
This commit extends compatibility with the bbolt kvdb implementation, which returns ErrIncompatibleValue in case of a bucket/value key collision. Furthermore the commit also adds an extra precondition to the transaction when a key doesn't exist. This is needed as we fix reads to a snapshot revision and other writers may commit the key otherwise.
This commit is contained in:
parent
12a341ba59
commit
5346ed8a5c
@ -116,6 +116,20 @@ func (b *readWriteBucket) NestedReadWriteBucket(key []byte) walletdb.ReadWriteBu
|
|||||||
return newReadWriteBucket(b.tx, bucketKey, bucketVal)
|
return newReadWriteBucket(b.tx, bucketKey, bucketVal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// assertNoValue checks if the value for the passed key exists.
|
||||||
|
func (b *readWriteBucket) assertNoValue(key []byte) error {
|
||||||
|
val, err := b.tx.stm.Get(string(makeValueKey(b.id, key)))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if val != nil {
|
||||||
|
return walletdb.ErrIncompatibleValue
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// CreateBucket creates and returns a new nested bucket with the given
|
// CreateBucket creates and returns a new nested bucket with the given
|
||||||
// key. Returns ErrBucketExists if the bucket already exists,
|
// key. Returns ErrBucketExists if the bucket already exists,
|
||||||
// ErrBucketNameRequired if the key is empty, or ErrIncompatibleValue
|
// ErrBucketNameRequired if the key is empty, or ErrIncompatibleValue
|
||||||
@ -141,11 +155,15 @@ func (b *readWriteBucket) CreateBucket(key []byte) (
|
|||||||
return nil, walletdb.ErrBucketExists
|
return nil, walletdb.ErrBucketExists
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := b.assertNoValue(key); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// Create a deterministic bucket id from the bucket key.
|
// Create a deterministic bucket id from the bucket key.
|
||||||
newID := makeBucketID(bucketKey)
|
newID := makeBucketID(bucketKey)
|
||||||
|
|
||||||
// Create the bucket.
|
// Create the bucket.
|
||||||
b.tx.put(string(bucketKey), string(newID[:]))
|
b.tx.stm.Put(string(bucketKey), string(newID[:]))
|
||||||
|
|
||||||
return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil
|
return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil
|
||||||
}
|
}
|
||||||
@ -171,8 +189,12 @@ func (b *readWriteBucket) CreateBucketIfNotExists(key []byte) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !isValidBucketID(bucketVal) {
|
if !isValidBucketID(bucketVal) {
|
||||||
|
if err := b.assertNoValue(key); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
newID := makeBucketID(bucketKey)
|
newID := makeBucketID(bucketKey)
|
||||||
b.tx.put(string(bucketKey), string(newID[:]))
|
b.tx.stm.Put(string(bucketKey), string(newID[:]))
|
||||||
|
|
||||||
return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil
|
return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil
|
||||||
}
|
}
|
||||||
@ -220,7 +242,7 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for kv != nil {
|
for kv != nil {
|
||||||
b.tx.del(kv.key)
|
b.tx.stm.Del(kv.key)
|
||||||
|
|
||||||
if isBucketKey(kv.key) {
|
if isBucketKey(kv.key) {
|
||||||
queue = append(queue, []byte(kv.val))
|
queue = append(queue, []byte(kv.val))
|
||||||
@ -233,12 +255,12 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Finally delete the sequence key for the bucket.
|
// Finally delete the sequence key for the bucket.
|
||||||
b.tx.del(string(makeSequenceKey(id)))
|
b.tx.stm.Del(string(makeSequenceKey(id)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the top level bucket and sequence key.
|
// Delete the top level bucket and sequence key.
|
||||||
b.tx.del(bucketKey)
|
b.tx.stm.Del(bucketKey)
|
||||||
b.tx.del(string(makeSequenceKey(bucketVal)))
|
b.tx.stm.Del(string(makeSequenceKey(bucketVal)))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -250,8 +272,17 @@ func (b *readWriteBucket) Put(key, value []byte) error {
|
|||||||
return walletdb.ErrKeyRequired
|
return walletdb.ErrKeyRequired
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val, err := b.tx.stm.Get(string(makeBucketKey(b.id, key)))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if val != nil {
|
||||||
|
return walletdb.ErrIncompatibleValue
|
||||||
|
}
|
||||||
|
|
||||||
// Update the transaction with the new value.
|
// Update the transaction with the new value.
|
||||||
b.tx.put(string(makeValueKey(b.id, key)), string(value))
|
b.tx.stm.Put(string(makeValueKey(b.id, key)), string(value))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -264,7 +295,7 @@ func (b *readWriteBucket) Delete(key []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Update the transaction to delete the key/value.
|
// Update the transaction to delete the key/value.
|
||||||
b.tx.del(string(makeValueKey(b.id, key)))
|
b.tx.stm.Del(string(makeValueKey(b.id, key)))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -294,7 +325,7 @@ func (b *readWriteBucket) SetSequence(v uint64) error {
|
|||||||
val := strconv.FormatUint(v, 10)
|
val := strconv.FormatUint(v, 10)
|
||||||
|
|
||||||
// Update the transaction with the new value for the sequence key.
|
// Update the transaction with the new value for the sequence key.
|
||||||
b.tx.put(string(makeSequenceKey(b.id)), val)
|
b.tx.stm.Put(string(makeSequenceKey(b.id)), val)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -403,3 +403,121 @@ func TestBucketSequence(t *testing.T) {
|
|||||||
|
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestKeyClash tests that one cannot create a bucket if a value with the same
|
||||||
|
// key exists and the same is true in reverse: that a value cannot be put if
|
||||||
|
// a bucket with the same key exists.
|
||||||
|
func TestKeyClash(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
f := NewEtcdTestFixture(t)
|
||||||
|
defer f.Cleanup()
|
||||||
|
|
||||||
|
db, err := newEtcdBackend(f.BackendConfig())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// First:
|
||||||
|
// put: /apple/key -> val
|
||||||
|
// create bucket: /apple/banana
|
||||||
|
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||||
|
apple, err := tx.CreateTopLevelBucket([]byte("apple"))
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, apple)
|
||||||
|
|
||||||
|
require.NoError(t, apple.Put([]byte("key"), []byte("val")))
|
||||||
|
|
||||||
|
banana, err := apple.CreateBucket([]byte("banana"))
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, banana)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
// Next try to:
|
||||||
|
// put: /apple/banana -> val => will fail (as /apple/banana is a bucket)
|
||||||
|
// create bucket: /apple/key => will fail (as /apple/key is a value)
|
||||||
|
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||||
|
apple, err := tx.CreateTopLevelBucket([]byte("apple"))
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, apple)
|
||||||
|
|
||||||
|
require.Error(t,
|
||||||
|
walletdb.ErrIncompatibleValue,
|
||||||
|
apple.Put([]byte("banana"), []byte("val")),
|
||||||
|
)
|
||||||
|
|
||||||
|
b, err := apple.CreateBucket([]byte("key"))
|
||||||
|
require.Nil(t, b)
|
||||||
|
require.Error(t, walletdb.ErrIncompatibleValue, b)
|
||||||
|
|
||||||
|
b, err = apple.CreateBucketIfNotExists([]byte("key"))
|
||||||
|
require.Nil(t, b)
|
||||||
|
require.Error(t, walletdb.ErrIncompatibleValue, b)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
// Except that the only existing items in the db are:
|
||||||
|
// bucket: /apple
|
||||||
|
// bucket: /apple/banana
|
||||||
|
// value: /apple/key -> val
|
||||||
|
expected := map[string]string{
|
||||||
|
bkey("apple"): bval("apple"),
|
||||||
|
bkey("apple", "banana"): bval("apple", "banana"),
|
||||||
|
vkey("key", "apple"): "val",
|
||||||
|
}
|
||||||
|
require.Equal(t, expected, f.Dump())
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBucketCreateDelete tests that creating then deleting then creating a
|
||||||
|
// bucket suceeds.
|
||||||
|
func TestBucketCreateDelete(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
f := NewEtcdTestFixture(t)
|
||||||
|
defer f.Cleanup()
|
||||||
|
|
||||||
|
db, err := newEtcdBackend(f.BackendConfig())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||||
|
apple, err := tx.CreateTopLevelBucket([]byte("apple"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, apple)
|
||||||
|
|
||||||
|
banana, err := apple.CreateBucket([]byte("banana"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, banana)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||||
|
apple := tx.ReadWriteBucket([]byte("apple"))
|
||||||
|
require.NotNil(t, apple)
|
||||||
|
require.NoError(t, apple.DeleteNestedBucket([]byte("banana")))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||||
|
apple := tx.ReadWriteBucket([]byte("apple"))
|
||||||
|
require.NotNil(t, apple)
|
||||||
|
require.NoError(t, apple.Put([]byte("banana"), []byte("value")))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
expected := map[string]string{
|
||||||
|
vkey("banana", "apple"): "value",
|
||||||
|
bkey("apple"): bval("apple"),
|
||||||
|
}
|
||||||
|
require.Equal(t, expected, f.Dump())
|
||||||
|
}
|
||||||
|
@ -34,16 +34,6 @@ func rootBucket(tx *readWriteTx) *readWriteBucket {
|
|||||||
return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:])
|
return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// put updates the passed key/value.
|
|
||||||
func (tx *readWriteTx) put(key, val string) {
|
|
||||||
tx.stm.Put(key, val)
|
|
||||||
}
|
|
||||||
|
|
||||||
// del marks the passed key deleted.
|
|
||||||
func (tx *readWriteTx) del(key string) {
|
|
||||||
tx.stm.Del(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadBucket opens the root bucket for read only access. If the bucket
|
// ReadBucket opens the root bucket for read only access. If the bucket
|
||||||
// described by the key does not exist, nil is returned.
|
// described by the key does not exist, nil is returned.
|
||||||
func (tx *readWriteTx) ReadBucket(key []byte) walletdb.ReadBucket {
|
func (tx *readWriteTx) ReadBucket(key []byte) walletdb.ReadBucket {
|
||||||
|
@ -352,6 +352,15 @@ func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(resp.Kvs) == 0 {
|
||||||
|
// Add assertion to the read set which will extend our commit
|
||||||
|
// constraint such that the commit will fail if the key is
|
||||||
|
// present in the database.
|
||||||
|
s.rset[key] = stmGet{
|
||||||
|
rev: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var result []KV
|
var result []KV
|
||||||
|
|
||||||
// Fill the read set with key/values returned.
|
// Fill the read set with key/values returned.
|
||||||
@ -395,12 +404,22 @@ func (s *stm) Get(key string) ([]byte, error) {
|
|||||||
// the prefetch set.
|
// the prefetch set.
|
||||||
if getValue, ok := s.prefetch[key]; ok {
|
if getValue, ok := s.prefetch[key]; ok {
|
||||||
delete(s.prefetch, key)
|
delete(s.prefetch, key)
|
||||||
s.rset[key] = getValue
|
|
||||||
|
// Use the prefetched value only if it is for
|
||||||
|
// an existing key.
|
||||||
|
if getValue.rev != 0 {
|
||||||
|
s.rset[key] = getValue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return value if alread in read set.
|
// Return value if alread in read set.
|
||||||
if getVal, ok := s.rset[key]; ok {
|
if getValue, ok := s.rset[key]; ok {
|
||||||
return []byte(getVal.val), nil
|
// Return the value if the rset contains an existing key.
|
||||||
|
if getValue.rev != 0 {
|
||||||
|
return []byte(getValue.val), nil
|
||||||
|
} else {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch and return value.
|
// Fetch and return value.
|
||||||
|
Loading…
Reference in New Issue
Block a user