diff --git a/chanbackup/backupfile.go b/chanbackup/backupfile.go index 0dbf9dc4..67e1d18f 100644 --- a/chanbackup/backupfile.go +++ b/chanbackup/backupfile.go @@ -76,6 +76,8 @@ func (b *MultiFile) UpdateAndSwap(newBackup PackedMulti) error { return ErrNoBackupFileExists } + log.Infof("Updating backup file at %v", b.fileName) + // If the old back up file still exists, then we'll delete it before // proceeding. if _, err := os.Stat(b.tempFileName); err == nil { diff --git a/chanbackup/pubsub.go b/chanbackup/pubsub.go index a4e41857..e4245eb9 100644 --- a/chanbackup/pubsub.go +++ b/chanbackup/pubsub.go @@ -2,6 +2,7 @@ package chanbackup import ( "bytes" + "fmt" "net" "sync" "sync/atomic" @@ -47,9 +48,9 @@ type ChannelEvent struct { // ChannelSubscription represents an intent to be notified of any updates to // the primary channel state. type ChannelSubscription struct { - // ChanUpdates is a read-only channel that will be sent upon once the - // primary channel state is updated. - ChanUpdates <-chan ChannelEvent + // ChanUpdates is a channel that will be sent upon once the primary + // channel state is updated. + ChanUpdates chan ChannelEvent // Cancel is a closure that allows the caller to cancel their // subscription and free up any resources allocated. @@ -160,6 +161,36 @@ func (s *SubSwapper) Stop() error { return nil } +// updateBackupFile updates the backup file in place given the current state of +// the SubSwapper. +func (s *SubSwapper) updateBackupFile() error { + // With our updated channel state obtained, we'll create a new multi + // from our series of singles. + var newMulti Multi + for _, backup := range s.backupState { + newMulti.StaticBackups = append( + newMulti.StaticBackups, backup, + ) + } + + // Now that our multi has been assembled, we'll attempt to pack + // (encrypt+encode) the new channel state to our target reader. + var b bytes.Buffer + err := newMulti.PackToWriter(&b, s.keyRing) + if err != nil { + return fmt.Errorf("unable to pack multi backup: %v", err) + } + + // Finally, we'll swap out the old backup for this new one in a single + // atomic step. + err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes())) + if err != nil { + return fmt.Errorf("unable to update multi backup: %v", err) + } + + return nil +} + // backupFileUpdater is the primary goroutine of the SubSwapper which is // responsible for listening for changes to the channel, and updating the // persistent multi backup state with a new packed multi of the latest channel @@ -172,6 +203,12 @@ func (s *SubSwapper) backupUpdater() { log.Debugf("SubSwapper's backupUpdater is active!") + // Before we enter our main loop, we'll update the on-disk state with + // the latest Single state, as nodes may have new advertised addresses. + if err := s.updateBackupFile(); err != nil { + log.Errorf("Unable to refresh backup file: %v", err) + } + for { select { // The channel state has been modified! We'll evaluate all @@ -183,7 +220,7 @@ func (s *SubSwapper) backupUpdater() { // For all new open channels, we'll create a new SCB // given the required information. for _, newChan := range chanUpdate.NewChans { - log.Debugf("Adding chanenl %v to backup state", + log.Debugf("Adding channel %v to backup state", newChan.FundingOutpoint) s.backupState[newChan.FundingOutpoint] = NewSingle( @@ -204,41 +241,20 @@ func (s *SubSwapper) backupUpdater() { newStateSize := len(s.backupState) - // With our updated channel state obtained, we'll - // create a new multi from our series of singles. - var newMulti Multi - for _, backup := range s.backupState { - newMulti.StaticBackups = append( - newMulti.StaticBackups, backup, - ) - } - - // Now that our multi has been assembled, we'll attempt - // to pack (encrypt+encode) the new channel state to - // our target reader. - var b bytes.Buffer - err := newMulti.PackToWriter(&b, s.keyRing) - if err != nil { - log.Errorf("unable to pack multi backup: %v", - err) - continue - } - log.Infof("Updating on-disk multi SCB backup: "+ "num_old_chans=%v, num_new_chans=%v", oldStateSize, newStateSize) - // Finally, we'll swap out the old backup for this new - // one in a single atomic step. - err = s.Swapper.UpdateAndSwap( - PackedMulti(b.Bytes()), - ) - if err != nil { - log.Errorf("unable to update multi "+ - "backup: %v", err) - continue + // With out new state constructed, we'll, atomically + // update the on-disk backup state. + if err := s.updateBackupFile(); err != nil { + log.Errorf("unable to update backup file: %v", + err) } + // TODO(roasbeef): refresh periodically on a time basis due to + // possible addr changes from node + // Exit at once if a quit signal is detected. case <-s.quit: return diff --git a/chanbackup/pubsub_test.go b/chanbackup/pubsub_test.go index f571c8b7..30bc689e 100644 --- a/chanbackup/pubsub_test.go +++ b/chanbackup/pubsub_test.go @@ -131,17 +131,23 @@ func TestSubSwapperIdempotentStartStop(t *testing.T) { keyRing := &mockKeyRing{} - var ( - swapper mockSwapper - chanNotifier mockChannelNotifier - ) + var chanNotifier mockChannelNotifier - subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper) + swapper := newMockSwapper() + subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper) if err != nil { t.Fatalf("unable to init subSwapper: %v", err) } - subSwapper.Start() + if err := subSwapper.Start(); err != nil { + t.Fatalf("unable to start swapper: %v", err) + } + + // The swapper should write the initial channel state as soon as it's + // active. + backupSet := make(map[wire.OutPoint]Single) + assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet) + subSwapper.Start() subSwapper.Stop() @@ -188,6 +194,10 @@ func TestSubSwapperUpdater(t *testing.T) { } defer subSwapper.Stop() + // The swapper should write the initial channel state as soon as it's + // active. + assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet) + // Now that the sub-swapper is active, we'll notify to add a brand new // channel to the channel state. newChannel, err := genRandomOpenChannelShell()