Merge pull request #4318 from bhandras/etcd_fixes

lnd+kvdb+etcd: remove retry goroutine from the STM and integrate abort context
This commit is contained in:
András Bánki-Horváth 2020-06-03 19:21:01 +02:00 committed by GitHub
commit 595bb7c1ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 102 additions and 74 deletions

@ -124,6 +124,9 @@ var _ walletdb.DB = (*db)(nil)
// BackendConfig holds and etcd backend config and connection parameters. // BackendConfig holds and etcd backend config and connection parameters.
type BackendConfig struct { type BackendConfig struct {
// Ctx is the context we use to cancel operations upon exit.
Ctx context.Context
// Host holds the peer url of the etcd instance. // Host holds the peer url of the etcd instance.
Host string Host string
@ -155,6 +158,10 @@ type BackendConfig struct {
// newEtcdBackend returns a db object initialized with the passed backend // newEtcdBackend returns a db object initialized with the passed backend
// config. If etcd connection cannot be estabished, then returns error. // config. If etcd connection cannot be estabished, then returns error.
func newEtcdBackend(config BackendConfig) (*db, error) { func newEtcdBackend(config BackendConfig) (*db, error) {
if config.Ctx == nil {
config.Ctx = context.Background()
}
tlsInfo := transport.TLSInfo{ tlsInfo := transport.TLSInfo{
CertFile: config.CertFile, CertFile: config.CertFile,
KeyFile: config.KeyFile, KeyFile: config.KeyFile,
@ -167,6 +174,7 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
} }
cli, err := clientv3.New(clientv3.Config{ cli, err := clientv3.New(clientv3.Config{
Context: config.Ctx,
Endpoints: []string{config.Host}, Endpoints: []string{config.Host},
DialTimeout: etcdConnectionTimeout, DialTimeout: etcdConnectionTimeout,
Username: config.User, Username: config.User,
@ -192,7 +200,8 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
// getSTMOptions creats all STM options based on the backend config. // getSTMOptions creats all STM options based on the backend config.
func (db *db) getSTMOptions() []STMOptionFunc { func (db *db) getSTMOptions() []STMOptionFunc {
opts := []STMOptionFunc{} opts := []STMOptionFunc{WithAbortContext(db.config.Ctx)}
if db.config.CollectCommitStats { if db.config.CollectCommitStats {
opts = append(opts, opts = append(opts,
WithCommitStatsCallback(db.commitStatsCollector.callback), WithCommitStatsCallback(db.commitStatsCollector.callback),
@ -257,9 +266,7 @@ func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
// start a read-only transaction to perform all operations. // start a read-only transaction to perform all operations.
// This function is part of the walletdb.Db interface implementation. // This function is part of the walletdb.Db interface implementation.
func (db *db) Copy(w io.Writer) error { func (db *db) Copy(w io.Writer) error {
ctx := context.Background() ctx, cancel := context.WithTimeout(db.config.Ctx, etcdLongTimeout)
ctx, cancel := context.WithTimeout(ctx, etcdLongTimeout)
defer cancel() defer cancel()
readCloser, err := db.cli.Snapshot(ctx) readCloser, err := db.cli.Snapshot(ctx)

@ -4,6 +4,7 @@ package etcd
import ( import (
"bytes" "bytes"
"context"
"testing" "testing"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
@ -42,3 +43,33 @@ func TestCopy(t *testing.T) {
} }
assert.Equal(t, expected, f.Dump()) assert.Equal(t, expected, f.Dump())
} }
func TestAbortContext(t *testing.T) {
t.Parallel()
f := NewEtcdTestFixture(t)
defer f.Cleanup()
ctx, cancel := context.WithCancel(context.Background())
config := f.BackendConfig()
config.Ctx = ctx
// Pass abort context and abort right away.
db, err := newEtcdBackend(config)
assert.NoError(t, err)
cancel()
// Expect that the update will fail.
err = db.Update(func(tx walletdb.ReadWriteTx) error {
_, err := tx.CreateTopLevelBucket([]byte("bucket"))
assert.NoError(t, err)
return nil
})
assert.Error(t, err, "context canceled")
// No changes in the DB.
assert.Equal(t, map[string]string{}, f.Dump())
}

@ -3,6 +3,7 @@
package etcd package etcd
import ( import (
"context"
"fmt" "fmt"
"net" "net"
"net/url" "net/url"
@ -63,6 +64,7 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) {
} }
connConfig := &BackendConfig{ connConfig := &BackendConfig{
Ctx: context.Background(),
Host: "http://" + peerURL, Host: "http://" + peerURL,
User: "user", User: "user",
Pass: "pass", Pass: "pass",

@ -229,47 +229,38 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm {
// errors and handling commit. The loop will quit on every error except // errors and handling commit. The loop will quit on every error except
// CommitError which is used to indicate a necessary retry. // CommitError which is used to indicate a necessary retry.
func runSTM(s *stm, apply func(STM) error) error { func runSTM(s *stm, apply func(STM) error) error {
out := make(chan error, 1)
go func() {
var ( var (
retries int retries int
stats CommitStats stats CommitStats
err error
) )
defer func() { loop:
// Recover DatabaseError panics so // In a loop try to apply and commit and roll back if the database has
// we can return them. // changed (CommitError).
if r := recover(); r != nil {
e, ok := r.(DatabaseError)
if !ok {
// Unknown panic.
panic(r)
}
// Return the error.
out <- e.Unwrap()
}
}()
var err error
// In a loop try to apply and commit and roll back
// if the database has changed (CommitError).
for { for {
// Abort STM if there was an application error. select {
// Check if the STM is aborted and break the retry loop if it is.
case <-s.options.ctx.Done():
err = fmt.Errorf("aborted")
break loop
default:
}
// Apply the transaction closure and abort the STM if there was an
// application error.
if err = apply(s); err != nil { if err = apply(s); err != nil {
break break loop
} }
stats, err = s.commit() stats, err = s.commit()
// Re-apply only upon commit error // Re-apply only upon commit error (meaning the database was changed).
// (meaning the database was changed).
if _, ok := err.(CommitError); !ok { if _, ok := err.(CommitError); !ok {
// Anything that's not a CommitError // Anything that's not a CommitError
// aborts the STM run loop. // aborts the STM run loop.
break break loop
} }
// Rollback before trying to re-apply. // Rollback before trying to re-apply.
@ -282,11 +273,7 @@ func runSTM(s *stm, apply func(STM) error) error {
s.options.commitStatsCallback(err == nil, stats) s.options.commitStatsCallback(err == nil, stats)
} }
// Return the error to the caller. return err
out <- err
}()
return <-out
} }
// add inserts a txn response to the read set. This is useful when the txn // add inserts a txn response to the read set. This is useful when the txn
@ -367,18 +354,10 @@ func (s *stm) fetch(key string, opts ...v3.OpOption) ([]KV, error) {
s.options.ctx, key, append(opts, s.getOpts...)..., s.options.ctx, key, append(opts, s.getOpts...)...,
) )
if err != nil { if err != nil {
dbErr := DatabaseError{ return nil, DatabaseError{
msg: "stm.fetch() failed", msg: "stm.fetch() failed",
err: err, err: err,
} }
// Do not panic when executing a manual transaction.
if s.manual {
return nil, dbErr
}
// Panic when executing inside the STM runloop.
panic(dbErr)
} }
// Set revison and serializable options upon first fetch // Set revison and serializable options upon first fetch

@ -3,6 +3,8 @@
package kvdb package kvdb
import ( import (
"context"
"github.com/lightningnetwork/lnd/channeldb/kvdb/etcd" "github.com/lightningnetwork/lnd/channeldb/kvdb/etcd"
) )
@ -12,10 +14,13 @@ const TestBackend = EtcdBackendName
// GetEtcdBackend returns an etcd backend configured according to the // GetEtcdBackend returns an etcd backend configured according to the
// passed etcdConfig. // passed etcdConfig.
func GetEtcdBackend(prefix string, etcdConfig *EtcdConfig) (Backend, error) { func GetEtcdBackend(ctx context.Context, prefix string,
etcdConfig *EtcdConfig) (Backend, error) {
// Config translation is needed here in order to keep the // Config translation is needed here in order to keep the
// etcd package fully independent from the rest of the source tree. // etcd package fully independent from the rest of the source tree.
backendConfig := etcd.BackendConfig{ backendConfig := etcd.BackendConfig{
Ctx: ctx,
Host: etcdConfig.Host, Host: etcdConfig.Host,
User: etcdConfig.User, User: etcdConfig.User,
Pass: etcdConfig.Pass, Pass: etcdConfig.Pass,

@ -3,6 +3,7 @@
package kvdb package kvdb
import ( import (
"context"
"fmt" "fmt"
) )
@ -13,7 +14,9 @@ const TestBackend = BoltBackendName
var errEtcdNotAvailable = fmt.Errorf("etcd backend not available") var errEtcdNotAvailable = fmt.Errorf("etcd backend not available")
// GetEtcdBackend is a stub returning nil and errEtcdNotAvailable error. // GetEtcdBackend is a stub returning nil and errEtcdNotAvailable error.
func GetEtcdBackend(prefix string, etcdConfig *EtcdConfig) (Backend, error) { func GetEtcdBackend(ctx context.Context, prefix string,
etcdConfig *EtcdConfig) (Backend, error) {
return nil, errEtcdNotAvailable return nil, errEtcdNotAvailable
} }

@ -1,6 +1,7 @@
package lncfg package lncfg
import ( import (
"context"
"fmt" "fmt"
"github.com/lightningnetwork/lnd/channeldb/kvdb" "github.com/lightningnetwork/lnd/channeldb/kvdb"
@ -50,12 +51,12 @@ func (db *DB) Validate() error {
} }
// GetBackend returns a kvdb.Backend as set in the DB config. // GetBackend returns a kvdb.Backend as set in the DB config.
func (db *DB) GetBackend(dbPath string, networkName string) ( func (db *DB) GetBackend(ctx context.Context, dbPath string,
kvdb.Backend, error) { networkName string) (kvdb.Backend, error) {
if db.Backend == etcdBackend { if db.Backend == etcdBackend {
// Prefix will separate key/values in the db. // Prefix will separate key/values in the db.
return kvdb.GetEtcdBackend(networkName, db.Etcd) return kvdb.GetEtcdBackend(ctx, networkName, db.Etcd)
} }
return kvdb.GetBoltBackend(dbPath, dbName, db.Bolt.NoFreeListSync) return kvdb.GetBoltBackend(dbPath, dbName, db.Bolt.NoFreeListSync)

10
lnd.go

@ -251,7 +251,11 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
"minutes...") "minutes...")
startOpenTime := time.Now() startOpenTime := time.Now()
chanDbBackend, err := cfg.DB.GetBackend( ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
chanDbBackend, err := cfg.DB.GetBackend(ctx,
cfg.localDatabaseDir(), cfg.networkName(), cfg.localDatabaseDir(), cfg.networkName(),
) )
if err != nil { if err != nil {
@ -283,10 +287,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, shutdownChan <-chan struct{}) error {
ltndLog.Infof("Database now open (time_to_open=%v)!", openTime) ltndLog.Infof("Database now open (time_to_open=%v)!", openTime)
// Only process macaroons if --no-macaroons isn't set. // Only process macaroons if --no-macaroons isn't set.
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tlsCfg, restCreds, restProxyDest, err := getTLSConfig(cfg) tlsCfg, restCreds, restProxyDest, err := getTLSConfig(cfg)
if err != nil { if err != nil {
err := fmt.Errorf("unable to load TLS credentials: %v", err) err := fmt.Errorf("unable to load TLS credentials: %v", err)