watchtower/wtdb: return sorted ClientSession.CommittedUpdates
This commit replaces the map-based CommittedUpdates field with a slice. When reading from disk, these will already be sorted by bbolt, so the client restore the updates as presented without needing to sort them first. Since the key in the map variant was the sequence number, we refactor the CommittedUpdate struct to have a sequence number and an embedded CommittedUpdateBody (which is equivalent to the old CommittedUpdate). The database is then expected to populate the sequence number from the key on disk. Since the sequence number is now directly integrated in the CommittedUpdate struct, this allow allows us to remove the now redundant seqNum argument from CommitUpdate.
This commit is contained in:
parent
3509c0c991
commit
5ad9530502
@ -61,7 +61,7 @@ type DB interface {
|
||||
// hasn't been ACK'd by the tower. The sequence number of the update
|
||||
// should be exactly one greater than the existing entry, and less that
|
||||
// or equal to the session's MaxUpdates.
|
||||
CommitUpdate(id *wtdb.SessionID, seqNum uint16,
|
||||
CommitUpdate(id *wtdb.SessionID,
|
||||
update *wtdb.CommittedUpdate) (uint16, error)
|
||||
|
||||
// AckUpdate records an acknowledgment from the watchtower that the
|
||||
|
@ -3,7 +3,6 @@ package wtclient
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -133,7 +132,11 @@ func newSessionQueue(cfg *sessionQueueConfig) *sessionQueue {
|
||||
}
|
||||
sq.queueCond = sync.NewCond(&sq.queueMtx)
|
||||
|
||||
sq.restoreCommittedUpdates()
|
||||
// The database should return them in sorted order, and session queue's
|
||||
// sequence number will be equal to that of the last committed update.
|
||||
for _, update := range sq.cfg.ClientSession.CommittedUpdates {
|
||||
sq.commitQueue.PushBack(update)
|
||||
}
|
||||
|
||||
return sq
|
||||
}
|
||||
@ -237,45 +240,6 @@ func (q *sessionQueue) AcceptTask(task *backupTask) (reserveStatus, bool) {
|
||||
return newStatus, true
|
||||
}
|
||||
|
||||
// updateWithSeqNum stores a CommittedUpdate with its assigned sequence number.
|
||||
// This allows committed updates to be sorted after a restart, and added to the
|
||||
// commitQueue in the proper order for delivery.
|
||||
type updateWithSeqNum struct {
|
||||
seqNum uint16
|
||||
update *wtdb.CommittedUpdate
|
||||
}
|
||||
|
||||
// restoreCommittedUpdates processes any CommittedUpdates loaded on startup by
|
||||
// sorting them in ascending order of sequence numbers and adding them to the
|
||||
// commitQueue. These will be sent before any pending updates are processed.
|
||||
func (q *sessionQueue) restoreCommittedUpdates() {
|
||||
committedUpdates := q.cfg.ClientSession.CommittedUpdates
|
||||
|
||||
// Construct and unordered slice of all committed updates with their
|
||||
// assigned sequence numbers.
|
||||
sortedUpdates := make([]updateWithSeqNum, 0, len(committedUpdates))
|
||||
for seqNum, update := range committedUpdates {
|
||||
sortedUpdates = append(sortedUpdates, updateWithSeqNum{
|
||||
seqNum: seqNum,
|
||||
update: update,
|
||||
})
|
||||
}
|
||||
|
||||
// Sort the resulting slice by increasing sequence number.
|
||||
sort.Slice(sortedUpdates, func(i, j int) bool {
|
||||
return sortedUpdates[i].seqNum < sortedUpdates[j].seqNum
|
||||
})
|
||||
|
||||
// Finally, add the sorted, committed updates to he commitQueue. These
|
||||
// updates will be prioritized before any new tasks are assigned to the
|
||||
// sessionQueue. The queue will begin uploading any tasks in the
|
||||
// commitQueue as soon as it is started, e.g. during client
|
||||
// initialization when detecting that this session has unacked updates.
|
||||
for _, update := range sortedUpdates {
|
||||
q.commitQueue.PushBack(update)
|
||||
}
|
||||
}
|
||||
|
||||
// sessionManager is the primary event loop for the sessionQueue, and is
|
||||
// responsible for encrypting and sending accepted tasks to the tower.
|
||||
func (q *sessionQueue) sessionManager() {
|
||||
@ -396,7 +360,7 @@ func (q *sessionQueue) drainBackups() {
|
||||
func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
|
||||
var (
|
||||
seqNum uint16
|
||||
update *wtdb.CommittedUpdate
|
||||
update wtdb.CommittedUpdate
|
||||
isLast bool
|
||||
isPending bool
|
||||
)
|
||||
@ -407,10 +371,9 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
|
||||
// If the commit queue is non-empty, parse the next committed update.
|
||||
case q.commitQueue.Len() > 0:
|
||||
next := q.commitQueue.Front()
|
||||
updateWithSeq := next.Value.(updateWithSeqNum)
|
||||
|
||||
seqNum = updateWithSeq.seqNum
|
||||
update = updateWithSeq.update
|
||||
update = next.Value.(wtdb.CommittedUpdate)
|
||||
seqNum = update.SeqNum
|
||||
|
||||
// If this is the last item in the commit queue and no items
|
||||
// exist in the pending queue, we will use the IsComplete flag
|
||||
@ -449,10 +412,13 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
|
||||
}
|
||||
// TODO(conner): special case other obscure errors
|
||||
|
||||
update = &wtdb.CommittedUpdate{
|
||||
BackupID: task.id,
|
||||
Hint: hint,
|
||||
EncryptedBlob: encBlob,
|
||||
update = wtdb.CommittedUpdate{
|
||||
SeqNum: seqNum,
|
||||
CommittedUpdateBody: wtdb.CommittedUpdateBody{
|
||||
BackupID: task.id,
|
||||
Hint: hint,
|
||||
EncryptedBlob: encBlob,
|
||||
},
|
||||
}
|
||||
|
||||
log.Debugf("Committing state update for session=%s seqnum=%d",
|
||||
@ -470,7 +436,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
|
||||
// we send the next time. This step ensures that if we reliably send the
|
||||
// same update for a given sequence number, to prevent us from thinking
|
||||
// we backed up a state when we instead backed up another.
|
||||
lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), seqNum, update)
|
||||
lastApplied, err := q.cfg.DB.CommitUpdate(q.ID(), &update)
|
||||
if err != nil {
|
||||
// TODO(conner): mark failed/reschedule
|
||||
return nil, false, fmt.Errorf("unable to commit state update "+
|
||||
@ -478,7 +444,7 @@ func (q *sessionQueue) nextStateUpdate() (*wtwire.StateUpdate, bool, error) {
|
||||
}
|
||||
|
||||
stateUpdate := &wtwire.StateUpdate{
|
||||
SeqNum: seqNum,
|
||||
SeqNum: update.SeqNum,
|
||||
LastApplied: lastApplied,
|
||||
Hint: update.Hint,
|
||||
EncryptedBlob: update.EncryptedBlob,
|
||||
|
@ -86,10 +86,10 @@ type ClientSession struct {
|
||||
// specifies a reward output.
|
||||
RewardPkScript []byte
|
||||
|
||||
// CommittedUpdates is a map from allocated sequence numbers to unacked
|
||||
// updates. These updates can be resent after a restart if the update
|
||||
// failed to send or receive an acknowledgment.
|
||||
CommittedUpdates map[uint16]*CommittedUpdate
|
||||
// CommittedUpdates is a sorted list of unacked updates. These updates
|
||||
// can be resent after a restart if the updates failed to send or
|
||||
// receive an acknowledgment.
|
||||
CommittedUpdates []CommittedUpdate
|
||||
|
||||
// AckedUpdates is a map from sequence number to backup id to record
|
||||
// which revoked states were uploaded via this session.
|
||||
@ -107,8 +107,21 @@ type BackupID struct {
|
||||
}
|
||||
|
||||
// CommittedUpdate holds a state update sent by a client along with its
|
||||
// SessionID.
|
||||
// allocated sequence number and the exact remote commitment the encrypted
|
||||
// justice transaction can rectify.
|
||||
type CommittedUpdate struct {
|
||||
// SeqNum is the unique sequence number allocated by the session to this
|
||||
// update.
|
||||
SeqNum uint16
|
||||
|
||||
CommittedUpdateBody
|
||||
}
|
||||
|
||||
// CommittedUpdateBody represents the primary components of a CommittedUpdate.
|
||||
// On disk, this is stored under the sequence number, which acts as its key.
|
||||
type CommittedUpdateBody struct {
|
||||
// BackupID identifies the breached commitment that the encrypted blob
|
||||
// can spend from.
|
||||
BackupID BackupID
|
||||
|
||||
// Hint is the 16-byte prefix of the revoked commitment transaction ID.
|
||||
|
@ -129,7 +129,7 @@ func (m *ClientDB) CreateClientSession(session *wtdb.ClientSession) error {
|
||||
SeqNum: session.SeqNum,
|
||||
TowerLastApplied: session.TowerLastApplied,
|
||||
RewardPkScript: cloneBytes(session.RewardPkScript),
|
||||
CommittedUpdates: make(map[uint16]*wtdb.CommittedUpdate),
|
||||
CommittedUpdates: make([]wtdb.CommittedUpdate, 0),
|
||||
AckedUpdates: make(map[uint16]wtdb.BackupID),
|
||||
}
|
||||
|
||||
@ -159,7 +159,7 @@ func (m *ClientDB) NextSessionKeyIndex(towerID wtdb.TowerID) (uint32, error) {
|
||||
|
||||
// CommitUpdate persists the CommittedUpdate provided in the slot for (session,
|
||||
// seqNum). This allows the client to retransmit this update on startup.
|
||||
func (m *ClientDB) CommitUpdate(id *wtdb.SessionID, seqNum uint16,
|
||||
func (m *ClientDB) CommitUpdate(id *wtdb.SessionID,
|
||||
update *wtdb.CommittedUpdate) (uint16, error) {
|
||||
|
||||
m.mu.Lock()
|
||||
@ -172,25 +172,26 @@ func (m *ClientDB) CommitUpdate(id *wtdb.SessionID, seqNum uint16,
|
||||
}
|
||||
|
||||
// Check if an update has already been committed for this state.
|
||||
dbUpdate, ok := session.CommittedUpdates[seqNum]
|
||||
if ok {
|
||||
// If the breach hint matches, we'll just return the last
|
||||
// applied value so the client can retransmit.
|
||||
if dbUpdate.Hint == update.Hint {
|
||||
return session.TowerLastApplied, nil
|
||||
}
|
||||
for _, dbUpdate := range session.CommittedUpdates {
|
||||
if dbUpdate.SeqNum == update.SeqNum {
|
||||
// If the breach hint matches, we'll just return the
|
||||
// last applied value so the client can retransmit.
|
||||
if dbUpdate.Hint == update.Hint {
|
||||
return session.TowerLastApplied, nil
|
||||
}
|
||||
|
||||
// Otherwise, fail since the breach hint doesn't match.
|
||||
return 0, wtdb.ErrUpdateAlreadyCommitted
|
||||
// Otherwise, fail since the breach hint doesn't match.
|
||||
return 0, wtdb.ErrUpdateAlreadyCommitted
|
||||
}
|
||||
}
|
||||
|
||||
// Sequence number must increment.
|
||||
if seqNum != session.SeqNum+1 {
|
||||
if update.SeqNum != session.SeqNum+1 {
|
||||
return 0, wtdb.ErrCommitUnorderedUpdate
|
||||
}
|
||||
|
||||
// Save the update and increment the sequence number.
|
||||
session.CommittedUpdates[seqNum] = update
|
||||
session.CommittedUpdates = append(session.CommittedUpdates, *update)
|
||||
session.SeqNum++
|
||||
|
||||
return session.TowerLastApplied, nil
|
||||
@ -209,13 +210,6 @@ func (m *ClientDB) AckUpdate(id *wtdb.SessionID, seqNum, lastApplied uint16) err
|
||||
return wtdb.ErrClientSessionNotFound
|
||||
}
|
||||
|
||||
// Retrieve the committed update, failing if none is found. We should
|
||||
// only receive acks for state updates that we send.
|
||||
update, ok := session.CommittedUpdates[seqNum]
|
||||
if !ok {
|
||||
return wtdb.ErrCommittedUpdateNotFound
|
||||
}
|
||||
|
||||
// Ensure the returned last applied value does not exceed the highest
|
||||
// allocated sequence number.
|
||||
if lastApplied > session.SeqNum {
|
||||
@ -228,14 +222,28 @@ func (m *ClientDB) AckUpdate(id *wtdb.SessionID, seqNum, lastApplied uint16) err
|
||||
return wtdb.ErrLastAppliedReversion
|
||||
}
|
||||
|
||||
// Finally, remove the committed update from disk and mark the update as
|
||||
// acked. The tower last applied value is also recorded to send along
|
||||
// with the next update.
|
||||
delete(session.CommittedUpdates, seqNum)
|
||||
session.AckedUpdates[seqNum] = update.BackupID
|
||||
session.TowerLastApplied = lastApplied
|
||||
// Retrieve the committed update, failing if none is found. We should
|
||||
// only receive acks for state updates that we send.
|
||||
updates := session.CommittedUpdates
|
||||
for i, update := range updates {
|
||||
if update.SeqNum != seqNum {
|
||||
continue
|
||||
}
|
||||
|
||||
return nil
|
||||
// Remove the committed update from disk and mark the update as
|
||||
// acked. The tower last applied value is also recorded to send
|
||||
// along with the next update.
|
||||
copy(updates[:i], updates[i+1:])
|
||||
updates[len(updates)-1] = wtdb.CommittedUpdate{}
|
||||
session.CommittedUpdates = updates[:len(updates)-1]
|
||||
|
||||
session.AckedUpdates[seqNum] = update.BackupID
|
||||
session.TowerLastApplied = lastApplied
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return wtdb.ErrCommittedUpdateNotFound
|
||||
}
|
||||
|
||||
// FetchChanPkScripts returns the set of sweep pkscripts known for all channels.
|
||||
|
Loading…
Reference in New Issue
Block a user