From bf042f1271cd5d4a1e726f4544bb9c1360e84150 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 20 Jun 2019 18:16:26 -0700 Subject: [PATCH] watchtower/wtclient: filter non-target towers from candidates --- watchtower/wtclient/candidate_iterator.go | 15 +++++++++++++++ watchtower/wtclient/client.go | 13 ++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/watchtower/wtclient/candidate_iterator.go b/watchtower/wtclient/candidate_iterator.go index aafdee1a..5dffef55 100644 --- a/watchtower/wtclient/candidate_iterator.go +++ b/watchtower/wtclient/candidate_iterator.go @@ -18,6 +18,10 @@ type TowerCandidateIterator interface { // to return results in any particular order. If no more candidates are // available, ErrTowerCandidatesExhausted is returned. Next() (*wtdb.Tower, error) + + // TowerIDs returns the set of tower IDs contained in the iterator, + // which can be used to filter candidate sessions for the active tower. + TowerIDs() map[wtdb.TowerID]struct{} } // towerListIterator is a linked-list backed TowerCandidateIterator. @@ -58,6 +62,17 @@ func (t *towerListIterator) Reset() error { return nil } +// TowerIDs returns the set of tower IDs contained in the iterator, which can be +// used to filter candidate sessions for the active tower. +func (t *towerListIterator) TowerIDs() map[wtdb.TowerID]struct{} { + ids := make(map[wtdb.TowerID]struct{}) + for e := t.candidates.Front(); e != nil; e = e.Next() { + tower := e.Value.(*wtdb.Tower) + ids[tower.ID] = struct{}{} + } + return ids +} + // Next returns the next candidate tower. This iterator will always return // candidates in the order given when the iterator was instantiated. If no more // candidates are available, ErrTowerCandidatesExhausted is returned. diff --git a/watchtower/wtclient/client.go b/watchtower/wtclient/client.go index 2e270d5a..7e83cdd9 100644 --- a/watchtower/wtclient/client.go +++ b/watchtower/wtclient/client.go @@ -151,6 +151,7 @@ type TowerClient struct { negotiator SessionNegotiator candidateSessions map[wtdb.SessionID]*wtdb.ClientSession activeSessions sessionQueueSet + targetTowerIDs map[wtdb.TowerID]struct{} sessionQueue *sessionQueue prevTask *backupTask @@ -198,10 +199,14 @@ func New(config *Config) (*TowerClient, error) { log.Infof("Using private watchtower %s, offering policy %s", cfg.PrivateTower, cfg.Policy) + candidates := newTowerListIterator(tower) + targetTowerIDs := candidates.TowerIDs() + c := &TowerClient{ cfg: cfg, pipeline: newTaskPipeline(), activeSessions: make(sessionQueueSet), + targetTowerIDs: targetTowerIDs, statTicker: time.NewTicker(DefaultStatInterval), forceQuit: make(chan struct{}), } @@ -213,7 +218,7 @@ func New(config *Config) (*TowerClient, error) { SendMessage: c.sendMessage, ReadMessage: c.readMessage, Dial: c.dial, - Candidates: newTowerListIterator(tower), + Candidates: candidates, MinBackoff: cfg.MinBackoff, MaxBackoff: cfg.MaxBackoff, }) @@ -526,6 +531,12 @@ func (c *TowerClient) nextSessionQueue() *sessionQueue { continue } + // Skip any sessions that are still active, but are not for the + // users currently configured tower. + if _, ok := c.targetTowerIDs[sessionInfo.TowerID]; !ok { + continue + } + candidateSession = sessionInfo break }