etcd: remove the lock set concept
This commit removes the lock set which was used to only add bucket keys to the tx predicate while also bumping their mod version. This was useful to reduce the size of the compare set but wasn't useful to reduce contention as top level buckets were always in the lock set.
This commit is contained in:
parent
3cdbb341da
commit
12a341ba59
@ -3,7 +3,6 @@
|
|||||||
package etcd
|
package etcd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/btcsuite/btcwallet/walletdb"
|
"github.com/btcsuite/btcwallet/walletdb"
|
||||||
@ -24,11 +23,6 @@ type readWriteBucket struct {
|
|||||||
// newReadWriteBucket creates a new rw bucket with the passed transaction
|
// newReadWriteBucket creates a new rw bucket with the passed transaction
|
||||||
// and bucket id.
|
// and bucket id.
|
||||||
func newReadWriteBucket(tx *readWriteTx, key, id []byte) *readWriteBucket {
|
func newReadWriteBucket(tx *readWriteTx, key, id []byte) *readWriteBucket {
|
||||||
if !bytes.Equal(id, tx.rootBucketID[:]) {
|
|
||||||
// Add the bucket key/value to the lock set.
|
|
||||||
tx.lock(string(key), string(id))
|
|
||||||
}
|
|
||||||
|
|
||||||
return &readWriteBucket{
|
return &readWriteBucket{
|
||||||
id: id,
|
id: id,
|
||||||
tx: tx,
|
tx: tx,
|
||||||
|
@ -17,14 +17,6 @@ type readWriteTx struct {
|
|||||||
|
|
||||||
// active is true if the transaction hasn't been committed yet.
|
// active is true if the transaction hasn't been committed yet.
|
||||||
active bool
|
active bool
|
||||||
|
|
||||||
// dirty is true if we intent to update a value in this transaction.
|
|
||||||
dirty bool
|
|
||||||
|
|
||||||
// lset holds key/value set that we want to lock on. If upon commit the
|
|
||||||
// transaction is dirty and the lset is not empty, we'll bump the mod
|
|
||||||
// version of these key/values.
|
|
||||||
lset map[string]string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newReadWriteTx creates an rw transaction with the passed STM.
|
// newReadWriteTx creates an rw transaction with the passed STM.
|
||||||
@ -33,7 +25,6 @@ func newReadWriteTx(stm STM, prefix string) *readWriteTx {
|
|||||||
stm: stm,
|
stm: stm,
|
||||||
active: true,
|
active: true,
|
||||||
rootBucketID: makeBucketID([]byte(prefix)),
|
rootBucketID: makeBucketID([]byte(prefix)),
|
||||||
lset: make(map[string]string),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -43,48 +34,14 @@ func rootBucket(tx *readWriteTx) *readWriteBucket {
|
|||||||
return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:])
|
return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock adds a key value to the lock set.
|
|
||||||
func (tx *readWriteTx) lock(key, val string) {
|
|
||||||
tx.stm.Lock(key)
|
|
||||||
if !tx.dirty {
|
|
||||||
tx.lset[key] = val
|
|
||||||
} else {
|
|
||||||
// Bump the mod version of the key,
|
|
||||||
// leaving the value intact.
|
|
||||||
tx.stm.Put(key, val)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// put updates the passed key/value.
|
// put updates the passed key/value.
|
||||||
func (tx *readWriteTx) put(key, val string) {
|
func (tx *readWriteTx) put(key, val string) {
|
||||||
tx.stm.Put(key, val)
|
tx.stm.Put(key, val)
|
||||||
tx.setDirty()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// del marks the passed key deleted.
|
// del marks the passed key deleted.
|
||||||
func (tx *readWriteTx) del(key string) {
|
func (tx *readWriteTx) del(key string) {
|
||||||
tx.stm.Del(key)
|
tx.stm.Del(key)
|
||||||
tx.setDirty()
|
|
||||||
}
|
|
||||||
|
|
||||||
// setDirty marks the transaction dirty and bumps
|
|
||||||
// mod version for the existing lock set if it is
|
|
||||||
// not empty.
|
|
||||||
func (tx *readWriteTx) setDirty() {
|
|
||||||
// Bump the lock set.
|
|
||||||
if !tx.dirty && len(tx.lset) > 0 {
|
|
||||||
for key, val := range tx.lset {
|
|
||||||
// Bump the mod version of the key,
|
|
||||||
// leaving the value intact.
|
|
||||||
tx.stm.Put(key, val)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear the lock set.
|
|
||||||
tx.lset = make(map[string]string)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set dirty.
|
|
||||||
tx.dirty = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadBucket opens the root bucket for read only access. If the bucket
|
// ReadBucket opens the root bucket for read only access. If the bucket
|
||||||
|
@ -32,11 +32,6 @@ type STM interface {
|
|||||||
// set. Returns nil if there's no matching key, or the key is empty.
|
// set. Returns nil if there's no matching key, or the key is empty.
|
||||||
Get(key string) ([]byte, error)
|
Get(key string) ([]byte, error)
|
||||||
|
|
||||||
// Lock adds a key to the lock set. If the lock set is not empty, we'll
|
|
||||||
// only check for conflicts in the lock set and the write set, instead
|
|
||||||
// of all read keys plus the write set.
|
|
||||||
Lock(key string)
|
|
||||||
|
|
||||||
// Put adds a value for a key to the txn's write set.
|
// Put adds a value for a key to the txn's write set.
|
||||||
Put(key, val string)
|
Put(key, val string)
|
||||||
|
|
||||||
@ -151,9 +146,6 @@ type stm struct {
|
|||||||
// wset holds overwritten keys and their values.
|
// wset holds overwritten keys and their values.
|
||||||
wset writeSet
|
wset writeSet
|
||||||
|
|
||||||
// lset holds keys we intent to lock on.
|
|
||||||
lset map[string]interface{}
|
|
||||||
|
|
||||||
// getOpts are the opts used for gets.
|
// getOpts are the opts used for gets.
|
||||||
getOpts []v3.OpOption
|
getOpts []v3.OpOption
|
||||||
|
|
||||||
@ -247,19 +239,19 @@ loop:
|
|||||||
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
// Apply the transaction closure and abort the STM if there was
|
||||||
// Apply the transaction closure and abort the STM if there was an
|
// an application error.
|
||||||
// application error.
|
|
||||||
if err = apply(s); err != nil {
|
if err = apply(s); err != nil {
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, err = s.commit()
|
stats, err = s.commit()
|
||||||
|
|
||||||
// Re-apply only upon commit error (meaning the database was changed).
|
// Retry the apply closure only upon commit error (meaning the
|
||||||
|
// database was changed).
|
||||||
if _, ok := err.(CommitError); !ok {
|
if _, ok := err.(CommitError); !ok {
|
||||||
// Anything that's not a CommitError
|
// Anything that's not a CommitError aborts the STM
|
||||||
// aborts the STM run loop.
|
// run loop.
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -303,24 +295,14 @@ func (rs readSet) gets() []v3.Op {
|
|||||||
return ops
|
return ops
|
||||||
}
|
}
|
||||||
|
|
||||||
// cmps returns a cmp list testing values in read set didn't change.
|
// cmps returns a compare list which will serve as a precondition testing that
|
||||||
func (rs readSet) cmps(lset map[string]interface{}) []v3.Cmp {
|
// the values in the read set didn't change.
|
||||||
if len(lset) > 0 {
|
func (rs readSet) cmps() []v3.Cmp {
|
||||||
cmps := make([]v3.Cmp, 0, len(lset))
|
|
||||||
for key := range lset {
|
|
||||||
if getValue, ok := rs[key]; ok {
|
|
||||||
cmps = append(
|
|
||||||
cmps,
|
|
||||||
v3.Compare(v3.ModRevision(key), "=", getValue.rev),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return cmps
|
|
||||||
}
|
|
||||||
|
|
||||||
cmps := make([]v3.Cmp, 0, len(rs))
|
cmps := make([]v3.Cmp, 0, len(rs))
|
||||||
for key, getValue := range rs {
|
for key, getValue := range rs {
|
||||||
cmps = append(cmps, v3.Compare(v3.ModRevision(key), "=", getValue.rev))
|
cmps = append(cmps, v3.Compare(
|
||||||
|
v3.ModRevision(key), "=", getValue.rev,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
return cmps
|
return cmps
|
||||||
@ -435,13 +417,6 @@ func (s *stm) Get(key string) ([]byte, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock adds a key to the lock set. If the lock set is
|
|
||||||
// not empty, we'll only check conflicts for the keys
|
|
||||||
// in the lock set.
|
|
||||||
func (s *stm) Lock(key string) {
|
|
||||||
s.lset[key] = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// First returns the first key/value matching prefix. If there's no key starting
|
// First returns the first key/value matching prefix. If there's no key starting
|
||||||
// with prefix, Last will return nil.
|
// with prefix, Last will return nil.
|
||||||
func (s *stm) First(prefix string) (*KV, error) {
|
func (s *stm) First(prefix string) (*KV, error) {
|
||||||
@ -711,7 +686,7 @@ func (s *stm) OnCommit(cb func()) {
|
|||||||
// because the keys have changed return a CommitError, otherwise return a
|
// because the keys have changed return a CommitError, otherwise return a
|
||||||
// DatabaseError.
|
// DatabaseError.
|
||||||
func (s *stm) commit() (CommitStats, error) {
|
func (s *stm) commit() (CommitStats, error) {
|
||||||
rset := s.rset.cmps(s.lset)
|
rset := s.rset.cmps()
|
||||||
wset := s.wset.cmps(s.revision + 1)
|
wset := s.wset.cmps(s.revision + 1)
|
||||||
|
|
||||||
stats := CommitStats{
|
stats := CommitStats{
|
||||||
@ -775,7 +750,6 @@ func (s *stm) Commit() error {
|
|||||||
func (s *stm) Rollback() {
|
func (s *stm) Rollback() {
|
||||||
s.rset = make(map[string]stmGet)
|
s.rset = make(map[string]stmGet)
|
||||||
s.wset = make(map[string]stmPut)
|
s.wset = make(map[string]stmPut)
|
||||||
s.lset = make(map[string]interface{})
|
|
||||||
s.getOpts = nil
|
s.getOpts = nil
|
||||||
s.revision = math.MaxInt64 - 1
|
s.revision = math.MaxInt64 - 1
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user