diff --git a/batch/interface.go b/batch/interface.go index b9ab8b77..e07cbd49 100644 --- a/batch/interface.go +++ b/batch/interface.go @@ -25,6 +25,23 @@ type Request struct { // // NOTE: This field is optional. OnCommit func(commitErr error) error + + // lazy should be true if we don't have to immediately execute this + // request when it comes in. This means that it can be scheduled later, + // allowing larger batches. + lazy bool +} + +// SchedulerOption is a type that can be used to supply options to a scheduled +// request. +type SchedulerOption func(r *Request) + +// LazyAdd will make the request be executed lazily, added to the next batch to +// reduce db contention. +func LazyAdd() SchedulerOption { + return func(r *Request) { + r.lazy = true + } } // Scheduler abstracts a generic batching engine that accumulates an incoming @@ -32,7 +49,7 @@ type Request struct { type Scheduler interface { // Execute schedules a Request for execution with the next available // batch. This method blocks until the the underlying closure has been - // run against the databse. The resulting error is returned to the + // run against the database. The resulting error is returned to the // caller. Execute(req *Request) error } diff --git a/batch/scheduler.go b/batch/scheduler.go index 7d681376..941c6f37 100644 --- a/batch/scheduler.go +++ b/batch/scheduler.go @@ -60,6 +60,12 @@ func (s *TimeScheduler) Execute(r *Request) error { time.AfterFunc(s.duration, s.b.trigger) } s.b.reqs = append(s.b.reqs, &req) + + // If this is a non-lazy request, we'll execute the batch immediately. + if !r.lazy { + go s.b.trigger() + } + s.mu.Unlock() // Wait for the batch to process the request. If the batch didn't