diff --git a/channeldb/kvdb/etcd/db.go b/channeldb/kvdb/etcd/db.go index a5d844b0..3b961273 100644 --- a/channeldb/kvdb/etcd/db.go +++ b/channeldb/kvdb/etcd/db.go @@ -2,7 +2,10 @@ package etcd import ( "context" + "fmt" "io" + "runtime" + "sync" "time" "github.com/btcsuite/btcwallet/walletdb" @@ -18,9 +21,99 @@ const ( etcdLongTimeout = 30 * time.Second ) +// callerStats holds commit stats for a specific caller. Currently it only +// holds the max stat, meaning that for a particular caller the largest +// commit set is recorded. +type callerStats struct { + count int + commitStats CommitStats +} + +func (s callerStats) String() string { + return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d", + s.count, s.commitStats.Retries, s.commitStats.Rset, s.commitStats.Wset) +} + +// commitStatsCollector collects commit stats for commits succeeding +// and also for commits failing. +type commitStatsCollector struct { + sync.RWMutex + succ map[string]*callerStats + fail map[string]*callerStats +} + +// newCommitStatsColletor creates a new commitStatsCollector instance. +func newCommitStatsColletor() *commitStatsCollector { + return &commitStatsCollector{ + succ: make(map[string]*callerStats), + fail: make(map[string]*callerStats), + } +} + +// PrintStats returns collected stats pretty printed into a string. +func (c *commitStatsCollector) PrintStats() string { + c.RLock() + defer c.RUnlock() + + s := "\nFailure:\n" + for k, v := range c.fail { + s += fmt.Sprintf("%s\t%s\n", k, v) + } + + s += "\nSuccess:\n" + for k, v := range c.succ { + s += fmt.Sprintf("%s\t%s\n", k, v) + } + + return s +} + +// updateStatsMap updatess commit stats map for a caller. +func updateStatMap( + caller string, stats CommitStats, m map[string]*callerStats) { + + if _, ok := m[caller]; !ok { + m[caller] = &callerStats{} + } + + curr := m[caller] + curr.count++ + + // Update only if the total commit set is greater or equal. + currTotal := curr.commitStats.Rset + curr.commitStats.Wset + if currTotal <= (stats.Rset + stats.Wset) { + curr.commitStats = stats + } +} + +// callback is an STM commit stats callback passed which can be passed +// using a WithCommitStatsCallback to the STM upon construction. +func (c *commitStatsCollector) callback(succ bool, stats CommitStats) { + caller := "unknown" + + // Get the caller. As this callback is called from + // the backend interface that means we need to ascend + // 4 frames in the callstack. + _, file, no, ok := runtime.Caller(4) + if ok { + caller = fmt.Sprintf("%s#%d", file, no) + } + + c.Lock() + defer c.Unlock() + + if succ { + updateStatMap(caller, stats, c.succ) + } else { + updateStatMap(caller, stats, c.fail) + } +} + // db holds a reference to the etcd client connection. type db struct { - cli *clientv3.Client + config BackendConfig + cli *clientv3.Client + commitStatsCollector *commitStatsCollector } // Enforce db implements the walletdb.DB interface. @@ -36,6 +129,9 @@ type BackendConfig struct { // Pass is the password for the etcd peer. Pass string + + // CollectCommitStats indicates wheter to commit commit stats. + CollectCommitStats bool } // newEtcdBackend returns a db object initialized with the passed backend @@ -52,12 +148,29 @@ func newEtcdBackend(config BackendConfig) (*db, error) { } backend := &db{ - cli: cli, + cli: cli, + config: config, + } + + if config.CollectCommitStats { + backend.commitStatsCollector = newCommitStatsColletor() } return backend, nil } +// getSTMOptions creats all STM options based on the backend config. +func (db *db) getSTMOptions() []STMOptionFunc { + opts := []STMOptionFunc{} + if db.config.CollectCommitStats { + opts = append(opts, + WithCommitStatsCallback(db.commitStatsCollector.callback), + ) + } + + return opts +} + // View opens a database read transaction and executes the function f with the // transaction passed as a parameter. After f exits, the transaction is rolled // back. If f errors, its error is returned, not a rollback error (if any @@ -67,7 +180,7 @@ func (db *db) View(f func(tx walletdb.ReadTx) error) error { return f(newReadWriteTx(stm)) } - return RunSTM(db.cli, apply) + return RunSTM(db.cli, apply, db.getSTMOptions()...) } // Update opens a database read/write transaction and executes the function f @@ -81,17 +194,26 @@ func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error { return f(newReadWriteTx(stm)) } - return RunSTM(db.cli, apply) + return RunSTM(db.cli, apply, db.getSTMOptions()...) +} + +// PrintStats returns all collected stats pretty printed into a string. +func (db *db) PrintStats() string { + if db.commitStatsCollector != nil { + return db.commitStatsCollector.PrintStats() + } + + return "" } // BeginReadTx opens a database read transaction. func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { - return newReadWriteTx(NewSTM(db.cli)), nil + return newReadWriteTx(NewSTM(db.cli, db.getSTMOptions()...)), nil } // BeginReadWriteTx opens a database read+write transaction. func (db *db) BeginReadTx() (walletdb.ReadTx, error) { - return newReadWriteTx(NewSTM(db.cli)), nil + return newReadWriteTx(NewSTM(db.cli, db.getSTMOptions()...)), nil } // Copy writes a copy of the database to the provided writer. This call will diff --git a/channeldb/kvdb/etcd/stm.go b/channeldb/kvdb/etcd/stm.go index 8d8c96e0..124f3616 100644 --- a/channeldb/kvdb/etcd/stm.go +++ b/channeldb/kvdb/etcd/stm.go @@ -9,6 +9,12 @@ import ( v3 "github.com/coreos/etcd/clientv3" ) +type CommitStats struct { + Rset int + Wset int + Retries int +} + // KV stores a key/value pair. type KV struct { key string @@ -152,7 +158,8 @@ type stm struct { // when an STM is created. type STMOptions struct { // ctx holds an externally provided abort context. - ctx context.Context + ctx context.Context + commitStatsCallback func(bool, CommitStats) } // STMOptionFunc is a function that updates the passed STMOptions. @@ -166,6 +173,12 @@ func WithAbortContext(ctx context.Context) STMOptionFunc { } } +func WithCommitStatsCallback(cb func(bool, CommitStats)) STMOptionFunc { + return func(so *STMOptions) { + so.commitStatsCallback = cb + } +} + // RunSTM runs the apply function by creating an STM using serializable snapshot // isolation, passing it to the apply and handling commit errors and retries. func RunSTM(cli *v3.Client, apply func(STM) error, so ...STMOptionFunc) error { @@ -209,6 +222,11 @@ func runSTM(s *stm, apply func(STM) error) error { out := make(chan error, 1) go func() { + var ( + retries int + stats CommitStats + ) + defer func() { // Recover DatabaseError panics so // we can return them. @@ -234,7 +252,7 @@ func runSTM(s *stm, apply func(STM) error) error { break } - err = s.Commit() + stats, err = s.commit() // Re-apply only upon commit error // (meaning the database was changed). @@ -246,6 +264,12 @@ func runSTM(s *stm, apply func(STM) error) error { // Rollback before trying to re-apply. s.Rollback() + retries++ + } + + if s.options.commitStatsCallback != nil { + stats.Retries = retries + s.options.commitStatsCallback(err == nil, stats) } // Return the error to the caller. @@ -674,10 +698,15 @@ func (s *stm) OnCommit(cb func()) { s.onCommit = cb } -// Commit builds the final transaction and tries to execute it. If commit fails +// commit builds the final transaction and tries to execute it. If commit fails // because the keys have changed return a CommitError, otherwise return a // DatabaseError. -func (s *stm) Commit() error { +func (s *stm) commit() (CommitStats, error) { + stats := CommitStats{ + Rset: len(s.rset), + Wset: len(s.wset), + } + // Create the compare set. cmps := append(s.rset.cmps(), s.wset.cmps(s.revision+1)...) // Create a transaction with the optional abort context. @@ -693,7 +722,7 @@ func (s *stm) Commit() error { txnresp, err := txn.Commit() if err != nil { - return DatabaseError{ + return stats, DatabaseError{ msg: "stm.Commit() failed", err: err, } @@ -706,7 +735,7 @@ func (s *stm) Commit() error { s.onCommit() } - return nil + return stats, nil } // Load prefetch before if commit failed. @@ -715,7 +744,18 @@ func (s *stm) Commit() error { // Return CommitError indicating that the transaction // can be retried. - return CommitError{} + return stats, CommitError{} +} + +// Commit simply calls commit and the commit stats callback if set. +func (s *stm) Commit() error { + stats, err := s.commit() + + if s.options.commitStatsCallback != nil { + s.options.commitStatsCallback(err == nil, stats) + } + + return err } // Rollback resets the STM. This is useful for uncommitted transaction rollback diff --git a/channeldb/kvdb/interface.go b/channeldb/kvdb/interface.go index ec426410..227cb006 100644 --- a/channeldb/kvdb/interface.go +++ b/channeldb/kvdb/interface.go @@ -36,6 +36,13 @@ var Create = walletdb.Create // through read or read+write transactions. type Backend = walletdb.DB +// BackendWithStats is and interface to debug/uncover database access patterns. +type BackendWithStats interface { + Backend + + PrintStats() string +} + // Open opens an existing database for the specified type. The arguments are // specific to the database type driver. See the documentation for the database // driver for further details.