From 281c0b9d9205f16e115da49e856401da5644d943 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Mon, 10 Aug 2020 16:46:54 +0200 Subject: [PATCH] etcd: add commit queue to effectively reduce transaction retries This commit adds commitQueue which is a lightweight contention manager for STM transactions. The queue attempts to queue up transactions that conflict for sequential execution, while leaving all "unblocked" transactons to run freely in parallel. --- channeldb/kvdb/etcd/commit_queue.go | 150 +++++++++++++++++++++++ channeldb/kvdb/etcd/commit_queue_test.go | 115 +++++++++++++++++ 2 files changed, 265 insertions(+) create mode 100644 channeldb/kvdb/etcd/commit_queue.go create mode 100644 channeldb/kvdb/etcd/commit_queue_test.go diff --git a/channeldb/kvdb/etcd/commit_queue.go b/channeldb/kvdb/etcd/commit_queue.go new file mode 100644 index 00000000..f0384565 --- /dev/null +++ b/channeldb/kvdb/etcd/commit_queue.go @@ -0,0 +1,150 @@ +// +build kvdb_etcd + +package etcd + +import ( + "context" + "sync" +) + +// commitQueueSize is the maximum number of commits we let to queue up. All +// remaining commits will block on commitQueue.Add(). +const commitQueueSize = 100 + +// commitQueue is a simple execution queue to manage conflicts for transactions +// and thereby reduce the number of times conflicting transactions need to be +// retried. When a new transaction is added to the queue, we first upgrade the +// read/write counts in the queue's own accounting to decide whether the new +// transaction has any conflicting dependencies. If the transaction does not +// conflict with any other, then it is comitted immediately, otherwise it'll be +// queued up for later exection. +// The algorithm is described in: http://www.cs.umd.edu/~abadi/papers/vll-vldb13.pdf +type commitQueue struct { + ctx context.Context + mx sync.Mutex + readerMap map[string]int + writerMap map[string]int + + commitMutex sync.RWMutex + queue chan (func()) + wg sync.WaitGroup +} + +// NewCommitQueue creates a new commit queue, with the passed abort context. +func NewCommitQueue(ctx context.Context) *commitQueue { + q := &commitQueue{ + ctx: ctx, + readerMap: make(map[string]int), + writerMap: make(map[string]int), + queue: make(chan func(), commitQueueSize), + } + + // Start the queue consumer loop. + q.wg.Add(1) + go q.mainLoop() + + return q +} + +// Wait waits for the queue to stop (after the queue context has been canceled). +func (c *commitQueue) Wait() { + c.wg.Wait() +} + +// Add increases lock counts and queues up tx commit closure for execution. +// Transactions that don't have any conflicts are executed immediately by +// "downgrading" the count mutex to allow concurrency. +func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) { + c.mx.Lock() + blocked := false + + // Mark as blocked if there's any writer changing any of the keys in + // the read set. Do not increment the reader counts yet as we'll need to + // use the original reader counts when scanning through the write set. + for key := range rset { + if c.writerMap[key] > 0 { + blocked = true + break + } + } + + // Mark as blocked if there's any writer or reader for any of the keys + // in the write set. + for key := range wset { + blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0 + + // Increment the writer count. + c.writerMap[key] += 1 + } + + // Finally we can increment the reader counts for keys in the read set. + for key := range rset { + c.readerMap[key] += 1 + } + + if blocked { + // Add the transaction to the queue if conflicts with an already + // queued one. + c.mx.Unlock() + + select { + case c.queue <- commitLoop: + case <-c.ctx.Done(): + } + } else { + // To make sure we don't add a new tx to the queue that depends + // on this "unblocked" tx, grab the commitMutex before lifting + // the mutex guarding the lock maps. + c.commitMutex.RLock() + c.mx.Unlock() + + // At this point we're safe to execute the "unblocked" tx, as + // we cannot execute blocked tx that may have been read from the + // queue until the commitMutex is held. + commitLoop() + + c.commitMutex.RUnlock() + } +} + +// Done decreases lock counts of the keys in the read/write sets. +func (c *commitQueue) Done(rset readSet, wset writeSet) { + c.mx.Lock() + defer c.mx.Unlock() + + for key := range rset { + c.readerMap[key] -= 1 + if c.readerMap[key] == 0 { + delete(c.readerMap, key) + } + } + + for key := range wset { + c.writerMap[key] -= 1 + if c.writerMap[key] == 0 { + delete(c.writerMap, key) + } + } +} + +// mainLoop executes queued transaction commits for transactions that have +// dependencies. The queue ensures that the top element doesn't conflict with +// any other transactions and therefore can be executed freely. +func (c *commitQueue) mainLoop() { + defer c.wg.Done() + + for { + select { + case top := <-c.queue: + // Execute the next blocked transaction. As it is + // the top element in the queue it means that it doesn't + // depend on any other transactions anymore. + c.commitMutex.Lock() + top() + c.commitMutex.Unlock() + + case <-c.ctx.Done(): + return + } + } +} diff --git a/channeldb/kvdb/etcd/commit_queue_test.go b/channeldb/kvdb/etcd/commit_queue_test.go new file mode 100644 index 00000000..16ff7100 --- /dev/null +++ b/channeldb/kvdb/etcd/commit_queue_test.go @@ -0,0 +1,115 @@ +// +build kvdb_etcd + +package etcd + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestCommitQueue tests that non-conflicting transactions commit concurrently, +// while conflicting transactions are queued up. +func TestCommitQueue(t *testing.T) { + // The duration of each commit. + const commitDuration = time.Millisecond * 500 + const numCommits = 4 + + var wg sync.WaitGroup + commits := make([]string, numCommits) + idx := int32(-1) + + commit := func(tag string, sleep bool) func() { + return func() { + defer wg.Done() + + // Update our log of commit order. Avoid blocking + // by preallocating the commit log and increasing + // the log index atomically. + i := atomic.AddInt32(&idx, 1) + commits[i] = tag + + if sleep { + time.Sleep(commitDuration) + } + } + } + + // Helper function to create a read set from the passed keys. + makeReadSet := func(keys []string) readSet { + rs := make(map[string]stmGet) + + for _, key := range keys { + rs[key] = stmGet{} + } + + return rs + } + + // Helper function to create a write set from the passed keys. + makeWriteSet := func(keys []string) writeSet { + ws := make(map[string]stmPut) + + for _, key := range keys { + ws[key] = stmPut{} + } + + return ws + } + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + q := NewCommitQueue(ctx) + defer q.Wait() + defer cancel() + + wg.Add(numCommits) + t1 := time.Now() + + // Tx1: reads: key1, key2, writes: key3, conflict: none + q.Add( + commit("free", true), + makeReadSet([]string{"key1", "key2"}), + makeWriteSet([]string{"key3"}), + ) + // Tx2: reads: key1, key2, writes: key3, conflict: Tx1 + q.Add( + commit("blocked1", false), + makeReadSet([]string{"key1", "key2"}), + makeWriteSet([]string{"key3"}), + ) + // Tx3: reads: key1, writes: key4, conflict: none + q.Add( + commit("free", true), + makeReadSet([]string{"key1", "key2"}), + makeWriteSet([]string{"key4"}), + ) + // Tx4: reads: key2, writes: key4 conflict: Tx3 + q.Add( + commit("blocked2", false), + makeReadSet([]string{"key2"}), + makeWriteSet([]string{"key4"}), + ) + + // Wait for all commits. + wg.Wait() + t2 := time.Now() + + // Expected total execution time: delta. + // 2 * commitDuration <= delta < 3 * commitDuration + delta := t2.Sub(t1) + require.LessOrEqual(t, int64(commitDuration*2), int64(delta)) + require.Greater(t, int64(commitDuration*3), int64(delta)) + + // Expect that the non-conflicting "free" transactions are executed + // before the blocking ones, and the blocking ones are executed in + // the order of addition. + require.Equal(t, + []string{"free", "free", "blocked1", "blocked2"}, + commits, + ) +}