watchtower/wtclient/session_queue: unify logging

This commit is contained in:
Conner Fromknecht 2019-06-13 17:27:32 -07:00
parent 3b51906a78
commit ab4a4b77f5
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
3 changed files with 71 additions and 59 deletions

@ -626,9 +626,7 @@ func (c *TowerClient) backupDispatcher() {
return return
} }
log.Debugf("Processing backup task chanid=%s "+ log.Debugf("Processing %v", task.id)
"commit-height=%d", task.id.ChanID,
task.id.CommitHeight)
c.stats.taskReceived() c.stats.taskReceived()
c.processTask(task) c.processTask(task)
@ -659,8 +657,8 @@ func (c *TowerClient) processTask(task *backupTask) {
// sessionQueue will be removed if accepting the task left the sessionQueue in // sessionQueue will be removed if accepting the task left the sessionQueue in
// an exhausted state. // an exhausted state.
func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) { func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) {
log.Infof("Backup chanid=%s commit-height=%d accepted successfully", log.Infof("Queued %v successfully for session %v",
task.id.ChanID, task.id.CommitHeight) task.id, c.sessionQueue.ID())
c.stats.taskAccepted() c.stats.taskAccepted()
@ -701,16 +699,14 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) {
case reserveAvailable: case reserveAvailable:
c.stats.taskIneligible() c.stats.taskIneligible()
log.Infof("Backup chanid=%s commit-height=%d is ineligible", log.Infof("Ignoring ineligible %v", task.id)
task.id.ChanID, task.id.CommitHeight)
err := c.cfg.DB.MarkBackupIneligible( err := c.cfg.DB.MarkBackupIneligible(
task.id.ChanID, task.id.CommitHeight, task.id.ChanID, task.id.CommitHeight,
) )
if err != nil { if err != nil {
log.Errorf("Unable to mark task chanid=%s "+ log.Errorf("Unable to mark %v ineligible: %v",
"commit-height=%d ineligible: %v", task.id, err)
task.id.ChanID, task.id.CommitHeight, err)
// It is safe to not handle this error, even if we could // It is safe to not handle this error, even if we could
// not persist the result. At worst, this task may be // not persist the result. At worst, this task may be
@ -729,10 +725,8 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) {
case reserveExhausted: case reserveExhausted:
c.stats.sessionExhausted() c.stats.sessionExhausted()
log.Debugf("Session %s exhausted, backup chanid=%s "+ log.Debugf("Session %v exhausted, %s queued for next session",
"commit-height=%d queued for next session", c.sessionQueue.ID(), task.id)
c.sessionQueue.ID(), task.id.ChanID,
task.id.CommitHeight)
// Cache the task that we pulled off, so that we can process it // Cache the task that we pulled off, so that we can process it
// once a new session queue is available. // once a new session queue is available.

@ -156,7 +156,7 @@ func (q *sessionQueue) Start() {
// will clear all pending tasks in the queue before returning to the caller. // will clear all pending tasks in the queue before returning to the caller.
func (q *sessionQueue) Stop() { func (q *sessionQueue) Stop() {
q.stopped.Do(func() { q.stopped.Do(func() {
log.Debugf("Stopping session queue %s", q.ID()) log.Debugf("SessionQueue(%s) stopping ...", q.ID())
close(q.quit) close(q.quit)
q.signalUntilShutdown() q.signalUntilShutdown()
@ -168,7 +168,7 @@ func (q *sessionQueue) Stop() {
default: default:
} }
log.Debugf("Session queue %s successfully stopped", q.ID()) log.Debugf("SessionQueue(%s) stopped", q.ID())
}) })
} }
@ -176,12 +176,12 @@ func (q *sessionQueue) Stop() {
// he caller after all lingering goroutines have spun down. // he caller after all lingering goroutines have spun down.
func (q *sessionQueue) ForceQuit() { func (q *sessionQueue) ForceQuit() {
q.forced.Do(func() { q.forced.Do(func() {
log.Infof("Force quitting session queue %s", q.ID()) log.Infof("SessionQueue(%s) force quitting...", q.ID())
close(q.forceQuit) close(q.forceQuit)
q.signalUntilShutdown() q.signalUntilShutdown()
log.Infof("Session queue %s unclean shutdown complete", q.ID()) log.Infof("SessionQueue(%s) force quit", q.ID())
}) })
} }
@ -197,8 +197,15 @@ func (q *sessionQueue) ID() *wtdb.SessionID {
func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) { func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
q.queueCond.L.Lock() q.queueCond.L.Lock()
numPending := uint32(q.pendingQueue.Len())
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
log.Debugf("SessionQueue(%x) deciding to accept %v seqnum=%d "+
"pending=%d max-updates=%d",
q.ID(), task.id, q.seqNum, numPending, maxUpdates)
// Examine the current reserve status of the session queue. // Examine the current reserve status of the session queue.
curStatus := q.reserveStatus() curStatus := q.reserveStatus()
switch curStatus { switch curStatus {
// The session queue is exhausted, and cannot accept the task because it // The session queue is exhausted, and cannot accept the task because it
@ -218,9 +225,8 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody) err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody)
if err != nil { if err != nil {
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("SessionQueue %s rejected backup chanid=%s "+ log.Debugf("SessionQueue(%s) rejected %v: %v ",
"commit-height=%d: %v", q.ID(), task.id.ChanID, q.ID(), task.id, err)
task.id.CommitHeight, err)
return curStatus, false return curStatus, false
} }
} }
@ -288,8 +294,8 @@ func (q *sessionQueue) drainBackups() {
// First, check that we are able to dial this session's tower. // First, check that we are able to dial this session's tower.
conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionPrivKey, q.towerAddr) conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionPrivKey, q.towerAddr)
if err != nil { if err != nil {
log.Errorf("Unable to dial watchtower at %v: %v", log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
q.towerAddr, err) q.ID(), q.towerAddr, err)
q.increaseBackoff() q.increaseBackoff()
select { select {
@ -308,9 +314,10 @@ func (q *sessionQueue) drainBackups() {
// Generate the next state update to upload to the tower. This // Generate the next state update to upload to the tower. This
// method will first proceed in dequeueing committed updates // method will first proceed in dequeueing committed updates
// before attempting to dequeue any pending updates. // before attempting to dequeue any pending updates.
stateUpdate, isPending, err := q.nextStateUpdate() stateUpdate, isPending, backupID, err := q.nextStateUpdate()
if err != nil { if err != nil {
log.Errorf("Unable to get next state update: %v", err) log.Errorf("SessionQueue(%s) unable to get next state "+
"update: %v", err)
return return
} }
@ -319,7 +326,8 @@ func (q *sessionQueue) drainBackups() {
conn, stateUpdate, q.localInit, sendInit, isPending, conn, stateUpdate, q.localInit, sendInit, isPending,
) )
if err != nil { if err != nil {
log.Errorf("Unable to send state update: %v", err) log.Errorf("SessionQueue(%s) unable to send state "+
"update: %v", q.ID(), err)
q.increaseBackoff() q.increaseBackoff()
select { select {
@ -329,6 +337,9 @@ func (q *sessionQueue) drainBackups() {
return return
} }
log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
q.ID(), backupID, stateUpdate.SeqNum)
// If the last task was backed up successfully, we'll exit and // If the last task was backed up successfully, we'll exit and
// continue once more tasks are added to the queue. We'll also // continue once more tasks are added to the queue. We'll also
// clear any accumulated backoff as this batch was able to be // clear any accumulated backoff as this batch was able to be
@ -357,7 +368,9 @@ func (q *sessionQueue) drainBackups() {
// boolean value in the response is true if the state update is taken from the // boolean value in the response is true if the state update is taken from the
// pending queue, allowing the caller to remove the update from either the // pending queue, allowing the caller to remove the update from either the
// commit or pending queue if the update is successfully acked. // commit or pending queue if the update is successfully acked.
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) { func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
wtdb.BackupID, error) {
var ( var (
seqNum uint16 seqNum uint16
update wtdb.CommittedUpdate update wtdb.CommittedUpdate
@ -382,8 +395,9 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0 isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("Reprocessing committed state update for "+ log.Debugf("SessionQueue(%s) reprocessing committed state "+
"session=%s seqnum=%d", q.ID(), seqNum) "update for %v seqnum=%d",
q.ID(), update.BackupID, seqNum)
// Otherwise, craft and commit the next update from the pending queue. // Otherwise, craft and commit the next update from the pending queue.
default: default:
@ -407,8 +421,9 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer) hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
if err != nil { if err != nil {
// TODO(conner): mark will not send // TODO(conner): mark will not send
return nil, false, fmt.Errorf("unable to craft "+ err := fmt.Errorf("unable to craft session payload: %v",
"session payload: %v", err) err)
return nil, false, wtdb.BackupID{}, err
} }
// TODO(conner): special case other obscure errors // TODO(conner): special case other obscure errors
@ -421,8 +436,8 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
}, },
} }
log.Debugf("Committing state update for session=%s seqnum=%d", log.Debugf("SessionQueue(%s) committing state update "+
q.ID(), seqNum) "%v seqnum=%d", q.ID(), update.BackupID, seqNum)
} }
// Before sending the task to the tower, commit the state update // Before sending the task to the tower, commit the state update
@ -439,8 +454,9 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update) lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
if err != nil { if err != nil {
// TODO(conner): mark failed/reschedule // TODO(conner): mark failed/reschedule
return nil, false, fmt.Errorf("unable to commit state update "+ err := fmt.Errorf("unable to commit state update for "+
"for session=%s seqnum=%d: %v", q.ID(), seqNum, err) "%v seqnum=%d: %v", update.BackupID, seqNum, err)
return nil, false, wtdb.BackupID{}, err
} }
stateUpdate := &wtwire.StateUpdate{ stateUpdate := &wtwire.StateUpdate{
@ -455,7 +471,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
stateUpdate.IsComplete = 1 stateUpdate.IsComplete = 1
} }
return stateUpdate, isPending, nil return stateUpdate, isPending, update.BackupID, nil
} }
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes // sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
@ -486,8 +502,8 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
remoteInit, ok := remoteMsg.(*wtwire.Init) remoteInit, ok := remoteMsg.(*wtwire.Init)
if !ok { if !ok {
return fmt.Errorf("watchtower responded with %T to "+ return fmt.Errorf("watchtower %s responded with %T "+
"Init", remoteMsg) "to Init", q.towerAddr, remoteMsg)
} }
// Validate Init. // Validate Init.
@ -513,8 +529,8 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply) stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
if !ok { if !ok {
return fmt.Errorf("watchtower responded with %T to StateUpdate", return fmt.Errorf("watchtower %s responded with %T to "+
remoteMsg) "StateUpdate", q.towerAddr, remoteMsg)
} }
// Process the reply from the tower. // Process the reply from the tower.
@ -527,10 +543,10 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
// TODO(conner): handle other error cases properly, ban towers, etc. // TODO(conner): handle other error cases properly, ban towers, etc.
default: default:
err := fmt.Errorf("received error code %v in "+ err := fmt.Errorf("received error code %v in "+
"StateUpdateReply from tower=%x session=%v", "StateUpdateReply for seqnum=%d",
stateUpdateReply.Code, stateUpdateReply.Code, stateUpdate.SeqNum)
conn.RemotePub().SerializeCompressed(), q.ID()) log.Warnf("SessionQueue(%s) unable to upload state update to "+
log.Warnf("Unable to upload state update: %v", err) "tower=%s: %v", q.ID(), q.towerAddr, err)
return err return err
} }
@ -539,28 +555,27 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
switch { switch {
case err == wtdb.ErrUnallocatedLastApplied: case err == wtdb.ErrUnallocatedLastApplied:
// TODO(conner): borked watchtower // TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack update=%d session=%s: %v", err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, q.ID(), err) stateUpdate.SeqNum, err)
log.Errorf("Failed to ack update: %v", err) log.Errorf("SessionQueue(%s) failed to ack update: %v", err)
return err return err
case err == wtdb.ErrLastAppliedReversion: case err == wtdb.ErrLastAppliedReversion:
// TODO(conner): borked watchtower // TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack update=%d session=%s: %v", err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, q.ID(), err) stateUpdate.SeqNum, err)
log.Errorf("Failed to ack update: %v", err) log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err return err
case err != nil: case err != nil:
err = fmt.Errorf("unable to ack update=%d session=%s: %v", err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, q.ID(), err) stateUpdate.SeqNum, err)
log.Errorf("Failed to ack update: %v", err) log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err)
return err return err
} }
log.Infof("Removing update session=%s seqnum=%d is_pending=%v "+
"from memory", q.ID(), stateUpdate.SeqNum, isPending)
q.queueCond.L.Lock() q.queueCond.L.Lock()
if isPending { if isPending {
// If a pending update was successfully sent, increment the // If a pending update was successfully sent, increment the
@ -591,9 +606,6 @@ func (q *sessionQueue) reserveStatus() reserveStatus {
numPending := uint32(q.pendingQueue.Len()) numPending := uint32(q.pendingQueue.Len())
maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates) maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
log.Debugf("SessionQueue %s reserveStatus seqnum=%d pending=%d "+
"max-updates=%d", q.ID(), q.seqNum, numPending, maxUpdates)
if uint32(q.seqNum)+numPending < maxUpdates { if uint32(q.seqNum)+numPending < maxUpdates {
return reserveAvailable return reserveAvailable
} }

@ -1,6 +1,7 @@
package wtdb package wtdb
import ( import (
"fmt"
"io" "io"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
@ -160,6 +161,11 @@ func (b *BackupID) Decode(r io.Reader) error {
) )
} }
// String returns a human-readable encoding of a BackupID.
func (b *BackupID) String() string {
return fmt.Sprintf("backup(%x, %d)", b.ChanID, b.CommitHeight)
}
// CommittedUpdate holds a state update sent by a client along with its // CommittedUpdate holds a state update sent by a client along with its
// allocated sequence number and the exact remote commitment the encrypted // allocated sequence number and the exact remote commitment the encrypted
// justice transaction can rectify. // justice transaction can rectify.