batch: add option for executing requests immediately
We make the default non-lazy, and will make the incoming gossip requests lazy.
This commit is contained in:
parent
503536c1e4
commit
e3b529939e
@ -25,6 +25,23 @@ type Request struct {
|
|||||||
//
|
//
|
||||||
// NOTE: This field is optional.
|
// NOTE: This field is optional.
|
||||||
OnCommit func(commitErr error) error
|
OnCommit func(commitErr error) error
|
||||||
|
|
||||||
|
// lazy should be true if we don't have to immediately execute this
|
||||||
|
// request when it comes in. This means that it can be scheduled later,
|
||||||
|
// allowing larger batches.
|
||||||
|
lazy bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// SchedulerOption is a type that can be used to supply options to a scheduled
|
||||||
|
// request.
|
||||||
|
type SchedulerOption func(r *Request)
|
||||||
|
|
||||||
|
// LazyAdd will make the request be executed lazily, added to the next batch to
|
||||||
|
// reduce db contention.
|
||||||
|
func LazyAdd() SchedulerOption {
|
||||||
|
return func(r *Request) {
|
||||||
|
r.lazy = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scheduler abstracts a generic batching engine that accumulates an incoming
|
// Scheduler abstracts a generic batching engine that accumulates an incoming
|
||||||
@ -32,7 +49,7 @@ type Request struct {
|
|||||||
type Scheduler interface {
|
type Scheduler interface {
|
||||||
// Execute schedules a Request for execution with the next available
|
// Execute schedules a Request for execution with the next available
|
||||||
// batch. This method blocks until the the underlying closure has been
|
// batch. This method blocks until the the underlying closure has been
|
||||||
// run against the databse. The resulting error is returned to the
|
// run against the database. The resulting error is returned to the
|
||||||
// caller.
|
// caller.
|
||||||
Execute(req *Request) error
|
Execute(req *Request) error
|
||||||
}
|
}
|
||||||
|
@ -60,6 +60,12 @@ func (s *TimeScheduler) Execute(r *Request) error {
|
|||||||
time.AfterFunc(s.duration, s.b.trigger)
|
time.AfterFunc(s.duration, s.b.trigger)
|
||||||
}
|
}
|
||||||
s.b.reqs = append(s.b.reqs, &req)
|
s.b.reqs = append(s.b.reqs, &req)
|
||||||
|
|
||||||
|
// If this is a non-lazy request, we'll execute the batch immediately.
|
||||||
|
if !r.lazy {
|
||||||
|
go s.b.trigger()
|
||||||
|
}
|
||||||
|
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
// Wait for the batch to process the request. If the batch didn't
|
// Wait for the batch to process the request. If the batch didn't
|
||||||
|
Loading…
Reference in New Issue
Block a user