From d3545830c9e672c9a6b80f97cc147466506b6e74 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Mon, 25 May 2020 18:00:22 +0200 Subject: [PATCH] kvdb+etcd: integrate the abort context to the STM retry loop This commit extends the etcd.BackendConfig to also provide an abort context and integrates it with the STM retry loop in order to be able stop LND when conflicting transactions keep the loop running. --- channeldb/kvdb/etcd/db.go | 15 +++++++++++---- channeldb/kvdb/etcd/db_test.go | 31 +++++++++++++++++++++++++++++++ channeldb/kvdb/etcd/embed.go | 2 ++ channeldb/kvdb/etcd/stm.go | 24 +++++++++++++++++------- 4 files changed, 61 insertions(+), 11 deletions(-) diff --git a/channeldb/kvdb/etcd/db.go b/channeldb/kvdb/etcd/db.go index a082e610..3bd89c29 100644 --- a/channeldb/kvdb/etcd/db.go +++ b/channeldb/kvdb/etcd/db.go @@ -124,6 +124,9 @@ var _ walletdb.DB = (*db)(nil) // BackendConfig holds and etcd backend config and connection parameters. type BackendConfig struct { + // Ctx is the context we use to cancel operations upon exit. + Ctx context.Context + // Host holds the peer url of the etcd instance. Host string @@ -155,6 +158,10 @@ type BackendConfig struct { // newEtcdBackend returns a db object initialized with the passed backend // config. If etcd connection cannot be estabished, then returns error. func newEtcdBackend(config BackendConfig) (*db, error) { + if config.Ctx == nil { + config.Ctx = context.Background() + } + tlsInfo := transport.TLSInfo{ CertFile: config.CertFile, KeyFile: config.KeyFile, @@ -167,6 +174,7 @@ func newEtcdBackend(config BackendConfig) (*db, error) { } cli, err := clientv3.New(clientv3.Config{ + Context: config.Ctx, Endpoints: []string{config.Host}, DialTimeout: etcdConnectionTimeout, Username: config.User, @@ -192,7 +200,8 @@ func newEtcdBackend(config BackendConfig) (*db, error) { // getSTMOptions creats all STM options based on the backend config. func (db *db) getSTMOptions() []STMOptionFunc { - opts := []STMOptionFunc{} + opts := []STMOptionFunc{WithAbortContext(db.config.Ctx)} + if db.config.CollectCommitStats { opts = append(opts, WithCommitStatsCallback(db.commitStatsCollector.callback), @@ -257,9 +266,7 @@ func (db *db) BeginReadTx() (walletdb.ReadTx, error) { // start a read-only transaction to perform all operations. // This function is part of the walletdb.Db interface implementation. func (db *db) Copy(w io.Writer) error { - ctx := context.Background() - - ctx, cancel := context.WithTimeout(ctx, etcdLongTimeout) + ctx, cancel := context.WithTimeout(db.config.Ctx, etcdLongTimeout) defer cancel() readCloser, err := db.cli.Snapshot(ctx) diff --git a/channeldb/kvdb/etcd/db_test.go b/channeldb/kvdb/etcd/db_test.go index 69342207..155d912e 100644 --- a/channeldb/kvdb/etcd/db_test.go +++ b/channeldb/kvdb/etcd/db_test.go @@ -4,6 +4,7 @@ package etcd import ( "bytes" + "context" "testing" "github.com/btcsuite/btcwallet/walletdb" @@ -42,3 +43,33 @@ func TestCopy(t *testing.T) { } assert.Equal(t, expected, f.Dump()) } + +func TestAbortContext(t *testing.T) { + t.Parallel() + + f := NewEtcdTestFixture(t) + defer f.Cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + + config := f.BackendConfig() + config.Ctx = ctx + + // Pass abort context and abort right away. + db, err := newEtcdBackend(config) + assert.NoError(t, err) + cancel() + + // Expect that the update will fail. + err = db.Update(func(tx walletdb.ReadWriteTx) error { + _, err := tx.CreateTopLevelBucket([]byte("bucket")) + assert.NoError(t, err) + + return nil + }) + + assert.Error(t, err, "context canceled") + + // No changes in the DB. + assert.Equal(t, map[string]string{}, f.Dump()) +} diff --git a/channeldb/kvdb/etcd/embed.go b/channeldb/kvdb/etcd/embed.go index f19363f3..96ea71ab 100644 --- a/channeldb/kvdb/etcd/embed.go +++ b/channeldb/kvdb/etcd/embed.go @@ -3,6 +3,7 @@ package etcd import ( + "context" "fmt" "net" "net/url" @@ -63,6 +64,7 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) { } connConfig := &BackendConfig{ + Ctx: context.Background(), Host: "http://" + peerURL, User: "user", Pass: "pass", diff --git a/channeldb/kvdb/etcd/stm.go b/channeldb/kvdb/etcd/stm.go index b55e1227..bf769d11 100644 --- a/channeldb/kvdb/etcd/stm.go +++ b/channeldb/kvdb/etcd/stm.go @@ -235,22 +235,32 @@ func runSTM(s *stm, apply func(STM) error) error { err error ) - // In a loop try to apply and commit and roll back - // if the database has changed (CommitError). +loop: + // In a loop try to apply and commit and roll back if the database has + // changed (CommitError). for { - // Abort STM if there was an application error. + select { + // Check if the STM is aborted and break the retry loop if it is. + case <-s.options.ctx.Done(): + err = fmt.Errorf("aborted") + break loop + + default: + } + + // Apply the transaction closure and abort the STM if there was an + // application error. if err = apply(s); err != nil { - break + break loop } stats, err = s.commit() - // Re-apply only upon commit error - // (meaning the database was changed). + // Re-apply only upon commit error (meaning the database was changed). if _, ok := err.(CommitError); !ok { // Anything that's not a CommitError // aborts the STM run loop. - break + break loop } // Rollback before trying to re-apply.