etcd: integrate the commitQueue to the STM commit loop

This commit integrates an externally passed commitQueue instance with
the STM to reduce retries for conflicting transactions.
This commit is contained in:
Andras Banki-Horvath 2020-08-10 16:50:06 +02:00
parent 6f3a45b75f
commit 9c47392dfa
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
4 changed files with 118 additions and 62 deletions

@ -16,8 +16,8 @@ import (
) )
const ( const (
// etcdConnectionTimeout is the timeout until successful connection to the // etcdConnectionTimeout is the timeout until successful connection to
// etcd instance. // the etcd instance.
etcdConnectionTimeout = 10 * time.Second etcdConnectionTimeout = 10 * time.Second
// etcdLongTimeout is a timeout for longer taking etcd operatons. // etcdLongTimeout is a timeout for longer taking etcd operatons.
@ -34,7 +34,8 @@ type callerStats struct {
func (s callerStats) String() string { func (s callerStats) String() string {
return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d", return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d",
s.count, s.commitStats.Retries, s.commitStats.Rset, s.commitStats.Wset) s.count, s.commitStats.Retries, s.commitStats.Rset,
s.commitStats.Wset)
} }
// commitStatsCollector collects commit stats for commits succeeding // commitStatsCollector collects commit stats for commits succeeding
@ -117,6 +118,7 @@ type db struct {
config BackendConfig config BackendConfig
cli *clientv3.Client cli *clientv3.Client
commitStatsCollector *commitStatsCollector commitStatsCollector *commitStatsCollector
txQueue *commitQueue
} }
// Enforce db implements the walletdb.DB interface. // Enforce db implements the walletdb.DB interface.
@ -188,8 +190,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
} }
backend := &db{ backend := &db{
cli: cli, cli: cli,
config: config, config: config,
txQueue: NewCommitQueue(config.Ctx),
} }
if config.CollectCommitStats { if config.CollectCommitStats {
@ -201,7 +204,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
// getSTMOptions creats all STM options based on the backend config. // getSTMOptions creats all STM options based on the backend config.
func (db *db) getSTMOptions() []STMOptionFunc { func (db *db) getSTMOptions() []STMOptionFunc {
opts := []STMOptionFunc{WithAbortContext(db.config.Ctx)} opts := []STMOptionFunc{
WithAbortContext(db.config.Ctx),
}
if db.config.CollectCommitStats { if db.config.CollectCommitStats {
opts = append(opts, opts = append(opts,
@ -221,7 +226,7 @@ func (db *db) View(f func(tx walletdb.ReadTx) error) error {
return f(newReadWriteTx(stm, db.config.Prefix)) return f(newReadWriteTx(stm, db.config.Prefix))
} }
return RunSTM(db.cli, apply, db.getSTMOptions()...) return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...)
} }
// Update opens a database read/write transaction and executes the function f // Update opens a database read/write transaction and executes the function f
@ -235,7 +240,7 @@ func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error {
return f(newReadWriteTx(stm, db.config.Prefix)) return f(newReadWriteTx(stm, db.config.Prefix))
} }
return RunSTM(db.cli, apply, db.getSTMOptions()...) return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...)
} }
// PrintStats returns all collected stats pretty printed into a string. // PrintStats returns all collected stats pretty printed into a string.
@ -247,18 +252,18 @@ func (db *db) PrintStats() string {
return "" return ""
} }
// BeginReadTx opens a database read transaction. // BeginReadWriteTx opens a database read+write transaction.
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
return newReadWriteTx( return newReadWriteTx(
NewSTM(db.cli, db.getSTMOptions()...), NewSTM(db.cli, db.txQueue, db.getSTMOptions()...),
db.config.Prefix, db.config.Prefix,
), nil ), nil
} }
// BeginReadWriteTx opens a database read+write transaction. // BeginReadTx opens a database read transaction.
func (db *db) BeginReadTx() (walletdb.ReadTx, error) { func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
return newReadWriteTx( return newReadWriteTx(
NewSTM(db.cli, db.getSTMOptions()...), NewSTM(db.cli, db.txQueue, db.getSTMOptions()...),
db.config.Prefix, db.config.Prefix,
), nil ), nil
} }

