diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index a68c0289..a4acdeef 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -214,6 +214,7 @@ jobs: matrix: unit_type: - btcd unit-cover + - unit tags=kvdb_etcd - travis-race steps: - name: git checkout diff --git a/Makefile b/Makefile index 6d729bf0..e94ca358 100644 --- a/Makefile +++ b/Makefile @@ -43,8 +43,6 @@ GOTEST := GO111MODULE=on go test GOVERSION := $(shell go version | awk '{print $$3}') GOFILES_NOVENDOR = $(shell find . -type f -name '*.go' -not -path "./vendor/*") -GOLIST := go list -deps $(PKG)/... | grep '$(PKG)'| grep -v '/vendor/' -GOLISTCOVER := $(shell go list -deps -f '{{.ImportPath}}' ./... | grep '$(PKG)' | sed -e 's/^$(ESCPKG)/./') RM := rm -f CP := cp diff --git a/channeldb/kvdb/etcd/commit_queue.go b/channeldb/kvdb/etcd/commit_queue.go new file mode 100644 index 00000000..f0384565 --- /dev/null +++ b/channeldb/kvdb/etcd/commit_queue.go @@ -0,0 +1,150 @@ +// +build kvdb_etcd + +package etcd + +import ( + "context" + "sync" +) + +// commitQueueSize is the maximum number of commits we let to queue up. All +// remaining commits will block on commitQueue.Add(). +const commitQueueSize = 100 + +// commitQueue is a simple execution queue to manage conflicts for transactions +// and thereby reduce the number of times conflicting transactions need to be +// retried. When a new transaction is added to the queue, we first upgrade the +// read/write counts in the queue's own accounting to decide whether the new +// transaction has any conflicting dependencies. If the transaction does not +// conflict with any other, then it is comitted immediately, otherwise it'll be +// queued up for later exection. +// The algorithm is described in: http://www.cs.umd.edu/~abadi/papers/vll-vldb13.pdf +type commitQueue struct { + ctx context.Context + mx sync.Mutex + readerMap map[string]int + writerMap map[string]int + + commitMutex sync.RWMutex + queue chan (func()) + wg sync.WaitGroup +} + +// NewCommitQueue creates a new commit queue, with the passed abort context. +func NewCommitQueue(ctx context.Context) *commitQueue { + q := &commitQueue{ + ctx: ctx, + readerMap: make(map[string]int), + writerMap: make(map[string]int), + queue: make(chan func(), commitQueueSize), + } + + // Start the queue consumer loop. + q.wg.Add(1) + go q.mainLoop() + + return q +} + +// Wait waits for the queue to stop (after the queue context has been canceled). +func (c *commitQueue) Wait() { + c.wg.Wait() +} + +// Add increases lock counts and queues up tx commit closure for execution. +// Transactions that don't have any conflicts are executed immediately by +// "downgrading" the count mutex to allow concurrency. +func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { + c.mx.Lock() + blocked := false + + // Mark as blocked if there's any writer changing any of the keys in + // the read set. Do not increment the reader counts yet as we'll need to + // use the original reader counts when scanning through the write set. + for key := range rset { + if c.writerMap[key] > 0 { + blocked = true + break + } + } + + // Mark as blocked if there's any writer or reader for any of the keys + // in the write set. + for key := range wset { + blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0 + + // Increment the writer count. + c.writerMap[key] += 1 + } + + // Finally we can increment the reader counts for keys in the read set. + for key := range rset { + c.readerMap[key] += 1 + } + + if blocked { + // Add the transaction to the queue if conflicts with an already + // queued one. + c.mx.Unlock() + + select { + case c.queue <- commitLoop: + case <-c.ctx.Done(): + } + } else { + // To make sure we don't add a new tx to the queue that depends + // on this "unblocked" tx, grab the commitMutex before lifting + // the mutex guarding the lock maps. + c.commitMutex.RLock() + c.mx.Unlock() + + // At this point we're safe to execute the "unblocked" tx, as + // we cannot execute blocked tx that may have been read from the + // queue until the commitMutex is held. + commitLoop() + + c.commitMutex.RUnlock() + } +} + +// Done decreases lock counts of the keys in the read/write sets. +func (c *commitQueue) Done(rset readSet, wset writeSet) { + c.mx.Lock() + defer c.mx.Unlock() + + for key := range rset { + c.readerMap[key] -= 1 + if c.readerMap[key] == 0 { + delete(c.readerMap, key) + } + } + + for key := range wset { + c.writerMap[key] -= 1 + if c.writerMap[key] == 0 { + delete(c.writerMap, key) + } + } +} + +// mainLoop executes queued transaction commits for transactions that have +// dependencies. The queue ensures that the top element doesn't conflict with +// any other transactions and therefore can be executed freely. +func (c *commitQueue) mainLoop() { + defer c.wg.Done() + + for { + select { + case top := <-c.queue: + // Execute the next blocked transaction. As it is + // the top element in the queue it means that it doesn't + // depend on any other transactions anymore. + c.commitMutex.Lock() + top() + c.commitMutex.Unlock() + + case <-c.ctx.Done(): + return + } + } +} diff --git a/channeldb/kvdb/etcd/commit_queue_test.go b/channeldb/kvdb/etcd/commit_queue_test.go new file mode 100644 index 00000000..16ff7100 --- /dev/null +++ b/channeldb/kvdb/etcd/commit_queue_test.go @@ -0,0 +1,115 @@ +// +build kvdb_etcd + +package etcd + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestCommitQueue tests that non-conflicting transactions commit concurrently, +// while conflicting transactions are queued up. +func TestCommitQueue(t *testing.T) { + // The duration of each commit. + const commitDuration = time.Millisecond * 500 + const numCommits = 4 + + var wg sync.WaitGroup + commits := make([]string, numCommits) + idx := int32(-1) + + commit := func(tag string, sleep bool) func() { + return func() { + defer wg.Done() + + // Update our log of commit order. Avoid blocking + // by preallocating the commit log and increasing + // the log index atomically. + i := atomic.AddInt32(&idx, 1) + commits[i] = tag + + if sleep { + time.Sleep(commitDuration) + } + } + } + + // Helper function to create a read set from the passed keys. + makeReadSet := func(keys []string) readSet { + rs := make(map[string]stmGet) + + for _, key := range keys { + rs[key] = stmGet{} + } + + return rs + } + + // Helper function to create a write set from the passed keys. + makeWriteSet := func(keys []string) writeSet { + ws := make(map[string]stmPut) + + for _, key := range keys { + ws[key] = stmPut{} + } + + return ws + } + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + q := NewCommitQueue(ctx) + defer q.Wait() + defer cancel() + + wg.Add(numCommits) + t1 := time.Now() + + // Tx1: reads: key1, key2, writes: key3, conflict: none + q.Add( + commit("free", true), + makeReadSet([]string{"key1", "key2"}), + makeWriteSet([]string{"key3"}), + ) + // Tx2: reads: key1, key2, writes: key3, conflict: Tx1 + q.Add( + commit("blocked1", false), + makeReadSet([]string{"key1", "key2"}), + makeWriteSet([]string{"key3"}), + ) + // Tx3: reads: key1, writes: key4, conflict: none + q.Add( + commit("free", true), + makeReadSet([]string{"key1", "key2"}), + makeWriteSet([]string{"key4"}), + ) + // Tx4: reads: key2, writes: key4 conflict: Tx3 + q.Add( + commit("blocked2", false), + makeReadSet([]string{"key2"}), + makeWriteSet([]string{"key4"}), + ) + + // Wait for all commits. + wg.Wait() + t2 := time.Now() + + // Expected total execution time: delta. + // 2 * commitDuration <= delta < 3 * commitDuration + delta := t2.Sub(t1) + require.LessOrEqual(t, int64(commitDuration*2), int64(delta)) + require.Greater(t, int64(commitDuration*3), int64(delta)) + + // Expect that the non-conflicting "free" transactions are executed + // before the blocking ones, and the blocking ones are executed in + // the order of addition. + require.Equal(t, + []string{"free", "free", "blocked1", "blocked2"}, + commits, + ) +} diff --git a/channeldb/kvdb/etcd/db.go b/channeldb/kvdb/etcd/db.go index 3bd89c29..9f52ad4e 100644 --- a/channeldb/kvdb/etcd/db.go +++ b/channeldb/kvdb/etcd/db.go @@ -16,8 +16,8 @@ import ( ) const ( - // etcdConnectionTimeout is the timeout until successful connection to the - // etcd instance. + // etcdConnectionTimeout is the timeout until successful connection to + // the etcd instance. etcdConnectionTimeout = 10 * time.Second // etcdLongTimeout is a timeout for longer taking etcd operatons. @@ -34,7 +34,8 @@ type callerStats struct { func (s callerStats) String() string { return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d", - s.count, s.commitStats.Retries, s.commitStats.Rset, s.commitStats.Wset) + s.count, s.commitStats.Retries, s.commitStats.Rset, + s.commitStats.Wset) } // commitStatsCollector collects commit stats for commits succeeding @@ -117,6 +118,7 @@ type db struct { config BackendConfig cli *clientv3.Client commitStatsCollector *commitStatsCollector + txQueue *commitQueue } // Enforce db implements the walletdb.DB interface. @@ -174,12 +176,13 @@ func newEtcdBackend(config BackendConfig) (*db, error) { } cli, err := clientv3.New(clientv3.Config{ - Context: config.Ctx, - Endpoints: []string{config.Host}, - DialTimeout: etcdConnectionTimeout, - Username: config.User, - Password: config.Pass, - TLS: tlsConfig, + Context: config.Ctx, + Endpoints: []string{config.Host}, + DialTimeout: etcdConnectionTimeout, + Username: config.User, + Password: config.Pass, + TLS: tlsConfig, + MaxCallSendMsgSize: 16384*1024 - 1, }) if err != nil { @@ -187,8 +190,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) { } backend := &db{ - cli: cli, - config: config, + cli: cli, + config: config, + txQueue: NewCommitQueue(config.Ctx), } if config.CollectCommitStats { @@ -200,7 +204,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) { // getSTMOptions creats all STM options based on the backend config. func (db *db) getSTMOptions() []STMOptionFunc { - opts := []STMOptionFunc{WithAbortContext(db.config.Ctx)} + opts := []STMOptionFunc{ + WithAbortContext(db.config.Ctx), + } if db.config.CollectCommitStats { opts = append(opts, @@ -220,7 +226,7 @@ func (db *db) View(f func(tx walletdb.ReadTx) error) error { return f(newReadWriteTx(stm, db.config.Prefix)) } - return RunSTM(db.cli, apply, db.getSTMOptions()...) + return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...) } // Update opens a database read/write transaction and executes the function f @@ -234,7 +240,7 @@ func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error { return f(newReadWriteTx(stm, db.config.Prefix)) } - return RunSTM(db.cli, apply, db.getSTMOptions()...) + return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...) } // PrintStats returns all collected stats pretty printed into a string. @@ -246,18 +252,18 @@ func (db *db) PrintStats() string { return "" } -// BeginReadTx opens a database read transaction. +// BeginReadWriteTx opens a database read+write transaction. func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { return newReadWriteTx( - NewSTM(db.cli, db.getSTMOptions()...), + NewSTM(db.cli, db.txQueue, db.getSTMOptions()...), db.config.Prefix, ), nil } -// BeginReadWriteTx opens a database read+write transaction. +// BeginReadTx opens a database read transaction. func (db *db) BeginReadTx() (walletdb.ReadTx, error) { return newReadWriteTx( - NewSTM(db.cli, db.getSTMOptions()...), + NewSTM(db.cli, db.txQueue, db.getSTMOptions()...), db.config.Prefix, ), nil } diff --git a/channeldb/kvdb/etcd/db_test.go b/channeldb/kvdb/etcd/db_test.go index c4332db8..8a5d623e 100644 --- a/channeldb/kvdb/etcd/db_test.go +++ b/channeldb/kvdb/etcd/db_test.go @@ -63,7 +63,7 @@ func TestAbortContext(t *testing.T) { // Expect that the update will fail. err = db.Update(func(tx walletdb.ReadWriteTx) error { _, err := tx.CreateTopLevelBucket([]byte("bucket")) - require.NoError(t, err) + require.Error(t, err, "context canceled") return nil }) diff --git a/channeldb/kvdb/etcd/embed.go b/channeldb/kvdb/etcd/embed.go index 96ea71ab..d99b901e 100644 --- a/channeldb/kvdb/etcd/embed.go +++ b/channeldb/kvdb/etcd/embed.go @@ -42,7 +42,8 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) { cfg.Dir = path // To ensure that we can submit large transactions. - cfg.MaxTxnOps = 1000 + cfg.MaxTxnOps = 8192 + cfg.MaxRequestBytes = 16384 * 1024 // Listen on random free ports. clientURL := fmt.Sprintf("127.0.0.1:%d", getFreePort()) @@ -63,8 +64,10 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) { fmt.Errorf("etcd failed to start after: %v", readyTimeout) } + ctx, cancel := context.WithCancel(context.Background()) + connConfig := &BackendConfig{ - Ctx: context.Background(), + Ctx: ctx, Host: "http://" + peerURL, User: "user", Pass: "pass", @@ -72,6 +75,7 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) { } return connConfig, func() { + cancel() etcd.Close() }, nil } diff --git a/channeldb/kvdb/etcd/stm.go b/channeldb/kvdb/etcd/stm.go index 14bb9ca9..59ac1f45 100644 --- a/channeldb/kvdb/etcd/stm.go +++ b/channeldb/kvdb/etcd/stm.go @@ -134,6 +134,10 @@ type stm struct { // execute in the STM run loop. manual bool + // txQueue is lightweight contention manager, which is used to detect + // transaction conflicts and reduce retries. + txQueue *commitQueue + // options stores optional settings passed by the user. options *STMOptions @@ -183,18 +187,22 @@ func WithCommitStatsCallback(cb func(bool, CommitStats)) STMOptionFunc { // RunSTM runs the apply function by creating an STM using serializable snapshot // isolation, passing it to the apply and handling commit errors and retries. -func RunSTM(cli *v3.Client, apply func(STM) error, so ...STMOptionFunc) error { - return runSTM(makeSTM(cli, false, so...), apply) +func RunSTM(cli *v3.Client, apply func(STM) error, txQueue *commitQueue, + so ...STMOptionFunc) error { + + return runSTM(makeSTM(cli, false, txQueue, so...), apply) } // NewSTM creates a new STM instance, using serializable snapshot isolation. -func NewSTM(cli *v3.Client, so ...STMOptionFunc) STM { - return makeSTM(cli, true, so...) +func NewSTM(cli *v3.Client, txQueue *commitQueue, so ...STMOptionFunc) STM { + return makeSTM(cli, true, txQueue, so...) } // makeSTM is the actual constructor of the stm. It first apply all passed // options then creates the stm object and resets it before returning. -func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm { +func makeSTM(cli *v3.Client, manual bool, txQueue *commitQueue, + so ...STMOptionFunc) *stm { + opts := &STMOptions{ ctx: cli.Ctx(), } @@ -207,6 +215,7 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm { s := &stm{ client: cli, manual: manual, + txQueue: txQueue, options: opts, prefetch: make(map[string]stmGet), } @@ -222,50 +231,72 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm { // CommitError which is used to indicate a necessary retry. func runSTM(s *stm, apply func(STM) error) error { var ( - retries int - stats CommitStats - err error + retries int + stats CommitStats + executeErr error ) -loop: - // In a loop try to apply and commit and roll back if the database has - // changed (CommitError). - for { - select { - // Check if the STM is aborted and break the retry loop if it is. - case <-s.options.ctx.Done(): - err = fmt.Errorf("aborted") - break loop + done := make(chan struct{}) - default: + execute := func() { + defer close(done) + + for { + select { + // Check if the STM is aborted and break the retry loop + // if it is. + case <-s.options.ctx.Done(): + executeErr = fmt.Errorf("aborted") + return + + default: + } + + stats, executeErr = s.commit() + + // Re-apply only upon commit error (meaning the + // keys were changed). + if _, ok := executeErr.(CommitError); !ok { + // Anything that's not a CommitError + // aborts the transaction. + return + } + + // Rollback before trying to re-apply. + s.Rollback() + retries++ + + // Re-apply the transaction closure. + if executeErr = apply(s); executeErr != nil { + return + } } - // Apply the transaction closure and abort the STM if there was - // an application error. - if err = apply(s); err != nil { - break loop - } - - stats, err = s.commit() - - // Retry the apply closure only upon commit error (meaning the - // database was changed). - if _, ok := err.(CommitError); !ok { - // Anything that's not a CommitError aborts the STM - // run loop. - break loop - } - - // Rollback before trying to re-apply. - s.Rollback() - retries++ } + // Run the tx closure to construct the read and write sets. + // Also we expect that if there are no conflicting transactions + // in the queue, then we only run apply once. + if preApplyErr := apply(s); preApplyErr != nil { + return preApplyErr + } + + // Queue up the transaction for execution. + s.txQueue.Add(execute, s.rset, s.wset) + + // Wait for the transaction to execute, or break if aborted. + select { + case <-done: + case <-s.options.ctx.Done(): + } + + s.txQueue.Done(s.rset, s.wset) + if s.options.commitStatsCallback != nil { stats.Retries = retries - s.options.commitStatsCallback(err == nil, stats) + s.options.commitStatsCallback(executeErr == nil, stats) } - return err + return executeErr } // add inserts a txn response to the read set. This is useful when the txn diff --git a/channeldb/kvdb/etcd/stm_test.go b/channeldb/kvdb/etcd/stm_test.go index 6beffc28..cde4abf3 100644 --- a/channeldb/kvdb/etcd/stm_test.go +++ b/channeldb/kvdb/etcd/stm_test.go @@ -21,7 +21,11 @@ func TestPutToEmpty(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() db, err := newEtcdBackend(f.BackendConfig()) require.NoError(t, err) @@ -31,7 +35,7 @@ func TestPutToEmpty(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, "abc", f.Get("123")) @@ -41,7 +45,11 @@ func TestGetPutDel(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() testKeyValues := []KV{ {"a", "1"}, @@ -105,7 +113,7 @@ func TestGetPutDel(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, "1", f.Get("a")) @@ -120,7 +128,11 @@ func TestFirstLastNextPrev(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() testKeyValues := []KV{ {"kb", "1"}, @@ -255,7 +267,7 @@ func TestFirstLastNextPrev(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, "0", f.Get("ka")) @@ -271,7 +283,11 @@ func TestCommitError(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() db, err := newEtcdBackend(f.BackendConfig()) require.NoError(t, err) @@ -301,7 +317,7 @@ func TestCommitError(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, 2, cnt) @@ -312,7 +328,11 @@ func TestManualTxError(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() db, err := newEtcdBackend(f.BackendConfig()) require.NoError(t, err) @@ -320,7 +340,7 @@ func TestManualTxError(t *testing.T) { // Preset DB state. f.Put("123", "xyz") - stm := NewSTM(db.cli) + stm := NewSTM(db.cli, txQueue) val, err := stm.Get("123") require.NoError(t, err) diff --git a/make/testing_flags.mk b/make/testing_flags.mk index 80637a23..1443ab5b 100644 --- a/make/testing_flags.mk +++ b/make/testing_flags.mk @@ -28,6 +28,10 @@ ifneq ($(icase),) TEST_FLAGS += -test.run=TestLightningNetworkDaemon/$(icase) endif +ifneq ($(tags),) +DEV_TAGS += ${tags} +endif + # Define the log tags that will be applied only when running unit tests. If none # are provided, we default to "nolog" which will be silent. ifneq ($(log),) @@ -44,6 +48,9 @@ else TEST_FLAGS += -test.timeout=40m endif +GOLIST := go list -tags="$(DEV_TAGS)" -deps $(PKG)/... | grep '$(PKG)'| grep -v '/vendor/' +GOLISTCOVER := $(shell go list -tags="$(DEV_TAGS)" -deps -f '{{.ImportPath}}' ./... | grep '$(PKG)' | sed -e 's/^$(ESCPKG)/./') + # UNIT_TARGTED is undefined iff a specific package and/or unit test case is # not being targeted. UNIT_TARGETED ?= no