Merge pull request #4885 from cfromknecht/wtwr-logging

watchtower/wtclient: add prefix logging
This commit is contained in:
Olaoluwa Osuntokun 2021-01-05 11:28:55 -08:00 committed by GitHub
commit fbb3e400b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 58 additions and 37 deletions

@ -317,7 +317,7 @@ func New(config *Config) (*TowerClient, error) {
c := &TowerClient{ c := &TowerClient{
cfg: cfg, cfg: cfg,
log: plog, log: plog,
pipeline: newTaskPipeline(), pipeline: newTaskPipeline(plog),
candidateTowers: newTowerListIterator(candidateTowers...), candidateTowers: newTowerListIterator(candidateTowers...),
candidateSessions: candidateSessions, candidateSessions: candidateSessions,
activeSessions: make(sessionQueueSet), activeSessions: make(sessionQueueSet),
@ -339,6 +339,7 @@ func New(config *Config) (*TowerClient, error) {
Candidates: c.candidateTowers, Candidates: c.candidateTowers,
MinBackoff: cfg.MinBackoff, MinBackoff: cfg.MinBackoff,
MaxBackoff: cfg.MaxBackoff, MaxBackoff: cfg.MaxBackoff,
Log: plog,
}) })
// Reconstruct the highest commit height processed for each channel // Reconstruct the highest commit height processed for each channel
@ -468,7 +469,7 @@ func (c *TowerClient) Start() error {
c.wg.Add(1) c.wg.Add(1)
go c.backupDispatcher() go c.backupDispatcher()
log.Infof("Watchtower client started successfully") c.log.Infof("Watchtower client started successfully")
}) })
return err return err
} }
@ -1006,6 +1007,7 @@ func (c *TowerClient) newSessionQueue(s *wtdb.ClientSession) *sessionQueue {
DB: c.cfg.DB, DB: c.cfg.DB,
MinBackoff: c.cfg.MinBackoff, MinBackoff: c.cfg.MinBackoff,
MaxBackoff: c.cfg.MaxBackoff, MaxBackoff: c.cfg.MaxBackoff,
Log: c.log,
}) })
} }

