From 12a341ba5983b2f3a8fce13177de6eb53e1f73a7 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Fri, 10 Jul 2020 14:28:18 +0200 Subject: [PATCH] etcd: remove the lock set concept This commit removes the lock set which was used to only add bucket keys to the tx predicate while also bumping their mod version. This was useful to reduce the size of the compare set but wasn't useful to reduce contention as top level buckets were always in the lock set. --- channeldb/kvdb/etcd/readwrite_bucket.go | 6 --- channeldb/kvdb/etcd/readwrite_tx.go | 43 -------------------- channeldb/kvdb/etcd/stm.go | 52 +++++++------------------ 3 files changed, 13 insertions(+), 88 deletions(-) diff --git a/channeldb/kvdb/etcd/readwrite_bucket.go b/channeldb/kvdb/etcd/readwrite_bucket.go index 20af7d92..f97268d9 100644 --- a/channeldb/kvdb/etcd/readwrite_bucket.go +++ b/channeldb/kvdb/etcd/readwrite_bucket.go @@ -3,7 +3,6 @@ package etcd import ( - "bytes" "strconv" "github.com/btcsuite/btcwallet/walletdb" @@ -24,11 +23,6 @@ type readWriteBucket struct { // newReadWriteBucket creates a new rw bucket with the passed transaction // and bucket id. func newReadWriteBucket(tx *readWriteTx, key, id []byte) *readWriteBucket { - if !bytes.Equal(id, tx.rootBucketID[:]) { - // Add the bucket key/value to the lock set. - tx.lock(string(key), string(id)) - } - return &readWriteBucket{ id: id, tx: tx, diff --git a/channeldb/kvdb/etcd/readwrite_tx.go b/channeldb/kvdb/etcd/readwrite_tx.go index 22d0ce42..aed00a17 100644 --- a/channeldb/kvdb/etcd/readwrite_tx.go +++ b/channeldb/kvdb/etcd/readwrite_tx.go @@ -17,14 +17,6 @@ type readWriteTx struct { // active is true if the transaction hasn't been committed yet. active bool - - // dirty is true if we intent to update a value in this transaction. - dirty bool - - // lset holds key/value set that we want to lock on. If upon commit the - // transaction is dirty and the lset is not empty, we'll bump the mod - // version of these key/values. - lset map[string]string } // newReadWriteTx creates an rw transaction with the passed STM. @@ -33,7 +25,6 @@ func newReadWriteTx(stm STM, prefix string) *readWriteTx { stm: stm, active: true, rootBucketID: makeBucketID([]byte(prefix)), - lset: make(map[string]string), } } @@ -43,48 +34,14 @@ func rootBucket(tx *readWriteTx) *readWriteBucket { return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:]) } -// lock adds a key value to the lock set. -func (tx *readWriteTx) lock(key, val string) { - tx.stm.Lock(key) - if !tx.dirty { - tx.lset[key] = val - } else { - // Bump the mod version of the key, - // leaving the value intact. - tx.stm.Put(key, val) - } -} - // put updates the passed key/value. func (tx *readWriteTx) put(key, val string) { tx.stm.Put(key, val) - tx.setDirty() } // del marks the passed key deleted. func (tx *readWriteTx) del(key string) { tx.stm.Del(key) - tx.setDirty() -} - -// setDirty marks the transaction dirty and bumps -// mod version for the existing lock set if it is -// not empty. -func (tx *readWriteTx) setDirty() { - // Bump the lock set. - if !tx.dirty && len(tx.lset) > 0 { - for key, val := range tx.lset { - // Bump the mod version of the key, - // leaving the value intact. - tx.stm.Put(key, val) - } - - // Clear the lock set. - tx.lset = make(map[string]string) - } - - // Set dirty. - tx.dirty = true } // ReadBucket opens the root bucket for read only access. If the bucket diff --git a/channeldb/kvdb/etcd/stm.go b/channeldb/kvdb/etcd/stm.go index 7a2f33b5..c13e8f96 100644 --- a/channeldb/kvdb/etcd/stm.go +++ b/channeldb/kvdb/etcd/stm.go @@ -32,11 +32,6 @@ type STM interface { // set. Returns nil if there's no matching key, or the key is empty. Get(key string) ([]byte, error) - // Lock adds a key to the lock set. If the lock set is not empty, we'll - // only check for conflicts in the lock set and the write set, instead - // of all read keys plus the write set. - Lock(key string) - // Put adds a value for a key to the txn's write set. Put(key, val string) @@ -151,9 +146,6 @@ type stm struct { // wset holds overwritten keys and their values. wset writeSet - // lset holds keys we intent to lock on. - lset map[string]interface{} - // getOpts are the opts used for gets. getOpts []v3.OpOption @@ -247,19 +239,19 @@ loop: default: } - - // Apply the transaction closure and abort the STM if there was an - // application error. + // Apply the transaction closure and abort the STM if there was + // an application error. if err = apply(s); err != nil { break loop } stats, err = s.commit() - // Re-apply only upon commit error (meaning the database was changed). + // Retry the apply closure only upon commit error (meaning the + // database was changed). if _, ok := err.(CommitError); !ok { - // Anything that's not a CommitError - // aborts the STM run loop. + // Anything that's not a CommitError aborts the STM + // run loop. break loop } @@ -303,24 +295,14 @@ func (rs readSet) gets() []v3.Op { return ops } -// cmps returns a cmp list testing values in read set didn't change. -func (rs readSet) cmps(lset map[string]interface{}) []v3.Cmp { - if len(lset) > 0 { - cmps := make([]v3.Cmp, 0, len(lset)) - for key := range lset { - if getValue, ok := rs[key]; ok { - cmps = append( - cmps, - v3.Compare(v3.ModRevision(key), "=", getValue.rev), - ) - } - } - return cmps - } - +// cmps returns a compare list which will serve as a precondition testing that +// the values in the read set didn't change. +func (rs readSet) cmps() []v3.Cmp { cmps := make([]v3.Cmp, 0, len(rs)) for key, getValue := range rs { - cmps = append(cmps, v3.Compare(v3.ModRevision(key), "=", getValue.rev)) + cmps = append(cmps, v3.Compare( + v3.ModRevision(key), "=", getValue.rev, + )) } return cmps @@ -435,13 +417,6 @@ func (s *stm) Get(key string) ([]byte, error) { return nil, nil } -// Lock adds a key to the lock set. If the lock set is -// not empty, we'll only check conflicts for the keys -// in the lock set. -func (s *stm) Lock(key string) { - s.lset[key] = nil -} - // First returns the first key/value matching prefix. If there's no key starting // with prefix, Last will return nil. func (s *stm) First(prefix string) (*KV, error) { @@ -711,7 +686,7 @@ func (s *stm) OnCommit(cb func()) { // because the keys have changed return a CommitError, otherwise return a // DatabaseError. func (s *stm) commit() (CommitStats, error) { - rset := s.rset.cmps(s.lset) + rset := s.rset.cmps() wset := s.wset.cmps(s.revision + 1) stats := CommitStats{ @@ -775,7 +750,6 @@ func (s *stm) Commit() error { func (s *stm) Rollback() { s.rset = make(map[string]stmGet) s.wset = make(map[string]stmPut) - s.lset = make(map[string]interface{}) s.getOpts = nil s.revision = math.MaxInt64 - 1 }