etcd: add namespace support to separate key spaces
This commit extends etcd db with namespaces without additional storage space requirements. This is simply done by instead of using an all zero root bucket id, we use the sha256 hash of the name space as our root bucket id.
This commit is contained in:
parent
bce0597643
commit
c3fcfd1530
@ -16,12 +16,6 @@ var (
|
|||||||
sequencePrefix = []byte("$")
|
sequencePrefix = []byte("$")
|
||||||
)
|
)
|
||||||
|
|
||||||
// rootBucketId returns a zero filled 32 byte array
|
|
||||||
func rootBucketID() []byte {
|
|
||||||
var rootID [bucketIDLength]byte
|
|
||||||
return rootID[:]
|
|
||||||
}
|
|
||||||
|
|
||||||
// makeBucketID returns a deterministic key for the passed byte slice.
|
// makeBucketID returns a deterministic key for the passed byte slice.
|
||||||
// Currently it returns the sha256 hash of the slice.
|
// Currently it returns the sha256 hash of the slice.
|
||||||
func makeBucketID(key []byte) [bucketIDLength]byte {
|
func makeBucketID(key []byte) [bucketIDLength]byte {
|
||||||
|
@ -7,7 +7,8 @@ package etcd
|
|||||||
func bkey(buckets ...string) string {
|
func bkey(buckets ...string) string {
|
||||||
var bucketKey []byte
|
var bucketKey []byte
|
||||||
|
|
||||||
parent := rootBucketID()
|
rootID := makeBucketID([]byte(""))
|
||||||
|
parent := rootID[:]
|
||||||
|
|
||||||
for _, bucketName := range buckets {
|
for _, bucketName := range buckets {
|
||||||
bucketKey = makeBucketKey(parent, []byte(bucketName))
|
bucketKey = makeBucketKey(parent, []byte(bucketName))
|
||||||
@ -28,7 +29,8 @@ func bval(buckets ...string) string {
|
|||||||
// vkey is a helper function used in tests to create a value key from the
|
// vkey is a helper function used in tests to create a value key from the
|
||||||
// passed key and bucket list.
|
// passed key and bucket list.
|
||||||
func vkey(key string, buckets ...string) string {
|
func vkey(key string, buckets ...string) string {
|
||||||
bucket := rootBucketID()
|
rootID := makeBucketID([]byte(""))
|
||||||
|
bucket := rootID[:]
|
||||||
|
|
||||||
for _, bucketName := range buckets {
|
for _, bucketName := range buckets {
|
||||||
bucketKey := makeBucketKey(bucket, []byte(bucketName))
|
bucketKey := makeBucketKey(bucket, []byte(bucketName))
|
||||||
|
@ -143,6 +143,11 @@ type BackendConfig struct {
|
|||||||
// skip TLS verification.
|
// skip TLS verification.
|
||||||
InsecureSkipVerify bool
|
InsecureSkipVerify bool
|
||||||
|
|
||||||
|
// Prefix the hash of the prefix will be used as the root
|
||||||
|
// bucket id. This enables key space separation similar to
|
||||||
|
// name spaces.
|
||||||
|
Prefix string
|
||||||
|
|
||||||
// CollectCommitStats indicates wheter to commit commit stats.
|
// CollectCommitStats indicates wheter to commit commit stats.
|
||||||
CollectCommitStats bool
|
CollectCommitStats bool
|
||||||
}
|
}
|
||||||
@ -203,7 +208,7 @@ func (db *db) getSTMOptions() []STMOptionFunc {
|
|||||||
// occur).
|
// occur).
|
||||||
func (db *db) View(f func(tx walletdb.ReadTx) error) error {
|
func (db *db) View(f func(tx walletdb.ReadTx) error) error {
|
||||||
apply := func(stm STM) error {
|
apply := func(stm STM) error {
|
||||||
return f(newReadWriteTx(stm))
|
return f(newReadWriteTx(stm, db.config.Prefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
return RunSTM(db.cli, apply, db.getSTMOptions()...)
|
return RunSTM(db.cli, apply, db.getSTMOptions()...)
|
||||||
@ -217,7 +222,7 @@ func (db *db) View(f func(tx walletdb.ReadTx) error) error {
|
|||||||
// returned.
|
// returned.
|
||||||
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error {
|
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error {
|
||||||
apply := func(stm STM) error {
|
apply := func(stm STM) error {
|
||||||
return f(newReadWriteTx(stm))
|
return f(newReadWriteTx(stm, db.config.Prefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
return RunSTM(db.cli, apply, db.getSTMOptions()...)
|
return RunSTM(db.cli, apply, db.getSTMOptions()...)
|
||||||
@ -234,12 +239,18 @@ func (db *db) PrintStats() string {
|
|||||||
|
|
||||||
// BeginReadTx opens a database read transaction.
|
// BeginReadTx opens a database read transaction.
|
||||||
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
|
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
|
||||||
return newReadWriteTx(NewSTM(db.cli, db.getSTMOptions()...)), nil
|
return newReadWriteTx(
|
||||||
|
NewSTM(db.cli, db.getSTMOptions()...),
|
||||||
|
db.config.Prefix,
|
||||||
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginReadWriteTx opens a database read+write transaction.
|
// BeginReadWriteTx opens a database read+write transaction.
|
||||||
func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
|
func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
|
||||||
return newReadWriteTx(NewSTM(db.cli, db.getSTMOptions()...)), nil
|
return newReadWriteTx(
|
||||||
|
NewSTM(db.cli, db.getSTMOptions()...),
|
||||||
|
db.config.Prefix,
|
||||||
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy writes a copy of the database to the provided writer. This call will
|
// Copy writes a copy of the database to the provided writer. This call will
|
||||||
|
@ -24,7 +24,7 @@ type readWriteBucket struct {
|
|||||||
// newReadWriteBucket creates a new rw bucket with the passed transaction
|
// newReadWriteBucket creates a new rw bucket with the passed transaction
|
||||||
// and bucket id.
|
// and bucket id.
|
||||||
func newReadWriteBucket(tx *readWriteTx, key, id []byte) *readWriteBucket {
|
func newReadWriteBucket(tx *readWriteTx, key, id []byte) *readWriteBucket {
|
||||||
if !bytes.Equal(id, rootBucketID()) {
|
if !bytes.Equal(id, tx.rootBucketID[:]) {
|
||||||
// Add the bucket key/value to the lock set.
|
// Add the bucket key/value to the lock set.
|
||||||
tx.lock(string(key), string(id))
|
tx.lock(string(key), string(id))
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,10 @@ type readWriteTx struct {
|
|||||||
// stm is the reference to the parent STM.
|
// stm is the reference to the parent STM.
|
||||||
stm STM
|
stm STM
|
||||||
|
|
||||||
|
// rootBucketID holds the sha256 hash of the root bucket id, which is used
|
||||||
|
// for key space spearation.
|
||||||
|
rootBucketID [bucketIDLength]byte
|
||||||
|
|
||||||
// active is true if the transaction hasn't been committed yet.
|
// active is true if the transaction hasn't been committed yet.
|
||||||
active bool
|
active bool
|
||||||
|
|
||||||
@ -24,18 +28,19 @@ type readWriteTx struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newReadWriteTx creates an rw transaction with the passed STM.
|
// newReadWriteTx creates an rw transaction with the passed STM.
|
||||||
func newReadWriteTx(stm STM) *readWriteTx {
|
func newReadWriteTx(stm STM, prefix string) *readWriteTx {
|
||||||
return &readWriteTx{
|
return &readWriteTx{
|
||||||
stm: stm,
|
stm: stm,
|
||||||
active: true,
|
active: true,
|
||||||
|
rootBucketID: makeBucketID([]byte(prefix)),
|
||||||
lset: make(map[string]string),
|
lset: make(map[string]string),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// rooBucket is a helper function to return the always present
|
// rooBucket is a helper function to return the always present
|
||||||
// root bucket.
|
// pseudo root bucket.
|
||||||
func rootBucket(tx *readWriteTx) *readWriteBucket {
|
func rootBucket(tx *readWriteTx) *readWriteBucket {
|
||||||
return newReadWriteBucket(tx, rootBucketID(), rootBucketID())
|
return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock adds a key value to the lock set.
|
// lock adds a key value to the lock set.
|
||||||
|
@ -12,7 +12,7 @@ const TestBackend = EtcdBackendName
|
|||||||
|
|
||||||
// GetEtcdBackend returns an etcd backend configured according to the
|
// GetEtcdBackend returns an etcd backend configured according to the
|
||||||
// passed etcdConfig.
|
// passed etcdConfig.
|
||||||
func GetEtcdBackend(etcdConfig *EtcdConfig) (Backend, error) {
|
func GetEtcdBackend(prefix string, etcdConfig *EtcdConfig) (Backend, error) {
|
||||||
// Config translation is needed here in order to keep the
|
// Config translation is needed here in order to keep the
|
||||||
// etcd package fully independent from the rest of the source tree.
|
// etcd package fully independent from the rest of the source tree.
|
||||||
backendConfig := etcd.BackendConfig{
|
backendConfig := etcd.BackendConfig{
|
||||||
@ -22,6 +22,7 @@ func GetEtcdBackend(etcdConfig *EtcdConfig) (Backend, error) {
|
|||||||
CertFile: etcdConfig.CertFile,
|
CertFile: etcdConfig.CertFile,
|
||||||
KeyFile: etcdConfig.KeyFile,
|
KeyFile: etcdConfig.KeyFile,
|
||||||
InsecureSkipVerify: etcdConfig.InsecureSkipVerify,
|
InsecureSkipVerify: etcdConfig.InsecureSkipVerify,
|
||||||
|
Prefix: prefix,
|
||||||
CollectCommitStats: etcdConfig.CollectStats,
|
CollectCommitStats: etcdConfig.CollectStats,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ const TestBackend = BoltBackendName
|
|||||||
var errEtcdNotAvailable = fmt.Errorf("etcd backend not available")
|
var errEtcdNotAvailable = fmt.Errorf("etcd backend not available")
|
||||||
|
|
||||||
// GetEtcdBackend is a stub returning nil and errEtcdNotAvailable error.
|
// GetEtcdBackend is a stub returning nil and errEtcdNotAvailable error.
|
||||||
func GetEtcdBackend(etcdConfig *EtcdConfig) (Backend, error) {
|
func GetEtcdBackend(prefix string, etcdConfig *EtcdConfig) (Backend, error) {
|
||||||
return nil, errEtcdNotAvailable
|
return nil, errEtcdNotAvailable
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package lncfg
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"path"
|
||||||
|
|
||||||
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||||
)
|
)
|
||||||
@ -50,12 +51,14 @@ func (db *DB) Validate() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetBackend returns a kvdb.Backend as set in the DB config.
|
// GetBackend returns a kvdb.Backend as set in the DB config.
|
||||||
func (db *DB) GetBackend(path string) (kvdb.Backend, error) {
|
func (db *DB) GetBackend(dbPath string) (kvdb.Backend, error) {
|
||||||
if db.Backend == etcdBackend {
|
if db.Backend == etcdBackend {
|
||||||
return kvdb.GetEtcdBackend(db.Etcd)
|
// Prefix will separate key/values in the db.
|
||||||
|
prefix := path.Join(dbPath, dbName)
|
||||||
|
return kvdb.GetEtcdBackend(prefix, db.Etcd)
|
||||||
}
|
}
|
||||||
|
|
||||||
return kvdb.GetBoltBackend(path, dbName, db.Bolt.NoFreeListSync)
|
return kvdb.GetBoltBackend(dbPath, dbName, db.Bolt.NoFreeListSync)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compile-time constraint to ensure Workers implements the Validator interface.
|
// Compile-time constraint to ensure Workers implements the Validator interface.
|
||||||
|
Loading…
Reference in New Issue
Block a user