diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 239b29b3..f81284d5 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -317,7 +317,7 @@ func New(config *Config) (*TowerClient, error) { c := &TowerClient{ cfg: cfg, log: plog, - pipeline: newTaskPipeline(), + pipeline: newTaskPipeline(plog), candidateTowers: newTowerListIterator(candidateTowers...), candidateSessions: candidateSessions, activeSessions: make(sessionQueueSet), @@ -339,6 +339,7 @@ func New(config *Config) (*TowerClient, error) { Candidates: c.candidateTowers, MinBackoff: cfg.MinBackoff, MaxBackoff: cfg.MaxBackoff, + Log: plog, }) // Reconstruct the highest commit height processed for each channel @@ -468,7 +469,7 @@ func (c *TowerClient) Start() error { c.wg.Add(1) go c.backupDispatcher() - log.Infof("Watchtower client started successfully") + c.log.Infof("Watchtower client started successfully") }) return err } @@ -1006,6 +1007,7 @@ func (c *TowerClient) newSessionQueue(s *wtdb.ClientSession) *sessionQueue { DB: c.cfg.DB, MinBackoff: c.cfg.MinBackoff, MaxBackoff: c.cfg.MaxBackoff, + Log: c.log, }) } diff --git a/watchtower/wtclient/session_negotiator.go b/watchtower/wtclient/session_negotiator.go index fe85edb0..efddfa99 100644 --- a/watchtower/wtclient/session_negotiator.go +++ b/watchtower/wtclient/session_negotiator.go @@ -6,6 +6,7 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/watchtower/blob" @@ -85,6 +86,10 @@ type NegotiatorConfig struct { // exponential backoff produces a timeout greater than this value, the // backoff duration will be clamped to MaxBackoff. MaxBackoff time.Duration + + // Log specifies the desired log output, which should be prefixed by the + // client type, e.g. anchor or legacy. + Log btclog.Logger } // sessionNegotiator is concrete SessionNegotiator that is able to request new @@ -97,6 +102,7 @@ type sessionNegotiator struct { localInit *wtwire.Init cfg *NegotiatorConfig + log btclog.Logger dispatcher chan struct{} newSessions chan *wtdb.ClientSession @@ -130,6 +136,7 @@ func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator { return &sessionNegotiator{ cfg: cfg, + log: cfg.Log, localInit: localInit, dispatcher: make(chan struct{}, 1), newSessions: make(chan *wtdb.ClientSession), @@ -141,7 +148,7 @@ func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator { // Start safely starts up the sessionNegotiator. func (n *sessionNegotiator) Start() error { n.started.Do(func() { - log.Debugf("Starting session negotiator") + n.log.Debugf("Starting session negotiator") n.wg.Add(1) go n.negotiationDispatcher() @@ -153,7 +160,7 @@ func (n *sessionNegotiator) Start() error { // Stop safely shutsdown the sessionNegotiator. func (n *sessionNegotiator) Stop() error { n.stopped.Do(func() { - log.Debugf("Stopping session negotiator") + n.log.Debugf("Stopping session negotiator") close(n.quit) n.wg.Wait() @@ -191,7 +198,7 @@ func (n *sessionNegotiator) negotiationDispatcher() { pendingNegotiations++ if pendingNegotiations > 1 { - log.Debugf("Already negotiating session, " + + n.log.Debugf("Already negotiating session, " + "waiting for existing negotiation to " + "complete") continue @@ -199,7 +206,7 @@ func (n *sessionNegotiator) negotiationDispatcher() { // TODO(conner): consider reusing good towers - log.Debugf("Dispatching session negotiation") + n.log.Debugf("Dispatching session negotiation") n.wg.Add(1) go n.negotiate() @@ -213,7 +220,7 @@ func (n *sessionNegotiator) negotiationDispatcher() { } if pendingNegotiations > 0 { - log.Debugf("Dispatching pending session " + + n.log.Debugf("Dispatching pending session " + "negotiation") n.wg.Add(1) @@ -278,7 +285,7 @@ retryWithBackoff: // We've run out of addresses, update our backoff. updateBackoff() - log.Debugf("Unable to get new tower candidate, "+ + n.log.Debugf("Unable to get new tower candidate, "+ "retrying after %v -- reason: %v", backoff, err) // Only reset the iterator once we've exhausted all @@ -292,7 +299,7 @@ retryWithBackoff: } towerPub := tower.IdentityKey.SerializeCompressed() - log.Debugf("Attempting session negotiation with tower=%x", + n.log.Debugf("Attempting session negotiation with tower=%x", towerPub) // Before proceeding, we will reserve a session key index to use @@ -302,7 +309,7 @@ retryWithBackoff: tower.ID, n.cfg.Policy.BlobType, ) if err != nil { - log.Debugf("Unable to reserve session key index "+ + n.log.Debugf("Unable to reserve session key index "+ "for tower=%x: %v", towerPub, err) continue } @@ -314,7 +321,7 @@ retryWithBackoff: // An unexpected error occurred, updpate our backoff. updateBackoff() - log.Debugf("Session negotiation with tower=%x "+ + n.log.Debugf("Session negotiation with tower=%x "+ "failed, trying again -- reason: %v", tower.IdentityKey.SerializeCompressed(), err) @@ -360,7 +367,7 @@ func (n *sessionNegotiator) createSession(tower *wtdb.Tower, fallthrough case err != nil: - log.Debugf("Request for session negotiation with "+ + n.log.Debugf("Request for session negotiation with "+ "tower=%s failed, trying again -- reason: "+ "%v", lnAddr, err) continue @@ -467,7 +474,7 @@ func (n *sessionNegotiator) tryAddress(sessionKey keychain.SingleKeyECDH, err) } - log.Debugf("New session negotiated with %s, policy: %s", + n.log.Debugf("New session negotiated with %s, policy: %s", lnAddr, clientSession.Policy) // We have a newly negotiated session, return it to the diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index ffc61892..8b2a9ad5 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -7,6 +7,7 @@ import ( "time" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btclog" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwire" @@ -70,6 +71,10 @@ type sessionQueueConfig struct { // timeout greater than this value, the backoff duration will be clamped // to MaxBackoff. MaxBackoff time.Duration + + // Log specifies the desired log output, which should be prefixed by the + // client type, e.g. anchor or legacy. + Log btclog.Logger } // sessionQueue implements a reliable queue that will encrypt and send accepted @@ -84,6 +89,7 @@ type sessionQueue struct { forced sync.Once cfg *sessionQueueConfig + log btclog.Logger commitQueue *list.List pendingQueue *list.List @@ -116,6 +122,7 @@ func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue { sq := &sessionQueue{ cfg: cfg, + log: cfg.Log, commitQueue: list.New(), pendingQueue: list.New(), localInit: localInit, @@ -149,7 +156,7 @@ func (q *sessionQueue) Start() { // will clear all pending tasks in the queue before returning to the caller. func (q *sessionQueue) Stop() { q.stopped.Do(func() { - log.Debugf("SessionQueue(%s) stopping ...", q.ID()) + q.log.Debugf("SessionQueue(%s) stopping ...", q.ID()) close(q.quit) q.signalUntilShutdown() @@ -161,7 +168,7 @@ func (q *sessionQueue) Stop() { default: } - log.Debugf("SessionQueue(%s) stopped", q.ID()) + q.log.Debugf("SessionQueue(%s) stopped", q.ID()) }) } @@ -169,12 +176,12 @@ func (q *sessionQueue) Stop() { // he caller after all lingering goroutines have spun down. func (q *sessionQueue) ForceQuit() { q.forced.Do(func() { - log.Infof("SessionQueue(%s) force quitting...", q.ID()) + q.log.Infof("SessionQueue(%s) force quitting...", q.ID()) close(q.forceQuit) q.signalUntilShutdown() - log.Infof("SessionQueue(%s) force quit", q.ID()) + q.log.Infof("SessionQueue(%s) force quit", q.ID()) }) } @@ -192,7 +199,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { numPending := uint32(q.pendingQueue.Len()) maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates - log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+ + q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+ "pending=%d max-updates=%d", q.ID(), task.id, q.seqNum, numPending, maxUpdates) @@ -218,7 +225,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody) if err != nil { q.queueCond.L.Unlock() - log.Debugf("SessionQueue(%s) rejected %v: %v ", + q.log.Debugf("SessionQueue(%s) rejected %v: %v ", q.ID(), task.id, err) return curStatus, false } @@ -287,7 +294,7 @@ func (q *sessionQueue) drainBackups() { // First, check that we are able to dial this session's tower. conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionKeyECDH, q.towerAddr) if err != nil { - log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v", + q.log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v", q.ID(), q.towerAddr, err) q.increaseBackoff() @@ -309,7 +316,7 @@ func (q *sessionQueue) drainBackups() { // before attempting to dequeue any pending updates. stateUpdate, isPending, backupID, err := q.nextStateUpdate() if err != nil { - log.Errorf("SessionQueue(%v) unable to get next state "+ + q.log.Errorf("SessionQueue(%v) unable to get next state "+ "update: %v", q.ID(), err) return } @@ -319,7 +326,7 @@ func (q *sessionQueue) drainBackups() { conn, stateUpdate, q.localInit, sendInit, isPending, ) if err != nil { - log.Errorf("SessionQueue(%s) unable to send state "+ + q.log.Errorf("SessionQueue(%s) unable to send state "+ "update: %v", q.ID(), err) q.increaseBackoff() @@ -330,7 +337,7 @@ func (q *sessionQueue) drainBackups() { return } - log.Infof("SessionQueue(%s) uploaded %v seqnum=%d", + q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d", q.ID(), backupID, stateUpdate.SeqNum) // If the last task was backed up successfully, we'll exit and @@ -388,7 +395,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0 q.queueCond.L.Unlock() - log.Debugf("SessionQueue(%s) reprocessing committed state "+ + q.log.Debugf("SessionQueue(%s) reprocessing committed state "+ "update for %v seqnum=%d", q.ID(), update.BackupID, seqNum) @@ -429,7 +436,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, }, } - log.Debugf("SessionQueue(%s) committing state update "+ + q.log.Debugf("SessionQueue(%s) committing state update "+ "%v seqnum=%d", q.ID(), update.BackupID, seqNum) } @@ -538,7 +545,7 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer, err := fmt.Errorf("received error code %v in "+ "StateUpdateReply for seqnum=%d", stateUpdateReply.Code, stateUpdate.SeqNum) - log.Warnf("SessionQueue(%s) unable to upload state update to "+ + q.log.Warnf("SessionQueue(%s) unable to upload state update to "+ "tower=%s: %v", q.ID(), q.towerAddr, err) return err } @@ -550,21 +557,21 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer, // TODO(conner): borked watchtower err = fmt.Errorf("unable to ack seqnum=%d: %v", stateUpdate.SeqNum, err) - log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err) + q.log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err) return err case err == wtdb.ErrLastAppliedReversion: // TODO(conner): borked watchtower err = fmt.Errorf("unable to ack seqnum=%d: %v", stateUpdate.SeqNum, err) - log.Errorf("SessionQueue(%s) failed to ack update: %v", + q.log.Errorf("SessionQueue(%s) failed to ack update: %v", q.ID(), err) return err case err != nil: err = fmt.Errorf("unable to ack seqnum=%d: %v", stateUpdate.SeqNum, err) - log.Errorf("SessionQueue(%s) failed to ack update: %v", + q.log.Errorf("SessionQueue(%s) failed to ack update: %v", q.ID(), err) return err } diff --git a/watchtower/wtclient/task_pipeline.go b/watchtower/wtclient/task_pipeline.go index 076c93f6..d1dc62ff 100644 --- a/watchtower/wtclient/task_pipeline.go +++ b/watchtower/wtclient/task_pipeline.go @@ -4,6 +4,8 @@ import ( "container/list" "sync" "time" + + "github.com/btcsuite/btclog" ) // taskPipeline implements a reliable, in-order queue that ensures its queue @@ -17,6 +19,8 @@ type taskPipeline struct { stopped sync.Once forced sync.Once + log btclog.Logger + queueMtx sync.Mutex queueCond *sync.Cond queue *list.List @@ -29,8 +33,9 @@ type taskPipeline struct { } // newTaskPipeline initializes a new taskPipeline. -func newTaskPipeline() *taskPipeline { +func newTaskPipeline(log btclog.Logger) *taskPipeline { rq := &taskPipeline{ + log: log, queue: list.New(), newBackupTasks: make(chan *backupTask), quit: make(chan struct{}), @@ -55,7 +60,7 @@ func (q *taskPipeline) Start() { // the delivery of pending tasks to be interrupted. func (q *taskPipeline) Stop() { q.stopped.Do(func() { - log.Debugf("Stopping task pipeline") + q.log.Debugf("Stopping task pipeline") close(q.quit) q.signalUntilShutdown() @@ -64,7 +69,7 @@ func (q *taskPipeline) Stop() { select { case <-q.forceQuit: default: - log.Debugf("Task pipeline stopped successfully") + q.log.Debugf("Task pipeline stopped successfully") } }) } @@ -73,12 +78,12 @@ func (q *taskPipeline) Stop() { // backupTasks that have not been delivered via NewBackupTasks. func (q *taskPipeline) ForceQuit() { q.forced.Do(func() { - log.Infof("Force quitting task pipeline") + q.log.Infof("Force quitting task pipeline") close(q.forceQuit) q.signalUntilShutdown() - log.Infof("Task pipeline unclean shutdown complete") + q.log.Infof("Task pipeline unclean shutdown complete") }) } @@ -139,13 +144,13 @@ func (q *taskPipeline) queueManager() { // Exit only after the queue has been fully drained. if q.queue.Len() == 0 { q.queueCond.L.Unlock() - log.Debugf("Revoked state pipeline flushed.") + q.log.Debugf("Revoked state pipeline flushed.") return } case <-q.forceQuit: q.queueCond.L.Unlock() - log.Debugf("Revoked state pipeline force quit.") + q.log.Debugf("Revoked state pipeline force quit.") return default: @@ -165,7 +170,7 @@ func (q *taskPipeline) queueManager() { // Force quit, return immediately to allow the client to exit. case <-q.forceQuit: - log.Debugf("Revoked state pipeline force quit.") + q.log.Debugf("Revoked state pipeline force quit.") return } }