104 lines
2.7 KiB
Go
104 lines
2.7 KiB
Go
|
package batch
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/lightningnetwork/lnd/channeldb/kvdb"
|
||
|
)
|
||
|
|
||
|
// TimeScheduler is a batching engine that executes requests within a fixed
|
||
|
// horizon. When the first request is received, a TimeScheduler waits a
|
||
|
// configurable duration for other concurrent requests to join the batch. Once
|
||
|
// this time has elapsed, the batch is closed and executed. Subsequent requests
|
||
|
// are then added to a new batch which undergoes the same process.
|
||
|
type TimeScheduler struct {
|
||
|
db kvdb.Backend
|
||
|
locker sync.Locker
|
||
|
duration time.Duration
|
||
|
|
||
|
mu sync.Mutex
|
||
|
b *batch
|
||
|
}
|
||
|
|
||
|
// NewTimeScheduler initializes a new TimeScheduler with a fixed duration at
|
||
|
// which to schedule batches. If the operation needs to modify a higher-level
|
||
|
// cache, the cache's lock should be provided to so that external consistency
|
||
|
// can be maintained, as successful db operations will cause a request's
|
||
|
// OnCommit method to be executed while holding this lock.
|
||
|
func NewTimeScheduler(db kvdb.Backend, locker sync.Locker,
|
||
|
duration time.Duration) *TimeScheduler {
|
||
|
|
||
|
return &TimeScheduler{
|
||
|
db: db,
|
||
|
locker: locker,
|
||
|
duration: duration,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Execute schedules the provided request for batch execution along with other
|
||
|
// concurrent requests. The request will be executed within a fixed horizon,
|
||
|
// parameterizeed by the duration of the scheduler. The error from the
|
||
|
// underlying operation is returned to the caller.
|
||
|
//
|
||
|
// NOTE: Part of the Scheduler interface.
|
||
|
func (s *TimeScheduler) Execute(r *Request) error {
|
||
|
req := request{
|
||
|
Request: r,
|
||
|
errChan: make(chan error, 1),
|
||
|
}
|
||
|
|
||
|
// Add the request to the current batch. If the batch has been cleared
|
||
|
// or no batch exists, create a new one.
|
||
|
s.mu.Lock()
|
||
|
if s.b == nil {
|
||
|
s.b = &batch{
|
||
|
db: s.db,
|
||
|
clear: s.clear,
|
||
|
locker: s.locker,
|
||
|
}
|
||
|
time.AfterFunc(s.duration, s.b.trigger)
|
||
|
}
|
||
|
s.b.reqs = append(s.b.reqs, &req)
|
||
|
s.mu.Unlock()
|
||
|
|
||
|
// Wait for the batch to process the request. If the batch didn't
|
||
|
// ask us to execute the request individually, simply return the error.
|
||
|
err := <-req.errChan
|
||
|
if err != errSolo {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Obtain exclusive access to the cache if this scheduler needs to
|
||
|
// modify the cache in OnCommit.
|
||
|
if s.locker != nil {
|
||
|
s.locker.Lock()
|
||
|
defer s.locker.Unlock()
|
||
|
}
|
||
|
|
||
|
// Otherwise, run the request on its own.
|
||
|
commitErr := kvdb.Update(s.db, req.Update, func() {
|
||
|
if req.Reset != nil {
|
||
|
req.Reset()
|
||
|
}
|
||
|
})
|
||
|
|
||
|
// Finally, return the commit error directly or execute the OnCommit
|
||
|
// closure with the commit error if present.
|
||
|
if req.OnCommit != nil {
|
||
|
return req.OnCommit(commitErr)
|
||
|
}
|
||
|
|
||
|
return commitErr
|
||
|
}
|
||
|
|
||
|
// clear resets the scheduler's batch to nil so that no more requests can be
|
||
|
// added.
|
||
|
func (s *TimeScheduler) clear(b *batch) {
|
||
|
s.mu.Lock()
|
||
|
if s.b == b {
|
||
|
s.b = nil
|
||
|
}
|
||
|
s.mu.Unlock()
|
||
|
}
|