kvdb+etcd: extend kvdb and STM with stats

The commit itslef adds stats to the transactions such that we can see
how LND behaves with and etcd backend.
This commit is contained in:
Andras Banki-Horvath 2020-03-13 16:59:26 +01:00
parent 248a00f211
commit 859a457e48
3 changed files with 182 additions and 13 deletions

View File

@ -2,7 +2,10 @@ package etcd
import (
"context"
"fmt"
"io"
"runtime"
"sync"
"time"
"github.com/btcsuite/btcwallet/walletdb"
@ -18,9 +21,99 @@ const (
etcdLongTimeout = 30 * time.Second
)
// callerStats holds commit stats for a specific caller. Currently it only
// holds the max stat, meaning that for a particular caller the largest
// commit set is recorded.
type callerStats struct {
count int
commitStats CommitStats
}
func (s callerStats) String() string {
return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d",
s.count, s.commitStats.Retries, s.commitStats.Rset, s.commitStats.Wset)
}
// commitStatsCollector collects commit stats for commits succeeding
// and also for commits failing.
type commitStatsCollector struct {
sync.RWMutex
succ map[string]*callerStats
fail map[string]*callerStats
}
// newCommitStatsColletor creates a new commitStatsCollector instance.
func newCommitStatsColletor() *commitStatsCollector {
return &commitStatsCollector{
succ: make(map[string]*callerStats),
fail: make(map[string]*callerStats),
}
}
// PrintStats returns collected stats pretty printed into a string.
func (c *commitStatsCollector) PrintStats() string {
c.RLock()
defer c.RUnlock()
s := "\nFailure:\n"
for k, v := range c.fail {
s += fmt.Sprintf("%s\t%s\n", k, v)
}
s += "\nSuccess:\n"
for k, v := range c.succ {
s += fmt.Sprintf("%s\t%s\n", k, v)
}
return s
}
// updateStatsMap updatess commit stats map for a caller.
func updateStatMap(
caller string, stats CommitStats, m map[string]*callerStats) {
if _, ok := m[caller]; !ok {
m[caller] = &callerStats{}
}
curr := m[caller]
curr.count++
// Update only if the total commit set is greater or equal.
currTotal := curr.commitStats.Rset + curr.commitStats.Wset
if currTotal <= (stats.Rset + stats.Wset) {
curr.commitStats = stats
}
}
// callback is an STM commit stats callback passed which can be passed
// using a WithCommitStatsCallback to the STM upon construction.
func (c *commitStatsCollector) callback(succ bool, stats CommitStats) {
caller := "unknown"
// Get the caller. As this callback is called from
// the backend interface that means we need to ascend
// 4 frames in the callstack.
_, file, no, ok := runtime.Caller(4)
if ok {
caller = fmt.Sprintf("%s#%d", file, no)
}
c.Lock()
defer c.Unlock()
if succ {
updateStatMap(caller, stats, c.succ)
} else {
updateStatMap(caller, stats, c.fail)
}
}
// db holds a reference to the etcd client connection.
type db struct {
cli *clientv3.Client
config BackendConfig
cli *clientv3.Client
commitStatsCollector *commitStatsCollector
}
// Enforce db implements the walletdb.DB interface.
@ -36,6 +129,9 @@ type BackendConfig struct {
// Pass is the password for the etcd peer.
Pass string
// CollectCommitStats indicates wheter to commit commit stats.
CollectCommitStats bool
}
// newEtcdBackend returns a db object initialized with the passed backend
@ -52,12 +148,29 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
}
backend := &db{
cli: cli,
cli: cli,
config: config,
}
if config.CollectCommitStats {
backend.commitStatsCollector = newCommitStatsColletor()
}
return backend, nil
}
// getSTMOptions creats all STM options based on the backend config.
func (db *db) getSTMOptions() []STMOptionFunc {
opts := []STMOptionFunc{}
if db.config.CollectCommitStats {
opts = append(opts,
WithCommitStatsCallback(db.commitStatsCollector.callback),
)
}
return opts
}
// View opens a database read transaction and executes the function f with the
// transaction passed as a parameter. After f exits, the transaction is rolled
// back. If f errors, its error is returned, not a rollback error (if any
@ -67,7 +180,7 @@ func (db *db) View(f func(tx walletdb.ReadTx) error) error {
return f(newReadWriteTx(stm))
}
return RunSTM(db.cli, apply)
return RunSTM(db.cli, apply, db.getSTMOptions()...)
}
// Update opens a database read/write transaction and executes the function f
@ -81,17 +194,26 @@ func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error {
return f(newReadWriteTx(stm))
}
return RunSTM(db.cli, apply)
return RunSTM(db.cli, apply, db.getSTMOptions()...)
}
// PrintStats returns all collected stats pretty printed into a string.
func (db *db) PrintStats() string {
if db.commitStatsCollector != nil {
return db.commitStatsCollector.PrintStats()
}
return ""
}
// BeginReadTx opens a database read transaction.
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
return newReadWriteTx(NewSTM(db.cli)), nil
return newReadWriteTx(NewSTM(db.cli, db.getSTMOptions()...)), nil
}
// BeginReadWriteTx opens a database read+write transaction.
func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
return newReadWriteTx(NewSTM(db.cli)), nil
return newReadWriteTx(NewSTM(db.cli, db.getSTMOptions()...)), nil
}
// Copy writes a copy of the database to the provided writer. This call will

