chanbackup: add new updateBackupFile method, write fresh backup on startup

In this commit, we modify the sub-swapper to update the set of backups
files time it's tarted. We do this to ensure that each time we start,
we're up to sync with the current set of open channels. This also ensure
that we'll create a new back up file if this is the first time we're
starting up with this new feature.
This commit is contained in:
Olaoluwa Osuntokun 2019-02-08 19:22:06 -08:00
parent 0b8131c3be
commit af1dfe176e
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2
3 changed files with 67 additions and 39 deletions

@ -76,6 +76,8 @@ func (b *MultiFile) UpdateAndSwap(newBackup PackedMulti) error {
return ErrNoBackupFileExists return ErrNoBackupFileExists
} }
log.Infof("Updating backup file at %v", b.fileName)
// If the old back up file still exists, then we'll delete it before // If the old back up file still exists, then we'll delete it before
// proceeding. // proceeding.
if _, err := os.Stat(b.tempFileName); err == nil { if _, err := os.Stat(b.tempFileName); err == nil {

@ -2,6 +2,7 @@ package chanbackup
import ( import (
"bytes" "bytes"
"fmt"
"net" "net"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -47,9 +48,9 @@ type ChannelEvent struct {
// ChannelSubscription represents an intent to be notified of any updates to // ChannelSubscription represents an intent to be notified of any updates to
// the primary channel state. // the primary channel state.
type ChannelSubscription struct { type ChannelSubscription struct {
// ChanUpdates is a read-only channel that will be sent upon once the // ChanUpdates is a channel that will be sent upon once the primary
// primary channel state is updated. // channel state is updated.
ChanUpdates <-chan ChannelEvent ChanUpdates chan ChannelEvent
// Cancel is a closure that allows the caller to cancel their // Cancel is a closure that allows the caller to cancel their
// subscription and free up any resources allocated. // subscription and free up any resources allocated.
@ -160,6 +161,36 @@ func (s *SubSwapper) Stop() error {
return nil return nil
} }
// updateBackupFile updates the backup file in place given the current state of
// the SubSwapper.
func (s *SubSwapper) updateBackupFile() error {
// With our updated channel state obtained, we'll create a new multi
// from our series of singles.
var newMulti Multi
for _, backup := range s.backupState {
newMulti.StaticBackups = append(
newMulti.StaticBackups, backup,
)
}
// Now that our multi has been assembled, we'll attempt to pack
// (encrypt+encode) the new channel state to our target reader.
var b bytes.Buffer
err := newMulti.PackToWriter(&b, s.keyRing)
if err != nil {
return fmt.Errorf("unable to pack multi backup: %v", err)
}
// Finally, we'll swap out the old backup for this new one in a single
// atomic step.
err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
if err != nil {
return fmt.Errorf("unable to update multi backup: %v", err)
}
return nil
}
// backupFileUpdater is the primary goroutine of the SubSwapper which is // backupFileUpdater is the primary goroutine of the SubSwapper which is
// responsible for listening for changes to the channel, and updating the // responsible for listening for changes to the channel, and updating the
// persistent multi backup state with a new packed multi of the latest channel // persistent multi backup state with a new packed multi of the latest channel
@ -172,6 +203,12 @@ func (s *SubSwapper) backupUpdater() {
log.Debugf("SubSwapper's backupUpdater is active!") log.Debugf("SubSwapper's backupUpdater is active!")
// Before we enter our main loop, we'll update the on-disk state with
// the latest Single state, as nodes may have new advertised addresses.
if err := s.updateBackupFile(); err != nil {
log.Errorf("Unable to refresh backup file: %v", err)
}
for { for {
select { select {
// The channel state has been modified! We'll evaluate all // The channel state has been modified! We'll evaluate all
@ -183,7 +220,7 @@ func (s *SubSwapper) backupUpdater() {
// For all new open channels, we'll create a new SCB // For all new open channels, we'll create a new SCB
// given the required information. // given the required information.
for _, newChan := range chanUpdate.NewChans { for _, newChan := range chanUpdate.NewChans {
log.Debugf("Adding chanenl %v to backup state", log.Debugf("Adding channel %v to backup state",
newChan.FundingOutpoint) newChan.FundingOutpoint)
s.backupState[newChan.FundingOutpoint] = NewSingle( s.backupState[newChan.FundingOutpoint] = NewSingle(
@ -204,41 +241,20 @@ func (s *SubSwapper) backupUpdater() {
newStateSize := len(s.backupState) newStateSize := len(s.backupState)
// With our updated channel state obtained, we'll
// create a new multi from our series of singles.
var newMulti Multi
for _, backup := range s.backupState {
newMulti.StaticBackups = append(
newMulti.StaticBackups, backup,
)
}
// Now that our multi has been assembled, we'll attempt
// to pack (encrypt+encode) the new channel state to
// our target reader.
var b bytes.Buffer
err := newMulti.PackToWriter(&b, s.keyRing)
if err != nil {
log.Errorf("unable to pack multi backup: %v",
err)
continue
}
log.Infof("Updating on-disk multi SCB backup: "+ log.Infof("Updating on-disk multi SCB backup: "+
"num_old_chans=%v, num_new_chans=%v", "num_old_chans=%v, num_new_chans=%v",
oldStateSize, newStateSize) oldStateSize, newStateSize)
// Finally, we'll swap out the old backup for this new // With out new state constructed, we'll, atomically
// one in a single atomic step. // update the on-disk backup state.
err = s.Swapper.UpdateAndSwap( if err := s.updateBackupFile(); err != nil {
PackedMulti(b.Bytes()), log.Errorf("unable to update backup file: %v",
) err)
if err != nil {
log.Errorf("unable to update multi "+
"backup: %v", err)
continue
} }
// TODO(roasbeef): refresh periodically on a time basis due to
// possible addr changes from node
// Exit at once if a quit signal is detected. // Exit at once if a quit signal is detected.
case <-s.quit: case <-s.quit:
return return

@ -131,17 +131,23 @@ func TestSubSwapperIdempotentStartStop(t *testing.T) {
keyRing := &mockKeyRing{} keyRing := &mockKeyRing{}
var ( var chanNotifier mockChannelNotifier
swapper mockSwapper
chanNotifier mockChannelNotifier
)
subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper) swapper := newMockSwapper()
subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper)
if err != nil { if err != nil {
t.Fatalf("unable to init subSwapper: %v", err) t.Fatalf("unable to init subSwapper: %v", err)
} }
subSwapper.Start() if err := subSwapper.Start(); err != nil {
t.Fatalf("unable to start swapper: %v", err)
}
// The swapper should write the initial channel state as soon as it's
// active.
backupSet := make(map[wire.OutPoint]Single)
assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet)
subSwapper.Start() subSwapper.Start()
subSwapper.Stop() subSwapper.Stop()
@ -188,6 +194,10 @@ func TestSubSwapperUpdater(t *testing.T) {
} }
defer subSwapper.Stop() defer subSwapper.Stop()
// The swapper should write the initial channel state as soon as it's
// active.
assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet)
// Now that the sub-swapper is active, we'll notify to add a brand new // Now that the sub-swapper is active, we'll notify to add a brand new
// channel to the channel state. // channel to the channel state.
newChannel, err := genRandomOpenChannelShell() newChannel, err := genRandomOpenChannelShell()