lnd.xprv/watchtower/wtclient/client.go
Wilmer Paulino 56d66c80a1
watchtower: extend client db to filter sessions for a specific tower
This currently takes O(N) time as there does not exist an index of
active client sessions for each watchtower within the client's database.
This index is likely to be added in the future.
2019-07-30 15:13:22 -07:00

908 lines
30 KiB
Go

package wtclient
import (
"bytes"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/watchtower/wtdb"
"github.com/lightningnetwork/lnd/watchtower/wtpolicy"
"github.com/lightningnetwork/lnd/watchtower/wtserver"
"github.com/lightningnetwork/lnd/watchtower/wtwire"
)
const (
// DefaultReadTimeout specifies the default duration we will wait during
// a read before breaking out of a blocking read.
DefaultReadTimeout = 15 * time.Second
// DefaultWriteTimeout specifies the default duration we will wait during
// a write before breaking out of a blocking write.
DefaultWriteTimeout = 15 * time.Second
// DefaultStatInterval specifies the default interval between logging
// metrics about the client's operation.
DefaultStatInterval = time.Minute
// DefaultForceQuitDelay specifies the default duration after which the
// client should abandon any pending updates or session negotiations
// before terminating.
DefaultForceQuitDelay = 10 * time.Second
)
// Client is the primary interface used by the daemon to control a client's
// lifecycle and backup revoked states.
type Client interface {
// RegisterChannel persistently initializes any channel-dependent
// parameters within the client. This should be called during link
// startup to ensure that the client is able to support the link during
// operation.
RegisterChannel(lnwire.ChannelID) error
// BackupState initiates a request to back up a particular revoked
// state. If the method returns nil, the backup is guaranteed to be
// successful unless the client is force quit, or the justice
// transaction would create dust outputs when trying to abide by the
// negotiated policy.
BackupState(*lnwire.ChannelID, *lnwallet.BreachRetribution) error
// Start initializes the watchtower client, allowing it process requests
// to backup revoked channel states.
Start() error
// Stop attempts a graceful shutdown of the watchtower client. In doing
// so, it will attempt to flush the pipeline and deliver any queued
// states to the tower before exiting.
Stop() error
// ForceQuit will forcibly shutdown the watchtower client. Calling this
// may lead to queued states being dropped.
ForceQuit()
}
// Config provides the TowerClient with access to the resources it requires to
// perform its duty. All nillable fields must be non-nil for the tower to be
// initialized properly.
type Config struct {
// Signer provides access to the wallet so that the client can sign
// justice transactions that spend from a remote party's commitment
// transaction.
Signer input.Signer
// NewAddress generates a new on-chain sweep pkscript.
NewAddress func() ([]byte, error)
// SecretKeyRing is used to derive the session keys used to communicate
// with the tower. The client only stores the KeyLocators internally so
// that we never store private keys on disk.
SecretKeyRing SecretKeyRing
// Dial connects to an addr using the specified net and returns the
// connection object.
Dial Dial
// AuthDialer establishes a brontide connection over an onion or clear
// network.
AuthDial AuthDialer
// DB provides access to the client's stable storage medium.
DB DB
// Policy is the session policy the client will propose when creating
// new sessions with the tower. If the policy differs from any active
// sessions recorded in the database, those sessions will be ignored and
// new sessions will be requested immediately.
Policy wtpolicy.Policy
// PrivateTower is the net address of a private tower. The client will
// try to create all sessions with this tower.
PrivateTower *lnwire.NetAddress
// ChainHash identifies the chain that the client is on and for which
// the tower must be watching to monitor for breaches.
ChainHash chainhash.Hash
// ForceQuitDelay is the duration after attempting to shutdown that the
// client will automatically abort any pending backups if an unclean
// shutdown is detected. If the value is less than or equal to zero, a
// call to Stop may block indefinitely. The client can always be
// ForceQuit externally irrespective of the chosen parameter.
ForceQuitDelay time.Duration
// ReadTimeout is the duration we will wait during a read before
// breaking out of a blocking read. If the value is less than or equal
// to zero, the default will be used instead.
ReadTimeout time.Duration
// WriteTimeout is the duration we will wait during a write before
// breaking out of a blocking write. If the value is less than or equal
// to zero, the default will be used instead.
WriteTimeout time.Duration
// MinBackoff defines the initial backoff applied to connections with
// watchtowers. Subsequent backoff durations will grow exponentially up
// until MaxBackoff.
MinBackoff time.Duration
// MaxBackoff defines the maximum backoff applied to connections with
// watchtowers. If the exponential backoff produces a timeout greater
// than this value, the backoff will be clamped to MaxBackoff.
MaxBackoff time.Duration
}
// TowerClient is a concrete implementation of the Client interface, offering a
// non-blocking, reliable subsystem for backing up revoked states to a specified
// private tower.
type TowerClient struct {
started sync.Once
stopped sync.Once
forced sync.Once
cfg *Config
pipeline *taskPipeline
negotiator SessionNegotiator
candidateSessions map[wtdb.SessionID]*wtdb.ClientSession
activeSessions sessionQueueSet
targetTowerIDs map[wtdb.TowerID]struct{}
sessionQueue *sessionQueue
prevTask *backupTask
backupMu sync.Mutex
summaries wtdb.ChannelSummaries
chanCommitHeights map[lnwire.ChannelID]uint64
statTicker *time.Ticker
stats ClientStats
wg sync.WaitGroup
forceQuit chan struct{}
}
// Compile-time constraint to ensure *TowerClient implements the Client
// interface.
var _ Client = (*TowerClient)(nil)
// New initializes a new TowerClient from the provide Config. An error is
// returned if the client could not initialized.
func New(config *Config) (*TowerClient, error) {
// Copy the config to prevent side-effects from modifying both the
// internal and external version of the Config.
cfg := new(Config)
*cfg = *config
// Set the read timeout to the default if none was provided.
if cfg.ReadTimeout <= 0 {
cfg.ReadTimeout = DefaultReadTimeout
}
// Set the write timeout to the default if none was provided.
if cfg.WriteTimeout <= 0 {
cfg.WriteTimeout = DefaultWriteTimeout
}
// Record the tower in our database, also loading any addresses
// previously associated with its public key.
tower, err := cfg.DB.CreateTower(cfg.PrivateTower)
if err != nil {
return nil, err
}
log.Infof("Using private watchtower %s, offering policy %s",
cfg.PrivateTower, cfg.Policy)
candidates := newTowerListIterator(tower)
targetTowerIDs := candidates.TowerIDs()
// Next, load all active sessions from the db into the client. We will
// use any of these session if their policies match the current policy
// of the client, otherwise they will be ignored and new sessions will
// be requested.
sessions, err := cfg.DB.ListClientSessions(nil)
if err != nil {
return nil, err
}
// Reload any towers from disk using the tower IDs contained in each
// candidate session. We will also rederive any session keys needed to
// be able to communicate with the towers and authenticate session
// requests. This prevents us from having to store the private keys on
// disk.
for _, s := range sessions {
tower, err := cfg.DB.LoadTower(s.TowerID)
if err != nil {
return nil, err
}
sessionPriv, err := DeriveSessionKey(
cfg.SecretKeyRing, s.KeyIndex,
)
if err != nil {
return nil, err
}
s.Tower = tower
s.SessionPrivKey = sessionPriv
}
// Load the sweep pkscripts that have been generated for all previously
// registered channels.
chanSummaries, err := cfg.DB.FetchChanSummaries()
if err != nil {
return nil, err
}
c := &TowerClient{
cfg: cfg,
pipeline: newTaskPipeline(),
candidateSessions: sessions,
activeSessions: make(sessionQueueSet),
targetTowerIDs: targetTowerIDs,
summaries: chanSummaries,
statTicker: time.NewTicker(DefaultStatInterval),
forceQuit: make(chan struct{}),
}
c.negotiator = newSessionNegotiator(&NegotiatorConfig{
DB: cfg.DB,
SecretKeyRing: cfg.SecretKeyRing,
Policy: cfg.Policy,
ChainHash: cfg.ChainHash,
SendMessage: c.sendMessage,
ReadMessage: c.readMessage,
Dial: c.dial,
Candidates: candidates,
MinBackoff: cfg.MinBackoff,
MaxBackoff: cfg.MaxBackoff,
})
// Reconstruct the highest commit height processed for each channel
// under the client's current policy.
c.buildHighestCommitHeights()
return c, nil
}
// buildHighestCommitHeights inspects the full set of candidate client sessions
// loaded from disk, and determines the highest known commit height for each
// channel. This allows the client to reject backups that it has already
// processed for it's active policy.
func (c *TowerClient) buildHighestCommitHeights() {
chanCommitHeights := make(map[lnwire.ChannelID]uint64)
for _, s := range c.candidateSessions {
// We only want to consider accepted updates that have been
// accepted under an identical policy to the client's current
// policy.
if s.Policy != c.cfg.Policy {
continue
}
// Take the highest commit height found in the session's
// committed updates.
for _, committedUpdate := range s.CommittedUpdates {
bid := committedUpdate.BackupID
height, ok := chanCommitHeights[bid.ChanID]
if !ok || bid.CommitHeight > height {
chanCommitHeights[bid.ChanID] = bid.CommitHeight
}
}
// Take the heights commit height found in the session's acked
// updates.
for _, bid := range s.AckedUpdates {
height, ok := chanCommitHeights[bid.ChanID]
if !ok || bid.CommitHeight > height {
chanCommitHeights[bid.ChanID] = bid.CommitHeight
}
}
}
c.chanCommitHeights = chanCommitHeights
}
// Start initializes the watchtower client by loading or negotiating an active
// session and then begins processing backup tasks from the request pipeline.
func (c *TowerClient) Start() error {
var err error
c.started.Do(func() {
log.Infof("Starting watchtower client")
// First, restart a session queue for any sessions that have
// committed but unacked state updates. This ensures that these
// sessions will be able to flush the committed updates after a
// restart.
for _, session := range c.candidateSessions {
if len(session.CommittedUpdates) > 0 {
log.Infof("Starting session=%s to process "+
"%d committed backups", session.ID,
len(session.CommittedUpdates))
c.initActiveQueue(session)
}
}
// Now start the session negotiator, which will allow us to
// request new session as soon as the backupDispatcher starts
// up.
err = c.negotiator.Start()
if err != nil {
return
}
// Start the task pipeline to which new backup tasks will be
// submitted from active links.
c.pipeline.Start()
c.wg.Add(1)
go c.backupDispatcher()
log.Infof("Watchtower client started successfully")
})
return err
}
// Stop idempotently initiates a graceful shutdown of the watchtower client.
func (c *TowerClient) Stop() error {
c.stopped.Do(func() {
log.Debugf("Stopping watchtower client")
// 1. Shutdown the backup queue, which will prevent any further
// updates from being accepted. In practice, the links should be
// shutdown before the client has been stopped, so all updates
// would have been added prior.
c.pipeline.Stop()
// 2. To ensure we don't hang forever on shutdown due to
// unintended failures, we'll delay a call to force quit the
// pipeline if a ForceQuitDelay is specified. This will have no
// effect if the pipeline shuts down cleanly before the delay
// fires.
//
// For full safety, this can be set to 0 and wait out
// indefinitely. However for mobile clients which may have a
// limited amount of time to exit before the background process
// is killed, this offers a way to ensure the process
// terminates.
if c.cfg.ForceQuitDelay > 0 {
time.AfterFunc(c.cfg.ForceQuitDelay, c.ForceQuit)
}
// 3. Once the backup queue has shutdown, wait for the main
// dispatcher to exit. The backup queue will signal it's
// completion to the dispatcher, which releases the wait group
// after all tasks have been assigned to session queues.
c.wg.Wait()
// 4. Since all valid tasks have been assigned to session
// queues, we no longer need to negotiate sessions.
c.negotiator.Stop()
log.Debugf("Waiting for active session queues to finish "+
"draining, stats: %s", c.stats)
// 5. Shutdown all active session queues in parallel. These will
// exit once all updates have been acked by the watchtower.
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
return s.Stop
})
// Skip log if force quitting.
select {
case <-c.forceQuit:
return
default:
}
log.Debugf("Client successfully stopped, stats: %s", c.stats)
})
return nil
}
// ForceQuit idempotently initiates an unclean shutdown of the watchtower
// client. This should only be executed if Stop is unable to exit cleanly.
func (c *TowerClient) ForceQuit() {
c.forced.Do(func() {
log.Infof("Force quitting watchtower client")
// 1. Shutdown the backup queue, which will prevent any further
// updates from being accepted. In practice, the links should be
// shutdown before the client has been stopped, so all updates
// would have been added prior.
c.pipeline.ForceQuit()
// 2. Once the backup queue has shutdown, wait for the main
// dispatcher to exit. The backup queue will signal it's
// completion to the dispatcher, which releases the wait group
// after all tasks have been assigned to session queues.
close(c.forceQuit)
c.wg.Wait()
// 3. Since all valid tasks have been assigned to session
// queues, we no longer need to negotiate sessions.
c.negotiator.Stop()
// 4. Force quit all active session queues in parallel. These
// will exit once all updates have been acked by the watchtower.
c.activeSessions.ApplyAndWait(func(s *sessionQueue) func() {
return s.ForceQuit
})
log.Infof("Watchtower client unclean shutdown complete, "+
"stats: %s", c.stats)
})
}
// RegisterChannel persistently initializes any channel-dependent parameters
// within the client. This should be called during link startup to ensure that
// the client is able to support the link during operation.
func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {
c.backupMu.Lock()
defer c.backupMu.Unlock()
// If a pkscript for this channel already exists, the channel has been
// previously registered.
if _, ok := c.summaries[chanID]; ok {
return nil
}
// Otherwise, generate a new sweep pkscript used to sweep funds for this
// channel.
pkScript, err := c.cfg.NewAddress()
if err != nil {
return err
}
// Persist the sweep pkscript so that restarts will not introduce
// address inflation when the channel is reregistered after a restart.
err = c.cfg.DB.RegisterChannel(chanID, pkScript)
if err != nil {
return err
}
// Finally, cache the pkscript in our in-memory cache to avoid db
// lookups for the remainder of the daemon's execution.
c.summaries[chanID] = wtdb.ClientChanSummary{
SweepPkScript: pkScript,
}
return nil
}
// BackupState initiates a request to back up a particular revoked state. If the
// method returns nil, the backup is guaranteed to be successful unless the:
// - client is force quit,
// - justice transaction would create dust outputs when trying to abide by the
// negotiated policy, or
// - breached outputs contain too little value to sweep at the target sweep fee
// rate.
func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,
breachInfo *lnwallet.BreachRetribution) error {
// Retrieve the cached sweep pkscript used for this channel.
c.backupMu.Lock()
summary, ok := c.summaries[*chanID]
if !ok {
c.backupMu.Unlock()
return ErrUnregisteredChannel
}
// Ignore backups that have already been presented to the client.
height, ok := c.chanCommitHeights[*chanID]
if ok && breachInfo.RevokedStateNum <= height {
c.backupMu.Unlock()
log.Debugf("Ignoring duplicate backup for chanid=%v at height=%d",
chanID, breachInfo.RevokedStateNum)
return nil
}
// This backup has a higher commit height than any known backup for this
// channel. We'll update our tip so that we won't accept it again if the
// link flaps.
c.chanCommitHeights[*chanID] = breachInfo.RevokedStateNum
c.backupMu.Unlock()
task := newBackupTask(chanID, breachInfo, summary.SweepPkScript)
return c.pipeline.QueueBackupTask(task)
}
// nextSessionQueue attempts to fetch an active session from our set of
// candidate sessions. Candidate sessions with a differing policy from the
// active client's advertised policy will be ignored, but may be resumed if the
// client is restarted with a matching policy. If no candidates were found, nil
// is returned to signal that we need to request a new policy.
func (c *TowerClient) nextSessionQueue() *sessionQueue {
// Select any candidate session at random, and remove it from the set of
// candidate sessions.
var candidateSession *wtdb.ClientSession
for id, sessionInfo := range c.candidateSessions {
delete(c.candidateSessions, id)
// Skip any sessions with policies that don't match the current
// TxPolicy, as they would result in different justice
// transactions from what is requested. These can be used again
// if the client changes their configuration and restarting.
if sessionInfo.Policy.TxPolicy != c.cfg.Policy.TxPolicy {
continue
}
// Skip any sessions that are still active, but are not for the
// users currently configured tower.
if _, ok := c.targetTowerIDs[sessionInfo.TowerID]; !ok {
continue
}
candidateSession = sessionInfo
break
}
// If none of the sessions could be used or none were found, we'll
// return nil to signal that we need another session to be negotiated.
if candidateSession == nil {
return nil
}
// Initialize the session queue and spin it up so it can begin handling
// updates. If the queue was already made active on startup, this will
// simply return the existing session queue from the set.
return c.getOrInitActiveQueue(candidateSession)
}
// backupDispatcher processes events coming from the taskPipeline and is
// responsible for detecting when the client needs to renegotiate a session to
// fulfill continuing demand. The event loop exits after all tasks have been
// received from the upstream taskPipeline, or the taskPipeline is force quit.
//
// NOTE: This method MUST be run as a goroutine.
func (c *TowerClient) backupDispatcher() {
defer c.wg.Done()
log.Tracef("Starting backup dispatcher")
defer log.Tracef("Stopping backup dispatcher")
for {
switch {
// No active session queue and no additional sessions.
case c.sessionQueue == nil && len(c.candidateSessions) == 0:
log.Infof("Requesting new session.")
// Immediately request a new session.
c.negotiator.RequestSession()
// Wait until we receive the newly negotiated session.
// All backups sent in the meantime are queued in the
// revoke queue, as we cannot process them.
awaitSession:
select {
case session := <-c.negotiator.NewSessions():
log.Infof("Acquired new session with id=%s",
session.ID)
c.candidateSessions[session.ID] = session
c.stats.sessionAcquired()
case <-c.statTicker.C:
log.Infof("Client stats: %s", c.stats)
// Instead of looping, we'll jump back into the
// select case and await the delivery of the
// session to prevent us from re-requesting
// additional sessions.
goto awaitSession
case <-c.forceQuit:
return
}
// No active session queue but have additional sessions.
case c.sessionQueue == nil && len(c.candidateSessions) > 0:
// We've exhausted the prior session, we'll pop another
// from the remaining sessions and continue processing
// backup tasks.
c.sessionQueue = c.nextSessionQueue()
if c.sessionQueue != nil {
log.Debugf("Loaded next candidate session "+
"queue id=%s", c.sessionQueue.ID())
}
// Have active session queue, process backups.
case c.sessionQueue != nil:
if c.prevTask != nil {
c.processTask(c.prevTask)
// Continue to ensure the sessionQueue is
// properly initialized before attempting to
// process more tasks from the pipeline.
continue
}
// Normal operation where new tasks are read from the
// pipeline.
select {
// If any sessions are negotiated while we have an
// active session queue, queue them for future use.
// This shouldn't happen with the current design, so
// it doesn't hurt to select here just in case. In the
// future, we will likely allow more asynchrony so that
// we can request new sessions before the session is
// fully empty, which this case would handle.
case session := <-c.negotiator.NewSessions():
log.Warnf("Acquired new session with id=%s",
"while processing tasks", session.ID)
c.candidateSessions[session.ID] = session
c.stats.sessionAcquired()
case <-c.statTicker.C:
log.Infof("Client stats: %s", c.stats)
// Process each backup task serially from the queue of
// revoked states.
case task, ok := <-c.pipeline.NewBackupTasks():
// All backups in the pipeline have been
// processed, it is now safe to exit.
if !ok {
return
}
log.Debugf("Processing %v", task.id)
c.stats.taskReceived()
c.processTask(task)
}
}
}
}
// processTask attempts to schedule the given backupTask on the active
// sessionQueue. The task will either be accepted or rejected, afterwhich the
// appropriate modifications to the client's state machine will be made. After
// every invocation of processTask, the caller should ensure that the
// sessionQueue hasn't been exhausted before proceeding to the next task. Tasks
// that are rejected because the active sessionQueue is full will be cached as
// the prevTask, and should be reprocessed after obtaining a new sessionQueue.
func (c *TowerClient) processTask(task *backupTask) {
status, accepted := c.sessionQueue.AcceptTask(task)
if accepted {
c.taskAccepted(task, status)
} else {
c.taskRejected(task, status)
}
}
// taskAccepted processes the acceptance of a task by a sessionQueue depending
// on the state the sessionQueue is in *after* the task is added. The client's
// prevTask is always removed as a result of this call. The client's
// sessionQueue will be removed if accepting the task left the sessionQueue in
// an exhausted state.
func (c *TowerClient) taskAccepted(task *backupTask, newStatus reserveStatus) {
log.Infof("Queued %v successfully for session %v",
task.id, c.sessionQueue.ID())
c.stats.taskAccepted()
// If this task was accepted, we discard anything held in the prevTask.
// Either it was nil before, or is the task which was just accepted.
c.prevTask = nil
switch newStatus {
// The sessionQueue still has capacity after accepting this task.
case reserveAvailable:
// The sessionQueue is full after accepting this task, so we will need
// to request a new one before proceeding.
case reserveExhausted:
c.stats.sessionExhausted()
log.Debugf("Session %s exhausted", c.sessionQueue.ID())
// This task left the session exhausted, set it to nil and
// proceed to the next loop so we can consume another
// pre-negotiated session or request another.
c.sessionQueue = nil
}
}
// taskRejected process the rejection of a task by a sessionQueue depending on
// the state the was in *before* the task was rejected. The client's prevTask
// will cache the task if the sessionQueue was exhausted before hand, and nil
// the sessionQueue to find a new session. If the sessionQueue was not
// exhausted, the client marks the task as ineligible, as this implies we
// couldn't construct a valid justice transaction given the session's policy.
func (c *TowerClient) taskRejected(task *backupTask, curStatus reserveStatus) {
switch curStatus {
// The sessionQueue has available capacity but the task was rejected,
// this indicates that the task was ineligible for backup.
case reserveAvailable:
c.stats.taskIneligible()
log.Infof("Ignoring ineligible %v", task.id)
err := c.cfg.DB.MarkBackupIneligible(
task.id.ChanID, task.id.CommitHeight,
)
if err != nil {
log.Errorf("Unable to mark %v ineligible: %v",
task.id, err)
// It is safe to not handle this error, even if we could
// not persist the result. At worst, this task may be
// reprocessed on a subsequent start up, and will either
// succeed do a change in session parameters or fail in
// the same manner.
}
// If this task was rejected *and* the session had available
// capacity, we discard anything held in the prevTask. Either it
// was nil before, or is the task which was just rejected.
c.prevTask = nil
// The sessionQueue rejected the task because it is full, we will stash
// this task and try to add it to the next available sessionQueue.
case reserveExhausted:
c.stats.sessionExhausted()
log.Debugf("Session %v exhausted, %v queued for next session",
c.sessionQueue.ID(), task.id)
// Cache the task that we pulled off, so that we can process it
// once a new session queue is available.
c.sessionQueue = nil
c.prevTask = task
}
}
// dial connects the peer at addr using privKey as our secret key for the
// connection. The connection will use the configured Net's resolver to resolve
// the address for either Tor or clear net connections.
func (c *TowerClient) dial(privKey *btcec.PrivateKey,
addr *lnwire.NetAddress) (wtserver.Peer, error) {
return c.cfg.AuthDial(privKey, addr, c.cfg.Dial)
}
// readMessage receives and parses the next message from the given Peer. An
// error is returned if a message is not received before the server's read
// timeout, the read off the wire failed, or the message could not be
// deserialized.
func (c *TowerClient) readMessage(peer wtserver.Peer) (wtwire.Message, error) {
// Set a read timeout to ensure we drop the connection if nothing is
// received in a timely manner.
err := peer.SetReadDeadline(time.Now().Add(c.cfg.ReadTimeout))
if err != nil {
err = fmt.Errorf("unable to set read deadline: %v", err)
log.Errorf("Unable to read msg: %v", err)
return nil, err
}
// Pull the next message off the wire,
rawMsg, err := peer.ReadNextMessage()
if err != nil {
err = fmt.Errorf("unable to read message: %v", err)
log.Errorf("Unable to read msg: %v", err)
return nil, err
}
// Parse the received message according to the watchtower wire
// specification.
msgReader := bytes.NewReader(rawMsg)
msg, err := wtwire.ReadMessage(msgReader, 0)
if err != nil {
err = fmt.Errorf("unable to parse message: %v", err)
log.Errorf("Unable to read msg: %v", err)
return nil, err
}
logMessage(peer, msg, true)
return msg, nil
}
// sendMessage sends a watchtower wire message to the target peer.
func (c *TowerClient) sendMessage(peer wtserver.Peer, msg wtwire.Message) error {
// Encode the next wire message into the buffer.
// TODO(conner): use buffer pool
var b bytes.Buffer
_, err := wtwire.WriteMessage(&b, msg, 0)
if err != nil {
err = fmt.Errorf("Unable to encode msg: %v", err)
log.Errorf("Unable to send msg: %v", err)
return err
}
// Set the write deadline for the connection, ensuring we drop the
// connection if nothing is sent in a timely manner.
err = peer.SetWriteDeadline(time.Now().Add(c.cfg.WriteTimeout))
if err != nil {
err = fmt.Errorf("unable to set write deadline: %v", err)
log.Errorf("Unable to send msg: %v", err)
return err
}
logMessage(peer, msg, false)
// Write out the full message to the remote peer.
_, err = peer.Write(b.Bytes())
if err != nil {
log.Errorf("Unable to send msg: %v", err)
}
return err
}
// newSessionQueue creates a sessionQueue from a ClientSession loaded from the
// database and supplying it with the resources needed by the client.
func (c *TowerClient) newSessionQueue(s *wtdb.ClientSession) *sessionQueue {
return newSessionQueue(&sessionQueueConfig{
ClientSession: s,
ChainHash: c.cfg.ChainHash,
Dial: c.dial,
ReadMessage: c.readMessage,
SendMessage: c.sendMessage,
Signer: c.cfg.Signer,
DB: c.cfg.DB,
MinBackoff: c.cfg.MinBackoff,
MaxBackoff: c.cfg.MaxBackoff,
})
}
// getOrInitActiveQueue checks the activeSessions set for a sessionQueue for the
// passed ClientSession. If it exists, the active sessionQueue is returned.
// Otherwise a new sessionQueue is initialized and added to the set.
func (c *TowerClient) getOrInitActiveQueue(s *wtdb.ClientSession) *sessionQueue {
if sq, ok := c.activeSessions[s.ID]; ok {
return sq
}
return c.initActiveQueue(s)
}
// initActiveQueue creates a new sessionQueue from the passed ClientSession,
// adds the sessionQueue to the activeSessions set, and starts the sessionQueue
// so that it can deliver any committed updates or begin accepting newly
// assigned tasks.
func (c *TowerClient) initActiveQueue(s *wtdb.ClientSession) *sessionQueue {
// Initialize the session queue, providing it with all of the resources
// it requires from the client instance.
sq := c.newSessionQueue(s)
// Add the session queue as an active session so that we remember to
// stop it on shutdown.
c.activeSessions.Add(sq)
// Start the queue so that it can be active in processing newly assigned
// tasks or to upload previously committed updates.
sq.Start()
return sq
}
// logMessage writes information about a message received from a remote peer,
// using directional prepositions to signal whether the message was sent or
// received.
func logMessage(peer wtserver.Peer, msg wtwire.Message, read bool) {
var action = "Received"
var preposition = "from"
if !read {
action = "Sending"
preposition = "to"
}
summary := wtwire.MessageSummary(msg)
if len(summary) > 0 {
summary = "(" + summary + ")"
}
log.Debugf("%s %s%v %s %x@%s", action, msg.MsgType(), summary,
preposition, peer.RemotePub().SerializeCompressed(),
peer.RemoteAddr())
}