diff --git a/batch/batch.go b/batch/batch.go new file mode 100644 index 00000000..6b1fa2ab --- /dev/null +++ b/batch/batch.go @@ -0,0 +1,102 @@ +package batch + +import ( + "errors" + "sync" + + "github.com/lightningnetwork/lnd/channeldb/kvdb" +) + +// errSolo is a sentinel error indicating that the requester should re-run the +// operation in isolation. +var errSolo = errors.New( + "batch function returned an error and should be re-run solo", +) + +type request struct { + *Request + errChan chan error +} + +type batch struct { + db kvdb.Backend + start sync.Once + reqs []*request + clear func(b *batch) + locker sync.Locker +} + +// trigger is the entry point for the batch and ensures that run is started at +// most once. +func (b *batch) trigger() { + b.start.Do(b.run) +} + +// run executes the current batch of requests. If any individual requests fail +// alongside others they will be retried by the caller. +func (b *batch) run() { + // Clear the batch from its scheduler, ensuring that no new requests are + // added to this batch. + b.clear(b) + + // If a cache lock was provided, hold it until the this method returns. + // This is critical for ensuring external consistency of the operation, + // so that caches don't get out of sync with the on disk state. + if b.locker != nil { + b.locker.Lock() + defer b.locker.Unlock() + } + + // Apply the batch until a subset succeeds or all of them fail. Requests + // that fail will be retried individually. + for len(b.reqs) > 0 { + var failIdx = -1 + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + for i, req := range b.reqs { + err := req.Update(tx) + if err != nil { + failIdx = i + return err + } + } + return nil + }, func() { + for _, req := range b.reqs { + if req.Reset != nil { + req.Reset() + } + } + }) + + // If a request's Update failed, extract it and re-run the + // batch. The removed request will be retried individually by + // the caller. + if failIdx >= 0 { + req := b.reqs[failIdx] + + // It's safe to shorten b.reqs here because the + // scheduler's batch no longer points to us. + b.reqs[failIdx] = b.reqs[len(b.reqs)-1] + b.reqs = b.reqs[:len(b.reqs)-1] + + // Tell the submitter re-run it solo, continue with the + // rest of the batch. + req.errChan <- errSolo + continue + } + + // None of the remaining requests failed, process the errors + // using each request's OnCommit closure and return the error + // to the requester. If no OnCommit closure is provided, simply + // return the error directly. + for _, req := range b.reqs { + if req.OnCommit != nil { + req.errChan <- req.OnCommit(err) + } else { + req.errChan <- err + } + } + + return + } +} diff --git a/batch/interface.go b/batch/interface.go new file mode 100644 index 00000000..b9ab8b77 --- /dev/null +++ b/batch/interface.go @@ -0,0 +1,38 @@ +package batch + +import "github.com/lightningnetwork/lnd/channeldb/kvdb" + +// Request defines an operation that can be batched into a single bbolt +// transaction. +type Request struct { + // Reset is called before each invocation of Update and is used to clear + // any possible modifications to local state as a result of previous + // calls to Update that were not committed due to a concurrent batch + // failure. + // + // NOTE: This field is optional. + Reset func() + + // Update is applied alongside other operations in the batch. + // + // NOTE: This method MUST NOT acquire any mutexes. + Update func(tx kvdb.RwTx) error + + // OnCommit is called if the batch or a subset of the batch including + // this request all succeeded without failure. The passed error should + // contain the result of the transaction commit, as that can still fail + // even if none of the closures returned an error. + // + // NOTE: This field is optional. + OnCommit func(commitErr error) error +} + +// Scheduler abstracts a generic batching engine that accumulates an incoming +// set of Requests, executes them, and returns the error from the operation. +type Scheduler interface { + // Execute schedules a Request for execution with the next available + // batch. This method blocks until the the underlying closure has been + // run against the databse. The resulting error is returned to the + // caller. + Execute(req *Request) error +} diff --git a/batch/scheduler.go b/batch/scheduler.go new file mode 100644 index 00000000..7d681376 --- /dev/null +++ b/batch/scheduler.go @@ -0,0 +1,103 @@ +package batch + +import ( + "sync" + "time" + + "github.com/lightningnetwork/lnd/channeldb/kvdb" +) + +// TimeScheduler is a batching engine that executes requests within a fixed +// horizon. When the first request is received, a TimeScheduler waits a +// configurable duration for other concurrent requests to join the batch. Once +// this time has elapsed, the batch is closed and executed. Subsequent requests +// are then added to a new batch which undergoes the same process. +type TimeScheduler struct { + db kvdb.Backend + locker sync.Locker + duration time.Duration + + mu sync.Mutex + b *batch +} + +// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at +// which to schedule batches. If the operation needs to modify a higher-level +// cache, the cache's lock should be provided to so that external consistency +// can be maintained, as successful db operations will cause a request's +// OnCommit method to be executed while holding this lock. +func NewTimeScheduler(db kvdb.Backend, locker sync.Locker, + duration time.Duration) *TimeScheduler { + + return &TimeScheduler{ + db: db, + locker: locker, + duration: duration, + } +} + +// Execute schedules the provided request for batch execution along with other +// concurrent requests. The request will be executed within a fixed horizon, +// parameterizeed by the duration of the scheduler. The error from the +// underlying operation is returned to the caller. +// +// NOTE: Part of the Scheduler interface. +func (s *TimeScheduler) Execute(r *Request) error { + req := request{ + Request: r, + errChan: make(chan error, 1), + } + + // Add the request to the current batch. If the batch has been cleared + // or no batch exists, create a new one. + s.mu.Lock() + if s.b == nil { + s.b = &batch{ + db: s.db, + clear: s.clear, + locker: s.locker, + } + time.AfterFunc(s.duration, s.b.trigger) + } + s.b.reqs = append(s.b.reqs, &req) + s.mu.Unlock() + + // Wait for the batch to process the request. If the batch didn't + // ask us to execute the request individually, simply return the error. + err := <-req.errChan + if err != errSolo { + return err + } + + // Obtain exclusive access to the cache if this scheduler needs to + // modify the cache in OnCommit. + if s.locker != nil { + s.locker.Lock() + defer s.locker.Unlock() + } + + // Otherwise, run the request on its own. + commitErr := kvdb.Update(s.db, req.Update, func() { + if req.Reset != nil { + req.Reset() + } + }) + + // Finally, return the commit error directly or execute the OnCommit + // closure with the commit error if present. + if req.OnCommit != nil { + return req.OnCommit(commitErr) + } + + return commitErr +} + +// clear resets the scheduler's batch to nil so that no more requests can be +// added. +func (s *TimeScheduler) clear(b *batch) { + s.mu.Lock() + if s.b == b { + s.b = nil + } + s.mu.Unlock() +}