watchtower/wtclient: dedup backups across restarts

Now that the committed and acked updates are persisted across restarts,
we will use them to filter out duplicate commit heights presented by the
client.
This commit is contained in:
Conner Fromknecht 2019-05-23 20:49:18 -07:00
parent 3be651b0b3
commit 9157c88f93
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 113 additions and 6 deletions

@ -150,8 +150,9 @@ type TowerClient struct {
sessionQueue *sessionQueue
prevTask *backupTask
summaryMu sync.RWMutex
summaries wtdb.ChannelSummaries
backupMu sync.Mutex
summaries wtdb.ChannelSummaries
chanCommitHeights map[lnwire.ChannelID]uint64
statTicker *time.Ticker
stats clientStats
@ -243,6 +244,10 @@ func New(config *Config) (*TowerClient, error) {
s.SessionPrivKey = sessionPriv
}
// Reconstruct the highest commit height processed for each channel
// under the client's current policy.
c.buildHighestCommitHeights()
// Finally, load the sweep pkscripts that have been generated for all
// previously registered channels.
c.summaries, err = c.cfg.DB.FetchChanSummaries()
@ -253,6 +258,44 @@ func New(config *Config) (*TowerClient, error) {
return c, nil
}
// buildHighestCommitHeights inspects the full set of candidate client sessions
// loaded from disk, and determines the highest known commit height for each
// channel. This allows the client to reject backups that it has already
// processed for it's active policy.
func (c *TowerClient) buildHighestCommitHeights() {
chanCommitHeights := make(map[lnwire.ChannelID]uint64)
for _, s := range c.candidateSessions {
// We only want to consider accepted updates that have been
// accepted under an identical policy to the client's current
// policy.
if s.Policy != c.cfg.Policy {
continue
}
// Take the highest commit height found in the session's
// committed updates.
for _, committedUpdate := range s.CommittedUpdates {
bid := committedUpdate.BackupID
height, ok := chanCommitHeights[bid.ChanID]
if !ok || bid.CommitHeight > height {
chanCommitHeights[bid.ChanID] = bid.CommitHeight
}
}
// Take the heights commit height found in the session's acked
// updates.
for _, bid := range s.AckedUpdates {
height, ok := chanCommitHeights[bid.ChanID]
if !ok || bid.CommitHeight > height {
chanCommitHeights[bid.ChanID] = bid.CommitHeight
}
}
}
c.chanCommitHeights = chanCommitHeights
}
// Start initializes the watchtower client by loading or negotiating an active
// session and then begins processing backup tasks from the request pipeline.
func (c *TowerClient) Start() error {
@ -388,8 +431,8 @@ func (c *TowerClient) ForceQuit() {
// within the client. This should be called during link startup to ensure that
// the client is able to support the link during operation.
func (c *TowerClient) RegisterChannel(chanID lnwire.ChannelID) error {
c.summaryMu.Lock()
defer c.summaryMu.Unlock()
c.backupMu.Lock()
defer c.backupMu.Unlock()
// If a pkscript for this channel already exists, the channel has been
// previously registered.
@ -431,13 +474,28 @@ func (c *TowerClient) BackupState(chanID *lnwire.ChannelID,
breachInfo *lnwallet.BreachRetribution) error {
// Retrieve the cached sweep pkscript used for this channel.
c.summaryMu.RLock()
c.backupMu.Lock()
summary, ok := c.summaries[*chanID]
c.summaryMu.RUnlock()
if !ok {
c.backupMu.Unlock()
return ErrUnregisteredChannel
}
// Ignore backups that have already been presented to the client.
height, ok := c.chanCommitHeights[*chanID]
if ok && breachInfo.RevokedStateNum <= height {
c.backupMu.Unlock()
log.Debugf("Ignoring duplicate backup for chanid=%v at height=%d",
chanID, breachInfo.RevokedStateNum)
return nil
}
// This backup has a higher commit height than any known backup for this
// channel. We'll update our tip so that we won't accept it again if the
// link flaps.
c.chanCommitHeights[*chanID] = breachInfo.RevokedStateNum
c.backupMu.Unlock()
task := newBackupTask(chanID, breachInfo, summary.SweepPkScript)
return c.pipeline.QueueBackupTask(task)

@ -1246,6 +1246,55 @@ var clientTests = []clientTest{
h.assertUpdatesForPolicy(hints, h.clientCfg.Policy)
},
},
{
// Asserts that the client will deduplicate backups presented by
// a channel both in memory and after a restart. The client
// should only accept backups with a commit height greater than
// any processed already processed for a given policy.
name: "dedup backups",
cfg: harnessCfg{
localBalance: localBalance,
remoteBalance: remoteBalance,
policy: wtpolicy.Policy{
BlobType: blob.TypeDefault,
MaxUpdates: 5,
SweepFeeRate: 1,
},
},
fn: func(h *testHarness) {
const (
numUpdates = 10
chanID = 0
)
// Generate the retributions that will be backed up.
hints := h.advanceChannelN(chanID, numUpdates)
// Queue the first half of the retributions twice, the
// second batch should be entirely deduped by the
// client's in-memory tracking.
h.backupStates(chanID, 0, numUpdates/2, nil)
h.backupStates(chanID, 0, numUpdates/2, nil)
// Wait for the first half of the updates to be
// populated in the server's database.
h.waitServerUpdates(hints[:len(hints)/2], 5*time.Second)
// Restart the client, so we can ensure the deduping is
// maintained across restarts.
h.client.Stop()
h.startClient()
defer h.client.ForceQuit()
// Try to back up the full range of retributions. Only
// the second half should actually be sent.
h.backupStates(chanID, 0, numUpdates, nil)
// Wait for all of the updates to be populated in the
// server's database.
h.waitServerUpdates(hints, 5*time.Second)
},
},
}
// TestClient executes the client test suite, asserting the ability to backup