lnd.xprv/kvdb/etcd/commit_queue.go

151 lines
4.0 KiB
Go
Raw Normal View History

// +build kvdb_etcd
package etcd
import (
"context"
"sync"
)
// commitQueueSize is the maximum number of commits we let to queue up. All
// remaining commits will block on commitQueue.Add().
const commitQueueSize = 100
// commitQueue is a simple execution queue to manage conflicts for transactions
// and thereby reduce the number of times conflicting transactions need to be
// retried. When a new transaction is added to the queue, we first upgrade the
// read/write counts in the queue's own accounting to decide whether the new
// transaction has any conflicting dependencies. If the transaction does not
// conflict with any other, then it is comitted immediately, otherwise it'll be
// queued up for later exection.
// The algorithm is described in: http://www.cs.umd.edu/~abadi/papers/vll-vldb13.pdf
type commitQueue struct {
ctx context.Context
mx sync.Mutex
readerMap map[string]int
writerMap map[string]int
commitMutex sync.RWMutex
queue chan (func())
wg sync.WaitGroup
}
// NewCommitQueue creates a new commit queue, with the passed abort context.
func NewCommitQueue(ctx context.Context) *commitQueue {
q := &commitQueue{
ctx: ctx,
readerMap: make(map[string]int),
writerMap: make(map[string]int),
queue: make(chan func(), commitQueueSize),
}
// Start the queue consumer loop.
q.wg.Add(1)
go q.mainLoop()
return q
}
// Wait waits for the queue to stop (after the queue context has been canceled).
func (c *commitQueue) Wait() {
c.wg.Wait()
}
// Add increases lock counts and queues up tx commit closure for execution.
// Transactions that don't have any conflicts are executed immediately by
// "downgrading" the count mutex to allow concurrency.
func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
c.mx.Lock()
blocked := false
// Mark as blocked if there's any writer changing any of the keys in
// the read set. Do not increment the reader counts yet as we'll need to
// use the original reader counts when scanning through the write set.
for key := range rset {
if c.writerMap[key] > 0 {
blocked = true
break
}
}
// Mark as blocked if there's any writer or reader for any of the keys
// in the write set.
for key := range wset {
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0
// Increment the writer count.
c.writerMap[key] += 1
}
// Finally we can increment the reader counts for keys in the read set.
for key := range rset {
c.readerMap[key] += 1
}
if blocked {
// Add the transaction to the queue if conflicts with an already
// queued one.
c.mx.Unlock()
select {
case c.queue <- commitLoop:
case <-c.ctx.Done():
}
} else {
// To make sure we don't add a new tx to the queue that depends
// on this "unblocked" tx, grab the commitMutex before lifting
// the mutex guarding the lock maps.
c.commitMutex.RLock()
c.mx.Unlock()
// At this point we're safe to execute the "unblocked" tx, as
// we cannot execute blocked tx that may have been read from the
// queue until the commitMutex is held.
commitLoop()
c.commitMutex.RUnlock()
}
}
// Done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) Done(rset readSet, wset writeSet) {
c.mx.Lock()
defer c.mx.Unlock()
for key := range rset {
c.readerMap[key] -= 1
if c.readerMap[key] == 0 {
delete(c.readerMap, key)
}
}
for key := range wset {
c.writerMap[key] -= 1
if c.writerMap[key] == 0 {
delete(c.writerMap, key)
}
}
}
// mainLoop executes queued transaction commits for transactions that have
// dependencies. The queue ensures that the top element doesn't conflict with
// any other transactions and therefore can be executed freely.
func (c *commitQueue) mainLoop() {
defer c.wg.Done()
for {
select {
case top := <-c.queue:
// Execute the next blocked transaction. As it is
// the top element in the queue it means that it doesn't
// depend on any other transactions anymore.
c.commitMutex.Lock()
top()
c.commitMutex.Unlock()
case <-c.ctx.Done():
return
}
}
}