watchtower/wtclient/session_queue: batch upload state updates
This commit is contained in:
parent
65d09fca64
commit
aa2b21117c
688
watchtower/wtclient/session_queue.go
Normal file
688
watchtower/wtclient/session_queue.go
Normal file
@ -0,0 +1,688 @@
|
||||
package wtclient
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/btcec"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/lightningnetwork/lnd/input"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/watchtower/wtdb"
|
||||
"github.com/lightningnetwork/lnd/watchtower/wtserver"
|
||||
"github.com/lightningnetwork/lnd/watchtower/wtwire"
|
||||
)
|
||||
|
||||
// retryInterval is the default duration we will wait between attempting to
|
||||
// connect back out to a tower if the prior state update failed.
|
||||
const retryInterval = 2 * time.Second
|
||||
|
||||
// reserveStatus is an enum that signals how full a particular session is.
|
||||
type reserveStatus uint8
|
||||
|
||||
const (
|
||||
// reserveAvailable indicates that the session has space for at least
|
||||
// one more backup.
|
||||
reserveAvailable reserveStatus = iota
|
||||
|
||||
// reserveExhausted indicates that all slots in the session have been
|
||||
// allocated.
|
||||
reserveExhausted
|
||||
)
|
||||
|
||||
// sessionQueueConfig bundles the resources required by the sessionQueue to
|
||||
// perform its duties. All entries MUST be non-nil.
|
||||
type sessionQueueConfig struct {
|
||||
// ClientSession provides access to the negotiated session parameters
|
||||
// and updating its persistent storage.
|
||||
ClientSession *wtdb.ClientSession
|
||||
|
||||
// ChainHash identifies the chain for which the session's justice
|
||||
// transactions are targeted.
|
||||
ChainHash chainhash.Hash
|
||||
|
||||
// Dial allows the client to dial the tower using it's public key and
|
||||
// net address.
|
||||
Dial func(*btcec.PrivateKey,
|
||||
*lnwire.NetAddress) (wtserver.Peer, error)
|
||||
|
||||
// SendMessage encodes, encrypts, and writes a message to the given peer.
|
||||
SendMessage func(wtserver.Peer, wtwire.Message) error
|
||||
|
||||
// ReadMessage receives, decypts, and decodes a message from the given
|
||||
// peer.
|
||||
ReadMessage func(wtserver.Peer) (wtwire.Message, error)
|
||||
|
||||
// Signer facilitates signing of inputs, used to construct the witnesses
|
||||
// for justice transaction inputs.
|
||||
Signer input.Signer
|
||||
|
||||
// DB provides access to the client's stable storage.
|
||||
DB DB
|
||||
|
||||
// MinBackoff defines the initial backoff applied by the session
|
||||
// queue before reconnecting to the tower after a failed or partially
|
||||
// successful batch is sent. Subsequent backoff durations will grow
|
||||
// exponentially up until MaxBackoff.
|
||||
MinBackoff time.Duration
|
||||
|
||||
// MaxBackoff defines the maximum backoff applied by the session
|
||||
// queue before reconnecting to the tower after a failed or partially
|
||||
// successful batch is sent. If the exponential backoff produces a
|
||||
// timeout greater than this value, the backoff duration will be clamped
|
||||
// to MaxBackoff.
|
||||
MaxBackoff time.Duration
|
||||
}
|
||||
|
||||
// sessionQueue implements a reliable queue that will encrypt and send accepted
|
||||
// backups to the watchtower specified in the config's ClientSession. Calling
|
||||
// Quit will attempt to perform a clean shutdown by receiving an ACK from the
|
||||
// tower for all pending backups before exiting. The clean shutdown can be
|
||||
// aborted by using ForceQuit, which will attempt to shutdown the queue
|
||||
// immediately.
|
||||
type sessionQueue struct {
|
||||
started sync.Once
|
||||
stopped sync.Once
|
||||
forced sync.Once
|
||||
|
||||
cfg *sessionQueueConfig
|
||||
|
||||
commitQueue *list.List
|
||||
pendingQueue *list.List
|
||||
queueMtx sync.Mutex
|
||||
queueCond *sync.Cond
|
||||
|
||||
localInit *wtwire.Init
|
||||
towerAddr *lnwire.NetAddress
|
||||
|
||||
seqNum uint16
|
||||
|
||||
retryBackoff time.Duration
|
||||
|
||||
quit chan struct{}
|
||||
forceQuit chan struct{}
|
||||
shutdown chan struct{}
|
||||
}
|
||||
|
||||
// newSessionQueue intiializes a fresh sessionQueue.
|
||||
func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue {
|
||||
localInit := wtwire.NewInitMessage(
|
||||
lnwire.NewRawFeatureVector(wtwire.WtSessionsRequired),
|
||||
cfg.ChainHash,
|
||||
)
|
||||
|
||||
towerAddr := &lnwire.NetAddress{
|
||||
IdentityKey: cfg.ClientSession.Tower.IdentityKey,
|
||||
Address: cfg.ClientSession.Tower.Addresses[0],
|
||||
}
|
||||
|
||||
sq := &sessionQueue{
|
||||
cfg: cfg,
|
||||
commitQueue: list.New(),
|
||||
pendingQueue: list.New(),
|
||||
localInit: localInit,
|
||||
towerAddr: towerAddr,
|
||||
seqNum: cfg.ClientSession.SeqNum,
|
||||
retryBackoff: cfg.MinBackoff,
|
||||
quit: make(chan struct{}),
|
||||
forceQuit: make(chan struct{}),
|
||||
shutdown: make(chan struct{}),
|
||||
}
|
||||
sq.queueCond = sync.NewCond(&sq.queueMtx)
|
||||
|
||||
sq.restoreCommittedUpdates()
|
||||
|
||||
return sq
|
||||
}
|
||||
|
||||
// Start idempotently starts the sessionQueue so that it can begin accepting
|
||||
// backups.
|
||||
func (q *sessionQueue) Start() {
|
||||
q.started.Do(func() {
|
||||
// TODO(conner): load prior committed state updates from disk an
|
||||
// populate in queue.
|
||||
|
||||
go q.sessionManager()
|
||||
})
|
||||
}
|
||||
|
||||
// Stop idempotently stops the sessionQueue by initiating a clean shutdown that
|
||||
// will clear all pending tasks in the queue before returning to the caller.
|
||||
func (q *sessionQueue) Stop() {
|
||||
q.stopped.Do(func() {
|
||||
log.Debugf("Stopping session queue %s", q.ID())
|
||||
|
||||
close(q.quit)
|
||||
q.signalUntilShutdown()
|
||||
|
||||
// Skip log if we also force quit.
|
||||
select {
|
||||
case <-q.forceQuit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
log.Debugf("Session queue %s successfully stopped", q.ID())
|
||||
})
|
||||
}
|
||||
|
||||
// ForceQuit idempotently aborts any clean shutdown in progress and returns to
|
||||
// he caller after all lingering goroutines have spun down.
|
||||
func (q *sessionQueue) ForceQuit() {
|
||||
q.forced.Do(func() {
|
||||
log.Infof("Force quitting session queue %s", q.ID())
|
||||
|
||||
close(q.forceQuit)
|
||||
q.signalUntilShutdown()
|
||||
|
||||
log.Infof("Session queue %s unclean shutdown complete", q.ID())
|
||||
})
|
||||
}
|
||||
|
||||
// ID returns the wtdb.SessionID for the queue, which can be used to uniquely
|
||||
// identify this a particular queue.
|
||||
func (q *sessionQueue) ID() *wtdb.SessionID {
|
||||
return &q.cfg.ClientSession.ID
|
||||
}
|
||||
|
||||
// AcceptTask attempts to queue a backupTask for delivery to the sessionQueue's
|
||||
// tower. The session will only be accepted if the queue is not already
|
||||
// exhausted and the task is successfully bound to the ClientSession.
|
||||
func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
q.queueCond.L.Lock()
|
||||
|
||||
// Examine the current reserve status of the session queue.
|
||||
curStatus := q.reserveStatus()
|
||||
switch curStatus {
|
||||
|
||||
// The session queue is exhausted, and cannot accept the task because it
|
||||
// is full. Reject the task such that it can be tried against a
|
||||
// different session.
|
||||
case reserveExhausted:
|
||||
q.queueCond.L.Unlock()
|
||||
return curStatus, false
|
||||
|
||||
// The session queue is not exhausted. Compute the sweep and reward
|
||||
// outputs as a function of the session parameters. If the outputs are
|
||||
// dusty or uneconomical to backup, the task is rejected and will not be
|
||||
// tried again.
|
||||
//
|
||||
// TODO(conner): queue backups and retry with different session params.
|
||||
case reserveAvailable:
|
||||
err := task.bindSession(q.cfg.ClientSession)
|
||||
if err != nil {
|
||||
q.queueCond.L.Unlock()
|
||||
log.Debugf("SessionQueue %s rejected backup chanid=%s "+
|
||||
"commit-height=%d: %v", q.ID(), task.id.ChanID,
|
||||
task.id.CommitHeight, err)
|
||||
return curStatus, false
|
||||
}
|
||||
}
|
||||
|
||||
// The sweep and reward outputs satisfy the session's policy, queue the
|
||||
// task for final signing and delivery.
|
||||
q.pendingQueue.PushBack(task)
|
||||
|
||||
// Finally, compute the session's *new* reserve status. This will be
|
||||
// used by the client to determine if it can continue using this session
|
||||
// queue, or if it should negotiate a new one.
|
||||
newStatus := q.reserveStatus()
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
q.queueCond.Signal()
|
||||
|
||||
return newStatus, true
|
||||
}
|
||||
|
||||
// updateWithSeqNum stores a CommittedUpdate with its assigned sequence number.
|
||||
// This allows committed updates to be sorted after a restart, and added to the
|
||||
// commitQueue in the proper order for delivery.
|
||||
type updateWithSeqNum struct {
|
||||
seqNum uint16
|
||||
update *wtdb.CommittedUpdate
|
||||
}
|
||||
|
||||
// restoreCommittedUpdates processes any CommittedUpdates loaded on startup by
|
||||
// sorting them in ascending order of sequence numbers and adding them to the
|
||||
// commitQueue. These will be sent before any pending updates are processed.
|
||||
func (q *sessionQueue) restoreCommittedUpdates() {
|
||||
committedUpdates := q.cfg.ClientSession.CommittedUpdates
|
||||
|
||||
// Construct and unordered slice of all committed updates with their
|
||||
// assigned sequence numbers.
|
||||
sortedUpdates := make([]updateWithSeqNum, 0, len(committedUpdates))
|
||||
for seqNum, update := range committedUpdates {
|
||||
sortedUpdates = append(sortedUpdates, updateWithSeqNum{
|
||||
seqNum: seqNum,
|
||||
update: update,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort the resulting slice by increasing sequence number.
|
||||
sort.Slice(sortedUpdates, func(i, j int) bool {
|
||||
return sortedUpdates[i].seqNum < sortedUpdates[j].seqNum
|
||||
})
|
||||
|
||||
// Finally, add the sorted, committed updates to he commitQueue. These
|
||||
// updates will be prioritized before any new tasks are assigned to the
|
||||
// sessionQueue. The queue will begin uploading any tasks in the
|
||||
// commitQueue as soon as it is started, e.g. during client
|
||||
// initialization when detecting that this session has unacked updates.
|
||||
for _, update := range sortedUpdates {
|
||||
q.commitQueue.PushBack(update)
|
||||
}
|
||||
}
|
||||
|
||||
// sessionManager is the primary event loop for the sessionQueue, and is
|
||||
// responsible for encrypting and sending accepted tasks to the tower.
|
||||
func (q *sessionQueue) sessionManager() {
|
||||
defer close(q.shutdown)
|
||||
|
||||
for {
|
||||
q.queueCond.L.Lock()
|
||||
for q.commitQueue.Len() == 0 &&
|
||||
q.pendingQueue.Len() == 0 {
|
||||
|
||||
q.queueCond.Wait()
|
||||
|
||||
select {
|
||||
case <-q.quit:
|
||||
if q.commitQueue.Len() == 0 &&
|
||||
q.pendingQueue.Len() == 0 {
|
||||
q.queueCond.L.Unlock()
|
||||
return
|
||||
}
|
||||
case <-q.forceQuit:
|
||||
q.queueCond.L.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
// Exit immediately if a force quit has been requested. If the
|
||||
// either of the queues still has state updates to send to the
|
||||
// tower, we may never exit in the above case if we are unable
|
||||
// to reach the tower for some reason.
|
||||
select {
|
||||
case <-q.forceQuit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Initiate a new connection to the watchtower and attempt to
|
||||
// drain all pending tasks.
|
||||
q.drainBackups()
|
||||
}
|
||||
}
|
||||
|
||||
// drainBackups attempts to send all pending updates in the queue to the tower.
|
||||
func (q *sessionQueue) drainBackups() {
|
||||
// First, check that we are able to dial this session's tower.
|
||||
conn, err := q.cfg.Dial(q.cfg.ClientSession.SessionPrivKey, q.towerAddr)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to dial watchtower at %v: %v",
|
||||
q.towerAddr, err)
|
||||
|
||||
q.increaseBackoff()
|
||||
select {
|
||||
case <-time.After(q.retryBackoff):
|
||||
case <-q.forceQuit:
|
||||
}
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Begin draining the queue of pending state updates. Before the first
|
||||
// update is sent, we will precede it with an Init message. If the first
|
||||
// is successful, subsequent updates can be streamed without sending an
|
||||
// Init.
|
||||
for sendInit := true; ; sendInit = false {
|
||||
// Generate the next state update to upload to the tower. This
|
||||
// method will first proceed in dequeueing committed updates
|
||||
// before attempting to dequeue any pending updates.
|
||||
stateUpdate, isPending, err := q.nextStateUpdate()
|
||||
if err != nil {
|
||||
log.Errorf("Unable to get next state update: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Now, send the state update to the tower and wait for a reply.
|
||||
err = q.sendStateUpdate(
|
||||
conn, stateUpdate, q.localInit, sendInit, isPending,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to send state update: %v", err)
|
||||
|
||||
q.increaseBackoff()
|
||||
select {
|
||||
case <-time.After(q.retryBackoff):
|
||||
case <-q.forceQuit:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// If the last task was backed up successfully, we'll exit and
|
||||
// continue once more tasks are added to the queue. We'll also
|
||||
// clear any accumulated backoff as this batch was able to be
|
||||
// sent reliably.
|
||||
if stateUpdate.IsComplete == 1 {
|
||||
q.resetBackoff()
|
||||
return
|
||||
}
|
||||
|
||||
// Always apply a small delay between sends, which makes the
|
||||
// unit tests more reliable. If we were requested to back off,
|
||||
// when we will do so.
|
||||
select {
|
||||
case <-time.After(time.Millisecond):
|
||||
case <-q.forceQuit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nextStateUpdate returns the next wtwire.StateUpdate to upload to the tower.
|
||||
// If any committed updates are present, this method will reconstruct the state
|
||||
// update from the committed update using the current last applied value found
|
||||
// in the database. Otherwise, it will select the next pending update, craft the
|
||||
// payload, and commit an update before returning the state update to send. The
|
||||
// boolean value in the response is true if the state update is taken from the
|
||||
// pending queue, allowing the caller to remove the update from either the
|
||||
// commit or pending queue if the update is successfully acked.
|
||||
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
|
||||
var (
|
||||
seqNum uint16
|
||||
update *wtdb.CommittedUpdate
|
||||
isLast bool
|
||||
isPending bool
|
||||
)
|
||||
|
||||
q.queueCond.L.Lock()
|
||||
switch {
|
||||
|
||||
// If the commit queue is non-empty, parse the next committed update.
|
||||
case q.commitQueue.Len() > 0:
|
||||
next := q.commitQueue.Front()
|
||||
updateWithSeq := next.Value.(updateWithSeqNum)
|
||||
|
||||
seqNum = updateWithSeq.seqNum
|
||||
update = updateWithSeq.update
|
||||
|
||||
// If this is the last item in the commit queue and no items
|
||||
// exist in the pending queue, we will use the IsComplete flag
|
||||
// in the StateUpdate to signal that the tower can release the
|
||||
// connection after replying to free up resources.
|
||||
isLast = q.commitQueue.Len() == 1 && q.pendingQueue.Len() == 0
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
log.Debugf("Reprocessing committed state update for "+
|
||||
"session=%s seqnum=%d", q.ID(), seqNum)
|
||||
|
||||
// Otherwise, craft and commit the next update from the pending queue.
|
||||
default:
|
||||
isPending = true
|
||||
|
||||
// Determine the current sequence number to apply for this
|
||||
// pending update.
|
||||
seqNum = q.seqNum + 1
|
||||
|
||||
// Obtain the next task from the queue.
|
||||
next := q.pendingQueue.Front()
|
||||
task := next.Value.(*backupTask)
|
||||
|
||||
// If this is the last item in the pending queue, we will use
|
||||
// the IsComplete flag in the StateUpdate to signal that the
|
||||
// tower can release the connection after replying to free up
|
||||
// resources.
|
||||
isLast = q.pendingQueue.Len() == 1
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
hint, encBlob, err := task.craftSessionPayload(q.cfg.Signer)
|
||||
if err != nil {
|
||||
// TODO(conner): mark will not send
|
||||
return nil, false, fmt.Errorf("unable to craft "+
|
||||
"session payload: %v", err)
|
||||
}
|
||||
// TODO(conner): special case other obscure errors
|
||||
|
||||
update = &wtdb.CommittedUpdate{
|
||||
BackupID: task.id,
|
||||
Hint: hint,
|
||||
EncryptedBlob: encBlob,
|
||||
}
|
||||
|
||||
log.Debugf("Committing state update for session=%s seqnum=%d",
|
||||
q.ID(), seqNum)
|
||||
}
|
||||
|
||||
// Before sending the task to the tower, commit the state update
|
||||
// to disk using the assigned sequence number. If this task has already
|
||||
// been committed, the call will succeed and only be used for the
|
||||
// purpose of obtaining the last applied value to send to the tower.
|
||||
//
|
||||
// This step ensures that if we crash before receiving an ack that we
|
||||
// will retransmit the same update. If the tower successfully received
|
||||
// the update from before, it will reply with an ACK regardless of what
|
||||
// we send the next time. This step ensures that if we reliably send the
|
||||
// same update for a given sequence number, to prevent us from thinking
|
||||
// we backed up a state when we instead backed up another.
|
||||
lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), seqNum, update)
|
||||
if err != nil {
|
||||
// TODO(conner): mark failed/reschedule
|
||||
return nil, false, fmt.Errorf("unable to commit state update "+
|
||||
"for session=%s seqnum=%d: %v", q.ID(), seqNum, err)
|
||||
}
|
||||
|
||||
stateUpdate := &wtwire.StateUpdate{
|
||||
SeqNum: seqNum,
|
||||
LastApplied: lastApplied,
|
||||
Hint: update.Hint,
|
||||
EncryptedBlob: update.EncryptedBlob,
|
||||
}
|
||||
|
||||
// Set the IsComplete flag if this is the last queued item.
|
||||
if isLast {
|
||||
stateUpdate.IsComplete = 1
|
||||
}
|
||||
|
||||
return stateUpdate, isPending, nil
|
||||
}
|
||||
|
||||
// sendStateUpdate sends a wtwire.StateUpdate to the watchtower and processes
|
||||
// the ACK before returning. If sendInit is true, this method will first send
|
||||
// the localInit message and verify that the tower supports our required feature
|
||||
// bits. And error is returned if any part of the send fails. The boolean return
|
||||
// variable indicates whether or not we should back off before attempting to
|
||||
// send the next state update.
|
||||
func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
|
||||
stateUpdate *wtwire.StateUpdate, localInit *wtwire.Init,
|
||||
sendInit, isPending bool) error {
|
||||
|
||||
// If this is the first message being sent to the tower, we must send an
|
||||
// Init message to establish that server supports the features we
|
||||
// require.
|
||||
if sendInit {
|
||||
// Send Init to tower.
|
||||
err := q.cfg.SendMessage(conn, q.localInit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Receive Init from tower.
|
||||
remoteMsg, err := q.cfg.ReadMessage(conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remoteInit, ok := remoteMsg.(*wtwire.Init)
|
||||
if !ok {
|
||||
return fmt.Errorf("watchtower responded with %T to "+
|
||||
"Init", remoteMsg)
|
||||
}
|
||||
|
||||
// Validate Init.
|
||||
err = q.localInit.CheckRemoteInit(
|
||||
remoteInit, wtwire.FeatureNames,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Send StateUpdate to tower.
|
||||
err := q.cfg.SendMessage(conn, stateUpdate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Receive StateUpdate from tower.
|
||||
remoteMsg, err := q.cfg.ReadMessage(conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stateUpdateReply, ok := remoteMsg.(*wtwire.StateUpdateReply)
|
||||
if !ok {
|
||||
return fmt.Errorf("watchtower responded with %T to StateUpdate",
|
||||
remoteMsg)
|
||||
}
|
||||
|
||||
// Process the reply from the tower.
|
||||
switch stateUpdateReply.Code {
|
||||
|
||||
// The tower reported a successful update, validate the response and
|
||||
// record the last applied returned.
|
||||
case wtwire.CodeOK:
|
||||
|
||||
// TODO(conner): handle other error cases properly, ban towers, etc.
|
||||
default:
|
||||
err := fmt.Errorf("received error code %s in "+
|
||||
"StateUpdateReply from tower=%x session=%s",
|
||||
stateUpdateReply.Code,
|
||||
conn.RemotePub().SerializeCompressed(), q.ID())
|
||||
log.Warnf("Unable to upload state update: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
lastApplied := stateUpdateReply.LastApplied
|
||||
err = q.cfg.DB.AckUpdate(q.ID(), stateUpdate.SeqNum, lastApplied)
|
||||
switch {
|
||||
case err == wtdb.ErrUnallocatedLastApplied:
|
||||
// TODO(conner): borked watchtower
|
||||
err = fmt.Errorf("unable to ack update=%d session=%s: %v",
|
||||
stateUpdate.SeqNum, q.ID(), err)
|
||||
log.Errorf("Failed to ack update: %v", err)
|
||||
return err
|
||||
|
||||
case err == wtdb.ErrLastAppliedReversion:
|
||||
// TODO(conner): borked watchtower
|
||||
err = fmt.Errorf("unable to ack update=%d session=%s: %v",
|
||||
stateUpdate.SeqNum, q.ID(), err)
|
||||
log.Errorf("Failed to ack update: %v", err)
|
||||
return err
|
||||
|
||||
case err != nil:
|
||||
err = fmt.Errorf("unable to ack update=%d session=%s: %v",
|
||||
stateUpdate.SeqNum, q.ID(), err)
|
||||
log.Errorf("Failed to ack update: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Removing update session=%s seqnum=%d is_pending=%v "+
|
||||
"from memory", q.ID(), stateUpdate.SeqNum, isPending)
|
||||
|
||||
q.queueCond.L.Lock()
|
||||
if isPending {
|
||||
// If a pending update was successfully sent, increment the
|
||||
// sequence number and remove the item from the queue. This
|
||||
// ensures the total number of backups in the session remains
|
||||
// unchanged, which maintains the external view of the session's
|
||||
// reserve status.
|
||||
q.seqNum++
|
||||
q.pendingQueue.Remove(q.pendingQueue.Front())
|
||||
} else {
|
||||
// Otherwise, simply remove the update from the committed queue.
|
||||
// This has no effect on the queues reserve status since the
|
||||
// update had already been committed.
|
||||
q.commitQueue.Remove(q.commitQueue.Front())
|
||||
}
|
||||
q.queueCond.L.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reserveStatus returns a reserveStatus indicating whether or not the
|
||||
// sessionQueue can accept another task. reserveAvailable is returned when a
|
||||
// task can be accepted, and reserveExhausted is returned if the all slots in
|
||||
// the session have been allocated.
|
||||
//
|
||||
// NOTE: This method MUST be called with queueCond's exclusive lock held.
|
||||
func (q *sessionQueue) reserveStatus() reserveStatus {
|
||||
numPending := uint32(q.pendingQueue.Len())
|
||||
maxUpdates := uint32(q.cfg.ClientSession.Policy.MaxUpdates)
|
||||
|
||||
log.Debugf("SessionQueue %s reserveStatus seqnum=%d pending=%d "+
|
||||
"max-updates=%d", q.ID(), q.seqNum, numPending, maxUpdates)
|
||||
|
||||
if uint32(q.seqNum)+numPending < maxUpdates {
|
||||
return reserveAvailable
|
||||
}
|
||||
|
||||
return reserveExhausted
|
||||
|
||||
}
|
||||
|
||||
// resetBackoff returns the connection backoff the minimum configured backoff.
|
||||
func (q *sessionQueue) resetBackoff() {
|
||||
q.retryBackoff = q.cfg.MinBackoff
|
||||
}
|
||||
|
||||
// increaseBackoff doubles the current connection backoff, clamping to the
|
||||
// configured maximum backoff if it would exceed the limit.
|
||||
func (q *sessionQueue) increaseBackoff() {
|
||||
q.retryBackoff *= 2
|
||||
if q.retryBackoff > q.cfg.MaxBackoff {
|
||||
q.retryBackoff = q.cfg.MaxBackoff
|
||||
}
|
||||
}
|
||||
|
||||
// signalUntilShutdown strobes the sessionQueue's condition variable until the
|
||||
// main event loop exits.
|
||||
func (q *sessionQueue) signalUntilShutdown() {
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Millisecond):
|
||||
q.queueCond.Signal()
|
||||
case <-q.shutdown:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sessionQueueSet maintains a mapping of SessionIDs to their corresponding
|
||||
// sessionQueue.
|
||||
type sessionQueueSet map[wtdb.SessionID]*sessionQueue
|
||||
|
||||
// Add inserts a sessionQueue into the sessionQueueSet.
|
||||
func (s *sessionQueueSet) Add(sessionQueue *sessionQueue) {
|
||||
(*s)[*sessionQueue.ID()] = sessionQueue
|
||||
}
|
||||
|
||||
// ApplyAndWait executes the nil-adic function returned from getApply for each
|
||||
// sessionQueue in the set in parallel, then waits for all of them to finish
|
||||
// before returning to the caller.
|
||||
func (s *sessionQueueSet) ApplyAndWait(getApply func(*sessionQueue) func()) {
|
||||
var wg sync.WaitGroup
|
||||
for _, sessionq := range *s {
|
||||
wg.Add(1)
|
||||
go func(sq *sessionQueue) {
|
||||
defer wg.Done()
|
||||
getApply(sq)()
|
||||
}(sessionq)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
Loading…
Reference in New Issue
Block a user