From af1dfe176e4ecaef342290d2bc21d56e063720b9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 8 Feb 2019 19:22:06 -0800 Subject: [PATCH] chanbackup: add new updateBackupFile method, write fresh backup on startup In this commit, we modify the sub-swapper to update the set of backups files time it's tarted. We do this to ensure that each time we start, we're up to sync with the current set of open channels. This also ensure that we'll create a new back up file if this is the first time we're starting up with this new feature. --- chanbackup/backupfile.go | 2 + chanbackup/pubsub.go | 82 +++++++++++++++++++++++---------------- chanbackup/pubsub_test.go | 22 ++++++++--- 3 files changed, 67 insertions(+), 39 deletions(-) diff --git a/chanbackup/backupfile.go b/chanbackup/backupfile.go index 0dbf9dc4..67e1d18f 100644 --- a/chanbackup/backupfile.go +++ b/chanbackup/backupfile.go @@ -76,6 +76,8 @@ func (b *MultiFile) UpdateAndSwap(newBackup PackedMulti) error { return ErrNoBackupFileExists } + log.Infof("Updating backup file at %v", b.fileName) + // If the old back up file still exists, then we'll delete it before // proceeding. if _, err := os.Stat(b.tempFileName); err == nil { diff --git a/chanbackup/pubsub.go b/chanbackup/pubsub.go index a4e41857..e4245eb9 100644 --- a/chanbackup/pubsub.go +++ b/chanbackup/pubsub.go @@ -2,6 +2,7 @@ package chanbackup import ( "bytes" + "fmt" "net" "sync" "sync/atomic" @@ -47,9 +48,9 @@ type ChannelEvent struct { // ChannelSubscription represents an intent to be notified of any updates to // the primary channel state. type ChannelSubscription struct { - // ChanUpdates is a read-only channel that will be sent upon once the - // primary channel state is updated. - ChanUpdates <-chan ChannelEvent + // ChanUpdates is a channel that will be sent upon once the primary + // channel state is updated. + ChanUpdates chan ChannelEvent // Cancel is a closure that allows the caller to cancel their // subscription and free up any resources allocated. @@ -160,6 +161,36 @@ func (s *SubSwapper) Stop() error { return nil } +// updateBackupFile updates the backup file in place given the current state of +// the SubSwapper. +func (s *SubSwapper) updateBackupFile() error { + // With our updated channel state obtained, we'll create a new multi + // from our series of singles. + var newMulti Multi + for _, backup := range s.backupState { + newMulti.StaticBackups = append( + newMulti.StaticBackups, backup, + ) + } + + // Now that our multi has been assembled, we'll attempt to pack + // (encrypt+encode) the new channel state to our target reader. + var b bytes.Buffer + err := newMulti.PackToWriter(&b, s.keyRing) + if err != nil { + return fmt.Errorf("unable to pack multi backup: %v", err) + } + + // Finally, we'll swap out the old backup for this new one in a single + // atomic step. + err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes())) + if err != nil { + return fmt.Errorf("unable to update multi backup: %v", err) + } + + return nil +} + // backupFileUpdater is the primary goroutine of the SubSwapper which is // responsible for listening for changes to the channel, and updating the // persistent multi backup state with a new packed multi of the latest channel @@ -172,6 +203,12 @@ func (s *SubSwapper) backupUpdater() { log.Debugf("SubSwapper's backupUpdater is active!") + // Before we enter our main loop, we'll update the on-disk state with + // the latest Single state, as nodes may have new advertised addresses. + if err := s.updateBackupFile(); err != nil { + log.Errorf("Unable to refresh backup file: %v", err) + } + for { select { // The channel state has been modified! We'll evaluate all @@ -183,7 +220,7 @@ func (s *SubSwapper) backupUpdater() { // For all new open channels, we'll create a new SCB // given the required information. for _, newChan := range chanUpdate.NewChans { - log.Debugf("Adding chanenl %v to backup state", + log.Debugf("Adding channel %v to backup state", newChan.FundingOutpoint) s.backupState[newChan.FundingOutpoint] = NewSingle( @@ -204,41 +241,20 @@ func (s *SubSwapper) backupUpdater() { newStateSize := len(s.backupState) - // With our updated channel state obtained, we'll - // create a new multi from our series of singles. - var newMulti Multi - for _, backup := range s.backupState { - newMulti.StaticBackups = append( - newMulti.StaticBackups, backup, - ) - } - - // Now that our multi has been assembled, we'll attempt - // to pack (encrypt+encode) the new channel state to - // our target reader. - var b bytes.Buffer - err := newMulti.PackToWriter(&b, s.keyRing) - if err != nil { - log.Errorf("unable to pack multi backup: %v", - err) - continue - } - log.Infof("Updating on-disk multi SCB backup: "+ "num_old_chans=%v, num_new_chans=%v", oldStateSize, newStateSize) - // Finally, we'll swap out the old backup for this new - // one in a single atomic step. - err = s.Swapper.UpdateAndSwap( - PackedMulti(b.Bytes()), - ) - if err != nil { - log.Errorf("unable to update multi "+ - "backup: %v", err) - continue + // With out new state constructed, we'll, atomically + // update the on-disk backup state. + if err := s.updateBackupFile(); err != nil { + log.Errorf("unable to update backup file: %v", + err) } + // TODO(roasbeef): refresh periodically on a time basis due to + // possible addr changes from node + // Exit at once if a quit signal is detected. case <-s.quit: return diff --git a/chanbackup/pubsub_test.go b/chanbackup/pubsub_test.go index f571c8b7..30bc689e 100644 --- a/chanbackup/pubsub_test.go +++ b/chanbackup/pubsub_test.go @@ -131,17 +131,23 @@ func TestSubSwapperIdempotentStartStop(t *testing.T) { keyRing := &mockKeyRing{} - var ( - swapper mockSwapper - chanNotifier mockChannelNotifier - ) + var chanNotifier mockChannelNotifier - subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper) + swapper := newMockSwapper() + subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper) if err != nil { t.Fatalf("unable to init subSwapper: %v", err) } - subSwapper.Start() + if err := subSwapper.Start(); err != nil { + t.Fatalf("unable to start swapper: %v", err) + } + + // The swapper should write the initial channel state as soon as it's + // active. + backupSet := make(map[wire.OutPoint]Single) + assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet) + subSwapper.Start() subSwapper.Stop() @@ -188,6 +194,10 @@ func TestSubSwapperUpdater(t *testing.T) { } defer subSwapper.Stop() + // The swapper should write the initial channel state as soon as it's + // active. + assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet) + // Now that the sub-swapper is active, we'll notify to add a brand new // channel to the channel state. newChannel, err := genRandomOpenChannelShell()