View File

@ -9,6 +9,12 @@ import (
v3 "github.com/coreos/etcd/clientv3"
)
type CommitStats struct {
Rset int
Wset int
Retries int
}
// KV stores a key/value pair.
type KV struct {
key string
@ -152,7 +158,8 @@ type stm struct {
// when an STM is created.
type STMOptions struct {
// ctx holds an externally provided abort context.
ctx context.Context
ctx context.Context
commitStatsCallback func(bool, CommitStats)
}
// STMOptionFunc is a function that updates the passed STMOptions.
@ -166,6 +173,12 @@ func WithAbortContext(ctx context.Context) STMOptionFunc {
}
}
func WithCommitStatsCallback(cb func(bool, CommitStats)) STMOptionFunc {
return func(so *STMOptions) {
so.commitStatsCallback = cb
}
}
// RunSTM runs the apply function by creating an STM using serializable snapshot
// isolation, passing it to the apply and handling commit errors and retries.
func RunSTM(cli *v3.Client, apply func(STM) error, so ...STMOptionFunc) error {
@ -209,6 +222,11 @@ func runSTM(s *stm, apply func(STM) error) error {
out := make(chan error, 1)
go func() {
var (
retries int
stats CommitStats
)
defer func() {
// Recover DatabaseError panics so
// we can return them.
@ -234,7 +252,7 @@ func runSTM(s *stm, apply func(STM) error) error {
break
}
err = s.Commit()
stats, err = s.commit()
// Re-apply only upon commit error
// (meaning the database was changed).
@ -246,6 +264,12 @@ func runSTM(s *stm, apply func(STM) error) error {
// Rollback before trying to re-apply.
s.Rollback()
retries++
}
if s.options.commitStatsCallback != nil {
stats.Retries = retries
s.options.commitStatsCallback(err == nil, stats)
}
// Return the error to the caller.
@ -674,10 +698,15 @@ func (s *stm) OnCommit(cb func()) {
s.onCommit = cb
}
// Commit builds the final transaction and tries to execute it. If commit fails
// commit builds the final transaction and tries to execute it. If commit fails
// because the keys have changed return a CommitError, otherwise return a
// DatabaseError.
func (s *stm) Commit() error {
func (s *stm) commit() (CommitStats, error) {
stats := CommitStats{
Rset: len(s.rset),
Wset: len(s.wset),
}
// Create the compare set.
cmps := append(s.rset.cmps(), s.wset.cmps(s.revision+1)...)
// Create a transaction with the optional abort context.
@ -693,7 +722,7 @@ func (s *stm) Commit() error {
txnresp, err := txn.Commit()
if err != nil {
return DatabaseError{
return stats, DatabaseError{
msg: "stm.Commit() failed",
err: err,
}
@ -706,7 +735,7 @@ func (s *stm) Commit() error {
s.onCommit()
}
return nil
return stats, nil
}
// Load prefetch before if commit failed.
@ -715,7 +744,18 @@ func (s *stm) Commit() error {
// Return CommitError indicating that the transaction
// can be retried.
return CommitError{}
return stats, CommitError{}
}
// Commit simply calls commit and the commit stats callback if set.
func (s *stm) Commit() error {
stats, err := s.commit()
if s.options.commitStatsCallback != nil {
s.options.commitStatsCallback(err == nil, stats)
}
return err
}
// Rollback resets the STM. This is useful for uncommitted transaction rollback

View File

@ -36,6 +36,13 @@ var Create = walletdb.Create
// through read or read+write transactions.
type Backend = walletdb.DB
// BackendWithStats is and interface to debug/uncover database access patterns.
type BackendWithStats interface {
Backend
PrintStats() string
}
// Open opens an existing database for the specified type. The arguments are
// specific to the database type driver. See the documentation for the database
// driver for further details.