wtclient/seseion_queue: add prefix logging
This commit is contained in:
parent
de09b8d9bd
commit
1d3535582b
@ -1007,6 +1007,7 @@ func (c *TowerClient) newSessionQueue(s *wtdb.ClientSession) *sessionQueue {
|
||||
DB: c.cfg.DB,
|
||||
MinBackoff: c.cfg.MinBackoff,
|
||||
MaxBackoff: c.cfg.MaxBackoff,
|
||||
Log: c.log,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/lightningnetwork/lnd/input"
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
@ -70,6 +71,10 @@ type sessionQueueConfig struct {
|
||||
// timeout greater than this value, the backoff duration will be clamped
|
||||
// to MaxBackoff.
|
||||
MaxBackoff time.Duration
|
||||
|
||||
// Log specifies the desired log output, which should be prefixed by the
|
||||
// client type, e.g. anchor or legacy.
|
||||
Log btclog.Logger
|
||||
}
|
||||
|
||||
// sessionQueue implements a reliable queue that will encrypt and send accepted
|
||||
@ -84,6 +89,7 @@ type sessionQueue struct {
|
||||
forced sync.Once
|
||||
|
||||
cfg *sessionQueueConfig
|
||||
log btclog.Logger
|
||||
|
||||
commitQueue *list.List
|
||||
pendingQueue *list.List
|
||||
@ -116,6 +122,7 @@ func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue {
|
||||
|
||||
sq := &sessionQueue{
|
||||
cfg: cfg,
|
||||
log: cfg.Log,
|
||||
commitQueue: list.New(),
|
||||
pendingQueue: list.New(),
|
||||
localInit: localInit,
|
||||
@ -149,7 +156,7 @@ func (q *sessionQueue) Start() {
|
||||
// will clear all pending tasks in the queue before returning to the caller.
|
||||
func (q *sessionQueue) Stop() {
|
||||
q.stopped.Do(func() {
|
||||
log.Debugf("SessionQueue(%s) stopping ...", q.ID())
|
||||
q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
|
||||
|
||||
close(q.quit)
|
||||
q.signalUntilShutdown()
|
||||
@ -161,7 +168,7 @@ func (q *sessionQueue) Stop() {
|
||||
default:
|
||||
}
|
||||
|
||||
log.Debugf("SessionQueue(%s) stopped", q.ID())
|
||||
q.log.Debugf("SessionQueue(%s) stopped", q.ID())
|
||||
})
|
||||
}
|
||||
|
||||
@ -169,12 +176,12 @@ func (q *sessionQueue) Stop() {
|
||||
// he caller after all lingering goroutines have spun down.
|
||||
func (q *sessionQueue) ForceQuit() {
|
||||
q.forced.Do(func() {
|
||||
log.Infof("SessionQueue(%s) force quitting...", q.ID())
|
||||
q.log.Infof("SessionQueue(%s) force quitting...", q.ID())
|
||||
|
||||
close(q.forceQuit)
|
||||
q.signalUntilShutdown()
|
||||
|
||||
log.Infof("SessionQueue(%s) force quit", q.ID())
|
||||
q.log.Infof("SessionQueue(%s) force quit", q.ID())
|
||||
})
|
||||
}
|
||||
|
||||
@ -192,7 +199,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
|
||||
numPending := uint32(q.pendingQueue.Len())
|
||||
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
|
||||
log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
|
||||
q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
|
||||
"pending=%d max-updates=%d",
|
||||
q.ID(), task.id, q.seqNum, numPending, maxUpdates)
|
||||
|
||||
@ -218,7 +225,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody)
|
||||
if err != nil {
|
||||
q.queueCond.L.Unlock()
|
||||
log.Debugf("SessionQueue(%s) rejected %v: %v ",
|
||||
q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
|
||||
q.ID(), task.id, err)
|
||||
return curStatus, false
|
||||
}
|
||||
@ -287,7 +294,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
// First, check that we are able to dial this session's tower.
|
||||
conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionKeyECDH, q.towerAddr)
|
||||
if err != nil {
|
||||
log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
|
||||
q.log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
|
||||
q.ID(), q.towerAddr, err)
|
||||
|
||||
q.increaseBackoff()
|
||||
@ -309,7 +316,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
// before attempting to dequeue any pending updates.
|
||||
stateUpdate, isPending, backupID, err := q.nextStateUpdate()
|
||||
if err != nil {
|
||||
log.Errorf("SessionQueue(%v) unable to get next state "+
|
||||
q.log.Errorf("SessionQueue(%v) unable to get next state "+
|
||||
"update: %v", q.ID(), err)
|
||||
return
|
||||
}
|
||||
@ -319,7 +326,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
conn, stateUpdate, q.localInit, sendInit, isPending,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("SessionQueue(%s) unable to send state "+
|
||||
q.log.Errorf("SessionQueue(%s) unable to send state "+
|
||||
"update: %v", q.ID(), err)
|
||||
|
||||
q.increaseBackoff()
|
||||
@ -330,7 +337,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
return
|
||||
}
|
||||
|
||||
log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
|
||||
q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
|
||||
q.ID(), backupID, stateUpdate.SeqNum)
|
||||
|
||||
// If the last task was backed up successfully, we'll exit and
|
||||
@ -388,7 +395,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
|
||||
isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
log.Debugf("SessionQueue(%s) reprocessing committed state "+
|
||||
q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
|
||||
"update for %v seqnum=%d",
|
||||
q.ID(), update.BackupID, seqNum)
|
||||
|
||||
@ -429,7 +436,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
|
||||
},
|
||||
}
|
||||
|
||||
log.Debugf("SessionQueue(%s) committing state update "+
|
||||
q.log.Debugf("SessionQueue(%s) committing state update "+
|
||||
"%v seqnum=%d", q.ID(), update.BackupID, seqNum)
|
||||
}
|
||||
|
||||
@ -538,7 +545,7 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
|
||||
err := fmt.Errorf("received error code %v in "+
|
||||
"StateUpdateReply for seqnum=%d",
|
||||
stateUpdateReply.Code, stateUpdate.SeqNum)
|
||||
log.Warnf("SessionQueue(%s) unable to upload state update to "+
|
||||
q.log.Warnf("SessionQueue(%s) unable to upload state update to "+
|
||||
"tower=%s: %v", q.ID(), q.towerAddr, err)
|
||||
return err
|
||||
}
|
||||
@ -550,21 +557,21 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
|
||||
// TODO(conner): borked watchtower
|
||||
err = fmt.Errorf("unable to ack seqnum=%d: %v",
|
||||
stateUpdate.SeqNum, err)
|
||||
log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
|
||||
q.log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
|
||||
return err
|
||||
|
||||
case err == wtdb.ErrLastAppliedReversion:
|
||||
// TODO(conner): borked watchtower
|
||||
err = fmt.Errorf("unable to ack seqnum=%d: %v",
|
||||
stateUpdate.SeqNum, err)
|
||||
log.Errorf("SessionQueue(%s) failed to ack update: %v",
|
||||
q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
|
||||
q.ID(), err)
|
||||
return err
|
||||
|
||||
case err != nil:
|
||||
err = fmt.Errorf("unable to ack seqnum=%d: %v",
|
||||
stateUpdate.SeqNum, err)
|
||||
log.Errorf("SessionQueue(%s) failed to ack update: %v",
|
||||
q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
|
||||
q.ID(), err)
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user