wtclient/task_pipeline: add prefix logging

This commit is contained in:
Conner Fromknecht 2020-12-26 17:35:21 -08:00
parent faa399f942
commit 505901d7b8
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 14 additions and 9 deletions

@ -317,7 +317,7 @@ func New(config *Config) (*TowerClient, error) {
c := &TowerClient{ c := &TowerClient{
cfg: cfg, cfg: cfg,
log: plog, log: plog,
pipeline: newTaskPipeline(), pipeline: newTaskPipeline(plog),
candidateTowers: newTowerListIterator(candidateTowers...), candidateTowers: newTowerListIterator(candidateTowers...),
candidateSessions: candidateSessions, candidateSessions: candidateSessions,
activeSessions: make(sessionQueueSet), activeSessions: make(sessionQueueSet),

@ -4,6 +4,8 @@ import (
"container/list" "container/list"
"sync" "sync"
"time" "time"
"github.com/btcsuite/btclog"
) )
// taskPipeline implements a reliable, in-order queue that ensures its queue // taskPipeline implements a reliable, in-order queue that ensures its queue
@ -17,6 +19,8 @@ type taskPipeline struct {
stopped sync.Once stopped sync.Once
forced sync.Once forced sync.Once
log btclog.Logger
queueMtx sync.Mutex queueMtx sync.Mutex
queueCond *sync.Cond queueCond *sync.Cond
queue *list.List queue *list.List
@ -29,8 +33,9 @@ type taskPipeline struct {
} }
// newTaskPipeline initializes a new taskPipeline. // newTaskPipeline initializes a new taskPipeline.
func newTaskPipeline() *taskPipeline { func newTaskPipeline(log btclog.Logger) *taskPipeline {
rq := &taskPipeline{ rq := &taskPipeline{
log: log,
queue: list.New(), queue: list.New(),
newBackupTasks: make(chan *backupTask), newBackupTasks: make(chan *backupTask),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -55,7 +60,7 @@ func (q *taskPipeline) Start() {
// the delivery of pending tasks to be interrupted. // the delivery of pending tasks to be interrupted.
func (q *taskPipeline) Stop() { func (q *taskPipeline) Stop() {
q.stopped.Do(func() { q.stopped.Do(func() {
log.Debugf("Stopping task pipeline") q.log.Debugf("Stopping task pipeline")
close(q.quit) close(q.quit)
q.signalUntilShutdown() q.signalUntilShutdown()
@ -64,7 +69,7 @@ func (q *taskPipeline) Stop() {
select { select {
case <-q.forceQuit: case <-q.forceQuit:
default: default:
log.Debugf("Task pipeline stopped successfully") q.log.Debugf("Task pipeline stopped successfully")
} }
}) })
} }
@ -73,12 +78,12 @@ func (q *taskPipeline) Stop() {
// backupTasks that have not been delivered via NewBackupTasks. // backupTasks that have not been delivered via NewBackupTasks.
func (q *taskPipeline) ForceQuit() { func (q *taskPipeline) ForceQuit() {
q.forced.Do(func() { q.forced.Do(func() {
log.Infof("Force quitting task pipeline") q.log.Infof("Force quitting task pipeline")
close(q.forceQuit) close(q.forceQuit)
q.signalUntilShutdown() q.signalUntilShutdown()
log.Infof("Task pipeline unclean shutdown complete") q.log.Infof("Task pipeline unclean shutdown complete")
}) })
} }
@ -139,13 +144,13 @@ func (q *taskPipeline) queueManager() {
// Exit only after the queue has been fully drained. // Exit only after the queue has been fully drained.
if q.queue.Len() == 0 { if q.queue.Len() == 0 {
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("Revoked state pipeline flushed.") q.log.Debugf("Revoked state pipeline flushed.")
return return
} }
case <-q.forceQuit: case <-q.forceQuit:
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("Revoked state pipeline force quit.") q.log.Debugf("Revoked state pipeline force quit.")
return return
default: default:
@ -165,7 +170,7 @@ func (q *taskPipeline) queueManager() {
// Force quit, return immediately to allow the client to exit. // Force quit, return immediately to allow the client to exit.
case <-q.forceQuit: case <-q.forceQuit:
log.Debugf("Revoked state pipeline force quit.") q.log.Debugf("Revoked state pipeline force quit.")
return return
} }
} }