watchtower/wtclient: extend TowerClient with CRUD operations for towers

In this commit, we extend the wtclient.Client interface with the
following methods:

  * AddTower
  * RemoveTower
  * RegisteredTowers
  * LookupTower
  * Stats

Care has been taken to ensure that any in-memory state updates are
_only_ performed after a successful database update.

These methods are currently unused, but they serve as a dependency for
the upcoming WatchtowerClient RPC subserver.
This commit is contained in:
Wilmer Paulino 2019-06-07 17:46:15 -07:00
parent 06d10d8100
commit c57128097e
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 353 additions and 9 deletions

@ -2,7 +2,9 @@ package wtclient
import (
"bytes"
"errors"
"fmt"
"net"
"sync"
"time"
@ -36,9 +38,48 @@ const (
DefaultForceQuitDelay = 10 * time.Second
)
// RegisteredTower encompasses information about a registered watchtower with
// the client.
type RegisteredTower struct {
*wtdb.Tower
// Sessions is the set of sessions corresponding to the watchtower.
Sessions map[wtdb.SessionID]*wtdb.ClientSession
// ActiveSessionCandidate determines whether the watchtower is currently
// being considered for new sessions.
ActiveSessionCandidate bool
}
// Client is the primary interface used by the daemon to control a client's
// lifecycle and backup revoked states.
type Client interface {
// AddTower adds a new watchtower reachable at the given address and
// considers it for new sessions. If the watchtower already exists, then
// any new addresses included will be considered when dialing it for
// session negotiations and backups.
AddTower(*lnwire.NetAddress) error
// RemoveTower removes a watchtower from being considered for future
// session negotiations and from being used for any subsequent backups
// until it's added again. If an address is provided, then this call
// only serves as a way of removing the address from the watchtower
// instead.
RemoveTower(*btcec.PublicKey, net.Addr) error
// RegisteredTowers retrieves the list of watchtowers registered with
// the client.
RegisteredTowers() ([]*RegisteredTower, error)
// LookupTower retrieves a registered watchtower through its public key.
LookupTower(*btcec.PublicKey) (*RegisteredTower, error)
// Stats returns the in-memory statistics of the client since startup.
Stats() ClientStats
// Policy returns the active client policy configuration.
Policy() wtpolicy.Policy
// RegisterChannel persistently initializes any channel-dependent
// parameters within the client. This should be called during link
// startup to ensure that the client is able to support the link during
@ -136,6 +177,39 @@ type Config struct {
MaxBackoff time.Duration
}
// newTowerMsg is an internal message we'll use within the TowerClient to signal
// that a new tower can be considered.
type newTowerMsg struct {
// addr is the tower's reachable address that we'll use to establish a
// connection with.
addr *lnwire.NetAddress
// errChan is the channel through which we'll send a response back to
// the caller when handling their request.
//
// NOTE: This channel must be buffered.
errChan chan error
}
// staleTowerMsg is an internal message we'll use within the TowerClient to
// signal that a tower should no longer be considered.
type staleTowerMsg struct {
// pubKey is the identifying public key of the watchtower.
pubKey *btcec.PublicKey
// addr is an optional field that when set signals that the address
// should be removed from the watchtower's set of addresses, indicating
// that it is stale. If it's not set, then the watchtower should be
// no longer be considered for new sessions.
addr net.Addr
// errChan is the channel through which we'll send a response back to
// the caller when handling their request.
//
// NOTE: This channel must be buffered.
errChan chan error
}
// TowerClient is a concrete implementation of the Client interface, offering a
// non-blocking, reliable subsystem for backing up revoked states to a specified
// private tower.
@ -161,7 +235,10 @@ type TowerClient struct {
chanCommitHeights map[lnwire.ChannelID]uint64
statTicker *time.Ticker
stats ClientStats
stats *ClientStats
newTowers chan *newTowerMsg
staleTowers chan *staleTowerMsg
wg sync.WaitGroup
forceQuit chan struct{}
@ -247,6 +324,9 @@ func New(config *Config) (*TowerClient, error) {
activeSessions: make(sessionQueueSet),
summaries: chanSummaries,
statTicker: time.NewTicker(DefaultStatInterval),
stats: new(ClientStats),
newTowers: make(chan *newTowerMsg),
staleTowers: make(chan *staleTowerMsg),
forceQuit: make(chan struct{}),
}
c.negotiator = newSessionNegotiator(&NegotiatorConfig{
@ -587,19 +667,38 @@ func (c *TowerClient) backupDispatcher() {
c.candidateSessions[session.ID] = session
c.stats.sessionAcquired()
// We'll continue to choose the newly negotiated
// session as our active session queue.
continue
case <-c.statTicker.C:
log.Infof("Client stats: %s", c.stats)
// Instead of looping, we'll jump back into the
// select case and await the delivery of the
// session to prevent us from re-requesting
// additional sessions.
goto awaitSession
// A new tower has been requested to be added. We'll
// update our persisted and in-memory state and consider
// its corresponding sessions, if any, as new
// candidates.
case msg := <-c.newTowers:
msg.errChan <- c.handleNewTower(msg)
// A tower has been requested to be removed. We'll
// immediately return an error as we want to avoid the
// possibility of a new session being negotiated with
// this request's tower.
case msg := <-c.staleTowers:
msg.errChan <- errors.New("removing towers " +
"is disallowed while a new session " +
"negotiation is in progress")
case <-c.forceQuit:
return
}
// Instead of looping, we'll jump back into the select
// case and await the delivery of the session to prevent
// us from re-requesting additional sessions.
goto awaitSession
// No active session queue but have additional sessions.
case c.sessionQueue == nil && len(c.candidateSessions) > 0:
// We've exhausted the prior session, we'll pop another
@ -634,7 +733,7 @@ func (c *TowerClient) backupDispatcher() {
// we can request new sessions before the session is
// fully empty, which this case would handle.
case session := <-c.negotiator.NewSessions():
log.Warnf("Acquired new session with id=%s",
log.Warnf("Acquired new session with id=%s "+
"while processing tasks", session.ID)
c.candidateSessions[session.ID] = session
c.stats.sessionAcquired()
@ -655,6 +754,20 @@ func (c *TowerClient) backupDispatcher() {
c.stats.taskReceived()
c.processTask(task)
// A new tower has been requested to be added. We'll
// update our persisted and in-memory state and consider
// its corresponding sessions, if any, as new
// candidates.
case msg := <-c.newTowers:
msg.errChan <- c.handleNewTower(msg)
// A tower has been removed, so we'll remove certain
// information that's persisted and also in our
// in-memory state depending on the request, and set any
// of its corresponding candidate sessions as inactive.
case msg := <-c.staleTowers:
msg.errChan <- c.handleStaleTower(msg)
}
}
}
@ -884,6 +997,207 @@ func (c *TowerClient) initActiveQueue(s *wtdb.ClientSession) *sessionQueue {
return sq
}
// AddTower adds a new watchtower reachable at the given address and considers
// it for new sessions. If the watchtower already exists, then any new addresses
// included will be considered when dialing it for session negotiations and
// backups.
func (c *TowerClient) AddTower(addr *lnwire.NetAddress) error {
errChan := make(chan error, 1)
select {
case c.newTowers <- &newTowerMsg{
addr: addr,
errChan: errChan,
}:
case <-c.pipeline.quit:
return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
}
select {
case err := <-errChan:
return err
case <-c.pipeline.quit:
return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
}
}
// handleNewTower handles a request for a new tower to be added. If the tower
// already exists, then its corresponding sessions, if any, will be set
// considered as candidates.
func (c *TowerClient) handleNewTower(msg *newTowerMsg) error {
// We'll start by updating our persisted state, followed by our
// in-memory state, with the new tower. This might not actually be a new
// tower, but it might include a new address at which it can be reached.
tower, err := c.cfg.DB.CreateTower(msg.addr)
if err != nil {
return err
}
c.candidateTowers.AddCandidate(tower)
// Include all of its corresponding sessions to our set of candidates.
sessions, err := c.cfg.DB.ListClientSessions(&tower.ID)
if err != nil {
return fmt.Errorf("unable to determine sessions for tower %x: "+
"%v", tower.IdentityKey.SerializeCompressed(), err)
}
for id, session := range sessions {
c.candidateSessions[id] = session
}
return nil
}
// RemoveTower removes a watchtower from being considered for future session
// negotiations and from being used for any subsequent backups until it's added
// again. If an address is provided, then this call only serves as a way of
// removing the address from the watchtower instead.
func (c *TowerClient) RemoveTower(pubKey *btcec.PublicKey, addr net.Addr) error {
errChan := make(chan error, 1)
select {
case c.staleTowers <- &staleTowerMsg{
pubKey: pubKey,
addr: addr,
errChan: errChan,
}:
case <-c.pipeline.quit:
return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
}
select {
case err := <-errChan:
return err
case <-c.pipeline.quit:
return ErrClientExiting
case <-c.pipeline.forceQuit:
return ErrClientExiting
}
}
// handleNewTower handles a request for an existing tower to be removed. If none
// of the tower's sessions have pending updates, then they will become inactive
// and removed as candidates. If the active session queue corresponds to any of
// these sessions, a new one will be negotiated.
func (c *TowerClient) handleStaleTower(msg *staleTowerMsg) error {
// We'll load the tower before potentially removing it in order to
// retrieve its ID within the database.
tower, err := c.cfg.DB.LoadTower(msg.pubKey)
if err != nil {
return err
}
// We'll update our persisted state, followed by our in-memory state,
// with the stale tower.
if err := c.cfg.DB.RemoveTower(msg.pubKey, msg.addr); err != nil {
return err
}
c.candidateTowers.RemoveCandidate(tower.ID, msg.addr)
// If an address was provided, then we're only meant to remove the
// address from the tower, so there's nothing left for us to do.
if msg.addr != nil {
return nil
}
// Otherwise, the tower should no longer be used for future session
// negotiations and backups.
pubKey := msg.pubKey.SerializeCompressed()
sessions, err := c.cfg.DB.ListClientSessions(&tower.ID)
if err != nil {
return fmt.Errorf("unable to retrieve sessions for tower %x: "+
"%v", pubKey, err)
}
for sessionID := range sessions {
delete(c.candidateSessions, sessionID)
}
// If our active session queue corresponds to the stale tower, we'll
// proceed to negotiate a new one.
if c.sessionQueue != nil {
activeTower := c.sessionQueue.towerAddr.IdentityKey.SerializeCompressed()
if bytes.Equal(pubKey, activeTower) {
c.sessionQueue = nil
}
}
return nil
}
// RegisteredTowers retrieves the list of watchtowers registered with the
// client.
func (c *TowerClient) RegisteredTowers() ([]*RegisteredTower, error) {
// Retrieve all of our towers along with all of our sessions.
towers, err := c.cfg.DB.ListTowers()
if err != nil {
return nil, err
}
clientSessions, err := c.cfg.DB.ListClientSessions(nil)
if err != nil {
return nil, err
}
// Construct a lookup map that coalesces all of the sessions for a
// specific watchtower.
towerSessions := make(
map[wtdb.TowerID]map[wtdb.SessionID]*wtdb.ClientSession,
)
for id, s := range clientSessions {
sessions, ok := towerSessions[s.TowerID]
if !ok {
sessions = make(map[wtdb.SessionID]*wtdb.ClientSession)
towerSessions[s.TowerID] = sessions
}
sessions[id] = s
}
registeredTowers := make([]*RegisteredTower, 0, len(towerSessions))
for _, tower := range towers {
isActive := c.candidateTowers.IsActive(tower.ID)
registeredTowers = append(registeredTowers, &RegisteredTower{
Tower: tower,
Sessions: towerSessions[tower.ID],
ActiveSessionCandidate: isActive,
})
}
return registeredTowers, nil
}
// LookupTower retrieves a registered watchtower through its public key.
func (c *TowerClient) LookupTower(pubKey *btcec.PublicKey) (*RegisteredTower, error) {
tower, err := c.cfg.DB.LoadTower(pubKey)
if err != nil {
return nil, err
}
towerSessions, err := c.cfg.DB.ListClientSessions(&tower.ID)
if err != nil {
return nil, err
}
return &RegisteredTower{
Tower: tower,
Sessions: towerSessions,
ActiveSessionCandidate: c.candidateTowers.IsActive(tower.ID),
}, nil
}
// Stats returns the in-memory statistics of the client since startup.
func (c *TowerClient) Stats() ClientStats {
return c.stats.Copy()
}
// Policy returns the active client policy configuration.
func (c *TowerClient) Policy() wtpolicy.Policy {
return c.cfg.Policy
}
// logMessage writes information about a message received from a remote peer,
// using directional prepositions to signal whether the message was sent or
// received.

@ -1,10 +1,15 @@
package wtclient
import "fmt"
import (
"fmt"
"sync"
)
// ClientStats is a collection of in-memory statistics of the actions the client
// has performed since its creation.
type ClientStats struct {
mu sync.Mutex
// NumTasksReceived is the total number of backups that are pending to
// be acknowledged by all active and exhausted watchtower sessions.
NumTasksReceived int
@ -29,12 +34,16 @@ type ClientStats struct {
// taskReceived increments the number to backup requests the client has received
// from active channels.
func (s *ClientStats) taskReceived() {
s.mu.Lock()
defer s.mu.Unlock()
s.NumTasksReceived++
}
// taskAccepted increments the number of tasks that have been assigned to active
// session queues, and are awaiting upload to a tower.
func (s *ClientStats) taskAccepted() {
s.mu.Lock()
defer s.mu.Unlock()
s.NumTasksAccepted++
}
@ -43,25 +52,46 @@ func (s *ClientStats) taskAccepted() {
// typically this means that the balance created dust outputs, so it may not be
// worth backing up at all.
func (s *ClientStats) taskIneligible() {
s.mu.Lock()
defer s.mu.Unlock()
s.NumTasksIneligible++
}
// sessionAcquired increments the number of sessions that have been successfully
// negotiated by the client during this execution.
func (s *ClientStats) sessionAcquired() {
s.mu.Lock()
defer s.mu.Unlock()
s.NumSessionsAcquired++
}
// sessionExhausted increments the number of session that have become full as a
// result of accepting backup tasks.
func (s *ClientStats) sessionExhausted() {
s.mu.Lock()
defer s.mu.Unlock()
s.NumSessionsExhausted++
}
// String returns a human readable summary of the client's metrics.
func (s ClientStats) String() string {
func (s *ClientStats) String() string {
s.mu.Lock()
defer s.mu.Unlock()
return fmt.Sprintf("tasks(received=%d accepted=%d ineligible=%d) "+
"sessions(acquired=%d exhausted=%d)", s.NumTasksReceived,
s.NumTasksAccepted, s.NumTasksIneligible, s.NumSessionsAcquired,
s.NumSessionsExhausted)
}
// Copy returns a copy of the current stats.
func (s *ClientStats) Copy() ClientStats {
s.mu.Lock()
defer s.mu.Unlock()
return ClientStats{
NumTasksReceived: s.NumTasksReceived,
NumTasksAccepted: s.NumTasksAccepted,
NumTasksIneligible: s.NumTasksIneligible,
NumSessionsAcquired: s.NumSessionsAcquired,
NumSessionsExhausted: s.NumSessionsExhausted,
}
}