From 65d09fca6439c62ae0b73fdd4b7b28c119666a82 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 15 Mar 2019 02:31:50 -0700 Subject: [PATCH] watchtower/wtclient/task_pipeline: add reliable task aggregator --- watchtower/wtclient/task_pipeline.go | 185 +++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 watchtower/wtclient/task_pipeline.go diff --git a/watchtower/wtclient/task_pipeline.go b/watchtower/wtclient/task_pipeline.go new file mode 100644 index 00000000..076c93f6 --- /dev/null +++ b/watchtower/wtclient/task_pipeline.go @@ -0,0 +1,185 @@ +package wtclient + +import ( + "container/list" + "sync" + "time" +) + +// taskPipeline implements a reliable, in-order queue that ensures its queue +// fully drained before exiting. Stopping the taskPipeline prevents the pipeline +// from accepting any further tasks, and will cause the pipeline to exit after +// all updates have been delivered to the downstream receiver. If this process +// hangs and is unable to make progress, users can optionally call ForceQuit to +// abandon the reliable draining of the queue in order to permit shutdown. +type taskPipeline struct { + started sync.Once + stopped sync.Once + forced sync.Once + + queueMtx sync.Mutex + queueCond *sync.Cond + queue *list.List + + newBackupTasks chan *backupTask + + quit chan struct{} + forceQuit chan struct{} + shutdown chan struct{} +} + +// newTaskPipeline initializes a new taskPipeline. +func newTaskPipeline() *taskPipeline { + rq := &taskPipeline{ + queue: list.New(), + newBackupTasks: make(chan *backupTask), + quit: make(chan struct{}), + forceQuit: make(chan struct{}), + shutdown: make(chan struct{}), + } + rq.queueCond = sync.NewCond(&rq.queueMtx) + + return rq +} + +// Start spins up the taskPipeline, making it eligible to begin receiving backup +// tasks and deliver them to the receiver of NewBackupTasks. +func (q *taskPipeline) Start() { + q.started.Do(func() { + go q.queueManager() + }) +} + +// Stop begins a graceful shutdown of the taskPipeline. This method returns once +// all backupTasks have been delivered via NewBackupTasks, or a ForceQuit causes +// the delivery of pending tasks to be interrupted. +func (q *taskPipeline) Stop() { + q.stopped.Do(func() { + log.Debugf("Stopping task pipeline") + + close(q.quit) + q.signalUntilShutdown() + + // Skip log if we also force quit. + select { + case <-q.forceQuit: + default: + log.Debugf("Task pipeline stopped successfully") + } + }) +} + +// ForceQuit signals the taskPipeline to immediately exit, dropping any +// backupTasks that have not been delivered via NewBackupTasks. +func (q *taskPipeline) ForceQuit() { + q.forced.Do(func() { + log.Infof("Force quitting task pipeline") + + close(q.forceQuit) + q.signalUntilShutdown() + + log.Infof("Task pipeline unclean shutdown complete") + }) +} + +// NewBackupTasks returns a read-only channel for enqueue backupTasks. The +// channel will be closed after a call to Stop and all pending tasks have been +// delivered, or if a call to ForceQuit is called before the pending entries +// have been drained. +func (q *taskPipeline) NewBackupTasks() <-chan *backupTask { + return q.newBackupTasks +} + +// QueueBackupTask enqueues a backupTask for reliable delivery to the consumer +// of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is +// returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be +// delivered via NewBackupTasks unless ForceQuit is called before completion. +func (q *taskPipeline) QueueBackupTask(task *backupTask) error { + q.queueCond.L.Lock() + select { + + // Reject new tasks after quit has been signaled. + case <-q.quit: + q.queueCond.L.Unlock() + return ErrClientExiting + + // Reject new tasks after force quit has been signaled. + case <-q.forceQuit: + q.queueCond.L.Unlock() + return ErrClientExiting + + default: + } + + // Queue the new task and signal the queue's condition variable to wake up + // the queueManager for processing. + q.queue.PushBack(task) + q.queueCond.L.Unlock() + + q.queueCond.Signal() + + return nil +} + +// queueManager processes all incoming backup requests that get added via +// QueueBackupTask. The manager will exit +// +// NOTE: This method MUST be run as a goroutine. +func (q *taskPipeline) queueManager() { + defer close(q.shutdown) + defer close(q.newBackupTasks) + + for { + q.queueCond.L.Lock() + for q.queue.Front() == nil { + q.queueCond.Wait() + + select { + case <-q.quit: + // Exit only after the queue has been fully drained. + if q.queue.Len() == 0 { + q.queueCond.L.Unlock() + log.Debugf("Revoked state pipeline flushed.") + return + } + + case <-q.forceQuit: + q.queueCond.L.Unlock() + log.Debugf("Revoked state pipeline force quit.") + return + + default: + } + } + + // Pop the first element from the queue. + e := q.queue.Front() + task := q.queue.Remove(e).(*backupTask) + q.queueCond.L.Unlock() + + select { + + // Backup task submitted to dispatcher. We don't select on quit to + // ensure that we still drain tasks while shutting down. + case q.newBackupTasks <- task: + + // Force quit, return immediately to allow the client to exit. + case <-q.forceQuit: + log.Debugf("Revoked state pipeline force quit.") + return + } + } +} + +// signalUntilShutdown strobes the queue's condition variable to ensure the +// queueManager reliably unblocks to check for the exit condition. +func (q *taskPipeline) signalUntilShutdown() { + for { + select { + case <-time.After(time.Millisecond): + q.queueCond.Signal() + case <-q.shutdown: + return + } + } +}