@ -63,7 +63,7 @@ func TestAbortContext(t *testing.T) {
// Expect that the update will fail. // Expect that the update will fail.
err = db.Update(func(tx walletdb.ReadWriteTx) error { err = db.Update(func(tx walletdb.ReadWriteTx) error {
_, err := tx.CreateTopLevelBucket([]byte("bucket")) _, err := tx.CreateTopLevelBucket([]byte("bucket"))
require.NoError(t, err) require.Error(t, err, "context canceled")
return nil return nil
}) })

@ -134,6 +134,10 @@ type stm struct {
// execute in the STM run loop. // execute in the STM run loop.
manual bool manual bool
// txQueue is lightweight contention manager, which is used to detect
// transaction conflicts and reduce retries.
txQueue *commitQueue
// options stores optional settings passed by the user. // options stores optional settings passed by the user.
options *STMOptions options *STMOptions
@ -183,18 +187,22 @@ func WithCommitStatsCallback(cb func(bool, CommitStats)) STMOptionFunc {
// RunSTM runs the apply function by creating an STM using serializable snapshot // RunSTM runs the apply function by creating an STM using serializable snapshot
// isolation, passing it to the apply and handling commit errors and retries. // isolation, passing it to the apply and handling commit errors and retries.
func RunSTM(cli *v3.Client, apply func(STM) error, so ...STMOptionFunc) error { func RunSTM(cli *v3.Client, apply func(STM) error, txQueue *commitQueue,
return runSTM(makeSTM(cli, false, so...), apply) so ...STMOptionFunc) error {
return runSTM(makeSTM(cli, false, txQueue, so...), apply)
} }
// NewSTM creates a new STM instance, using serializable snapshot isolation. // NewSTM creates a new STM instance, using serializable snapshot isolation.
func NewSTM(cli *v3.Client, so ...STMOptionFunc) STM { func NewSTM(cli *v3.Client, txQueue *commitQueue, so ...STMOptionFunc) STM {
return makeSTM(cli, true, so...) return makeSTM(cli, true, txQueue, so...)
} }
// makeSTM is the actual constructor of the stm. It first apply all passed // makeSTM is the actual constructor of the stm. It first apply all passed
// options then creates the stm object and resets it before returning. // options then creates the stm object and resets it before returning.
func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm { func makeSTM(cli *v3.Client, manual bool, txQueue *commitQueue,
so ...STMOptionFunc) *stm {
opts := &STMOptions{ opts := &STMOptions{
ctx: cli.Ctx(), ctx: cli.Ctx(),
} }
@ -207,6 +215,7 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm {
s := &stm{ s := &stm{
client: cli, client: cli,
manual: manual, manual: manual,
txQueue: txQueue,
options: opts, options: opts,
prefetch: make(map[string]stmGet), prefetch: make(map[string]stmGet),
} }
@ -222,50 +231,72 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm {
// CommitError which is used to indicate a necessary retry. // CommitError which is used to indicate a necessary retry.
func runSTM(s *stm, apply func(STM) error) error { func runSTM(s *stm, apply func(STM) error) error {
var ( var (
retries int retries int
stats CommitStats stats CommitStats
err error executeErr error
) )
loop: done := make(chan struct{})
// In a loop try to apply and commit and roll back if the database has
// changed (CommitError).
for {
select {
// Check if the STM is aborted and break the retry loop if it is.
case <-s.options.ctx.Done():
err = fmt.Errorf("aborted")
break loop
default: execute := func() {
defer close(done)
for {
select {
// Check if the STM is aborted and break the retry loop
// if it is.
case <-s.options.ctx.Done():
executeErr = fmt.Errorf("aborted")
return
default:
}
stats, executeErr = s.commit()
// Re-apply only upon commit error (meaning the
// keys were changed).
if _, ok := executeErr.(CommitError); !ok {
// Anything that's not a CommitError
// aborts the transaction.
return
}
// Rollback before trying to re-apply.
s.Rollback()
retries++
// Re-apply the transaction closure.
if executeErr = apply(s); executeErr != nil {
return
}
} }
// Apply the transaction closure and abort the STM if there was
// an application error.
if err = apply(s); err != nil {
break loop
}
stats, err = s.commit()
// Retry the apply closure only upon commit error (meaning the
// database was changed).
if _, ok := err.(CommitError); !ok {
// Anything that's not a CommitError aborts the STM
// run loop.
break loop
}
// Rollback before trying to re-apply.
s.Rollback()
retries++
} }
// Run the tx closure to construct the read and write sets.
// Also we expect that if there are no conflicting transactions
// in the queue, then we only run apply once.
if preApplyErr := apply(s); preApplyErr != nil {
return preApplyErr
}
// Queue up the transaction for execution.
s.txQueue.Add(execute, s.rset, s.wset)
// Wait for the transaction to execute, or break if aborted.
select {
case <-done:
case <-s.options.ctx.Done():
}
s.txQueue.Done(s.rset, s.wset)
if s.options.commitStatsCallback != nil { if s.options.commitStatsCallback != nil {
stats.Retries = retries stats.Retries = retries
s.options.commitStatsCallback(err == nil, stats) s.options.commitStatsCallback(executeErr == nil, stats)
} }
return err return executeErr
} }
// add inserts a txn response to the read set. This is useful when the txn // add inserts a txn response to the read set. This is useful when the txn

@ -21,7 +21,11 @@ func TestPutToEmpty(t *testing.T) {
t.Parallel() t.Parallel()
f := NewEtcdTestFixture(t) f := NewEtcdTestFixture(t)
defer f.Cleanup() txQueue := NewCommitQueue(f.config.Ctx)
defer func() {
f.Cleanup()
txQueue.Wait()
}()
db, err := newEtcdBackend(f.BackendConfig()) db, err := newEtcdBackend(f.BackendConfig())
require.NoError(t, err) require.NoError(t, err)
@ -31,7 +35,7 @@ func TestPutToEmpty(t *testing.T) {
return nil return nil
} }
err = RunSTM(db.cli, apply) err = RunSTM(db.cli, apply, txQueue)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "abc", f.Get("123")) require.Equal(t, "abc", f.Get("123"))
@ -41,7 +45,11 @@ func TestGetPutDel(t *testing.T) {
t.Parallel() t.Parallel()
f := NewEtcdTestFixture(t) f := NewEtcdTestFixture(t)
defer f.cleanup() txQueue := NewCommitQueue(f.config.Ctx)
defer func() {
f.Cleanup()
txQueue.Wait()
}()
testKeyValues := []KV{ testKeyValues := []KV{
{"a", "1"}, {"a", "1"},
@ -105,7 +113,7 @@ func TestGetPutDel(t *testing.T) {
return nil return nil
} }
err = RunSTM(db.cli, apply) err = RunSTM(db.cli, apply, txQueue)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "1", f.Get("a")) require.Equal(t, "1", f.Get("a"))
@ -120,7 +128,11 @@ func TestFirstLastNextPrev(t *testing.T) {
t.Parallel() t.Parallel()
f := NewEtcdTestFixture(t) f := NewEtcdTestFixture(t)
defer f.Cleanup() txQueue := NewCommitQueue(f.config.Ctx)
defer func() {
f.Cleanup()
txQueue.Wait()
}()
testKeyValues := []KV{ testKeyValues := []KV{
{"kb", "1"}, {"kb", "1"},
@ -255,7 +267,7 @@ func TestFirstLastNextPrev(t *testing.T) {
return nil return nil
} }
err = RunSTM(db.cli, apply) err = RunSTM(db.cli, apply, txQueue)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "0", f.Get("ka")) require.Equal(t, "0", f.Get("ka"))
@ -271,7 +283,11 @@ func TestCommitError(t *testing.T) {
t.Parallel() t.Parallel()
f := NewEtcdTestFixture(t) f := NewEtcdTestFixture(t)
defer f.Cleanup() txQueue := NewCommitQueue(f.config.Ctx)
defer func() {
f.Cleanup()
txQueue.Wait()
}()
db, err := newEtcdBackend(f.BackendConfig()) db, err := newEtcdBackend(f.BackendConfig())
require.NoError(t, err) require.NoError(t, err)
@ -301,7 +317,7 @@ func TestCommitError(t *testing.T) {
return nil return nil
} }
err = RunSTM(db.cli, apply) err = RunSTM(db.cli, apply, txQueue)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 2, cnt) require.Equal(t, 2, cnt)
@ -312,7 +328,11 @@ func TestManualTxError(t *testing.T) {
t.Parallel() t.Parallel()
f := NewEtcdTestFixture(t) f := NewEtcdTestFixture(t)
defer f.Cleanup() txQueue := NewCommitQueue(f.config.Ctx)
defer func() {
f.Cleanup()
txQueue.Wait()
}()
db, err := newEtcdBackend(f.BackendConfig()) db, err := newEtcdBackend(f.BackendConfig())
require.NoError(t, err) require.NoError(t, err)
@ -320,7 +340,7 @@ func TestManualTxError(t *testing.T) {
// Preset DB state. // Preset DB state.
f.Put("123", "xyz") f.Put("123", "xyz")
stm := NewSTM(db.cli) stm := NewSTM(db.cli, txQueue)
val, err := stm.Get("123") val, err := stm.Get("123")
require.NoError(t, err) require.NoError(t, err)