From 5346ed8a5c1aba8bdec44f5dc37544487a215982 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Tue, 21 Jul 2020 18:26:13 +0200 Subject: [PATCH] kvdb+etcd: assert on bucket/value key when putting value/bucket This commit extends compatibility with the bbolt kvdb implementation, which returns ErrIncompatibleValue in case of a bucket/value key collision. Furthermore the commit also adds an extra precondition to the transaction when a key doesn't exist. This is needed as we fix reads to a snapshot revision and other writers may commit the key otherwise. --- channeldb/kvdb/etcd/readwrite_bucket.go | 49 ++++++-- channeldb/kvdb/etcd/readwrite_bucket_test.go | 118 +++++++++++++++++++ channeldb/kvdb/etcd/readwrite_tx.go | 10 -- channeldb/kvdb/etcd/stm.go | 25 +++- 4 files changed, 180 insertions(+), 22 deletions(-) diff --git a/channeldb/kvdb/etcd/readwrite_bucket.go b/channeldb/kvdb/etcd/readwrite_bucket.go index f97268d9..dafab5ff 100644 --- a/channeldb/kvdb/etcd/readwrite_bucket.go +++ b/channeldb/kvdb/etcd/readwrite_bucket.go @@ -116,6 +116,20 @@ func (b *readWriteBucket) NestedReadWriteBucket(key []byte) walletdb.ReadWriteBu return newReadWriteBucket(b.tx, bucketKey, bucketVal) } +// assertNoValue checks if the value for the passed key exists. +func (b *readWriteBucket) assertNoValue(key []byte) error { + val, err := b.tx.stm.Get(string(makeValueKey(b.id, key))) + if err != nil { + return err + } + + if val != nil { + return walletdb.ErrIncompatibleValue + } + + return nil +} + // CreateBucket creates and returns a new nested bucket with the given // key. Returns ErrBucketExists if the bucket already exists, // ErrBucketNameRequired if the key is empty, or ErrIncompatibleValue @@ -141,11 +155,15 @@ func (b *readWriteBucket) CreateBucket(key []byte) ( return nil, walletdb.ErrBucketExists } + if err := b.assertNoValue(key); err != nil { + return nil, err + } + // Create a deterministic bucket id from the bucket key. newID := makeBucketID(bucketKey) // Create the bucket. - b.tx.put(string(bucketKey), string(newID[:])) + b.tx.stm.Put(string(bucketKey), string(newID[:])) return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil } @@ -171,8 +189,12 @@ func (b *readWriteBucket) CreateBucketIfNotExists(key []byte) ( } if !isValidBucketID(bucketVal) { + if err := b.assertNoValue(key); err != nil { + return nil, err + } + newID := makeBucketID(bucketKey) - b.tx.put(string(bucketKey), string(newID[:])) + b.tx.stm.Put(string(bucketKey), string(newID[:])) return newReadWriteBucket(b.tx, bucketKey, newID[:]), nil } @@ -220,7 +242,7 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error { } for kv != nil { - b.tx.del(kv.key) + b.tx.stm.Del(kv.key) if isBucketKey(kv.key) { queue = append(queue, []byte(kv.val)) @@ -233,12 +255,12 @@ func (b *readWriteBucket) DeleteNestedBucket(key []byte) error { } // Finally delete the sequence key for the bucket. - b.tx.del(string(makeSequenceKey(id))) + b.tx.stm.Del(string(makeSequenceKey(id))) } // Delete the top level bucket and sequence key. - b.tx.del(bucketKey) - b.tx.del(string(makeSequenceKey(bucketVal))) + b.tx.stm.Del(bucketKey) + b.tx.stm.Del(string(makeSequenceKey(bucketVal))) return nil } @@ -250,8 +272,17 @@ func (b *readWriteBucket) Put(key, value []byte) error { return walletdb.ErrKeyRequired } + val, err := b.tx.stm.Get(string(makeBucketKey(b.id, key))) + if err != nil { + return err + } + + if val != nil { + return walletdb.ErrIncompatibleValue + } + // Update the transaction with the new value. - b.tx.put(string(makeValueKey(b.id, key)), string(value)) + b.tx.stm.Put(string(makeValueKey(b.id, key)), string(value)) return nil } @@ -264,7 +295,7 @@ func (b *readWriteBucket) Delete(key []byte) error { } // Update the transaction to delete the key/value. - b.tx.del(string(makeValueKey(b.id, key))) + b.tx.stm.Del(string(makeValueKey(b.id, key))) return nil } @@ -294,7 +325,7 @@ func (b *readWriteBucket) SetSequence(v uint64) error { val := strconv.FormatUint(v, 10) // Update the transaction with the new value for the sequence key. - b.tx.put(string(makeSequenceKey(b.id)), val) + b.tx.stm.Put(string(makeSequenceKey(b.id)), val) return nil } diff --git a/channeldb/kvdb/etcd/readwrite_bucket_test.go b/channeldb/kvdb/etcd/readwrite_bucket_test.go index 6fb32136..2795dce3 100644 --- a/channeldb/kvdb/etcd/readwrite_bucket_test.go +++ b/channeldb/kvdb/etcd/readwrite_bucket_test.go @@ -403,3 +403,121 @@ func TestBucketSequence(t *testing.T) { require.Nil(t, err) } + +// TestKeyClash tests that one cannot create a bucket if a value with the same +// key exists and the same is true in reverse: that a value cannot be put if +// a bucket with the same key exists. +func TestKeyClash(t *testing.T) { + t.Parallel() + + f := NewEtcdTestFixture(t) + defer f.Cleanup() + + db, err := newEtcdBackend(f.BackendConfig()) + require.NoError(t, err) + + // First: + // put: /apple/key -> val + // create bucket: /apple/banana + err = db.Update(func(tx walletdb.ReadWriteTx) error { + apple, err := tx.CreateTopLevelBucket([]byte("apple")) + require.Nil(t, err) + require.NotNil(t, apple) + + require.NoError(t, apple.Put([]byte("key"), []byte("val"))) + + banana, err := apple.CreateBucket([]byte("banana")) + require.Nil(t, err) + require.NotNil(t, banana) + + return nil + }) + + require.Nil(t, err) + + // Next try to: + // put: /apple/banana -> val => will fail (as /apple/banana is a bucket) + // create bucket: /apple/key => will fail (as /apple/key is a value) + err = db.Update(func(tx walletdb.ReadWriteTx) error { + apple, err := tx.CreateTopLevelBucket([]byte("apple")) + require.Nil(t, err) + require.NotNil(t, apple) + + require.Error(t, + walletdb.ErrIncompatibleValue, + apple.Put([]byte("banana"), []byte("val")), + ) + + b, err := apple.CreateBucket([]byte("key")) + require.Nil(t, b) + require.Error(t, walletdb.ErrIncompatibleValue, b) + + b, err = apple.CreateBucketIfNotExists([]byte("key")) + require.Nil(t, b) + require.Error(t, walletdb.ErrIncompatibleValue, b) + + return nil + }) + + require.Nil(t, err) + + // Except that the only existing items in the db are: + // bucket: /apple + // bucket: /apple/banana + // value: /apple/key -> val + expected := map[string]string{ + bkey("apple"): bval("apple"), + bkey("apple", "banana"): bval("apple", "banana"), + vkey("key", "apple"): "val", + } + require.Equal(t, expected, f.Dump()) + +} + +// TestBucketCreateDelete tests that creating then deleting then creating a +// bucket suceeds. +func TestBucketCreateDelete(t *testing.T) { + t.Parallel() + f := NewEtcdTestFixture(t) + defer f.Cleanup() + + db, err := newEtcdBackend(f.BackendConfig()) + require.NoError(t, err) + + err = db.Update(func(tx walletdb.ReadWriteTx) error { + apple, err := tx.CreateTopLevelBucket([]byte("apple")) + require.NoError(t, err) + require.NotNil(t, apple) + + banana, err := apple.CreateBucket([]byte("banana")) + require.NoError(t, err) + require.NotNil(t, banana) + + return nil + }) + require.NoError(t, err) + + err = db.Update(func(tx walletdb.ReadWriteTx) error { + apple := tx.ReadWriteBucket([]byte("apple")) + require.NotNil(t, apple) + require.NoError(t, apple.DeleteNestedBucket([]byte("banana"))) + + return nil + }) + require.NoError(t, err) + + err = db.Update(func(tx walletdb.ReadWriteTx) error { + apple := tx.ReadWriteBucket([]byte("apple")) + require.NotNil(t, apple) + require.NoError(t, apple.Put([]byte("banana"), []byte("value"))) + + return nil + }) + require.NoError(t, err) + + expected := map[string]string{ + vkey("banana", "apple"): "value", + bkey("apple"): bval("apple"), + } + require.Equal(t, expected, f.Dump()) +} diff --git a/channeldb/kvdb/etcd/readwrite_tx.go b/channeldb/kvdb/etcd/readwrite_tx.go index aed00a17..81c27323 100644 --- a/channeldb/kvdb/etcd/readwrite_tx.go +++ b/channeldb/kvdb/etcd/readwrite_tx.go @@ -34,16 +34,6 @@ func rootBucket(tx *readWriteTx) *readWriteBucket { return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:]) } -// put updates the passed key/value. -func (tx *readWriteTx) put(key, val string) { - tx.stm.Put(key, val) -} - -// del marks the passed key deleted. -func (tx *readWriteTx) del(key string) { - tx.stm.Del(key) -} - // ReadBucket opens the root bucket for read only access. If the bucket // described by the key does not exist, nil is returned. func (tx *readWriteTx) ReadBucket(key []byte) walletdb.ReadBucket { diff --git a/channeldb/kvdb/etcd/stm.go b/channeldb/kvdb/etcd/stm.go index c13e8f96..14bb9ca9 100644 --- a/channeldb/kvdb/etcd/stm.go +++ b/channeldb/kvdb/etcd/stm.go @@ -352,6 +352,15 @@ func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) { } } + if len(resp.Kvs) == 0 { + // Add assertion to the read set which will extend our commit + // constraint such that the commit will fail if the key is + // present in the database. + s.rset[key] = stmGet{ + rev: 0, + } + } + var result []KV // Fill the read set with key/values returned. @@ -395,12 +404,22 @@ func (s *stm) Get(key string) ([]byte, error) { // the prefetch set. if getValue, ok := s.prefetch[key]; ok { delete(s.prefetch, key) - s.rset[key] = getValue + + // Use the prefetched value only if it is for + // an existing key. + if getValue.rev != 0 { + s.rset[key] = getValue + } } // Return value if alread in read set. - if getVal, ok := s.rset[key]; ok { - return []byte(getVal.val), nil + if getValue, ok := s.rset[key]; ok { + // Return the value if the rset contains an existing key. + if getValue.rev != 0 { + return []byte(getValue.val), nil + } else { + return nil, nil + } } // Fetch and return value.