etcd: optionally reduce concurrency to single writer for legacy code

This commit is contained in:
Andras Banki-Horvath 2021-07-20 15:09:47 +02:00
parent bc98bb3f88
commit bc4ffb489d
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
5 changed files with 70 additions and 17 deletions

@ -25,4 +25,8 @@ type Config struct {
InsecureSkipVerify bool `long:"insecure_skip_verify" description:"Whether we intend to skip TLS verification"` InsecureSkipVerify bool `long:"insecure_skip_verify" description:"Whether we intend to skip TLS verification"`
CollectStats bool `long:"collect_stats" description:"Whether to collect etcd commit stats."` CollectStats bool `long:"collect_stats" description:"Whether to collect etcd commit stats."`
// SingleWriter should be set to true if we intend to only allow a
// single writer to the database at a time.
SingleWriter bool
} }

@ -125,6 +125,7 @@ type db struct {
cli *clientv3.Client cli *clientv3.Client
commitStatsCollector *commitStatsCollector commitStatsCollector *commitStatsCollector
txQueue *commitQueue txQueue *commitQueue
txMutex sync.RWMutex
} }
// Enforce db implements the walletdb.DB interface. // Enforce db implements the walletdb.DB interface.
@ -204,9 +205,14 @@ func (db *db) getSTMOptions() []STMOptionFunc {
// expect retries of the f closure (depending on the database backend used), the // expect retries of the f closure (depending on the database backend used), the
// reset function will be called before each retry respectively. // reset function will be called before each retry respectively.
func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error { func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error {
if db.cfg.SingleWriter {
db.txMutex.RLock()
defer db.txMutex.RUnlock()
}
apply := func(stm STM) error { apply := func(stm STM) error {
reset() reset()
return f(newReadWriteTx(stm, etcdDefaultRootBucketId)) return f(newReadWriteTx(stm, etcdDefaultRootBucketId, nil))
} }
return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...) return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...)
@ -220,9 +226,14 @@ func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error {
// returned. As callers may expect retries of the f closure, the reset function // returned. As callers may expect retries of the f closure, the reset function
// will be called before each retry respectively. // will be called before each retry respectively.
func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error { func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) error {
if db.cfg.SingleWriter {
db.txMutex.Lock()
defer db.txMutex.Unlock()
}
apply := func(stm STM) error { apply := func(stm STM) error {
reset() reset()
return f(newReadWriteTx(stm, etcdDefaultRootBucketId)) return f(newReadWriteTx(stm, etcdDefaultRootBucketId, nil))
} }
return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...) return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...)
@ -239,17 +250,29 @@ func (db *db) PrintStats() string {
// BeginReadWriteTx opens a database read+write transaction. // BeginReadWriteTx opens a database read+write transaction.
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
var locker sync.Locker
if db.cfg.SingleWriter {
db.txMutex.Lock()
locker = &db.txMutex
}
return newReadWriteTx( return newReadWriteTx(
NewSTM(db.cli, db.txQueue, db.getSTMOptions()...), NewSTM(db.cli, db.txQueue, db.getSTMOptions()...),
etcdDefaultRootBucketId, etcdDefaultRootBucketId, locker,
), nil ), nil
} }
// BeginReadTx opens a database read transaction. // BeginReadTx opens a database read transaction.
func (db *db) BeginReadTx() (walletdb.ReadTx, error) { func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
var locker sync.Locker
if db.cfg.SingleWriter {
db.txMutex.RLock()
locker = db.txMutex.RLocker()
}
return newReadWriteTx( return newReadWriteTx(
NewSTM(db.cli, db.txQueue, db.getSTMOptions()...), NewSTM(db.cli, db.txQueue, db.getSTMOptions()...),
etcdDefaultRootBucketId, etcdDefaultRootBucketId, locker,
), nil ), nil
} }

@ -78,8 +78,13 @@ func NewEtcdTestFixture(t *testing.T) *EtcdTestFixture {
} }
} }
func (f *EtcdTestFixture) NewBackend() walletdb.DB { func (f *EtcdTestFixture) NewBackend(singleWriter bool) walletdb.DB {
db, err := newEtcdBackend(context.TODO(), f.BackendConfig()) cfg := f.BackendConfig()
if singleWriter {
cfg.SingleWriter = true
}
db, err := newEtcdBackend(context.TODO(), cfg)
require.NoError(f.t, err) require.NoError(f.t, err)
return db return db

@ -3,6 +3,8 @@
package etcd package etcd
import ( import (
"sync"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
) )
@ -17,13 +19,18 @@ type readWriteTx struct {
// active is true if the transaction hasn't been committed yet. // active is true if the transaction hasn't been committed yet.
active bool active bool
// lock is passed on for manual txns when the backend is instantiated
// such that we read/write lock transactions to ensure a single writer.
lock sync.Locker
} }
// newReadWriteTx creates an rw transaction with the passed STM. // newReadWriteTx creates an rw transaction with the passed STM.
func newReadWriteTx(stm STM, prefix string) *readWriteTx { func newReadWriteTx(stm STM, prefix string, lock sync.Locker) *readWriteTx {
return &readWriteTx{ return &readWriteTx{
stm: stm, stm: stm,
active: true, active: true,
lock: lock,
rootBucketID: makeBucketID([]byte(prefix)), rootBucketID: makeBucketID([]byte(prefix)),
} }
} }
@ -65,6 +72,10 @@ func (tx *readWriteTx) Rollback() error {
return walletdb.ErrTxClosed return walletdb.ErrTxClosed
} }
if tx.lock != nil {
defer tx.lock.Unlock()
}
// Rollback the STM and set the tx to inactive. // Rollback the STM and set the tx to inactive.
tx.stm.Rollback() tx.stm.Rollback()
tx.active = false tx.active = false
@ -99,6 +110,10 @@ func (tx *readWriteTx) Commit() error {
return walletdb.ErrTxClosed return walletdb.ErrTxClosed
} }
if tx.lock != nil {
defer tx.lock.Unlock()
}
// Try committing the transaction. // Try committing the transaction.
if err := tx.stm.Commit(); err != nil { if err := tx.stm.Commit(); err != nil {
return err return err

@ -3,6 +3,7 @@
package kvdb package kvdb
import ( import (
"fmt"
"testing" "testing"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
@ -143,13 +144,17 @@ func TestEtcd(t *testing.T) {
continue continue
} }
t.Run(test.name, func(t *testing.T) { rwLock := []bool{false, true}
for _, doRwLock := range rwLock {
name := fmt.Sprintf("%v/RWLock=%v", test.name, doRwLock)
t.Run(name, func(t *testing.T) {
t.Parallel() t.Parallel()
f := etcd.NewEtcdTestFixture(t) f := etcd.NewEtcdTestFixture(t)
defer f.Cleanup() defer f.Cleanup()
test.test(t, f.NewBackend()) test.test(t, f.NewBackend(doRwLock))
if test.expectedDb != nil { if test.expectedDb != nil {
dump := f.Dump() dump := f.Dump()
@ -158,3 +163,4 @@ func TestEtcd(t *testing.T) {
}) })
} }
} }
}