package wtclient import ( "container/list" "sync" "time" "github.com/btcsuite/btclog" ) // taskPipeline implements a reliable, in-order queue that ensures its queue // fully drained before exiting. Stopping the taskPipeline prevents the pipeline // from accepting any further tasks, and will cause the pipeline to exit after // all updates have been delivered to the downstream receiver. If this process // hangs and is unable to make progress, users can optionally call ForceQuit to // abandon the reliable draining of the queue in order to permit shutdown. type taskPipeline struct { started sync.Once stopped sync.Once forced sync.Once log btclog.Logger queueMtx sync.Mutex queueCond *sync.Cond queue *list.List newBackupTasks chan *backupTask quit chan struct{} forceQuit chan struct{} shutdown chan struct{} } // newTaskPipeline initializes a new taskPipeline. func newTaskPipeline(log btclog.Logger) *taskPipeline { rq := &taskPipeline{ log: log, queue: list.New(), newBackupTasks: make(chan *backupTask), quit: make(chan struct{}), forceQuit: make(chan struct{}), shutdown: make(chan struct{}), } rq.queueCond = sync.NewCond(&rq.queueMtx) return rq } // Start spins up the taskPipeline, making it eligible to begin receiving backup // tasks and deliver them to the receiver of NewBackupTasks. func (q *taskPipeline) Start() { q.started.Do(func() { go q.queueManager() }) } // Stop begins a graceful shutdown of the taskPipeline. This method returns once // all backupTasks have been delivered via NewBackupTasks, or a ForceQuit causes // the delivery of pending tasks to be interrupted. func (q *taskPipeline) Stop() { q.stopped.Do(func() { q.log.Debugf("Stopping task pipeline") close(q.quit) q.signalUntilShutdown() // Skip log if we also force quit. select { case <-q.forceQuit: default: q.log.Debugf("Task pipeline stopped successfully") } }) } // ForceQuit signals the taskPipeline to immediately exit, dropping any // backupTasks that have not been delivered via NewBackupTasks. func (q *taskPipeline) ForceQuit() { q.forced.Do(func() { q.log.Infof("Force quitting task pipeline") close(q.forceQuit) q.signalUntilShutdown() q.log.Infof("Task pipeline unclean shutdown complete") }) } // NewBackupTasks returns a read-only channel for enqueue backupTasks. The // channel will be closed after a call to Stop and all pending tasks have been // delivered, or if a call to ForceQuit is called before the pending entries // have been drained. func (q *taskPipeline) NewBackupTasks() <-chan *backupTask { return q.newBackupTasks } // QueueBackupTask enqueues a backupTask for reliable delivery to the consumer // of NewBackupTasks. If the taskPipeline is shutting down, ErrClientExiting is // returned. Otherwise, if QueueBackupTask returns nil it is guaranteed to be // delivered via NewBackupTasks unless ForceQuit is called before completion. func (q *taskPipeline) QueueBackupTask(task *backupTask) error { q.queueCond.L.Lock() select { // Reject new tasks after quit has been signaled. case <-q.quit: q.queueCond.L.Unlock() return ErrClientExiting // Reject new tasks after force quit has been signaled. case <-q.forceQuit: q.queueCond.L.Unlock() return ErrClientExiting default: } // Queue the new task and signal the queue's condition variable to wake up // the queueManager for processing. q.queue.PushBack(task) q.queueCond.L.Unlock() q.queueCond.Signal() return nil } // queueManager processes all incoming backup requests that get added via // QueueBackupTask. The manager will exit // // NOTE: This method MUST be run as a goroutine. func (q *taskPipeline) queueManager() { defer close(q.shutdown) defer close(q.newBackupTasks) for { q.queueCond.L.Lock() for q.queue.Front() == nil { q.queueCond.Wait() select { case <-q.quit: // Exit only after the queue has been fully drained. if q.queue.Len() == 0 { q.queueCond.L.Unlock() q.log.Debugf("Revoked state pipeline flushed.") return } case <-q.forceQuit: q.queueCond.L.Unlock() q.log.Debugf("Revoked state pipeline force quit.") return default: } } // Pop the first element from the queue. e := q.queue.Front() task := q.queue.Remove(e).(*backupTask) q.queueCond.L.Unlock() select { // Backup task submitted to dispatcher. We don't select on quit to // ensure that we still drain tasks while shutting down. case q.newBackupTasks <- task: // Force quit, return immediately to allow the client to exit. case <-q.forceQuit: q.log.Debugf("Revoked state pipeline force quit.") return } } } // signalUntilShutdown strobes the queue's condition variable to ensure the // queueManager reliably unblocks to check for the exit condition. func (q *taskPipeline) signalUntilShutdown() { for { select { case <-time.After(time.Millisecond): q.queueCond.Signal() case <-q.shutdown: return } } }