etcd: add commit queue to effectively reduce transaction retries
This commit adds commitQueue which is a lightweight contention manager for STM transactions. The queue attempts to queue up transactions that conflict for sequential execution, while leaving all "unblocked" transactons to run freely in parallel.
This commit is contained in:
parent
b4b5a9d7de
commit
281c0b9d92
150
channeldb/kvdb/etcd/commit_queue.go
Normal file
150
channeldb/kvdb/etcd/commit_queue.go
Normal file
@ -0,0 +1,150 @@
|
||||
// +build kvdb_etcd
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// commitQueueSize is the maximum number of commits we let to queue up. All
|
||||
// remaining commits will block on commitQueue.Add().
|
||||
const commitQueueSize = 100
|
||||
|
||||
// commitQueue is a simple execution queue to manage conflicts for transactions
|
||||
// and thereby reduce the number of times conflicting transactions need to be
|
||||
// retried. When a new transaction is added to the queue, we first upgrade the
|
||||
// read/write counts in the queue's own accounting to decide whether the new
|
||||
// transaction has any conflicting dependencies. If the transaction does not
|
||||
// conflict with any other, then it is comitted immediately, otherwise it'll be
|
||||
// queued up for later exection.
|
||||
// The algorithm is described in: http://www.cs.umd.edu/~abadi/papers/vll-vldb13.pdf
|
||||
type commitQueue struct {
|
||||
ctx context.Context
|
||||
mx sync.Mutex
|
||||
readerMap map[string]int
|
||||
writerMap map[string]int
|
||||
|
||||
commitMutex sync.RWMutex
|
||||
queue chan (func())
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewCommitQueue creates a new commit queue, with the passed abort context.
|
||||
func NewCommitQueue(ctx context.Context) *commitQueue {
|
||||
q := &commitQueue{
|
||||
ctx: ctx,
|
||||
readerMap: make(map[string]int),
|
||||
writerMap: make(map[string]int),
|
||||
queue: make(chan func(), commitQueueSize),
|
||||
}
|
||||
|
||||
// Start the queue consumer loop.
|
||||
q.wg.Add(1)
|
||||
go q.mainLoop()
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
// Wait waits for the queue to stop (after the queue context has been canceled).
|
||||
func (c *commitQueue) Wait() {
|
||||
c.wg.Wait()
|
||||
}
|
||||
|
||||
// Add increases lock counts and queues up tx commit closure for execution.
|
||||
// Transactions that don't have any conflicts are executed immediately by
|
||||
// "downgrading" the count mutex to allow concurrency.
|
||||
func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
|
||||
c.mx.Lock()
|
||||
blocked := false
|
||||
|
||||
// Mark as blocked if there's any writer changing any of the keys in
|
||||
// the read set. Do not increment the reader counts yet as we'll need to
|
||||
// use the original reader counts when scanning through the write set.
|
||||
for key := range rset {
|
||||
if c.writerMap[key] > 0 {
|
||||
blocked = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Mark as blocked if there's any writer or reader for any of the keys
|
||||
// in the write set.
|
||||
for key := range wset {
|
||||
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0
|
||||
|
||||
// Increment the writer count.
|
||||
c.writerMap[key] += 1
|
||||
}
|
||||
|
||||
// Finally we can increment the reader counts for keys in the read set.
|
||||
for key := range rset {
|
||||
c.readerMap[key] += 1
|
||||
}
|
||||
|
||||
if blocked {
|
||||
// Add the transaction to the queue if conflicts with an already
|
||||
// queued one.
|
||||
c.mx.Unlock()
|
||||
|
||||
select {
|
||||
case c.queue <- commitLoop:
|
||||
case <-c.ctx.Done():
|
||||
}
|
||||
} else {
|
||||
// To make sure we don't add a new tx to the queue that depends
|
||||
// on this "unblocked" tx, grab the commitMutex before lifting
|
||||
// the mutex guarding the lock maps.
|
||||
c.commitMutex.RLock()
|
||||
c.mx.Unlock()
|
||||
|
||||
// At this point we're safe to execute the "unblocked" tx, as
|
||||
// we cannot execute blocked tx that may have been read from the
|
||||
// queue until the commitMutex is held.
|
||||
commitLoop()
|
||||
|
||||
c.commitMutex.RUnlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Done decreases lock counts of the keys in the read/write sets.
|
||||
func (c *commitQueue) Done(rset readSet, wset writeSet) {
|
||||
c.mx.Lock()
|
||||
defer c.mx.Unlock()
|
||||
|
||||
for key := range rset {
|
||||
c.readerMap[key] -= 1
|
||||
if c.readerMap[key] == 0 {
|
||||
delete(c.readerMap, key)
|
||||
}
|
||||
}
|
||||
|
||||
for key := range wset {
|
||||
c.writerMap[key] -= 1
|
||||
if c.writerMap[key] == 0 {
|
||||
delete(c.writerMap, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mainLoop executes queued transaction commits for transactions that have
|
||||
// dependencies. The queue ensures that the top element doesn't conflict with
|
||||
// any other transactions and therefore can be executed freely.
|
||||
func (c *commitQueue) mainLoop() {
|
||||
defer c.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case top := <-c.queue:
|
||||
// Execute the next blocked transaction. As it is
|
||||
// the top element in the queue it means that it doesn't
|
||||
// depend on any other transactions anymore.
|
||||
c.commitMutex.Lock()
|
||||
top()
|
||||
c.commitMutex.Unlock()
|
||||
|
||||
case <-c.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
115
channeldb/kvdb/etcd/commit_queue_test.go
Normal file
115
channeldb/kvdb/etcd/commit_queue_test.go
Normal file
@ -0,0 +1,115 @@
|
||||
// +build kvdb_etcd
|
||||
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestCommitQueue tests that non-conflicting transactions commit concurrently,
|
||||
// while conflicting transactions are queued up.
|
||||
func TestCommitQueue(t *testing.T) {
|
||||
// The duration of each commit.
|
||||
const commitDuration = time.Millisecond * 500
|
||||
const numCommits = 4
|
||||
|
||||
var wg sync.WaitGroup
|
||||
commits := make([]string, numCommits)
|
||||
idx := int32(-1)
|
||||
|
||||
commit := func(tag string, sleep bool) func() {
|
||||
return func() {
|
||||
defer wg.Done()
|
||||
|
||||
// Update our log of commit order. Avoid blocking
|
||||
// by preallocating the commit log and increasing
|
||||
// the log index atomically.
|
||||
i := atomic.AddInt32(&idx, 1)
|
||||
commits[i] = tag
|
||||
|
||||
if sleep {
|
||||
time.Sleep(commitDuration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to create a read set from the passed keys.
|
||||
makeReadSet := func(keys []string) readSet {
|
||||
rs := make(map[string]stmGet)
|
||||
|
||||
for _, key := range keys {
|
||||
rs[key] = stmGet{}
|
||||
}
|
||||
|
||||
return rs
|
||||
}
|
||||
|
||||
// Helper function to create a write set from the passed keys.
|
||||
makeWriteSet := func(keys []string) writeSet {
|
||||
ws := make(map[string]stmPut)
|
||||
|
||||
for _, key := range keys {
|
||||
ws[key] = stmPut{}
|
||||
}
|
||||
|
||||
return ws
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
q := NewCommitQueue(ctx)
|
||||
defer q.Wait()
|
||||
defer cancel()
|
||||
|
||||
wg.Add(numCommits)
|
||||
t1 := time.Now()
|
||||
|
||||
// Tx1: reads: key1, key2, writes: key3, conflict: none
|
||||
q.Add(
|
||||
commit("free", true),
|
||||
makeReadSet([]string{"key1", "key2"}),
|
||||
makeWriteSet([]string{"key3"}),
|
||||
)
|
||||
// Tx2: reads: key1, key2, writes: key3, conflict: Tx1
|
||||
q.Add(
|
||||
commit("blocked1", false),
|
||||
makeReadSet([]string{"key1", "key2"}),
|
||||
makeWriteSet([]string{"key3"}),
|
||||
)
|
||||
// Tx3: reads: key1, writes: key4, conflict: none
|
||||
q.Add(
|
||||
commit("free", true),
|
||||
makeReadSet([]string{"key1", "key2"}),
|
||||
makeWriteSet([]string{"key4"}),
|
||||
)
|
||||
// Tx4: reads: key2, writes: key4 conflict: Tx3
|
||||
q.Add(
|
||||
commit("blocked2", false),
|
||||
makeReadSet([]string{"key2"}),
|
||||
makeWriteSet([]string{"key4"}),
|
||||
)
|
||||
|
||||
// Wait for all commits.
|
||||
wg.Wait()
|
||||
t2 := time.Now()
|
||||
|
||||
// Expected total execution time: delta.
|
||||
// 2 * commitDuration <= delta < 3 * commitDuration
|
||||
delta := t2.Sub(t1)
|
||||
require.LessOrEqual(t, int64(commitDuration*2), int64(delta))
|
||||
require.Greater(t, int64(commitDuration*3), int64(delta))
|
||||
|
||||
// Expect that the non-conflicting "free" transactions are executed
|
||||
// before the blocking ones, and the blocking ones are executed in
|
||||
// the order of addition.
|
||||
require.Equal(t,
|
||||
[]string{"free", "free", "blocked1", "blocked2"},
|
||||
commits,
|
||||
)
|
||||
}
|
Loading…
Reference in New Issue
Block a user