186 lines
4.8 KiB
Go
186 lines
4.8 KiB
Go
|
package wtclient
|
||
|
|
||
|
import (
|
||
|
"container/list"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// taskPipeline implements a reliable, in-order queue that ensures its queue
|
||
|
// fully drained before exiting. Stopping the taskPipeline prevents the pipeline
|
||
|
// from accepting any further tasks, and will cause the pipeline to exit after
|
||
|
// all updates have been delivered to the downstream receiver. If this process
|
||
|
// hangs and is unable to make progress, users can optionally call ForceQuit to
|
||
|
// abandon the reliable draining of the queue in order to permit shutdown.
|
||
|
type taskPipeline struct {
|
||
|
started sync.Once
|
||
|
stopped sync.Once
|
||
|
forced sync.Once
|
||
|
|
||
|
queueMtx sync.Mutex
|
||
|
queueCond *sync.Cond
|
||
|
queue *list.List
|
||
|
|
||
|
newBackupTasks chan *backupTask
|
||
|
|
||
|
quit chan struct{}
|
||
|
forceQuit chan struct{}
|
||
|
shutdown chan struct{}
|
||
|
}
|
||
|
|
||
|
// newTaskPipeline initializes a new taskPipeline.
|
||
|
func newTaskPipeline() *taskPipeline {
|
||
|
rq := &taskPipeline{
|
||
|
queue: list.New(),
|
||
|
newBackupTasks: make(chan *backupTask),
|
||
|
quit: make(chan struct{}),
|
||
|
forceQuit: make(chan struct{}),
|
||
|
shutdown: make(chan struct{}),
|
||
|
}
|
||
|
rq.queueCond = sync.NewCond(&rq.queueMtx)
|
||
|
|
||
|
return rq
|
||
|
}
|
||
|
|
||
|
// Start spins up the taskPipeline, making it eligible to begin receiving backup
|
||
|
// tasks and deliver them to the receiver of NewBackupTasks.
|
||
|
func (q *taskPipeline) Start() {
|
||
|
q.started.Do(func() {
|
||
|
go q.queueManager()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Stop begins a graceful shutdown of the taskPipeline. This method returns once
|
||
|
// all backupTasks have been delivered via NewBackupTasks, or a ForceQuit causes
|
||
|
// the delivery of pending tasks to be interrupted.
|
||
|
func (q *taskPipeline) Stop() {
|
||
|
q.stopped.Do(func() {
|
||
|
log.Debugf("Stopping task pipeline")
|
||
|
|
||
|
close(q.quit)
|
||
|
q.signalUntilShutdown()
|
||
|
|
||
|
// Skip log if we also force quit.
|
||
|
select {
|
||
|
case <-q.forceQuit:
|
||
|
default:
|
||
|
log.Debugf("Task pipeline stopped successfully")
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// ForceQuit signals the taskPipeline to immediately exit, dropping any
|
||
|
// backupTasks that have not been delivered via NewBackupTasks.
|
||
|
func (q *taskPipeline) ForceQuit() {
|
||
|
q.forced.Do(func() {
|
||
|
log.Infof("Force quitting task pipeline")
|
||
|
|
||
|
close(q.forceQuit)
|
||
|
q.signalUntilShutdown()
|
||
|
|
||
|
log.Infof("Task pipeline unclean shutdown complete")
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// NewBackupTasks returns a read-only channel for enqueue backupTasks. The
|
||
|
// channel will be closed after a call to Stop and all pending tasks have been
|
||
|
// delivered, or if a call to ForceQuit is called before the pending entries
|
||
|
// have been drained.
|
||
|
func (q *taskPipeline) NewBackupTasks() <-chan *backupTask {
|
||
|
return q.newBackupTasks
|
||
|
}
|
||
|
|
||
|
// QueueBackupTask enqueues a backupTask for reliable delivery to the consumer
|
||
|
// of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is
|
||
|
// returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be
|
||
|
// delivered via NewBackupTasks unless ForceQuit is called before completion.
|
||
|
func (q *taskPipeline) QueueBackupTask(task *backupTask) error {
|
||
|
q.queueCond.L.Lock()
|
||
|
select {
|
||
|
|
||
|
// Reject new tasks after quit has been signaled.
|
||
|
case <-q.quit:
|
||
|
q.queueCond.L.Unlock()
|
||
|
return ErrClientExiting
|
||
|
|
||
|
// Reject new tasks after force quit has been signaled.
|
||
|
case <-q.forceQuit:
|
||
|
q.queueCond.L.Unlock()
|
||
|
return ErrClientExiting
|
||
|
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
// Queue the new task and signal the queue's condition variable to wake up
|
||
|
// the queueManager for processing.
|
||
|
q.queue.PushBack(task)
|
||
|
q.queueCond.L.Unlock()
|
||
|
|
||
|
q.queueCond.Signal()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// queueManager processes all incoming backup requests that get added via
|
||
|
// QueueBackupTask. The manager will exit
|
||
|
//
|
||
|
// NOTE: This method MUST be run as a goroutine.
|
||
|
func (q *taskPipeline) queueManager() {
|
||
|
defer close(q.shutdown)
|
||
|
defer close(q.newBackupTasks)
|
||
|
|
||
|
for {
|
||
|
q.queueCond.L.Lock()
|
||
|
for q.queue.Front() == nil {
|
||
|
q.queueCond.Wait()
|
||
|
|
||
|
select {
|
||
|
case <-q.quit:
|
||
|
// Exit only after the queue has been fully drained.
|
||
|
if q.queue.Len() == 0 {
|
||
|
q.queueCond.L.Unlock()
|
||
|
log.Debugf("Revoked state pipeline flushed.")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
case <-q.forceQuit:
|
||
|
q.queueCond.L.Unlock()
|
||
|
log.Debugf("Revoked state pipeline force quit.")
|
||
|
return
|
||
|
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Pop the first element from the queue.
|
||
|
e := q.queue.Front()
|
||
|
task := q.queue.Remove(e).(*backupTask)
|
||
|
q.queueCond.L.Unlock()
|
||
|
|
||
|
select {
|
||
|
|
||
|
// Backup task submitted to dispatcher. We don't select on quit to
|
||
|
// ensure that we still drain tasks while shutting down.
|
||
|
case q.newBackupTasks <- task:
|
||
|
|
||
|
// Force quit, return immediately to allow the client to exit.
|
||
|
case <-q.forceQuit:
|
||
|
log.Debugf("Revoked state pipeline force quit.")
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// signalUntilShutdown strobes the queue's condition variable to ensure the
|
||
|
// queueManager reliably unblocks to check for the exit condition.
|
||
|
func (q *taskPipeline) signalUntilShutdown() {
|
||
|
for {
|
||
|
select {
|
||
|
case <-time.After(time.Millisecond):
|
||
|
q.queueCond.Signal()
|
||
|
case <-q.shutdown:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|