From ab4a4b77f587f302892a3612789e0276a721e6c8 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 13 Jun 2019 17:27:32 -0700 Subject: [PATCH] watchtower/wtclient/session_queue: unify logging --- watchtower/wtclient/client.go | 22 +++--- watchtower/wtclient/session_queue.go | 102 +++++++++++++++------------ watchtower/wtdb/client_session.go | 6 ++ 3 files changed, 71 insertions(+), 59 deletions(-) diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 8f0cbc9f..dc250980 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -626,9 +626,7 @@ func (c *TowerClient) backupDispatcher() { return } - log.Debugf("Processing backup task chanid=%s "+ - "commit-height=%d", task.id.ChanID, - task.id.CommitHeight) + log.Debugf("Processing %v", task.id) c.stats.taskReceived() c.processTask(task) @@ -659,8 +657,8 @@ func (c *TowerClient) processTask(task *backupTask) { // sessionQueue will be removed if accepting the task left the sessionQueue in // an exhausted state. func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) { - log.Infof("Backup chanid=%s commit-height=%d accepted successfully", - task.id.ChanID, task.id.CommitHeight) + log.Infof("Queued %v successfully for session %v", + task.id, c.sessionQueue.ID()) c.stats.taskAccepted() @@ -701,16 +699,14 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { case reserveAvailable: c.stats.taskIneligible() - log.Infof("Backup chanid=%s commit-height=%d is ineligible", - task.id.ChanID, task.id.CommitHeight) + log.Infof("Ignoring ineligible %v", task.id) err := c.cfg.DB.MarkBackupIneligible( task.id.ChanID, task.id.CommitHeight, ) if err != nil { - log.Errorf("Unable to mark task chanid=%s "+ - "commit-height=%d ineligible: %v", - task.id.ChanID, task.id.CommitHeight, err) + log.Errorf("Unable to mark %v ineligible: %v", + task.id, err) // It is safe to not handle this error, even if we could // not persist the result. At worst, this task may be @@ -729,10 +725,8 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { case reserveExhausted: c.stats.sessionExhausted() - log.Debugf("Session %s exhausted, backup chanid=%s "+ - "commit-height=%d queued for next session", - c.sessionQueue.ID(), task.id.ChanID, - task.id.CommitHeight) + log.Debugf("Session %v exhausted, %s queued for next session", + c.sessionQueue.ID(), task.id) // Cache the task that we pulled off, so that we can process it // once a new session queue is available. diff --git a/watchtower/wtclient/session_queue.go b/watchtower/wtclient/session_queue.go index 39ab0a43..92d81ef4 100644 --- a/watchtower/wtclient/session_queue.go +++ b/watchtower/wtclient/session_queue.go @@ -156,7 +156,7 @@ func (q *sessionQueue) Start() { // will clear all pending tasks in the queue before returning to the caller. func (q *sessionQueue) Stop() { q.stopped.Do(func() { - log.Debugf("Stopping session queue %s", q.ID()) + log.Debugf("SessionQueue(%s) stopping ...", q.ID()) close(q.quit) q.signalUntilShutdown() @@ -168,7 +168,7 @@ func (q *sessionQueue) Stop() { default: } - log.Debugf("Session queue %s successfully stopped", q.ID()) + log.Debugf("SessionQueue(%s) stopped", q.ID()) }) } @@ -176,12 +176,12 @@ func (q *sessionQueue) Stop() { // he caller after all lingering goroutines have spun down. func (q *sessionQueue) ForceQuit() { q.forced.Do(func() { - log.Infof("Force quitting session queue %s", q.ID()) + log.Infof("SessionQueue(%s) force quitting...", q.ID()) close(q.forceQuit) q.signalUntilShutdown() - log.Infof("Session queue %s unclean shutdown complete", q.ID()) + log.Infof("SessionQueue(%s) force quit", q.ID()) }) } @@ -197,8 +197,15 @@ func (q *sessionQueue) ID() *wtdb.SessionID { func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { q.queueCond.L.Lock() + numPending := uint32(q.pendingQueue.Len()) + maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates + log.Debugf("SessionQueue(%x) deciding to accept %v seqnum=%d "+ + "pending=%d max-updates=%d", + q.ID(), task.id, q.seqNum, numPending, maxUpdates) + // Examine the current reserve status of the session queue. curStatus := q.reserveStatus() + switch curStatus { // The session queue is exhausted, and cannot accept the task because it @@ -218,9 +225,8 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody) if err != nil { q.queueCond.L.Unlock() - log.Debugf("SessionQueue %s rejected backup chanid=%s "+ - "commit-height=%d: %v", q.ID(), task.id.ChanID, - task.id.CommitHeight, err) + log.Debugf("SessionQueue(%s) rejected %v: %v ", + q.ID(), task.id, err) return curStatus, false } } @@ -288,8 +294,8 @@ func (q *sessionQueue) drainBackups() { // First, check that we are able to dial this session's tower. conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionPrivKey, q.towerAddr) if err != nil { - log.Errorf("Unable to dial watchtower at %v: %v", - q.towerAddr, err) + log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v", + q.ID(), q.towerAddr, err) q.increaseBackoff() select { @@ -308,9 +314,10 @@ func (q *sessionQueue) drainBackups() { // Generate the next state update to upload to the tower. This // method will first proceed in dequeueing committed updates // before attempting to dequeue any pending updates. - stateUpdate, isPending, err := q.nextStateUpdate() + stateUpdate, isPending, backupID, err := q.nextStateUpdate() if err != nil { - log.Errorf("Unable to get next state update: %v", err) + log.Errorf("SessionQueue(%s) unable to get next state "+ + "update: %v", err) return } @@ -319,7 +326,8 @@ func (q *sessionQueue) drainBackups() { conn, stateUpdate, q.localInit, sendInit, isPending, ) if err != nil { - log.Errorf("Unable to send state update: %v", err) + log.Errorf("SessionQueue(%s) unable to send state "+ + "update: %v", q.ID(), err) q.increaseBackoff() select { @@ -329,6 +337,9 @@ func (q *sessionQueue) drainBackups() { return } + log.Infof("SessionQueue(%s) uploaded %v seqnum=%d", + q.ID(), backupID, stateUpdate.SeqNum) + // If the last task was backed up successfully, we'll exit and // continue once more tasks are added to the queue. We'll also // clear any accumulated backoff as this batch was able to be @@ -357,7 +368,9 @@ func (q *sessionQueue) drainBackups() { // boolean value in the response is true if the state update is taken from the // pending queue, allowing the caller to remove the update from either the // commit or pending queue if the update is successfully acked. -func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) { +func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, + wtdb.BackupID, error) { + var ( seqNum uint16 update wtdb.CommittedUpdate @@ -382,8 +395,9 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) { isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0 q.queueCond.L.Unlock() - log.Debugf("Reprocessing committed state update for "+ - "session=%s seqnum=%d", q.ID(), seqNum) + log.Debugf("SessionQueue(%s) reprocessing committed state "+ + "update for %v seqnum=%d", + q.ID(), update.BackupID, seqNum) // Otherwise, craft and commit the next update from the pending queue. default: @@ -407,8 +421,9 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) { hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer) if err != nil { // TODO(conner): mark will not send - return nil, false, fmt.Errorf("unable to craft "+ - "session payload: %v", err) + err := fmt.Errorf("unable to craft session payload: %v", + err) + return nil, false, wtdb.BackupID{}, err } // TODO(conner): special case other obscure errors @@ -421,8 +436,8 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) { }, } - log.Debugf("Committing state update for session=%s seqnum=%d", - q.ID(), seqNum) + log.Debugf("SessionQueue(%s) committing state update "+ + "%v seqnum=%d", q.ID(), update.BackupID, seqNum) } // Before sending the task to the tower, commit the state update @@ -439,8 +454,9 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) { lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update) if err != nil { // TODO(conner): mark failed/reschedule - return nil, false, fmt.Errorf("unable to commit state update "+ - "for session=%s seqnum=%d: %v", q.ID(), seqNum, err) + err := fmt.Errorf("unable to commit state update for "+ + "%v seqnum=%d: %v", update.BackupID, seqNum, err) + return nil, false, wtdb.BackupID{}, err } stateUpdate := &wtwire.StateUpdate{ @@ -455,7 +471,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) { stateUpdate.IsComplete = 1 } - return stateUpdate, isPending, nil + return stateUpdate, isPending, update.BackupID, nil } // sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes @@ -486,8 +502,8 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer, remoteInit, ok := remoteMsg.(*wtwire.Init) if !ok { - return fmt.Errorf("watchtower responded with %T to "+ - "Init", remoteMsg) + return fmt.Errorf("watchtower %s responded with %T "+ + "to Init", q.towerAddr, remoteMsg) } // Validate Init. @@ -513,8 +529,8 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer, stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply) if !ok { - return fmt.Errorf("watchtower responded with %T to StateUpdate", - remoteMsg) + return fmt.Errorf("watchtower %s responded with %T to "+ + "StateUpdate", q.towerAddr, remoteMsg) } // Process the reply from the tower. @@ -527,10 +543,10 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer, // TODO(conner): handle other error cases properly, ban towers, etc. default: err := fmt.Errorf("received error code %v in "+ - "StateUpdateReply from tower=%x session=%v", - stateUpdateReply.Code, - conn.RemotePub().SerializeCompressed(), q.ID()) - log.Warnf("Unable to upload state update: %v", err) + "StateUpdateReply for seqnum=%d", + stateUpdateReply.Code, stateUpdate.SeqNum) + log.Warnf("SessionQueue(%s) unable to upload state update to "+ + "tower=%s: %v", q.ID(), q.towerAddr, err) return err } @@ -539,28 +555,27 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer, switch { case err == wtdb.ErrUnallocatedLastApplied: // TODO(conner): borked watchtower - err = fmt.Errorf("unable to ack update=%d session=%s: %v", - stateUpdate.SeqNum, q.ID(), err) - log.Errorf("Failed to ack update: %v", err) + err = fmt.Errorf("unable to ack seqnum=%d: %v", + stateUpdate.SeqNum, err) + log.Errorf("SessionQueue(%s) failed to ack update: %v", err) return err case err == wtdb.ErrLastAppliedReversion: // TODO(conner): borked watchtower - err = fmt.Errorf("unable to ack update=%d session=%s: %v", - stateUpdate.SeqNum, q.ID(), err) - log.Errorf("Failed to ack update: %v", err) + err = fmt.Errorf("unable to ack seqnum=%d: %v", + stateUpdate.SeqNum, err) + log.Errorf("SessionQueue(%s) failed to ack update: %v", + q.ID(), err) return err case err != nil: - err = fmt.Errorf("unable to ack update=%d session=%s: %v", - stateUpdate.SeqNum, q.ID(), err) - log.Errorf("Failed to ack update: %v", err) + err = fmt.Errorf("unable to ack seqnum=%d: %v", + stateUpdate.SeqNum, err) + log.Errorf("SessionQueue(%s) failed to ack update: %v", + q.ID(), err) return err } - log.Infof("Removing update session=%s seqnum=%d is_pending=%v "+ - "from memory", q.ID(), stateUpdate.SeqNum, isPending) - q.queueCond.L.Lock() if isPending { // If a pending update was successfully sent, increment the @@ -591,9 +606,6 @@ func (q *sessionQueue) reserveStatus() reserveStatus { numPending := uint32(q.pendingQueue.Len()) maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates) - log.Debugf("SessionQueue %s reserveStatus seqnum=%d pending=%d "+ - "max-updates=%d", q.ID(), q.seqNum, numPending, maxUpdates) - if uint32(q.seqNum)+numPending < maxUpdates { return reserveAvailable } diff --git a/watchtower/wtdb/client_session.go b/watchtower/wtdb/client_session.go index 768fe109..e1fd564b 100644 --- a/watchtower/wtdb/client_session.go +++ b/watchtower/wtdb/client_session.go @@ -1,6 +1,7 @@ package wtdb import ( + "fmt" "io" "github.com/btcsuite/btcd/btcec" @@ -160,6 +161,11 @@ func (b *BackupID) Decode(r io.Reader) error { ) } +// String returns a human-readable encoding of a BackupID. +func (b *BackupID) String() string { + return fmt.Sprintf("backup(%x, %d)", b.ChanID, b.CommitHeight) +} + // CommittedUpdate holds a state update sent by a client along with its // allocated sequence number and the exact remote commitment the encrypted // justice transaction can rectify.