diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 36f36d19..94ea800b 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -10,6 +10,8 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" @@ -231,6 +233,8 @@ type TowerClient struct { cfg *Config + log btclog.Logger + pipeline *taskPipeline negotiator SessionNegotiator @@ -277,6 +281,12 @@ func New(config *Config) (*TowerClient, error) { cfg.WriteTimeout = DefaultWriteTimeout } + prefix := "(legacy)" + if cfg.Policy.IsAnchorChannel() { + prefix = "(anchor)" + } + plog := build.NewPrefixLog(prefix, log) + // Next, load all candidate sessions and towers from the database into // the client. We will use any of these session if their policies match // the current policy of the client, otherwise they will be ignored and @@ -292,7 +302,7 @@ func New(config *Config) (*TowerClient, error) { var candidateTowers []*wtdb.Tower for _, s := range candidateSessions { - log.Infof("Using private watchtower %s, offering policy %s", + plog.Infof("Using private watchtower %s, offering policy %s", s.Tower, cfg.Policy) candidateTowers = append(candidateTowers, s.Tower) } @@ -306,6 +316,7 @@ func New(config *Config) (*TowerClient, error) { c := &TowerClient{ cfg: cfg, + log: plog, pipeline: newTaskPipeline(), candidateTowers: newTowerListIterator(candidateTowers...), candidateSessions: candidateSessions, @@ -427,7 +438,7 @@ func (c *TowerClient) buildHighestCommitHeights() { func (c *TowerClient) Start() error { var err error c.started.Do(func() { - log.Infof("Starting watchtower client") + c.log.Infof("Starting watchtower client") // First, restart a session queue for any sessions that have // committed but unacked state updates. This ensures that these @@ -435,7 +446,7 @@ func (c *TowerClient) Start() error { // restart. for _, session := range c.candidateSessions { if len(session.CommittedUpdates) > 0 { - log.Infof("Starting session=%s to process "+ + c.log.Infof("Starting session=%s to process "+ "%d committed backups", session.ID, len(session.CommittedUpdates)) c.initActiveQueue(session) @@ -465,7 +476,7 @@ func (c *TowerClient) Start() error { // Stop idempotently initiates a graceful shutdown of the watchtower client. func (c *TowerClient) Stop() error { c.stopped.Do(func() { - log.Debugf("Stopping watchtower client") + c.log.Debugf("Stopping watchtower client") // 1. Shutdown the backup queue, which will prevent any further // updates from being accepted. In practice, the links should be @@ -498,7 +509,7 @@ func (c *TowerClient) Stop() error { // queues, we no longer need to negotiate sessions. c.negotiator.Stop() - log.Debugf("Waiting for active session queues to finish "+ + c.log.Debugf("Waiting for active session queues to finish "+ "draining, stats: %s", c.stats) // 5. Shutdown all active session queues in parallel. These will @@ -514,7 +525,7 @@ func (c *TowerClient) Stop() error { default: } - log.Debugf("Client successfully stopped, stats: %s", c.stats) + c.log.Debugf("Client successfully stopped, stats: %s", c.stats) }) return nil } @@ -523,7 +534,7 @@ func (c *TowerClient) Stop() error { // client. This should only be executed if Stop is unable to exit cleanly. func (c *TowerClient) ForceQuit() { c.forced.Do(func() { - log.Infof("Force quitting watchtower client") + c.log.Infof("Force quitting watchtower client") // 1. Shutdown the backup queue, which will prevent any further // updates from being accepted. In practice, the links should be @@ -548,7 +559,7 @@ func (c *TowerClient) ForceQuit() { return s.ForceQuit }) - log.Infof("Watchtower client unclean shutdown complete, "+ + c.log.Infof("Watchtower client unclean shutdown complete, "+ "stats: %s", c.stats) }) } @@ -612,7 +623,7 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, height, ok := c.chanCommitHeights[*chanID] if ok && breachInfo.RevokedStateNum <= height { c.backupMu.Unlock() - log.Debugf("Ignoring duplicate backup for chanid=%v at height=%d", + c.log.Debugf("Ignoring duplicate backup for chanid=%v at height=%d", chanID, breachInfo.RevokedStateNum) return nil } @@ -675,15 +686,15 @@ func (c *TowerClient) nextSessionQueue() *sessionQueue { func (c *TowerClient) backupDispatcher() { defer c.wg.Done() - log.Tracef("Starting backup dispatcher") - defer log.Tracef("Stopping backup dispatcher") + c.log.Tracef("Starting backup dispatcher") + defer c.log.Tracef("Stopping backup dispatcher") for { switch { // No active session queue and no additional sessions. case c.sessionQueue == nil && len(c.candidateSessions) == 0: - log.Infof("Requesting new session.") + c.log.Infof("Requesting new session.") // Immediately request a new session. c.negotiator.RequestSession() @@ -694,7 +705,7 @@ func (c *TowerClient) backupDispatcher() { awaitSession: select { case session := <-c.negotiator.NewSessions(): - log.Infof("Acquired new session with id=%s", + c.log.Infof("Acquired new session with id=%s", session.ID) c.candidateSessions[session.ID] = session c.stats.sessionAcquired() @@ -704,7 +715,7 @@ func (c *TowerClient) backupDispatcher() { continue case <-c.statTicker.C: - log.Infof("Client stats: %s", c.stats) + c.log.Infof("Client stats: %s", c.stats) // A new tower has been requested to be added. We'll // update our persisted and in-memory state and consider @@ -738,7 +749,7 @@ func (c *TowerClient) backupDispatcher() { // backup tasks. c.sessionQueue = c.nextSessionQueue() if c.sessionQueue != nil { - log.Debugf("Loaded next candidate session "+ + c.log.Debugf("Loaded next candidate session "+ "queue id=%s", c.sessionQueue.ID()) } @@ -765,13 +776,13 @@ func (c *TowerClient) backupDispatcher() { // we can request new sessions before the session is // fully empty, which this case would handle. case session := <-c.negotiator.NewSessions(): - log.Warnf("Acquired new session with id=%s "+ + c.log.Warnf("Acquired new session with id=%s "+ "while processing tasks", session.ID) c.candidateSessions[session.ID] = session c.stats.sessionAcquired() case <-c.statTicker.C: - log.Infof("Client stats: %s", c.stats) + c.log.Infof("Client stats: %s", c.stats) // Process each backup task serially from the queue of // revoked states. @@ -782,7 +793,7 @@ func (c *TowerClient) backupDispatcher() { return } - log.Debugf("Processing %v", task.id) + c.log.Debugf("Processing %v", task.id) c.stats.taskReceived() c.processTask(task) @@ -827,7 +838,7 @@ func (c *TowerClient) processTask(task *backupTask) { // sessionQueue will be removed if accepting the task left the sessionQueue in // an exhausted state. func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) { - log.Infof("Queued %v successfully for session %v", + c.log.Infof("Queued %v successfully for session %v", task.id, c.sessionQueue.ID()) c.stats.taskAccepted() @@ -846,7 +857,7 @@ func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) { case reserveExhausted: c.stats.sessionExhausted() - log.Debugf("Session %s exhausted", c.sessionQueue.ID()) + c.log.Debugf("Session %s exhausted", c.sessionQueue.ID()) // This task left the session exhausted, set it to nil and // proceed to the next loop so we can consume another @@ -869,13 +880,13 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { case reserveAvailable: c.stats.taskIneligible() - log.Infof("Ignoring ineligible %v", task.id) + c.log.Infof("Ignoring ineligible %v", task.id) err := c.cfg.DB.MarkBackupIneligible( task.id.ChanID, task.id.CommitHeight, ) if err != nil { - log.Errorf("Unable to mark %v ineligible: %v", + c.log.Errorf("Unable to mark %v ineligible: %v", task.id, err) // It is safe to not handle this error, even if we could @@ -895,7 +906,7 @@ func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { case reserveExhausted: c.stats.sessionExhausted() - log.Debugf("Session %v exhausted, %v queued for next session", + c.log.Debugf("Session %v exhausted, %v queued for next session", c.sessionQueue.ID(), task.id) // Cache the task that we pulled off, so that we can process it @@ -924,7 +935,7 @@ func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout)) if err != nil { err = fmt.Errorf("unable to set read deadline: %v", err) - log.Errorf("Unable to read msg: %v", err) + c.log.Errorf("Unable to read msg: %v", err) return nil, err } @@ -932,7 +943,7 @@ func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { rawMsg, err := peer.ReadNextMessage() if err != nil { err = fmt.Errorf("unable to read message: %v", err) - log.Errorf("Unable to read msg: %v", err) + c.log.Errorf("Unable to read msg: %v", err) return nil, err } @@ -942,11 +953,11 @@ func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { msg, err := wtwire.ReadMessage(msgReader, 0) if err != nil { err = fmt.Errorf("unable to parse message: %v", err) - log.Errorf("Unable to read msg: %v", err) + c.log.Errorf("Unable to read msg: %v", err) return nil, err } - logMessage(peer, msg, true) + c.logMessage(peer, msg, true) return msg, nil } @@ -959,7 +970,7 @@ func (c *TowerClient) sendMessage(peer wtserver.Peer, msg wtwire.Message) error _, err := wtwire.WriteMessage(&b, msg, 0) if err != nil { err = fmt.Errorf("Unable to encode msg: %v", err) - log.Errorf("Unable to send msg: %v", err) + c.log.Errorf("Unable to send msg: %v", err) return err } @@ -968,16 +979,16 @@ func (c *TowerClient) sendMessage(peer wtserver.Peer, msg wtwire.Message) error err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout)) if err != nil { err = fmt.Errorf("unable to set write deadline: %v", err) - log.Errorf("Unable to send msg: %v", err) + c.log.Errorf("Unable to send msg: %v", err) return err } - logMessage(peer, msg, false) + c.logMessage(peer, msg, false) // Write out the full message to the remote peer. _, err = peer.Write(b.Bytes()) if err != nil { - log.Errorf("Unable to send msg: %v", err) + c.log.Errorf("Unable to send msg: %v", err) } return err } @@ -1240,7 +1251,9 @@ func (c *TowerClient) Policy() wtpolicy.Policy { // logMessage writes information about a message received from a remote peer, // using directional prepositions to signal whether the message was sent or // received. -func logMessage(peer wtserver.Peer, msg wtwire.Message, read bool) { +func (c *TowerClient) logMessage( + peer wtserver.Peer, msg wtwire.Message, read bool) { + var action = "Received" var preposition = "from" if !read { @@ -1253,7 +1266,7 @@ func logMessage(peer wtserver.Peer, msg wtwire.Message, read bool) { summary = "(" + summary + ")" } - log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary, + c.log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary, preposition, peer.RemotePub().SerializeCompressed(), peer.RemoteAddr()) }