watchtower/wtclient: filter non-target towers from candidates

This commit is contained in:
Conner Fromknecht 2019-06-20 18:16:26 -07:00
parent 0506b1e587
commit bf042f1271
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 27 additions and 1 deletions

@ -18,6 +18,10 @@ type TowerCandidateIterator interface {
// to return results in any particular order. If no more candidates are // to return results in any particular order. If no more candidates are
// available, ErrTowerCandidatesExhausted is returned. // available, ErrTowerCandidatesExhausted is returned.
Next() (*wtdb.Tower, error) Next() (*wtdb.Tower, error)
// TowerIDs returns the set of tower IDs contained in the iterator,
// which can be used to filter candidate sessions for the active tower.
TowerIDs() map[wtdb.TowerID]struct{}
} }
// towerListIterator is a linked-list backed TowerCandidateIterator. // towerListIterator is a linked-list backed TowerCandidateIterator.
@ -58,6 +62,17 @@ func (t *towerListIterator) Reset() error {
return nil return nil
} }
// TowerIDs returns the set of tower IDs contained in the iterator, which can be
// used to filter candidate sessions for the active tower.
func (t *towerListIterator) TowerIDs() map[wtdb.TowerID]struct{} {
ids := make(map[wtdb.TowerID]struct{})
for e := t.candidates.Front(); e != nil; e = e.Next() {
tower := e.Value.(*wtdb.Tower)
ids[tower.ID] = struct{}{}
}
return ids
}
// Next returns the next candidate tower. This iterator will always return // Next returns the next candidate tower. This iterator will always return
// candidates in the order given when the iterator was instantiated. If no more // candidates in the order given when the iterator was instantiated. If no more
// candidates are available, ErrTowerCandidatesExhausted is returned. // candidates are available, ErrTowerCandidatesExhausted is returned.

@ -151,6 +151,7 @@ type TowerClient struct {
negotiator SessionNegotiator negotiator SessionNegotiator
candidateSessions map[wtdb.SessionID]*wtdb.ClientSession candidateSessions map[wtdb.SessionID]*wtdb.ClientSession
activeSessions sessionQueueSet activeSessions sessionQueueSet
targetTowerIDs map[wtdb.TowerID]struct{}
sessionQueue *sessionQueue sessionQueue *sessionQueue
prevTask *backupTask prevTask *backupTask
@ -198,10 +199,14 @@ func New(config *Config) (*TowerClient, error) {
log.Infof("Using private watchtower %s, offering policy %s", log.Infof("Using private watchtower %s, offering policy %s",
cfg.PrivateTower, cfg.Policy) cfg.PrivateTower, cfg.Policy)
candidates := newTowerListIterator(tower)
targetTowerIDs := candidates.TowerIDs()
c := &TowerClient{ c := &TowerClient{
cfg: cfg, cfg: cfg,
pipeline: newTaskPipeline(), pipeline: newTaskPipeline(),
activeSessions: make(sessionQueueSet), activeSessions: make(sessionQueueSet),
targetTowerIDs: targetTowerIDs,
statTicker: time.NewTicker(DefaultStatInterval), statTicker: time.NewTicker(DefaultStatInterval),
forceQuit: make(chan struct{}), forceQuit: make(chan struct{}),
} }
@ -213,7 +218,7 @@ func New(config *Config) (*TowerClient, error) {
SendMessage: c.sendMessage, SendMessage: c.sendMessage,
ReadMessage: c.readMessage, ReadMessage: c.readMessage,
Dial: c.dial, Dial: c.dial,
Candidates: newTowerListIterator(tower), Candidates: candidates,
MinBackoff: cfg.MinBackoff, MinBackoff: cfg.MinBackoff,
MaxBackoff: cfg.MaxBackoff, MaxBackoff: cfg.MaxBackoff,
}) })
@ -526,6 +531,12 @@ func (c *TowerClient) nextSessionQueue() *sessionQueue {
continue continue
} }
// Skip any sessions that are still active, but are not for the
// users currently configured tower.
if _, ok := c.targetTowerIDs[sessionInfo.TowerID]; !ok {
continue
}
candidateSession = sessionInfo candidateSession = sessionInfo
break break
} }