@ -6,6 +6,7 @@ import (
"time" "time"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower/blob" "github.com/lightningnetwork/lnd/watchtower/blob"
@ -85,6 +86,10 @@ type NegotiatorConfig struct {
// exponential backoff produces a timeout greater than this value, the // exponential backoff produces a timeout greater than this value, the
// backoff duration will be clamped to MaxBackoff. // backoff duration will be clamped to MaxBackoff.
MaxBackoff time.Duration MaxBackoff time.Duration
// Log specifies the desired log output, which should be prefixed by the
// client type, e.g. anchor or legacy.
Log btclog.Logger
} }
// sessionNegotiator is concrete SessionNegotiator that is able to request new // sessionNegotiator is concrete SessionNegotiator that is able to request new
@ -97,6 +102,7 @@ type sessionNegotiator struct {
localInit *wtwire.Init localInit *wtwire.Init
cfg *NegotiatorConfig cfg *NegotiatorConfig
log btclog.Logger
dispatcher chan struct{} dispatcher chan struct{}
newSessions chan *wtdb.ClientSession newSessions chan *wtdb.ClientSession
@ -130,6 +136,7 @@ func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator {
return &sessionNegotiator{ return &sessionNegotiator{
cfg: cfg, cfg: cfg,
log: cfg.Log,
localInit: localInit, localInit: localInit,
dispatcher: make(chan struct{}, 1), dispatcher: make(chan struct{}, 1),
newSessions: make(chan *wtdb.ClientSession), newSessions: make(chan *wtdb.ClientSession),
@ -141,7 +148,7 @@ func newSessionNegotiator(cfg *NegotiatorConfig) *sessionNegotiator {
// Start safely starts up the sessionNegotiator. // Start safely starts up the sessionNegotiator.
func (n *sessionNegotiator) Start() error { func (n *sessionNegotiator) Start() error {
n.started.Do(func() { n.started.Do(func() {
log.Debugf("Starting session negotiator") n.log.Debugf("Starting session negotiator")
n.wg.Add(1) n.wg.Add(1)
go n.negotiationDispatcher() go n.negotiationDispatcher()
@ -153,7 +160,7 @@ func (n *sessionNegotiator) Start() error {
// Stop safely shutsdown the sessionNegotiator. // Stop safely shutsdown the sessionNegotiator.
func (n *sessionNegotiator) Stop() error { func (n *sessionNegotiator) Stop() error {
n.stopped.Do(func() { n.stopped.Do(func() {
log.Debugf("Stopping session negotiator") n.log.Debugf("Stopping session negotiator")
close(n.quit) close(n.quit)
n.wg.Wait() n.wg.Wait()
@ -191,7 +198,7 @@ func (n *sessionNegotiator) negotiationDispatcher() {
pendingNegotiations++ pendingNegotiations++
if pendingNegotiations > 1 { if pendingNegotiations > 1 {
log.Debugf("Already negotiating session, " + n.log.Debugf("Already negotiating session, " +
"waiting for existing negotiation to " + "waiting for existing negotiation to " +
"complete") "complete")
continue continue
@ -199,7 +206,7 @@ func (n *sessionNegotiator) negotiationDispatcher() {
// TODO(conner): consider reusing good towers // TODO(conner): consider reusing good towers
log.Debugf("Dispatching session negotiation") n.log.Debugf("Dispatching session negotiation")
n.wg.Add(1) n.wg.Add(1)
go n.negotiate() go n.negotiate()
@ -213,7 +220,7 @@ func (n *sessionNegotiator) negotiationDispatcher() {
} }
if pendingNegotiations > 0 { if pendingNegotiations > 0 {
log.Debugf("Dispatching pending session " + n.log.Debugf("Dispatching pending session " +
"negotiation") "negotiation")
n.wg.Add(1) n.wg.Add(1)
@ -278,7 +285,7 @@ retryWithBackoff:
// We've run out of addresses, update our backoff. // We've run out of addresses, update our backoff.
updateBackoff() updateBackoff()
log.Debugf("Unable to get new tower candidate, "+ n.log.Debugf("Unable to get new tower candidate, "+
"retrying after %v -- reason: %v", backoff, err) "retrying after %v -- reason: %v", backoff, err)
// Only reset the iterator once we've exhausted all // Only reset the iterator once we've exhausted all
@ -292,7 +299,7 @@ retryWithBackoff:
} }
towerPub := tower.IdentityKey.SerializeCompressed() towerPub := tower.IdentityKey.SerializeCompressed()
log.Debugf("Attempting session negotiation with tower=%x", n.log.Debugf("Attempting session negotiation with tower=%x",
towerPub) towerPub)
// Before proceeding, we will reserve a session key index to use // Before proceeding, we will reserve a session key index to use
@ -302,7 +309,7 @@ retryWithBackoff:
tower.ID, n.cfg.Policy.BlobType, tower.ID, n.cfg.Policy.BlobType,
) )
if err != nil { if err != nil {
log.Debugf("Unable to reserve session key index "+ n.log.Debugf("Unable to reserve session key index "+
"for tower=%x: %v", towerPub, err) "for tower=%x: %v", towerPub, err)
continue continue
} }
@ -314,7 +321,7 @@ retryWithBackoff:
// An unexpected error occurred, updpate our backoff. // An unexpected error occurred, updpate our backoff.
updateBackoff() updateBackoff()
log.Debugf("Session negotiation with tower=%x "+ n.log.Debugf("Session negotiation with tower=%x "+
"failed, trying again -- reason: %v", "failed, trying again -- reason: %v",
tower.IdentityKey.SerializeCompressed(), err) tower.IdentityKey.SerializeCompressed(), err)
@ -360,7 +367,7 @@ func (n *sessionNegotiator) createSession(tower *wtdb.Tower,
fallthrough fallthrough
case err != nil: case err != nil:
log.Debugf("Request for session negotiation with "+ n.log.Debugf("Request for session negotiation with "+
"tower=%s failed, trying again -- reason: "+ "tower=%s failed, trying again -- reason: "+
"%v", lnAddr, err) "%v", lnAddr, err)
continue continue
@ -467,7 +474,7 @@ func (n *sessionNegotiator) tryAddress(sessionKey keychain.SingleKeyECDH,
err) err)
} }
log.Debugf("New session negotiated with %s, policy: %s", n.log.Debugf("New session negotiated with %s, policy: %s",
lnAddr, clientSession.Policy) lnAddr, clientSession.Policy)
// We have a newly negotiated session, return it to the // We have a newly negotiated session, return it to the

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -70,6 +71,10 @@ type sessionQueueConfig struct {
// timeout greater than this value, the backoff duration will be clamped // timeout greater than this value, the backoff duration will be clamped
// to MaxBackoff. // to MaxBackoff.
MaxBackoff time.Duration MaxBackoff time.Duration
// Log specifies the desired log output, which should be prefixed by the
// client type, e.g. anchor or legacy.
Log btclog.Logger
} }
// sessionQueue implements a reliable queue that will encrypt and send accepted // sessionQueue implements a reliable queue that will encrypt and send accepted
@ -84,6 +89,7 @@ type sessionQueue struct {
forced sync.Once forced sync.Once
cfg *sessionQueueConfig cfg *sessionQueueConfig
log btclog.Logger
commitQueue *list.List commitQueue *list.List
pendingQueue *list.List pendingQueue *list.List
@ -116,6 +122,7 @@ func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue {
sq := &sessionQueue{ sq := &sessionQueue{
cfg: cfg, cfg: cfg,
log: cfg.Log,
commitQueue: list.New(), commitQueue: list.New(),
pendingQueue: list.New(), pendingQueue: list.New(),
localInit: localInit, localInit: localInit,
@ -149,7 +156,7 @@ func (q *sessionQueue) Start() {
// will clear all pending tasks in the queue before returning to the caller. // will clear all pending tasks in the queue before returning to the caller.
func (q *sessionQueue) Stop() { func (q *sessionQueue) Stop() {
q.stopped.Do(func() { q.stopped.Do(func() {
log.Debugf("SessionQueue(%s) stopping ...", q.ID()) q.log.Debugf("SessionQueue(%s) stopping ...", q.ID())
close(q.quit) close(q.quit)
q.signalUntilShutdown() q.signalUntilShutdown()
@ -161,7 +168,7 @@ func (q *sessionQueue) Stop() {
default: default:
} }
log.Debugf("SessionQueue(%s) stopped", q.ID()) q.log.Debugf("SessionQueue(%s) stopped", q.ID())
}) })
} }
@ -169,12 +176,12 @@ func (q *sessionQueue) Stop() {
// he caller after all lingering goroutines have spun down. // he caller after all lingering goroutines have spun down.
func (q *sessionQueue) ForceQuit() { func (q *sessionQueue) ForceQuit() {
q.forced.Do(func() { q.forced.Do(func() {
log.Infof("SessionQueue(%s) force quitting...", q.ID()) q.log.Infof("SessionQueue(%s) force quitting...", q.ID())
close(q.forceQuit) close(q.forceQuit)
q.signalUntilShutdown() q.signalUntilShutdown()
log.Infof("SessionQueue(%s) force quit", q.ID()) q.log.Infof("SessionQueue(%s) force quit", q.ID())
}) })
} }
@ -192,7 +199,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
numPending := uint32(q.pendingQueue.Len()) numPending := uint32(q.pendingQueue.Len())
maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates maxUpdates := q.cfg.ClientSession.Policy.MaxUpdates
log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+ q.log.Debugf("SessionQueue(%s) deciding to accept %v seqnum=%d "+
"pending=%d max-updates=%d", "pending=%d max-updates=%d",
q.ID(), task.id, q.seqNum, numPending, maxUpdates) q.ID(), task.id, q.seqNum, numPending, maxUpdates)
@ -218,7 +225,7 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody) err := task.bindSession(&q.cfg.ClientSession.ClientSessionBody)
if err != nil { if err != nil {
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("SessionQueue(%s) rejected %v: %v ", q.log.Debugf("SessionQueue(%s) rejected %v: %v ",
q.ID(), task.id, err) q.ID(), task.id, err)
return curStatus, false return curStatus, false
} }
@ -287,7 +294,7 @@ func (q *sessionQueue) drainBackups() {
// First, check that we are able to dial this session's tower. // First, check that we are able to dial this session's tower.
conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionKeyECDH, q.towerAddr) conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionKeyECDH, q.towerAddr)
if err != nil { if err != nil {
log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v", q.log.Errorf("SessionQueue(%s) unable to dial tower at %v: %v",
q.ID(), q.towerAddr, err) q.ID(), q.towerAddr, err)
q.increaseBackoff() q.increaseBackoff()
@ -309,7 +316,7 @@ func (q *sessionQueue) drainBackups() {
// before attempting to dequeue any pending updates. // before attempting to dequeue any pending updates.
stateUpdate, isPending, backupID, err := q.nextStateUpdate() stateUpdate, isPending, backupID, err := q.nextStateUpdate()
if err != nil { if err != nil {
log.Errorf("SessionQueue(%v) unable to get next state "+ q.log.Errorf("SessionQueue(%v) unable to get next state "+
"update: %v", q.ID(), err) "update: %v", q.ID(), err)
return return
} }
@ -319,7 +326,7 @@ func (q *sessionQueue) drainBackups() {
conn, stateUpdate, q.localInit, sendInit, isPending, conn, stateUpdate, q.localInit, sendInit, isPending,
) )
if err != nil { if err != nil {
log.Errorf("SessionQueue(%s) unable to send state "+ q.log.Errorf("SessionQueue(%s) unable to send state "+
"update: %v", q.ID(), err) "update: %v", q.ID(), err)
q.increaseBackoff() q.increaseBackoff()
@ -330,7 +337,7 @@ func (q *sessionQueue) drainBackups() {
return return
} }
log.Infof("SessionQueue(%s) uploaded %v seqnum=%d", q.log.Infof("SessionQueue(%s) uploaded %v seqnum=%d",
q.ID(), backupID, stateUpdate.SeqNum) q.ID(), backupID, stateUpdate.SeqNum)
// If the last task was backed up successfully, we'll exit and // If the last task was backed up successfully, we'll exit and
@ -388,7 +395,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0 isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("SessionQueue(%s) reprocessing committed state "+ q.log.Debugf("SessionQueue(%s) reprocessing committed state "+
"update for %v seqnum=%d", "update for %v seqnum=%d",
q.ID(), update.BackupID, seqNum) q.ID(), update.BackupID, seqNum)
@ -429,7 +436,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool,
}, },
} }
log.Debugf("SessionQueue(%s) committing state update "+ q.log.Debugf("SessionQueue(%s) committing state update "+
"%v seqnum=%d", q.ID(), update.BackupID, seqNum) "%v seqnum=%d", q.ID(), update.BackupID, seqNum)
} }
@ -538,7 +545,7 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
err := fmt.Errorf("received error code %v in "+ err := fmt.Errorf("received error code %v in "+
"StateUpdateReply for seqnum=%d", "StateUpdateReply for seqnum=%d",
stateUpdateReply.Code, stateUpdate.SeqNum) stateUpdateReply.Code, stateUpdate.SeqNum)
log.Warnf("SessionQueue(%s) unable to upload state update to "+ q.log.Warnf("SessionQueue(%s) unable to upload state update to "+
"tower=%s: %v", q.ID(), q.towerAddr, err) "tower=%s: %v", q.ID(), q.towerAddr, err)
return err return err
} }
@ -550,21 +557,21 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
// TODO(conner): borked watchtower // TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v", err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err) stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err) q.log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
return err return err
case err == wtdb.ErrLastAppliedReversion: case err == wtdb.ErrLastAppliedReversion:
// TODO(conner): borked watchtower // TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v", err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err) stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%s) failed to ack update: %v", q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err) q.ID(), err)
return err return err
case err != nil: case err != nil:
err = fmt.Errorf("unable to ack seqnum=%d: %v", err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err) stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%s) failed to ack update: %v", q.log.Errorf("SessionQueue(%s) failed to ack update: %v",
q.ID(), err) q.ID(), err)
return err return err
} }

@ -4,6 +4,8 @@ import (
"container/list" "container/list"
"sync" "sync"
"time" "time"
"github.com/btcsuite/btclog"
) )
// taskPipeline implements a reliable, in-order queue that ensures its queue // taskPipeline implements a reliable, in-order queue that ensures its queue
@ -17,6 +19,8 @@ type taskPipeline struct {
stopped sync.Once stopped sync.Once
forced sync.Once forced sync.Once
log btclog.Logger
queueMtx sync.Mutex queueMtx sync.Mutex
queueCond *sync.Cond queueCond *sync.Cond
queue *list.List queue *list.List
@ -29,8 +33,9 @@ type taskPipeline struct {
} }
// newTaskPipeline initializes a new taskPipeline. // newTaskPipeline initializes a new taskPipeline.
func newTaskPipeline() *taskPipeline { func newTaskPipeline(log btclog.Logger) *taskPipeline {
rq := &taskPipeline{ rq := &taskPipeline{
log: log,
queue: list.New(), queue: list.New(),
newBackupTasks: make(chan *backupTask), newBackupTasks: make(chan *backupTask),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -55,7 +60,7 @@ func (q *taskPipeline) Start() {
// the delivery of pending tasks to be interrupted. // the delivery of pending tasks to be interrupted.
func (q *taskPipeline) Stop() { func (q *taskPipeline) Stop() {
q.stopped.Do(func() { q.stopped.Do(func() {
log.Debugf("Stopping task pipeline") q.log.Debugf("Stopping task pipeline")
close(q.quit) close(q.quit)
q.signalUntilShutdown() q.signalUntilShutdown()
@ -64,7 +69,7 @@ func (q *taskPipeline) Stop() {
select { select {
case <-q.forceQuit: case <-q.forceQuit:
default: default:
log.Debugf("Task pipeline stopped successfully") q.log.Debugf("Task pipeline stopped successfully")
} }
}) })
} }
@ -73,12 +78,12 @@ func (q *taskPipeline) Stop() {
// backupTasks that have not been delivered via NewBackupTasks. // backupTasks that have not been delivered via NewBackupTasks.
func (q *taskPipeline) ForceQuit() { func (q *taskPipeline) ForceQuit() {
q.forced.Do(func() { q.forced.Do(func() {
log.Infof("Force quitting task pipeline") q.log.Infof("Force quitting task pipeline")
close(q.forceQuit) close(q.forceQuit)
q.signalUntilShutdown() q.signalUntilShutdown()
log.Infof("Task pipeline unclean shutdown complete") q.log.Infof("Task pipeline unclean shutdown complete")
}) })
} }
@ -139,13 +144,13 @@ func (q *taskPipeline) queueManager() {
// Exit only after the queue has been fully drained. // Exit only after the queue has been fully drained.
if q.queue.Len() == 0 { if q.queue.Len() == 0 {
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("Revoked state pipeline flushed.") q.log.Debugf("Revoked state pipeline flushed.")
return return
} }
case <-q.forceQuit: case <-q.forceQuit:
q.queueCond.L.Unlock() q.queueCond.L.Unlock()
log.Debugf("Revoked state pipeline force quit.") q.log.Debugf("Revoked state pipeline force quit.")
return return
default: default:
@ -165,7 +170,7 @@ func (q *taskPipeline) queueManager() {
// Force quit, return immediately to allow the client to exit. // Force quit, return immediately to allow the client to exit.
case <-q.forceQuit: case <-q.forceQuit:
log.Debugf("Revoked state pipeline force quit.") q.log.Debugf("Revoked state pipeline force quit.")
return return
} }
} }