batch: add external batching engine for bbolt operations
This commit is contained in:
parent
fb9218d100
commit
d1634b5e13
102
batch/batch.go
Normal file
102
batch/batch.go
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// errSolo is a sentinel error indicating that the requester should re-run the
|
||||||
|
// operation in isolation.
|
||||||
|
var errSolo = errors.New(
|
||||||
|
"batch function returned an error and should be re-run solo",
|
||||||
|
)
|
||||||
|
|
||||||
|
type request struct {
|
||||||
|
*Request
|
||||||
|
errChan chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
type batch struct {
|
||||||
|
db kvdb.Backend
|
||||||
|
start sync.Once
|
||||||
|
reqs []*request
|
||||||
|
clear func(b *batch)
|
||||||
|
locker sync.Locker
|
||||||
|
}
|
||||||
|
|
||||||
|
// trigger is the entry point for the batch and ensures that run is started at
|
||||||
|
// most once.
|
||||||
|
func (b *batch) trigger() {
|
||||||
|
b.start.Do(b.run)
|
||||||
|
}
|
||||||
|
|
||||||
|
// run executes the current batch of requests. If any individual requests fail
|
||||||
|
// alongside others they will be retried by the caller.
|
||||||
|
func (b *batch) run() {
|
||||||
|
// Clear the batch from its scheduler, ensuring that no new requests are
|
||||||
|
// added to this batch.
|
||||||
|
b.clear(b)
|
||||||
|
|
||||||
|
// If a cache lock was provided, hold it until the this method returns.
|
||||||
|
// This is critical for ensuring external consistency of the operation,
|
||||||
|
// so that caches don't get out of sync with the on disk state.
|
||||||
|
if b.locker != nil {
|
||||||
|
b.locker.Lock()
|
||||||
|
defer b.locker.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the batch until a subset succeeds or all of them fail. Requests
|
||||||
|
// that fail will be retried individually.
|
||||||
|
for len(b.reqs) > 0 {
|
||||||
|
var failIdx = -1
|
||||||
|
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
||||||
|
for i, req := range b.reqs {
|
||||||
|
err := req.Update(tx)
|
||||||
|
if err != nil {
|
||||||
|
failIdx = i
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, func() {
|
||||||
|
for _, req := range b.reqs {
|
||||||
|
if req.Reset != nil {
|
||||||
|
req.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// If a request's Update failed, extract it and re-run the
|
||||||
|
// batch. The removed request will be retried individually by
|
||||||
|
// the caller.
|
||||||
|
if failIdx >= 0 {
|
||||||
|
req := b.reqs[failIdx]
|
||||||
|
|
||||||
|
// It's safe to shorten b.reqs here because the
|
||||||
|
// scheduler's batch no longer points to us.
|
||||||
|
b.reqs[failIdx] = b.reqs[len(b.reqs)-1]
|
||||||
|
b.reqs = b.reqs[:len(b.reqs)-1]
|
||||||
|
|
||||||
|
// Tell the submitter re-run it solo, continue with the
|
||||||
|
// rest of the batch.
|
||||||
|
req.errChan <- errSolo
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// None of the remaining requests failed, process the errors
|
||||||
|
// using each request's OnCommit closure and return the error
|
||||||
|
// to the requester. If no OnCommit closure is provided, simply
|
||||||
|
// return the error directly.
|
||||||
|
for _, req := range b.reqs {
|
||||||
|
if req.OnCommit != nil {
|
||||||
|
req.errChan <- req.OnCommit(err)
|
||||||
|
} else {
|
||||||
|
req.errChan <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
38
batch/interface.go
Normal file
38
batch/interface.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package batch
|
||||||
|
|
||||||
|
import "github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||||
|
|
||||||
|
// Request defines an operation that can be batched into a single bbolt
|
||||||
|
// transaction.
|
||||||
|
type Request struct {
|
||||||
|
// Reset is called before each invocation of Update and is used to clear
|
||||||
|
// any possible modifications to local state as a result of previous
|
||||||
|
// calls to Update that were not committed due to a concurrent batch
|
||||||
|
// failure.
|
||||||
|
//
|
||||||
|
// NOTE: This field is optional.
|
||||||
|
Reset func()
|
||||||
|
|
||||||
|
// Update is applied alongside other operations in the batch.
|
||||||
|
//
|
||||||
|
// NOTE: This method MUST NOT acquire any mutexes.
|
||||||
|
Update func(tx kvdb.RwTx) error
|
||||||
|
|
||||||
|
// OnCommit is called if the batch or a subset of the batch including
|
||||||
|
// this request all succeeded without failure. The passed error should
|
||||||
|
// contain the result of the transaction commit, as that can still fail
|
||||||
|
// even if none of the closures returned an error.
|
||||||
|
//
|
||||||
|
// NOTE: This field is optional.
|
||||||
|
OnCommit func(commitErr error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scheduler abstracts a generic batching engine that accumulates an incoming
|
||||||
|
// set of Requests, executes them, and returns the error from the operation.
|
||||||
|
type Scheduler interface {
|
||||||
|
// Execute schedules a Request for execution with the next available
|
||||||
|
// batch. This method blocks until the the underlying closure has been
|
||||||
|
// run against the databse. The resulting error is returned to the
|
||||||
|
// caller.
|
||||||
|
Execute(req *Request) error
|
||||||
|
}
|
103
batch/scheduler.go
Normal file
103
batch/scheduler.go
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TimeScheduler is a batching engine that executes requests within a fixed
|
||||||
|
// horizon. When the first request is received, a TimeScheduler waits a
|
||||||
|
// configurable duration for other concurrent requests to join the batch. Once
|
||||||
|
// this time has elapsed, the batch is closed and executed. Subsequent requests
|
||||||
|
// are then added to a new batch which undergoes the same process.
|
||||||
|
type TimeScheduler struct {
|
||||||
|
db kvdb.Backend
|
||||||
|
locker sync.Locker
|
||||||
|
duration time.Duration
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
b *batch
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
|
||||||
|
// which to schedule batches. If the operation needs to modify a higher-level
|
||||||
|
// cache, the cache's lock should be provided to so that external consistency
|
||||||
|
// can be maintained, as successful db operations will cause a request's
|
||||||
|
// OnCommit method to be executed while holding this lock.
|
||||||
|
func NewTimeScheduler(db kvdb.Backend, locker sync.Locker,
|
||||||
|
duration time.Duration) *TimeScheduler {
|
||||||
|
|
||||||
|
return &TimeScheduler{
|
||||||
|
db: db,
|
||||||
|
locker: locker,
|
||||||
|
duration: duration,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute schedules the provided request for batch execution along with other
|
||||||
|
// concurrent requests. The request will be executed within a fixed horizon,
|
||||||
|
// parameterizeed by the duration of the scheduler. The error from the
|
||||||
|
// underlying operation is returned to the caller.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Scheduler interface.
|
||||||
|
func (s *TimeScheduler) Execute(r *Request) error {
|
||||||
|
req := request{
|
||||||
|
Request: r,
|
||||||
|
errChan: make(chan error, 1),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the request to the current batch. If the batch has been cleared
|
||||||
|
// or no batch exists, create a new one.
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.b == nil {
|
||||||
|
s.b = &batch{
|
||||||
|
db: s.db,
|
||||||
|
clear: s.clear,
|
||||||
|
locker: s.locker,
|
||||||
|
}
|
||||||
|
time.AfterFunc(s.duration, s.b.trigger)
|
||||||
|
}
|
||||||
|
s.b.reqs = append(s.b.reqs, &req)
|
||||||
|
s.mu.Unlock()
|
||||||
|
|
||||||
|
// Wait for the batch to process the request. If the batch didn't
|
||||||
|
// ask us to execute the request individually, simply return the error.
|
||||||
|
err := <-req.errChan
|
||||||
|
if err != errSolo {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain exclusive access to the cache if this scheduler needs to
|
||||||
|
// modify the cache in OnCommit.
|
||||||
|
if s.locker != nil {
|
||||||
|
s.locker.Lock()
|
||||||
|
defer s.locker.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, run the request on its own.
|
||||||
|
commitErr := kvdb.Update(s.db, req.Update, func() {
|
||||||
|
if req.Reset != nil {
|
||||||
|
req.Reset()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// Finally, return the commit error directly or execute the OnCommit
|
||||||
|
// closure with the commit error if present.
|
||||||
|
if req.OnCommit != nil {
|
||||||
|
return req.OnCommit(commitErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return commitErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear resets the scheduler's batch to nil so that no more requests can be
|
||||||
|
// added.
|
||||||
|
func (s *TimeScheduler) clear(b *batch) {
|
||||||
|
s.mu.Lock()
|
||||||
|
if s.b == b {
|
||||||
|
s.b = nil
|
||||||
|
}
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user