diff --git a/chanbackup/pubsub.go b/chanbackup/pubsub.go index b9331820..87d61299 100644 --- a/chanbackup/pubsub.go +++ b/chanbackup/pubsub.go @@ -18,12 +18,14 @@ type Swapper interface { // UpdateAndSwap attempts to atomically update the main multi back up // file location with the new fully packed multi-channel backup. UpdateAndSwap(newBackup PackedMulti) error + + // ExtractMulti attempts to obtain and decode the current SCB instance + // stored by the Swapper instance. + ExtractMulti(keychain keychain.KeyRing) (*Multi, error) } // ChannelWithAddrs bundles an open channel along with all the addresses for // the channel peer. -// -// TODO(roasbeef): use channel shell instead? type ChannelWithAddrs struct { *channeldb.OpenChannel @@ -75,8 +77,6 @@ type ChannelNotifier interface { // update the file state on disk with the new set of open channels. This can // be used to implement a system that always keeps the multi-chan backup file // on disk in a consistent state for safety purposes. -// -// TODO(roasbeef): better name lol type SubSwapper struct { started sync.Once stopped sync.Once @@ -155,12 +155,50 @@ func (s *SubSwapper) Stop() error { } // updateBackupFile updates the backup file in place given the current state of -// the SubSwapper. -func (s *SubSwapper) updateBackupFile() error { +// the SubSwapper. We accept the set of channels that were closed between this +// update and the last to make sure we leave them out of our backup set union. +func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error { + // Before we pack the new set of SCBs, we'll first decode what we + // already have on-disk, to make sure we can decode it (proper seed) + // and that we're able to combine it with our new data. + diskMulti, err := s.Swapper.ExtractMulti(s.keyRing) + if err != nil { + return fmt.Errorf("unable to extract on disk encrypted "+ + "SCB: %v", err) + } + + // Now that we have channels stored on-disk, we'll create a new set of + // the combined old and new channels to make sure we retain what's + // already on-disk. + // + // NOTE: The ordering of this operations means that our in-memory + // structure will replace what we read from disk. + combinedBackup := make(map[wire.OutPoint]Single) + if diskMulti != nil { + for _, diskChannel := range diskMulti.StaticBackups { + chanPoint := diskChannel.FundingOutpoint + combinedBackup[chanPoint] = diskChannel + } + } + for _, memChannel := range s.backupState { + chanPoint := memChannel.FundingOutpoint + if _, ok := combinedBackup[chanPoint]; ok { + log.Warnf("Replacing disk backup for ChannelPoint(%v) "+ + "w/ newer version", chanPoint) + } + + combinedBackup[chanPoint] = memChannel + } + + // Remove the set of closed channels from the final set of backups. + for _, closedChan := range closedChans { + delete(combinedBackup, closedChan) + } + // With our updated channel state obtained, we'll create a new multi // from our series of singles. var newMulti Multi - for _, backup := range s.backupState { + for _, backup := range combinedBackup { newMulti.StaticBackups = append( newMulti.StaticBackups, backup, ) @@ -169,13 +207,14 @@ func (s *SubSwapper) updateBackupFile() error { // Now that our multi has been assembled, we'll attempt to pack // (encrypt+encode) the new channel state to our target reader. var b bytes.Buffer - err := newMulti.PackToWriter(&b, s.keyRing) + err = newMulti.PackToWriter(&b, s.keyRing) if err != nil { return fmt.Errorf("unable to pack multi backup: %v", err) } // Finally, we'll swap out the old backup for this new one in a single - // atomic step. + // atomic step, combining the file already on-disk with this set of new + // channels. err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes())) if err != nil { return fmt.Errorf("unable to update multi backup: %v", err) @@ -198,6 +237,8 @@ func (s *SubSwapper) backupUpdater() { // Before we enter our main loop, we'll update the on-disk state with // the latest Single state, as nodes may have new advertised addresses. + // + // TODO(roasbeef): do in Start() so we don't start with an invalid SCB? if err := s.updateBackupFile(); err != nil { log.Errorf("Unable to refresh backup file: %v", err) } @@ -223,6 +264,9 @@ func (s *SubSwapper) backupUpdater() { // For all closed channels, we'll remove the prior // backup state. + closedChans := make( + []wire.OutPoint, 0, len(chanUpdate.ClosedChans), + ) for i, closedChan := range chanUpdate.ClosedChans { log.Debugf("Removing channel %v from backup "+ "state", newLogClosure(func() string { @@ -230,6 +274,8 @@ func (s *SubSwapper) backupUpdater() { })) delete(s.backupState, closedChan) + + closedChans = append(closedChans, closedChan) } newStateSize := len(s.backupState) @@ -240,7 +286,7 @@ func (s *SubSwapper) backupUpdater() { // With out new state constructed, we'll, atomically // update the on-disk backup state. - if err := s.updateBackupFile(); err != nil { + if err := s.updateBackupFile(closedChans...); err != nil { log.Errorf("unable to update backup file: %v", err) } diff --git a/chanbackup/pubsub_test.go b/chanbackup/pubsub_test.go index 30bc689e..8f4b73c6 100644 --- a/chanbackup/pubsub_test.go +++ b/chanbackup/pubsub_test.go @@ -2,12 +2,10 @@ package chanbackup import ( "fmt" - "reflect" "testing" "time" "github.com/btcsuite/btcd/wire" - "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/keychain" ) @@ -15,11 +13,17 @@ type mockSwapper struct { fail bool swaps chan PackedMulti + + swapState *Multi + + keyChain keychain.KeyRing } -func newMockSwapper() *mockSwapper { +func newMockSwapper(keychain keychain.KeyRing) *mockSwapper { return &mockSwapper{ - swaps: make(chan PackedMulti), + swaps: make(chan PackedMulti), + keyChain: keychain, + swapState: &Multi{}, } } @@ -28,11 +32,22 @@ func (m *mockSwapper) UpdateAndSwap(newBackup PackedMulti) error { return fmt.Errorf("fail") } + swapState, err := newBackup.Unpack(m.keyChain) + if err != nil { + return fmt.Errorf("unable to decode on disk swaps: %v", err) + } + + m.swapState = swapState + m.swaps <- newBackup return nil } +func (m *mockSwapper) ExtractMulti(keychain keychain.KeyRing) (*Multi, error) { + return m.swapState, nil +} + type mockChannelNotifier struct { fail bool @@ -87,7 +102,7 @@ func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper, case newPackedMulti := <-swapper.swaps: // If we unpack the new multi, then we should find all the old // channels, and also the new channel included and any deleted - // channel omitted.. + // channel omitted. newMulti, err := newPackedMulti.Unpack(keyRing) if err != nil { t.Fatalf("unable to unpack multi: %v", err) @@ -111,12 +126,19 @@ func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper, } } - // The internal state of the sub-swapper should also be one - // larger. - if !reflect.DeepEqual(expectedChanSet, subSwapper.backupState) { - t.Fatalf("backup set doesn't match: expected %v got %v", - spew.Sdump(expectedChanSet), - spew.Sdump(subSwapper.backupState)) + // The same applies for our in-memory state, but it's also + // possible for there to be items in the on-disk state that we + // don't know of explicit. + newChans := make(map[wire.OutPoint]Single) + for _, newChan := range newMulti.StaticBackups { + newChans[newChan.FundingOutpoint] = newChan + } + for _, backup := range subSwapper.backupState { + _, ok := newChans[backup.FundingOutpoint] + if !ok { + t.Fatalf("didn't find backup in original set: %v", + backup.FundingOutpoint) + } } case <-time.After(time.Second * 5): @@ -133,7 +155,7 @@ func TestSubSwapperIdempotentStartStop(t *testing.T) { var chanNotifier mockChannelNotifier - swapper := newMockSwapper() + swapper := newMockSwapper(keyRing) subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper) if err != nil { t.Fatalf("unable to init subSwapper: %v", err) @@ -162,7 +184,7 @@ func TestSubSwapperUpdater(t *testing.T) { keyRing := &mockKeyRing{} chanNotifier := newMockChannelNotifier() - swapper := newMockSwapper() + swapper := newMockSwapper(keyRing) // First, we'll start out by creating a channels set for the initial // set of channels known to the sub-swapper. @@ -181,6 +203,24 @@ func TestSubSwapperUpdater(t *testing.T) { initialChanSet = append(initialChanSet, single) } + // We'll also generate two additional channels which will already be + // present on disk. However, these will at first only be known by the + // on disk backup (the backup set). + const numDiskChans = 2 + for i := 0; i < numDiskChans; i++ { + channel, err := genRandomOpenChannelShell() + if err != nil { + t.Fatalf("unable to make test chan: %v", err) + } + + single := NewSingle(channel, nil) + + backupSet[channel.FundingOutpoint] = single + swapper.swapState.StaticBackups = append( + swapper.swapState.StaticBackups, single, + ) + } + // With our channel set created, we'll make a fresh sub swapper // instance to begin our test. subSwapper, err := NewSubSwapper(