From f00b4c5e960888686e6e6dd334da89716b489bf3 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Fri, 15 Mar 2019 02:32:27 -0700 Subject: [PATCH] watchtower/wtclient/client: hook up full client pipeline --- watchtower/wtclient/client.go | 804 ++++++++++++++++++++++++++++++++++ 1 file changed, 804 insertions(+) create mode 100644 watchtower/wtclient/client.go diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go new file mode 100644 index 00000000..9bd8e9e8 --- /dev/null +++ b/watchtower/wtclient/client.go @@ -0,0 +1,804 @@ +package wtclient + +import ( + "bytes" + "fmt" + "sync" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/watchtower/wtdb" + "github.com/lightningnetwork/lnd/watchtower/wtpolicy" + "github.com/lightningnetwork/lnd/watchtower/wtserver" + "github.com/lightningnetwork/lnd/watchtower/wtwire" +) + +const ( + // DefaultReadTimeout specifies the default duration we will wait during + // a read before breaking out of a blocking read. + DefaultReadTimeout = 15 * time.Second + + // DefaultWriteTimeout specifies the default duration we will wait during + // a write before breaking out of a blocking write. + DefaultWriteTimeout = 15 * time.Second + + // DefaultStatInterval specifies the default interval between logging + // metrics about the client's operation. + DefaultStatInterval = 30 * time.Second +) + +// Client is the primary interface used by the daemon to control a client's +// lifecycle and backup revoked states. +type Client interface { + // RegisterChannel persistently initializes any channel-dependent + // parameters within the client. This should be called during link + // startup to ensure that the client is able to support the link during + // operation. + RegisterChannel(lnwire.ChannelID) error + + // BackupState initiates a request to back up a particular revoked + // state. If the method returns nil, the backup is guaranteed to be + // successful unless the client is force quit, or the justice + // transaction would create dust outputs when trying to abide by the + // negotiated policy. + BackupState(*lnwire.ChannelID, *lnwallet.BreachRetribution) error + + // Start initializes the watchtower client, allowing it process requests + // to backup revoked channel states. + Start() error + + // Stop attempts a graceful shutdown of the watchtower client. In doing + // so, it will attempt to flush the pipeline and deliver any queued + // states to the tower before exiting. + Stop() error + + // ForceQuit will forcibly shutdown the watchtower client. Calling this + // may lead to queued states being dropped. + ForceQuit() +} + +// Config provides the TowerClient with access to the resources it requires to +// perform its duty. All nillable fields must be non-nil for the tower to be +// initialized properly. +type Config struct { + // Signer provides access to the wallet so that the client can sign + // justice transactions that spend from a remote party's commitment + // transaction. + Signer input.Signer + + // NewAddress generates a new on-chain sweep pkscript. + NewAddress func() ([]byte, error) + + // SecretKeyRing is used to derive the session keys used to communicate + // with the tower. The client only stores the KeyLocators internally so + // that we never store private keys on disk. + SecretKeyRing keychain.SecretKeyRing + + // Dial connects to an addr using the specified net and returns the + // connection object. + Dial Dial + + // AuthDialer establishes a brontide connection over an onion or clear + // network. + AuthDial AuthDialer + + // DB provides access to the client's stable storage medium. + DB DB + + // Policy is the session policy the client will propose when creating + // new sessions with the tower. If the policy differs from any active + // sessions recorded in the database, those sessions will be ignored and + // new sessions will be requested immediately. + Policy wtpolicy.Policy + + // PrivateTower is the net address of a private tower. The client will + // try to create all sessions with this tower. + PrivateTower *lnwire.NetAddress + + // ChainHash identifies the chain that the client is on and for which + // the tower must be watching to monitor for breaches. + ChainHash chainhash.Hash + + // ForceQuitDelay is the duration after attempting to shutdown that the + // client will automatically abort any pending backups if an unclean + // shutdown is detected. If the value is less than or equal to zero, a + // call to Stop may block indefinitely. The client can always be + // ForceQuit externally irrespective of the chosen parameter. + ForceQuitDelay time.Duration + + // ReadTimeout is the duration we will wait during a read before + // breaking out of a blocking read. If the value is less than or equal + // to zero, the default will be used instead. + ReadTimeout time.Duration + + // WriteTimeout is the duration we will wait during a write before + // breaking out of a blocking write. If the value is less than or equal + // to zero, the default will be used instead. + WriteTimeout time.Duration + + // MinBackoff defines the initial backoff applied to connections with + // watchtowers. Subsequent backoff durations will grow exponentially up + // until MaxBackoff. + MinBackoff time.Duration + + // MaxBackoff defines the maximum backoff applied to conenctions with + // watchtowers. If the exponential backoff produces a timeout greater + // than this value, the backoff will be clamped to MaxBackoff. + MaxBackoff time.Duration +} + +// TowerClient is a concrete implementation of the Client interface, offering a +// non-blocking, reliable subsystem for backing up revoked states to a specified +// private tower. +type TowerClient struct { + started sync.Once + stopped sync.Once + forced sync.Once + + cfg *Config + + pipeline *taskPipeline + + negotiator SessionNegotiator + candidateSessions map[wtdb.SessionID]*wtdb.ClientSession + activeSessions sessionQueueSet + + sessionQueue *sessionQueue + prevTask *backupTask + + sweepPkScriptMu sync.RWMutex + sweepPkScripts map[lnwire.ChannelID][]byte + + statTicker *time.Ticker + stats clientStats + + wg sync.WaitGroup + forceQuit chan struct{} +} + +// Compile-time constraint to ensure *TowerClient implements the Client +// interface. +var _ Client = (*TowerClient)(nil) + +// New initializes a new TowerClient from the provide Config. An error is +// returned if the client could not initialized. +func New(config *Config) (*TowerClient, error) { + // Copy the config to prevent side-effects from modifying both the + // internal and external version of the Config. + cfg := new(Config) + *cfg = *config + + // Set the read timeout to the default if none was provided. + if cfg.ReadTimeout <= 0 { + cfg.ReadTimeout = DefaultReadTimeout + } + + // Set the write timeout to the default if none was provided. + if cfg.WriteTimeout <= 0 { + cfg.WriteTimeout = DefaultWriteTimeout + } + + // Record the tower in our database, also loading any addresses + // previously associated with its public key. + tower, err := cfg.DB.CreateTower(cfg.PrivateTower) + if err != nil { + return nil, err + } + + log.Infof("Using private watchtower %s, offering policy %s", + cfg.PrivateTower, cfg.Policy) + + c := &TowerClient{ + cfg: cfg, + pipeline: newTaskPipeline(), + activeSessions: make(sessionQueueSet), + statTicker: time.NewTicker(DefaultStatInterval), + forceQuit: make(chan struct{}), + } + c.negotiator = newSessionNegotiator(&NegotiatorConfig{ + DB: cfg.DB, + Policy: cfg.Policy, + ChainHash: cfg.ChainHash, + SendMessage: c.sendMessage, + ReadMessage: c.readMessage, + Dial: c.dial, + Candidates: newTowerListIterator(tower), + MinBackoff: cfg.MinBackoff, + MaxBackoff: cfg.MaxBackoff, + }) + + // Next, load all active sessions from the db into the client. We will + // use any of these session if their policies match the current policy + // of the client, otherwise they will be ignored and new sessions will + // be requested. + c.candidateSessions, err = c.cfg.DB.ListClientSessions() + if err != nil { + return nil, err + } + + // Finally, load the sweep pkscripts that have been generated for all + // previously registered channels. + c.sweepPkScripts, err = c.cfg.DB.FetchChanPkScripts() + if err != nil { + return nil, err + } + + return c, nil +} + +// Start initializes the watchtower client by loading or negotiating an active +// session and then begins processing backup tasks from the request pipeline. +func (c *TowerClient) Start() error { + var err error + c.started.Do(func() { + log.Infof("Starting watchtower client") + + // First, restart a session queue for any sessions that have + // committed but unacked state updates. This ensures that these + // sessions will be able to flush the committed updates after a + // restart. + for _, session := range c.candidateSessions { + if len(session.CommittedUpdates) > 0 { + log.Infof("Starting session=%s to process "+ + "%d committed backups", session.ID, + len(session.CommittedUpdates)) + c.initActiveQueue(session) + } + } + + // Now start the session negotiator, which will allow us to + // request new session as soon as the backupDispatcher starts + // up. + err = c.negotiator.Start() + if err != nil { + return + } + + // Start the task pipeline to which new backup tasks will be + // submitted from active links. + c.pipeline.Start() + + c.wg.Add(1) + go c.backupDispatcher() + + log.Infof("Watchtower client started successfully") + }) + return err +} + +// Stop idempotently initiates a graceful shutdown of the watchtower client. +func (c *TowerClient) Stop() error { + c.stopped.Do(func() { + log.Debugf("Stopping watchtower client") + + // 1. Shutdown the backup queue, which will prevent any further + // updates from being accepted. In practice, the links should be + // shutdown before the client has been stopped, so all updates + // would have been added prior. + c.pipeline.Stop() + + // 2. To ensure we don't hang forever on shutdown due to + // unintended failures, we'll delay a call to force quit the + // pipeline if a ForceQuitDelay is specified. This will have no + // effect if the pipeline shuts down cleanly before the delay + // fires. + // + // For full safety, this can be set to 0 and wait out + // indefinitely. However for mobile clients which may have a + // limited amount of time to exit before the background process + // is killed, this offers a way to ensure the process + // terminates. + if c.cfg.ForceQuitDelay > 0 { + time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit) + } + + // 3. Once the backup queue has shutdown, wait for the main + // dispatcher to exit. The backup queue will signal it's + // completion to the dispatcher, which releases the wait group + // after all tasks have been assigned to session queues. + c.wg.Wait() + + // 4. Since all valid tasks have been assigned to session + // queues, we no longer need to negotiate sessions. + c.negotiator.Stop() + + log.Debugf("Waiting for active session queues to finish "+ + "draining, stats: %s", c.stats) + + // 5. Shutdown all active session queues in parallel. These will + // exit once all updates have been acked by the watchtower. + c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { + return s.Stop + }) + + // Skip log if force quitting. + select { + case <-c.forceQuit: + return + default: + } + + log.Debugf("Client successfully stopped, stats: %s", c.stats) + }) + return nil +} + +// ForceQuit idempotently initiates an unclean shutdown of the watchtower +// client. This should only be executed if Stop is unable to exit cleanly. +func (c *TowerClient) ForceQuit() { + c.forced.Do(func() { + log.Infof("Force quitting watchtower client") + + // Cancel log message from stop. + close(c.forceQuit) + + // 1. Shutdown the backup queue, which will prevent any further + // updates from being accepted. In practice, the links should be + // shutdown before the client has been stopped, so all updates + // would have been added prior. + c.pipeline.ForceQuit() + + // 2. Once the backup queue has shutdown, wait for the main + // dispatcher to exit. The backup queue will signal it's + // completion to the dispatcher, which releases the wait group + // after all tasks have been assigned to session queues. + c.wg.Wait() + + // 3. Since all valid tasks have been assigned to session + // queues, we no longer need to negotiate sessions. + c.negotiator.Stop() + + // 4. Force quit all active session queues in parallel. These + // will exit once all updates have been acked by the watchtower. + c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() { + return s.ForceQuit + }) + + log.Infof("Watchtower client unclean shutdown complete, "+ + "stats: %s", c.stats) + }) +} + +// RegisterChannel persistently initializes any channel-dependent parameters +// within the client. This should be called during link startup to ensure that +// the client is able to support the link during operation. +func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error { + c.sweepPkScriptMu.Lock() + defer c.sweepPkScriptMu.Unlock() + + // If a pkscript for this channel already exists, the channel has been + // previously registered. + if _, ok := c.sweepPkScripts[chanID]; ok { + return nil + } + + // Otherwise, generate a new sweep pkscript used to sweep funds for this + // channel. + pkScript, err := c.cfg.NewAddress() + if err != nil { + return err + } + + // Persist the sweep pkscript so that restarts will not introduce + // address inflation when the channel is reregistered after a restart. + err = c.cfg.DB.AddChanPkScript(chanID, pkScript) + if err != nil { + return err + } + + // Finally, cache the pkscript in our in-memory cache to avoid db + // lookups for the remainder of the daemon's execution. + c.sweepPkScripts[chanID] = pkScript + + return nil +} + +// BackupState initiates a request to back up a particular revoked state. If the +// method returns nil, the backup is guaranteed to be successful unless the: +// - client is force quit, +// - justice transaction would create dust outputs when trying to abide by the +// negotiated policy, or +// - breached outputs contain too little value to sweep at the target sweep fee +// rate. +func (c *TowerClient) BackupState(chanID *lnwire.ChannelID, + breachInfo *lnwallet.BreachRetribution) error { + + // Retrieve the cached sweep pkscript used for this channel. + c.sweepPkScriptMu.RLock() + sweepPkScript, ok := c.sweepPkScripts[*chanID] + c.sweepPkScriptMu.RUnlock() + if !ok { + return ErrUnregisteredChannel + } + + task := newBackupTask(chanID, breachInfo, sweepPkScript) + + return c.pipeline.QueueBackupTask(task) +} + +// nextSessionQueue attempts to fetch an active session from our set of +// candidate sessions. Candidate sessions with a differing policy from the +// active client's advertised policy will be ignored, but may be resumed if the +// client is restarted with a matching policy. If no candidates were found, nil +// is returned to signal that we need to request a new policy. +func (c *TowerClient) nextSessionQueue() *sessionQueue { + // Select any candidate session at random, and remove it from the set of + // candidate sessions. + var candidateSession *wtdb.ClientSession + for id, sessionInfo := range c.candidateSessions { + delete(c.candidateSessions, id) + + // Skip any sessions with policies that don't match the current + // configuration. These can be used again if the client changes + // their configuration back. + if sessionInfo.Policy != c.cfg.Policy { + continue + } + + candidateSession = sessionInfo + break + } + + // If none of the sessions could be used or none were found, we'll + // return nil to signal that we need another session to be negotiated. + if candidateSession == nil { + return nil + } + + // Initialize the session queue and spin it up so it can begin handling + // updates. If the queue was already made active on startup, this will + // simply return the existing session queue from the set. + return c.getOrInitActiveQueue(candidateSession) +} + +// backupDispatcher processes events coming from the taskPipeline and is +// responsible for detecting when the client needs to renegotiate a session to +// fulfill continuing demand. The event loop exits after all tasks have been +// received from the upstream taskPipeline, or the taskPipeline is force quit. +// +// NOTE: This method MUST be run as a goroutine. +func (c *TowerClient) backupDispatcher() { + defer c.wg.Done() + + log.Tracef("Starting backup dispatcher") + defer log.Tracef("Stopping backup dispatcher") + + for { + switch { + + // No active session queue and no additional sessions. + case c.sessionQueue == nil && len(c.candidateSessions) == 0: + log.Infof("Requesting new session.") + + // Immediately request a new session. + c.negotiator.RequestSession() + + // Wait until we receive the newly negotiated session. + // All backups sent in the meantime are queued in the + // revoke queue, as we cannot process them. + select { + case session := <-c.negotiator.NewSessions(): + log.Infof("Acquired new session with id=%s", + session.ID) + c.candidateSessions[session.ID] = session + c.stats.sessionAcquired() + + case <-c.statTicker.C: + log.Infof("Client stats: %s", c.stats) + } + + // No active session queue but have additional sessions. + case c.sessionQueue == nil && len(c.candidateSessions) > 0: + // We've exhausted the prior session, we'll pop another + // from the remaining sessions and continue processing + // backup tasks. + c.sessionQueue = c.nextSessionQueue() + if c.sessionQueue != nil { + log.Debugf("Loaded next candidate session "+ + "queue id=%s", c.sessionQueue.ID()) + } + + // Have active session queue, process backups. + case c.sessionQueue != nil: + if c.prevTask != nil { + c.processTask(c.prevTask) + + // Continue to ensure the sessionQueue is + // properly initialized before attempting to + // process more tasks from the pipeline. + continue + } + + // Normal operation where new tasks are read from the + // pipeline. + select { + + // If any sessions are negotiated while we have an + // active session queue, queue them for future use. + // This shouldn't happen with the current design, so + // it doesn't hurt to select here just in case. In the + // future, we will likely allow more asynchrony so that + // we can request new sessions before the session is + // fully empty, which this case would handle. + case session := <-c.negotiator.NewSessions(): + log.Warnf("Acquired new session with id=%s", + "while processing tasks", session.ID) + c.candidateSessions[session.ID] = session + c.stats.sessionAcquired() + + case <-c.statTicker.C: + log.Infof("Client stats: %s", c.stats) + + // Process each backup task serially from the queue of + // revoked states. + case task, ok := <-c.pipeline.NewBackupTasks(): + // All backups in the pipeline have been + // processed, it is now safe to exit. + if !ok { + return + } + + log.Debugf("Processing backup task chanid=%s "+ + "commit-height=%d", task.id.ChanID, + task.id.CommitHeight) + + c.stats.taskReceived() + c.processTask(task) + } + } + } +} + +// processTask attempts to schedule the given backupTask on the active +// sessionQueue. The task will either be accepted or rejected, afterwhich the +// appropriate modifications to the client's state machine will be made. After +// every invocation of processTask, the caller should ensure that the +// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks +// that are rejected because the active sessionQueue is full will be cached as +// the prevTask, and should be reprocessed after obtaining a new sessionQueue. +func (c *TowerClient) processTask(task *backupTask) { + status, accepted := c.sessionQueue.AcceptTask(task) + if accepted { + c.taskAccepted(task, status) + } else { + c.taskRejected(task, status) + } +} + +// taskAccepted processes the acceptance of a task by a sessionQueue depending +// on the state the sessionQueue is in *after* the task is added. The client's +// prevTask is always removed as a result of this call. The client's +// sessionQueue will be removed if accepting the task left the sessionQueue in +// an exhausted state. +func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) { + log.Infof("Backup chanid=%s commit-height=%d accepted successfully", + task.id.ChanID, task.id.CommitHeight) + + c.stats.taskAccepted() + + // If this task was accepted, we discard anything held in the prevTask. + // Either it was nil before, or is the task which was just accepted. + c.prevTask = nil + + switch newStatus { + + // The sessionQueue still has capacity after accepting this task. + case reserveAvailable: + + // The sessionQueue is full after accepting this task, so we will need + // to request a new one before proceeding. + case reserveExhausted: + c.stats.sessionExhausted() + + log.Debugf("Session %s exhausted", c.sessionQueue.ID()) + + // This task left the session exhausted, set it to nil and + // proceed to the next loop so we can consume another + // pre-negotiated session or request another. + c.sessionQueue = nil + } +} + +// taskRejected process the rejection of a task by a sessionQueue depending on +// the state the was in *before* the task was rejected. The client's prevTask +// will cache the task if the sessionQueue was exhausted before hand, and nil +// the sessionQueue to find a new session. If the sessionQueue was not +// exhausted, the client marks the task as ineligible, as this implies we +// couldn't construct a valid justice transaction given the session's policy. +func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) { + switch curStatus { + + // The sessionQueue has available capacity but the task was rejected, + // this indicates that the task was ineligible for backup. + case reserveAvailable: + c.stats.taskIneligible() + + log.Infof("Backup chanid=%s commit-height=%d is ineligible", + task.id.ChanID, task.id.CommitHeight) + + err := c.cfg.DB.MarkBackupIneligible( + task.id.ChanID, task.id.CommitHeight, + ) + if err != nil { + log.Errorf("Unable to mark task chanid=%s "+ + "commit-height=%d ineligible: %v", + task.id.ChanID, task.id.CommitHeight, err) + + // It is safe to not handle this error, even if we could + // not persist the result. At worst, this task may be + // reprocessed on a subsequent start up, and will either + // succeed do a change in session parameters or fail in + // the same manner. + } + + // If this task was rejected *and* the session had available + // capacity, we discard anything held in the prevTask. Either it + // was nil before, or is the task which was just rejected. + c.prevTask = nil + + // The sessionQueue rejected the task because it is full, we will stash + // this task and try to add it to the next available sessionQueue. + case reserveExhausted: + c.stats.sessionExhausted() + + log.Debugf("Session %s exhausted, backup chanid=%s "+ + "commit-height=%d queued for next session", + c.sessionQueue.ID(), task.id.ChanID, + task.id.CommitHeight) + + // Cache the task that we pulled off, so that we can process it + // once a new session queue is available. + c.sessionQueue = nil + c.prevTask = task + } +} + +// dial connects the peer at addr using privKey as our secret key for the +// connection. The connection will use the configured Net's resolver to resolve +// the address for either Tor or clear net connections. +func (c *TowerClient) dial(privKey *btcec.PrivateKey, + addr *lnwire.NetAddress) (wtserver.Peer, error) { + + return c.cfg.AuthDial(privKey, addr, c.cfg.Dial) +} + +// readMessage receives and parses the next message from the given Peer. An +// error is returned if a message is not received before the server's read +// timeout, the read off the wire failed, or the message could not be +// deserialized. +func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) { + // Set a read timeout to ensure we drop the connection if nothing is + // received in a timely manner. + err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout)) + if err != nil { + err = fmt.Errorf("unable to set read deadline: %v", err) + log.Errorf("Unable to read msg: %v", err) + return nil, err + } + + // Pull the next message off the wire, + rawMsg, err := peer.ReadNextMessage() + if err != nil { + err = fmt.Errorf("unable to read message: %v", err) + log.Errorf("Unable to read msg: %v", err) + return nil, err + } + + // Parse the received message according to the watchtower wire + // specification. + msgReader := bytes.NewReader(rawMsg) + msg, err := wtwire.ReadMessage(msgReader, 0) + if err != nil { + err = fmt.Errorf("unable to parse message: %v", err) + log.Errorf("Unable to read msg: %v", err) + return nil, err + } + + logMessage(peer, msg, true) + + return msg, nil +} + +// sendMessage sends a watchtower wire message to the target peer. +func (c *TowerClient) sendMessage(peer wtserver.Peer, msg wtwire.Message) error { + // Encode the next wire message into the buffer. + // TODO(conner): use buffer pool + var b bytes.Buffer + _, err := wtwire.WriteMessage(&b, msg, 0) + if err != nil { + err = fmt.Errorf("Unable to encode msg: %v", err) + log.Errorf("Unable to send msg: %v", err) + return err + } + + // Set the write deadline for the connection, ensuring we drop the + // connection if nothing is sent in a timely manner. + err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout)) + if err != nil { + err = fmt.Errorf("unable to set write deadline: %v", err) + log.Errorf("Unable to send msg: %v", err) + return err + } + + logMessage(peer, msg, false) + + // Write out the full message to the remote peer. + _, err = peer.Write(b.Bytes()) + if err != nil { + log.Errorf("Unable to send msg: %v", err) + } + return err +} + +// newSessionQueue creates a sessionQueue from a ClientSession loaded from the +// database and supplying it with the resources needed by the client. +func (c *TowerClient) newSessionQueue(s *wtdb.ClientSession) *sessionQueue { + return newSessionQueue(&sessionQueueConfig{ + ClientSession: s, + ChainHash: c.cfg.ChainHash, + Dial: c.dial, + ReadMessage: c.readMessage, + SendMessage: c.sendMessage, + Signer: c.cfg.Signer, + DB: c.cfg.DB, + MinBackoff: c.cfg.MinBackoff, + MaxBackoff: c.cfg.MaxBackoff, + }) +} + +// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the +// passed ClientSession. If it exists, the active sessionQueue is returned. +// Otherwise a new sessionQueue is initialized and added to the set. +func (c *TowerClient) getOrInitActiveQueue(s *wtdb.ClientSession) *sessionQueue { + if sq, ok := c.activeSessions[s.ID]; ok { + return sq + } + + return c.initActiveQueue(s) +} + +// initActiveQueue creates a new sessionQueue from the passed ClientSession, +// adds the sessionQueue to the activeSessions set, and starts the sessionQueue +// so that it can deliver any committed updates or begin accepting newly +// assigned tasks. +func (c *TowerClient) initActiveQueue(s *wtdb.ClientSession) *sessionQueue { + // Initialize the session queue, providing it with all of the resources + // it requires from the client instance. + sq := c.newSessionQueue(s) + + // Add the session queue as an active session so that we remember to + // stop it on shutdown. + c.activeSessions.Add(sq) + + // Start the queue so that it can be active in processing newly assigned + // tasks or to upload previously committed updates. + sq.Start() + + return sq +} + +// logMessage writes information about a message received from a remote peer, +// using directional prepositions to signal whether the message was sent or +// received. +func logMessage(peer wtserver.Peer, msg wtwire.Message, read bool) { + var action = "Received" + var preposition = "from" + if !read { + action = "Sending" + preposition = "to" + } + + summary := wtwire.MessageSummary(msg) + if len(summary) > 0 { + summary = "(" + summary + ")" + } + + log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary, + preposition, peer.RemotePub().SerializeCompressed(), + peer.RemoteAddr()) +}