kvdb+etcd: integrate the abort context to the STM retry loop

This commit extends the etcd.BackendConfig to also provide an abort
context and integrates it with the STM retry loop in order to be able
stop LND when conflicting transactions keep the loop running.
This commit is contained in:
Andras Banki-Horvath 2020-05-25 18:00:22 +02:00
parent 415de2f0c7
commit d3545830c9
4 changed files with 61 additions and 11 deletions

@ -124,6 +124,9 @@ var _ walletdb.DB = (*db)(nil)
// BackendConfig holds and etcd backend config and connection parameters. // BackendConfig holds and etcd backend config and connection parameters.
type BackendConfig struct { type BackendConfig struct {
// Ctx is the context we use to cancel operations upon exit.
Ctx context.Context
// Host holds the peer url of the etcd instance. // Host holds the peer url of the etcd instance.
Host string Host string
@ -155,6 +158,10 @@ type BackendConfig struct {
// newEtcdBackend returns a db object initialized with the passed backend // newEtcdBackend returns a db object initialized with the passed backend
// config. If etcd connection cannot be estabished, then returns error. // config. If etcd connection cannot be estabished, then returns error.
func newEtcdBackend(config BackendConfig) (*db, error) { func newEtcdBackend(config BackendConfig) (*db, error) {
if config.Ctx == nil {
config.Ctx = context.Background()
}
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: config.CertFile, CertFile: config.CertFile,
KeyFile: config.KeyFile, KeyFile: config.KeyFile,
@ -167,6 +174,7 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
} }
cli, err := clientv3.New(clientv3.Config{ cli, err := clientv3.New(clientv3.Config{
Context: config.Ctx,
Endpoints: []string{config.Host}, Endpoints: []string{config.Host},
DialTimeout: etcdConnectionTimeout, DialTimeout: etcdConnectionTimeout,
Username: config.User, Username: config.User,
@ -192,7 +200,8 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
// getSTMOptions creats all STM options based on the backend config. // getSTMOptions creats all STM options based on the backend config.
func (db *db) getSTMOptions() []STMOptionFunc { func (db *db) getSTMOptions() []STMOptionFunc {
opts := []STMOptionFunc{} opts := []STMOptionFunc{WithAbortContext(db.config.Ctx)}
if db.config.CollectCommitStats { if db.config.CollectCommitStats {
opts = append(opts, opts = append(opts,
WithCommitStatsCallback(db.commitStatsCollector.callback), WithCommitStatsCallback(db.commitStatsCollector.callback),
@ -257,9 +266,7 @@ func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
// start a read-only transaction to perform all operations. // start a read-only transaction to perform all operations.
// This function is part of the walletdb.Db interface implementation. // This function is part of the walletdb.Db interface implementation.
func (db *db) Copy(w io.Writer) error { func (db *db) Copy(w io.Writer) error {
ctx := context.Background() ctx, cancel := context.WithTimeout(db.config.Ctx, etcdLongTimeout)
ctx, cancel := context.WithTimeout(ctx, etcdLongTimeout)
defer cancel() defer cancel()
readCloser, err := db.cli.Snapshot(ctx) readCloser, err := db.cli.Snapshot(ctx)

@ -4,6 +4,7 @@ package etcd
import ( import (
"bytes" "bytes"
"context"
"testing" "testing"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
@ -42,3 +43,33 @@ func TestCopy(t *testing.T) {
} }
assert.Equal(t, expected, f.Dump()) assert.Equal(t, expected, f.Dump())
} }
func TestAbortContext(t *testing.T) {
t.Parallel()
f := NewEtcdTestFixture(t)
defer f.Cleanup()
ctx, cancel := context.WithCancel(context.Background())
config := f.BackendConfig()
config.Ctx = ctx
// Pass abort context and abort right away.
db, err := newEtcdBackend(config)
assert.NoError(t, err)
cancel()
// Expect that the update will fail.
err = db.Update(func(tx walletdb.ReadWriteTx) error {
_, err := tx.CreateTopLevelBucket([]byte("bucket"))
assert.NoError(t, err)
return nil
})
assert.Error(t, err, "context canceled")
// No changes in the DB.
assert.Equal(t, map[string]string{}, f.Dump())
}

@ -3,6 +3,7 @@
package etcd package etcd
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"net/url" "net/url"
@ -63,6 +64,7 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) {
} }
connConfig := &BackendConfig{ connConfig := &BackendConfig{
Ctx: context.Background(),
Host: "http://" + peerURL, Host: "http://" + peerURL,
User: "user", User: "user",
Pass: "pass", Pass: "pass",

@ -235,22 +235,32 @@ func runSTM(s *stm, apply func(STM) error) error {
err error err error
) )
// In a loop try to apply and commit and roll back loop:
// if the database has changed (CommitError). // In a loop try to apply and commit and roll back if the database has
// changed (CommitError).
for { for {
// Abort STM if there was an application error. select {
// Check if the STM is aborted and break the retry loop if it is.
case <-s.options.ctx.Done():
err = fmt.Errorf("aborted")
break loop
default:
}
// Apply the transaction closure and abort the STM if there was an
// application error.
if err = apply(s); err != nil { if err = apply(s); err != nil {
break break loop
} }
stats, err = s.commit() stats, err = s.commit()
// Re-apply only upon commit error // Re-apply only upon commit error (meaning the database was changed).
// (meaning the database was changed).
if _, ok := err.(CommitError); !ok { if _, ok := err.(CommitError); !ok {
// Anything that's not a CommitError // Anything that's not a CommitError
// aborts the STM run loop. // aborts the STM run loop.
break break loop
} }
// Rollback before trying to re-apply. // Rollback before trying to re-apply.