diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 239b29b3..6d6efe7b 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -317,7 +317,7 @@ func New(config *Config) (*TowerClient, error) { c := &TowerClient{ cfg: cfg, log: plog, - pipeline: newTaskPipeline(), + pipeline: newTaskPipeline(plog), candidateTowers: newTowerListIterator(candidateTowers...), candidateSessions: candidateSessions, activeSessions: make(sessionQueueSet), diff --git a/watchtower/wtclient/task_pipeline.go b/watchtower/wtclient/task_pipeline.go index 076c93f6..d1dc62ff 100644 --- a/watchtower/wtclient/task_pipeline.go +++ b/watchtower/wtclient/task_pipeline.go @@ -4,6 +4,8 @@ import ( "container/list" "sync" "time" + + "github.com/btcsuite/btclog" ) // taskPipeline implements a reliable, in-order queue that ensures its queue @@ -17,6 +19,8 @@ type taskPipeline struct { stopped sync.Once forced sync.Once + log btclog.Logger + queueMtx sync.Mutex queueCond *sync.Cond queue *list.List @@ -29,8 +33,9 @@ type taskPipeline struct { } // newTaskPipeline initializes a new taskPipeline. -func newTaskPipeline() *taskPipeline { +func newTaskPipeline(log btclog.Logger) *taskPipeline { rq := &taskPipeline{ + log: log, queue: list.New(), newBackupTasks: make(chan *backupTask), quit: make(chan struct{}), @@ -55,7 +60,7 @@ func (q *taskPipeline) Start() { // the delivery of pending tasks to be interrupted. func (q *taskPipeline) Stop() { q.stopped.Do(func() { - log.Debugf("Stopping task pipeline") + q.log.Debugf("Stopping task pipeline") close(q.quit) q.signalUntilShutdown() @@ -64,7 +69,7 @@ func (q *taskPipeline) Stop() { select { case <-q.forceQuit: default: - log.Debugf("Task pipeline stopped successfully") + q.log.Debugf("Task pipeline stopped successfully") } }) } @@ -73,12 +78,12 @@ func (q *taskPipeline) Stop() { // backupTasks that have not been delivered via NewBackupTasks. func (q *taskPipeline) ForceQuit() { q.forced.Do(func() { - log.Infof("Force quitting task pipeline") + q.log.Infof("Force quitting task pipeline") close(q.forceQuit) q.signalUntilShutdown() - log.Infof("Task pipeline unclean shutdown complete") + q.log.Infof("Task pipeline unclean shutdown complete") }) } @@ -139,13 +144,13 @@ func (q *taskPipeline) queueManager() { // Exit only after the queue has been fully drained. if q.queue.Len() == 0 { q.queueCond.L.Unlock() - log.Debugf("Revoked state pipeline flushed.") + q.log.Debugf("Revoked state pipeline flushed.") return } case <-q.forceQuit: q.queueCond.L.Unlock() - log.Debugf("Revoked state pipeline force quit.") + q.log.Debugf("Revoked state pipeline force quit.") return default: @@ -165,7 +170,7 @@ func (q *taskPipeline) queueManager() { // Force quit, return immediately to allow the client to exit. case <-q.forceQuit: - log.Debugf("Revoked state pipeline force quit.") + q.log.Debugf("Revoked state pipeline force quit.") return } }