Merge pull request #4457 from bhandras/etcd_tx_queue
etcd: STM transaction queue to effectively reduce retries for conflicting transactions
This commit is contained in:
commit
111db801e7
1
.github/workflows/main.yml
vendored
1
.github/workflows/main.yml
vendored
@ -214,6 +214,7 @@ jobs:
|
|||||||
matrix:
|
matrix:
|
||||||
unit_type:
|
unit_type:
|
||||||
- btcd unit-cover
|
- btcd unit-cover
|
||||||
|
- unit tags=kvdb_etcd
|
||||||
- travis-race
|
- travis-race
|
||||||
steps:
|
steps:
|
||||||
- name: git checkout
|
- name: git checkout
|
||||||
|
2
Makefile
2
Makefile
@ -43,8 +43,6 @@ GOTEST := GO111MODULE=on go test
|
|||||||
|
|
||||||
GOVERSION := $(shell go version | awk '{print $$3}')
|
GOVERSION := $(shell go version | awk '{print $$3}')
|
||||||
GOFILES_NOVENDOR = $(shell find . -type f -name '*.go' -not -path "./vendor/*")
|
GOFILES_NOVENDOR = $(shell find . -type f -name '*.go' -not -path "./vendor/*")
|
||||||
GOLIST := go list -deps $(PKG)/... | grep '$(PKG)'| grep -v '/vendor/'
|
|
||||||
GOLISTCOVER := $(shell go list -deps -f '{{.ImportPath}}' ./... | grep '$(PKG)' | sed -e 's/^$(ESCPKG)/./')
|
|
||||||
|
|
||||||
RM := rm -f
|
RM := rm -f
|
||||||
CP := cp
|
CP := cp
|
||||||
|
150
channeldb/kvdb/etcd/commit_queue.go
Normal file
150
channeldb/kvdb/etcd/commit_queue.go
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
// +build kvdb_etcd
|
||||||
|
|
||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// commitQueueSize is the maximum number of commits we let to queue up. All
|
||||||
|
// remaining commits will block on commitQueue.Add().
|
||||||
|
const commitQueueSize = 100
|
||||||
|
|
||||||
|
// commitQueue is a simple execution queue to manage conflicts for transactions
|
||||||
|
// and thereby reduce the number of times conflicting transactions need to be
|
||||||
|
// retried. When a new transaction is added to the queue, we first upgrade the
|
||||||
|
// read/write counts in the queue's own accounting to decide whether the new
|
||||||
|
// transaction has any conflicting dependencies. If the transaction does not
|
||||||
|
// conflict with any other, then it is comitted immediately, otherwise it'll be
|
||||||
|
// queued up for later exection.
|
||||||
|
// The algorithm is described in: http://www.cs.umd.edu/~abadi/papers/vll-vldb13.pdf
|
||||||
|
type commitQueue struct {
|
||||||
|
ctx context.Context
|
||||||
|
mx sync.Mutex
|
||||||
|
readerMap map[string]int
|
||||||
|
writerMap map[string]int
|
||||||
|
|
||||||
|
commitMutex sync.RWMutex
|
||||||
|
queue chan (func())
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCommitQueue creates a new commit queue, with the passed abort context.
|
||||||
|
func NewCommitQueue(ctx context.Context) *commitQueue {
|
||||||
|
q := &commitQueue{
|
||||||
|
ctx: ctx,
|
||||||
|
readerMap: make(map[string]int),
|
||||||
|
writerMap: make(map[string]int),
|
||||||
|
queue: make(chan func(), commitQueueSize),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the queue consumer loop.
|
||||||
|
q.wg.Add(1)
|
||||||
|
go q.mainLoop()
|
||||||
|
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait waits for the queue to stop (after the queue context has been canceled).
|
||||||
|
func (c *commitQueue) Wait() {
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add increases lock counts and queues up tx commit closure for execution.
|
||||||
|
// Transactions that don't have any conflicts are executed immediately by
|
||||||
|
// "downgrading" the count mutex to allow concurrency.
|
||||||
|
func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
|
||||||
|
c.mx.Lock()
|
||||||
|
blocked := false
|
||||||
|
|
||||||
|
// Mark as blocked if there's any writer changing any of the keys in
|
||||||
|
// the read set. Do not increment the reader counts yet as we'll need to
|
||||||
|
// use the original reader counts when scanning through the write set.
|
||||||
|
for key := range rset {
|
||||||
|
if c.writerMap[key] > 0 {
|
||||||
|
blocked = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark as blocked if there's any writer or reader for any of the keys
|
||||||
|
// in the write set.
|
||||||
|
for key := range wset {
|
||||||
|
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0
|
||||||
|
|
||||||
|
// Increment the writer count.
|
||||||
|
c.writerMap[key] += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally we can increment the reader counts for keys in the read set.
|
||||||
|
for key := range rset {
|
||||||
|
c.readerMap[key] += 1
|
||||||
|
}
|
||||||
|
|
||||||
|
if blocked {
|
||||||
|
// Add the transaction to the queue if conflicts with an already
|
||||||
|
// queued one.
|
||||||
|
c.mx.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.queue <- commitLoop:
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// To make sure we don't add a new tx to the queue that depends
|
||||||
|
// on this "unblocked" tx, grab the commitMutex before lifting
|
||||||
|
// the mutex guarding the lock maps.
|
||||||
|
c.commitMutex.RLock()
|
||||||
|
c.mx.Unlock()
|
||||||
|
|
||||||
|
// At this point we're safe to execute the "unblocked" tx, as
|
||||||
|
// we cannot execute blocked tx that may have been read from the
|
||||||
|
// queue until the commitMutex is held.
|
||||||
|
commitLoop()
|
||||||
|
|
||||||
|
c.commitMutex.RUnlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done decreases lock counts of the keys in the read/write sets.
|
||||||
|
func (c *commitQueue) Done(rset readSet, wset writeSet) {
|
||||||
|
c.mx.Lock()
|
||||||
|
defer c.mx.Unlock()
|
||||||
|
|
||||||
|
for key := range rset {
|
||||||
|
c.readerMap[key] -= 1
|
||||||
|
if c.readerMap[key] == 0 {
|
||||||
|
delete(c.readerMap, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for key := range wset {
|
||||||
|
c.writerMap[key] -= 1
|
||||||
|
if c.writerMap[key] == 0 {
|
||||||
|
delete(c.writerMap, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// mainLoop executes queued transaction commits for transactions that have
|
||||||
|
// dependencies. The queue ensures that the top element doesn't conflict with
|
||||||
|
// any other transactions and therefore can be executed freely.
|
||||||
|
func (c *commitQueue) mainLoop() {
|
||||||
|
defer c.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case top := <-c.queue:
|
||||||
|
// Execute the next blocked transaction. As it is
|
||||||
|
// the top element in the queue it means that it doesn't
|
||||||
|
// depend on any other transactions anymore.
|
||||||
|
c.commitMutex.Lock()
|
||||||
|
top()
|
||||||
|
c.commitMutex.Unlock()
|
||||||
|
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
115
channeldb/kvdb/etcd/commit_queue_test.go
Normal file
115
channeldb/kvdb/etcd/commit_queue_test.go
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
// +build kvdb_etcd
|
||||||
|
|
||||||
|
package etcd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestCommitQueue tests that non-conflicting transactions commit concurrently,
|
||||||
|
// while conflicting transactions are queued up.
|
||||||
|
func TestCommitQueue(t *testing.T) {
|
||||||
|
// The duration of each commit.
|
||||||
|
const commitDuration = time.Millisecond * 500
|
||||||
|
const numCommits = 4
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
commits := make([]string, numCommits)
|
||||||
|
idx := int32(-1)
|
||||||
|
|
||||||
|
commit := func(tag string, sleep bool) func() {
|
||||||
|
return func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
// Update our log of commit order. Avoid blocking
|
||||||
|
// by preallocating the commit log and increasing
|
||||||
|
// the log index atomically.
|
||||||
|
i := atomic.AddInt32(&idx, 1)
|
||||||
|
commits[i] = tag
|
||||||
|
|
||||||
|
if sleep {
|
||||||
|
time.Sleep(commitDuration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to create a read set from the passed keys.
|
||||||
|
makeReadSet := func(keys []string) readSet {
|
||||||
|
rs := make(map[string]stmGet)
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
rs[key] = stmGet{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to create a write set from the passed keys.
|
||||||
|
makeWriteSet := func(keys []string) writeSet {
|
||||||
|
ws := make(map[string]stmPut)
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
ws[key] = stmPut{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ws
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
q := NewCommitQueue(ctx)
|
||||||
|
defer q.Wait()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
wg.Add(numCommits)
|
||||||
|
t1 := time.Now()
|
||||||
|
|
||||||
|
// Tx1: reads: key1, key2, writes: key3, conflict: none
|
||||||
|
q.Add(
|
||||||
|
commit("free", true),
|
||||||
|
makeReadSet([]string{"key1", "key2"}),
|
||||||
|
makeWriteSet([]string{"key3"}),
|
||||||
|
)
|
||||||
|
// Tx2: reads: key1, key2, writes: key3, conflict: Tx1
|
||||||
|
q.Add(
|
||||||
|
commit("blocked1", false),
|
||||||
|
makeReadSet([]string{"key1", "key2"}),
|
||||||
|
makeWriteSet([]string{"key3"}),
|
||||||
|
)
|
||||||
|
// Tx3: reads: key1, writes: key4, conflict: none
|
||||||
|
q.Add(
|
||||||
|
commit("free", true),
|
||||||
|
makeReadSet([]string{"key1", "key2"}),
|
||||||
|
makeWriteSet([]string{"key4"}),
|
||||||
|
)
|
||||||
|
// Tx4: reads: key2, writes: key4 conflict: Tx3
|
||||||
|
q.Add(
|
||||||
|
commit("blocked2", false),
|
||||||
|
makeReadSet([]string{"key2"}),
|
||||||
|
makeWriteSet([]string{"key4"}),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Wait for all commits.
|
||||||
|
wg.Wait()
|
||||||
|
t2 := time.Now()
|
||||||
|
|
||||||
|
// Expected total execution time: delta.
|
||||||
|
// 2 * commitDuration <= delta < 3 * commitDuration
|
||||||
|
delta := t2.Sub(t1)
|
||||||
|
require.LessOrEqual(t, int64(commitDuration*2), int64(delta))
|
||||||
|
require.Greater(t, int64(commitDuration*3), int64(delta))
|
||||||
|
|
||||||
|
// Expect that the non-conflicting "free" transactions are executed
|
||||||
|
// before the blocking ones, and the blocking ones are executed in
|
||||||
|
// the order of addition.
|
||||||
|
require.Equal(t,
|
||||||
|
[]string{"free", "free", "blocked1", "blocked2"},
|
||||||
|
commits,
|
||||||
|
)
|
||||||
|
}
|
@ -16,8 +16,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// etcdConnectionTimeout is the timeout until successful connection to the
|
// etcdConnectionTimeout is the timeout until successful connection to
|
||||||
// etcd instance.
|
// the etcd instance.
|
||||||
etcdConnectionTimeout = 10 * time.Second
|
etcdConnectionTimeout = 10 * time.Second
|
||||||
|
|
||||||
// etcdLongTimeout is a timeout for longer taking etcd operatons.
|
// etcdLongTimeout is a timeout for longer taking etcd operatons.
|
||||||
@ -34,7 +34,8 @@ type callerStats struct {
|
|||||||
|
|
||||||
func (s callerStats) String() string {
|
func (s callerStats) String() string {
|
||||||
return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d",
|
return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d",
|
||||||
s.count, s.commitStats.Retries, s.commitStats.Rset, s.commitStats.Wset)
|
s.count, s.commitStats.Retries, s.commitStats.Rset,
|
||||||
|
s.commitStats.Wset)
|
||||||
}
|
}
|
||||||
|
|
||||||
// commitStatsCollector collects commit stats for commits succeeding
|
// commitStatsCollector collects commit stats for commits succeeding
|
||||||
@ -117,6 +118,7 @@ type db struct {
|
|||||||
config BackendConfig
|
config BackendConfig
|
||||||
cli *clientv3.Client
|
cli *clientv3.Client
|
||||||
commitStatsCollector *commitStatsCollector
|
commitStatsCollector *commitStatsCollector
|
||||||
|
txQueue *commitQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enforce db implements the walletdb.DB interface.
|
// Enforce db implements the walletdb.DB interface.
|
||||||
@ -174,12 +176,13 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cli, err := clientv3.New(clientv3.Config{
|
cli, err := clientv3.New(clientv3.Config{
|
||||||
Context: config.Ctx,
|
Context: config.Ctx,
|
||||||
Endpoints: []string{config.Host},
|
Endpoints: []string{config.Host},
|
||||||
DialTimeout: etcdConnectionTimeout,
|
DialTimeout: etcdConnectionTimeout,
|
||||||
Username: config.User,
|
Username: config.User,
|
||||||
Password: config.Pass,
|
Password: config.Pass,
|
||||||
TLS: tlsConfig,
|
TLS: tlsConfig,
|
||||||
|
MaxCallSendMsgSize: 16384*1024 - 1,
|
||||||
})
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -187,8 +190,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
backend := &db{
|
backend := &db{
|
||||||
cli: cli,
|
cli: cli,
|
||||||
config: config,
|
config: config,
|
||||||
|
txQueue: NewCommitQueue(config.Ctx),
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.CollectCommitStats {
|
if config.CollectCommitStats {
|
||||||
@ -200,7 +204,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) {
|
|||||||
|
|
||||||
// getSTMOptions creats all STM options based on the backend config.
|
// getSTMOptions creats all STM options based on the backend config.
|
||||||
func (db *db) getSTMOptions() []STMOptionFunc {
|
func (db *db) getSTMOptions() []STMOptionFunc {
|
||||||
opts := []STMOptionFunc{WithAbortContext(db.config.Ctx)}
|
opts := []STMOptionFunc{
|
||||||
|
WithAbortContext(db.config.Ctx),
|
||||||
|
}
|
||||||
|
|
||||||
if db.config.CollectCommitStats {
|
if db.config.CollectCommitStats {
|
||||||
opts = append(opts,
|
opts = append(opts,
|
||||||
@ -220,7 +226,7 @@ func (db *db) View(f func(tx walletdb.ReadTx) error) error {
|
|||||||
return f(newReadWriteTx(stm, db.config.Prefix))
|
return f(newReadWriteTx(stm, db.config.Prefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
return RunSTM(db.cli, apply, db.getSTMOptions()...)
|
return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update opens a database read/write transaction and executes the function f
|
// Update opens a database read/write transaction and executes the function f
|
||||||
@ -234,7 +240,7 @@ func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error {
|
|||||||
return f(newReadWriteTx(stm, db.config.Prefix))
|
return f(newReadWriteTx(stm, db.config.Prefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
return RunSTM(db.cli, apply, db.getSTMOptions()...)
|
return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrintStats returns all collected stats pretty printed into a string.
|
// PrintStats returns all collected stats pretty printed into a string.
|
||||||
@ -246,18 +252,18 @@ func (db *db) PrintStats() string {
|
|||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginReadTx opens a database read transaction.
|
// BeginReadWriteTx opens a database read+write transaction.
|
||||||
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
|
func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) {
|
||||||
return newReadWriteTx(
|
return newReadWriteTx(
|
||||||
NewSTM(db.cli, db.getSTMOptions()...),
|
NewSTM(db.cli, db.txQueue, db.getSTMOptions()...),
|
||||||
db.config.Prefix,
|
db.config.Prefix,
|
||||||
), nil
|
), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// BeginReadWriteTx opens a database read+write transaction.
|
// BeginReadTx opens a database read transaction.
|
||||||
func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
|
func (db *db) BeginReadTx() (walletdb.ReadTx, error) {
|
||||||
return newReadWriteTx(
|
return newReadWriteTx(
|
||||||
NewSTM(db.cli, db.getSTMOptions()...),
|
NewSTM(db.cli, db.txQueue, db.getSTMOptions()...),
|
||||||
db.config.Prefix,
|
db.config.Prefix,
|
||||||
), nil
|
), nil
|
||||||
}
|
}
|
||||||
|
@ -63,7 +63,7 @@ func TestAbortContext(t *testing.T) {
|
|||||||
// Expect that the update will fail.
|
// Expect that the update will fail.
|
||||||
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
err = db.Update(func(tx walletdb.ReadWriteTx) error {
|
||||||
_, err := tx.CreateTopLevelBucket([]byte("bucket"))
|
_, err := tx.CreateTopLevelBucket([]byte("bucket"))
|
||||||
require.NoError(t, err)
|
require.Error(t, err, "context canceled")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -42,7 +42,8 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) {
|
|||||||
cfg.Dir = path
|
cfg.Dir = path
|
||||||
|
|
||||||
// To ensure that we can submit large transactions.
|
// To ensure that we can submit large transactions.
|
||||||
cfg.MaxTxnOps = 1000
|
cfg.MaxTxnOps = 8192
|
||||||
|
cfg.MaxRequestBytes = 16384 * 1024
|
||||||
|
|
||||||
// Listen on random free ports.
|
// Listen on random free ports.
|
||||||
clientURL := fmt.Sprintf("127.0.0.1:%d", getFreePort())
|
clientURL := fmt.Sprintf("127.0.0.1:%d", getFreePort())
|
||||||
@ -63,8 +64,10 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) {
|
|||||||
fmt.Errorf("etcd failed to start after: %v", readyTimeout)
|
fmt.Errorf("etcd failed to start after: %v", readyTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
connConfig := &BackendConfig{
|
connConfig := &BackendConfig{
|
||||||
Ctx: context.Background(),
|
Ctx: ctx,
|
||||||
Host: "http://" + peerURL,
|
Host: "http://" + peerURL,
|
||||||
User: "user",
|
User: "user",
|
||||||
Pass: "pass",
|
Pass: "pass",
|
||||||
@ -72,6 +75,7 @@ func NewEmbeddedEtcdInstance(path string) (*BackendConfig, func(), error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return connConfig, func() {
|
return connConfig, func() {
|
||||||
|
cancel()
|
||||||
etcd.Close()
|
etcd.Close()
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -134,6 +134,10 @@ type stm struct {
|
|||||||
// execute in the STM run loop.
|
// execute in the STM run loop.
|
||||||
manual bool
|
manual bool
|
||||||
|
|
||||||
|
// txQueue is lightweight contention manager, which is used to detect
|
||||||
|
// transaction conflicts and reduce retries.
|
||||||
|
txQueue *commitQueue
|
||||||
|
|
||||||
// options stores optional settings passed by the user.
|
// options stores optional settings passed by the user.
|
||||||
options *STMOptions
|
options *STMOptions
|
||||||
|
|
||||||
@ -183,18 +187,22 @@ func WithCommitStatsCallback(cb func(bool, CommitStats)) STMOptionFunc {
|
|||||||
|
|
||||||
// RunSTM runs the apply function by creating an STM using serializable snapshot
|
// RunSTM runs the apply function by creating an STM using serializable snapshot
|
||||||
// isolation, passing it to the apply and handling commit errors and retries.
|
// isolation, passing it to the apply and handling commit errors and retries.
|
||||||
func RunSTM(cli *v3.Client, apply func(STM) error, so ...STMOptionFunc) error {
|
func RunSTM(cli *v3.Client, apply func(STM) error, txQueue *commitQueue,
|
||||||
return runSTM(makeSTM(cli, false, so...), apply)
|
so ...STMOptionFunc) error {
|
||||||
|
|
||||||
|
return runSTM(makeSTM(cli, false, txQueue, so...), apply)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSTM creates a new STM instance, using serializable snapshot isolation.
|
// NewSTM creates a new STM instance, using serializable snapshot isolation.
|
||||||
func NewSTM(cli *v3.Client, so ...STMOptionFunc) STM {
|
func NewSTM(cli *v3.Client, txQueue *commitQueue, so ...STMOptionFunc) STM {
|
||||||
return makeSTM(cli, true, so...)
|
return makeSTM(cli, true, txQueue, so...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeSTM is the actual constructor of the stm. It first apply all passed
|
// makeSTM is the actual constructor of the stm. It first apply all passed
|
||||||
// options then creates the stm object and resets it before returning.
|
// options then creates the stm object and resets it before returning.
|
||||||
func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm {
|
func makeSTM(cli *v3.Client, manual bool, txQueue *commitQueue,
|
||||||
|
so ...STMOptionFunc) *stm {
|
||||||
|
|
||||||
opts := &STMOptions{
|
opts := &STMOptions{
|
||||||
ctx: cli.Ctx(),
|
ctx: cli.Ctx(),
|
||||||
}
|
}
|
||||||
@ -207,6 +215,7 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm {
|
|||||||
s := &stm{
|
s := &stm{
|
||||||
client: cli,
|
client: cli,
|
||||||
manual: manual,
|
manual: manual,
|
||||||
|
txQueue: txQueue,
|
||||||
options: opts,
|
options: opts,
|
||||||
prefetch: make(map[string]stmGet),
|
prefetch: make(map[string]stmGet),
|
||||||
}
|
}
|
||||||
@ -222,50 +231,72 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm {
|
|||||||
// CommitError which is used to indicate a necessary retry.
|
// CommitError which is used to indicate a necessary retry.
|
||||||
func runSTM(s *stm, apply func(STM) error) error {
|
func runSTM(s *stm, apply func(STM) error) error {
|
||||||
var (
|
var (
|
||||||
retries int
|
retries int
|
||||||
stats CommitStats
|
stats CommitStats
|
||||||
err error
|
executeErr error
|
||||||
)
|
)
|
||||||
|
|
||||||
loop:
|
done := make(chan struct{})
|
||||||
// In a loop try to apply and commit and roll back if the database has
|
|
||||||
// changed (CommitError).
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
// Check if the STM is aborted and break the retry loop if it is.
|
|
||||||
case <-s.options.ctx.Done():
|
|
||||||
err = fmt.Errorf("aborted")
|
|
||||||
break loop
|
|
||||||
|
|
||||||
default:
|
execute := func() {
|
||||||
|
defer close(done)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// Check if the STM is aborted and break the retry loop
|
||||||
|
// if it is.
|
||||||
|
case <-s.options.ctx.Done():
|
||||||
|
executeErr = fmt.Errorf("aborted")
|
||||||
|
return
|
||||||
|
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
stats, executeErr = s.commit()
|
||||||
|
|
||||||
|
// Re-apply only upon commit error (meaning the
|
||||||
|
// keys were changed).
|
||||||
|
if _, ok := executeErr.(CommitError); !ok {
|
||||||
|
// Anything that's not a CommitError
|
||||||
|
// aborts the transaction.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rollback before trying to re-apply.
|
||||||
|
s.Rollback()
|
||||||
|
retries++
|
||||||
|
|
||||||
|
// Re-apply the transaction closure.
|
||||||
|
if executeErr = apply(s); executeErr != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Apply the transaction closure and abort the STM if there was
|
|
||||||
// an application error.
|
|
||||||
if err = apply(s); err != nil {
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
|
|
||||||
stats, err = s.commit()
|
|
||||||
|
|
||||||
// Retry the apply closure only upon commit error (meaning the
|
|
||||||
// database was changed).
|
|
||||||
if _, ok := err.(CommitError); !ok {
|
|
||||||
// Anything that's not a CommitError aborts the STM
|
|
||||||
// run loop.
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rollback before trying to re-apply.
|
|
||||||
s.Rollback()
|
|
||||||
retries++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run the tx closure to construct the read and write sets.
|
||||||
|
// Also we expect that if there are no conflicting transactions
|
||||||
|
// in the queue, then we only run apply once.
|
||||||
|
if preApplyErr := apply(s); preApplyErr != nil {
|
||||||
|
return preApplyErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue up the transaction for execution.
|
||||||
|
s.txQueue.Add(execute, s.rset, s.wset)
|
||||||
|
|
||||||
|
// Wait for the transaction to execute, or break if aborted.
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-s.options.ctx.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
s.txQueue.Done(s.rset, s.wset)
|
||||||
|
|
||||||
if s.options.commitStatsCallback != nil {
|
if s.options.commitStatsCallback != nil {
|
||||||
stats.Retries = retries
|
stats.Retries = retries
|
||||||
s.options.commitStatsCallback(err == nil, stats)
|
s.options.commitStatsCallback(executeErr == nil, stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return executeErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// add inserts a txn response to the read set. This is useful when the txn
|
// add inserts a txn response to the read set. This is useful when the txn
|
||||||
|
@ -21,7 +21,11 @@ func TestPutToEmpty(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
f := NewEtcdTestFixture(t)
|
f := NewEtcdTestFixture(t)
|
||||||
defer f.Cleanup()
|
txQueue := NewCommitQueue(f.config.Ctx)
|
||||||
|
defer func() {
|
||||||
|
f.Cleanup()
|
||||||
|
txQueue.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
db, err := newEtcdBackend(f.BackendConfig())
|
db, err := newEtcdBackend(f.BackendConfig())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -31,7 +35,7 @@ func TestPutToEmpty(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = RunSTM(db.cli, apply)
|
err = RunSTM(db.cli, apply, txQueue)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "abc", f.Get("123"))
|
require.Equal(t, "abc", f.Get("123"))
|
||||||
@ -41,7 +45,11 @@ func TestGetPutDel(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
f := NewEtcdTestFixture(t)
|
f := NewEtcdTestFixture(t)
|
||||||
defer f.cleanup()
|
txQueue := NewCommitQueue(f.config.Ctx)
|
||||||
|
defer func() {
|
||||||
|
f.Cleanup()
|
||||||
|
txQueue.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
testKeyValues := []KV{
|
testKeyValues := []KV{
|
||||||
{"a", "1"},
|
{"a", "1"},
|
||||||
@ -105,7 +113,7 @@ func TestGetPutDel(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = RunSTM(db.cli, apply)
|
err = RunSTM(db.cli, apply, txQueue)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "1", f.Get("a"))
|
require.Equal(t, "1", f.Get("a"))
|
||||||
@ -120,7 +128,11 @@ func TestFirstLastNextPrev(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
f := NewEtcdTestFixture(t)
|
f := NewEtcdTestFixture(t)
|
||||||
defer f.Cleanup()
|
txQueue := NewCommitQueue(f.config.Ctx)
|
||||||
|
defer func() {
|
||||||
|
f.Cleanup()
|
||||||
|
txQueue.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
testKeyValues := []KV{
|
testKeyValues := []KV{
|
||||||
{"kb", "1"},
|
{"kb", "1"},
|
||||||
@ -255,7 +267,7 @@ func TestFirstLastNextPrev(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = RunSTM(db.cli, apply)
|
err = RunSTM(db.cli, apply, txQueue)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, "0", f.Get("ka"))
|
require.Equal(t, "0", f.Get("ka"))
|
||||||
@ -271,7 +283,11 @@ func TestCommitError(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
f := NewEtcdTestFixture(t)
|
f := NewEtcdTestFixture(t)
|
||||||
defer f.Cleanup()
|
txQueue := NewCommitQueue(f.config.Ctx)
|
||||||
|
defer func() {
|
||||||
|
f.Cleanup()
|
||||||
|
txQueue.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
db, err := newEtcdBackend(f.BackendConfig())
|
db, err := newEtcdBackend(f.BackendConfig())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -301,7 +317,7 @@ func TestCommitError(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = RunSTM(db.cli, apply)
|
err = RunSTM(db.cli, apply, txQueue)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, cnt)
|
require.Equal(t, 2, cnt)
|
||||||
|
|
||||||
@ -312,7 +328,11 @@ func TestManualTxError(t *testing.T) {
|
|||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
f := NewEtcdTestFixture(t)
|
f := NewEtcdTestFixture(t)
|
||||||
defer f.Cleanup()
|
txQueue := NewCommitQueue(f.config.Ctx)
|
||||||
|
defer func() {
|
||||||
|
f.Cleanup()
|
||||||
|
txQueue.Wait()
|
||||||
|
}()
|
||||||
|
|
||||||
db, err := newEtcdBackend(f.BackendConfig())
|
db, err := newEtcdBackend(f.BackendConfig())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -320,7 +340,7 @@ func TestManualTxError(t *testing.T) {
|
|||||||
// Preset DB state.
|
// Preset DB state.
|
||||||
f.Put("123", "xyz")
|
f.Put("123", "xyz")
|
||||||
|
|
||||||
stm := NewSTM(db.cli)
|
stm := NewSTM(db.cli, txQueue)
|
||||||
|
|
||||||
val, err := stm.Get("123")
|
val, err := stm.Get("123")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -28,6 +28,10 @@ ifneq ($(icase),)
|
|||||||
TEST_FLAGS += -test.run=TestLightningNetworkDaemon/$(icase)
|
TEST_FLAGS += -test.run=TestLightningNetworkDaemon/$(icase)
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
ifneq ($(tags),)
|
||||||
|
DEV_TAGS += ${tags}
|
||||||
|
endif
|
||||||
|
|
||||||
# Define the log tags that will be applied only when running unit tests. If none
|
# Define the log tags that will be applied only when running unit tests. If none
|
||||||
# are provided, we default to "nolog" which will be silent.
|
# are provided, we default to "nolog" which will be silent.
|
||||||
ifneq ($(log),)
|
ifneq ($(log),)
|
||||||
@ -44,6 +48,9 @@ else
|
|||||||
TEST_FLAGS += -test.timeout=40m
|
TEST_FLAGS += -test.timeout=40m
|
||||||
endif
|
endif
|
||||||
|
|
||||||
|
GOLIST := go list -tags="$(DEV_TAGS)" -deps $(PKG)/... | grep '$(PKG)'| grep -v '/vendor/'
|
||||||
|
GOLISTCOVER := $(shell go list -tags="$(DEV_TAGS)" -deps -f '{{.ImportPath}}' ./... | grep '$(PKG)' | sed -e 's/^$(ESCPKG)/./')
|
||||||
|
|
||||||
# UNIT_TARGTED is undefined iff a specific package and/or unit test case is
|
# UNIT_TARGTED is undefined iff a specific package and/or unit test case is
|
||||||
# not being targeted.
|
# not being targeted.
|
||||||
UNIT_TARGETED ?= no
|
UNIT_TARGETED ?= no
|
||||||
|
Loading…
Reference in New Issue
Block a user