kvdb+etcd: remove retry goroutine from the STM

This commit removes the retry goroutine from the STM as the retry loop
is only running when the STM transaction is encapsulated in Update/View
whereas for self-standing transactions we use a different approach.
By removing the goroutine we won't catch panics thrown that are supposed
to be catched outside of the STM.
This commit is contained in:
Andras Banki-Horvath 2020-05-25 17:06:29 +02:00
parent 09bb9db782
commit 415de2f0c7

@ -229,64 +229,41 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm {
// errors and handling commit. The loop will quit on every error except // errors and handling commit. The loop will quit on every error except
// CommitError which is used to indicate a necessary retry. // CommitError which is used to indicate a necessary retry.
func runSTM(s *stm, apply func(STM) error) error { func runSTM(s *stm, apply func(STM) error) error {
out := make(chan error, 1) var (
retries int
stats CommitStats
err error
)
go func() { // In a loop try to apply and commit and roll back
var ( // if the database has changed (CommitError).
retries int for {
stats CommitStats // Abort STM if there was an application error.
) if err = apply(s); err != nil {
break
defer func() {
// Recover DatabaseError panics so
// we can return them.
if r := recover(); r != nil {
e, ok := r.(DatabaseError)
if !ok {
// Unknown panic.
panic(r)
}
// Return the error.
out <- e.Unwrap()
}
}()
var err error
// In a loop try to apply and commit and roll back
// if the database has changed (CommitError).
for {
// Abort STM if there was an application error.
if err = apply(s); err != nil {
break
}
stats, err = s.commit()
// Re-apply only upon commit error
// (meaning the database was changed).
if _, ok := err.(CommitError); !ok {
// Anything that's not a CommitError
// aborts the STM run loop.
break
}
// Rollback before trying to re-apply.
s.Rollback()
retries++
} }
if s.options.commitStatsCallback != nil { stats, err = s.commit()
stats.Retries = retries
s.options.commitStatsCallback(err == nil, stats) // Re-apply only upon commit error
// (meaning the database was changed).
if _, ok := err.(CommitError); !ok {
// Anything that's not a CommitError
// aborts the STM run loop.
break
} }
// Return the error to the caller. // Rollback before trying to re-apply.
out <- err s.Rollback()
}() retries++
}
return <-out if s.options.commitStatsCallback != nil {
stats.Retries = retries
s.options.commitStatsCallback(err == nil, stats)
}
return err
} }
// add inserts a txn response to the read set. This is useful when the txn // add inserts a txn response to the read set. This is useful when the txn
@ -367,18 +344,10 @@ func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) {
s.options.ctx, key, append(opts, s.getOpts...)..., s.options.ctx, key, append(opts, s.getOpts...)...,
) )
if err != nil { if err != nil {
dbErr := DatabaseError{ return nil, DatabaseError{
msg: "stm.fetch() failed", msg: "stm.fetch() failed",
err: err, err: err,
} }
// Do not panic when executing a manual transaction.
if s.manual {
return nil, dbErr
}
// Panic when executing inside the STM runloop.
panic(dbErr)
} }
// Set revison and serializable options upon first fetch // Set revison and serializable options upon first fetch