From 3b370fa08dac45b621e1f36e9b0a7882eaf8b7f3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 9 Dec 2018 19:09:28 -0800 Subject: [PATCH] chanbackup: add new sub-system for keeping the on disk channels.backup up to date In this commit, we introduce the chanbackup.SubSwapper interface. It takes a regular Swapper implementation (defined by the chanbackup.SubSwapper) interface along with a chanbackup.ChannelNotifier implementation. Given these two interfaces, we're able to be notified when a new channel is opened or closed, and then use the Swapper to atomically replace the on-disk channel back up. As a result, a Lightning daemon can ensure that they alwayts have a up to date channels.backup on disk that can safely be copied away by users and be used to restore channel funds in the event of partial/total data loss. --- chanbackup/pubsub.go | 247 ++++++++++++++++++++++++++++++++++++++ chanbackup/pubsub_test.go | 234 ++++++++++++++++++++++++++++++++++++ 2 files changed, 481 insertions(+) create mode 100644 chanbackup/pubsub.go create mode 100644 chanbackup/pubsub_test.go diff --git a/chanbackup/pubsub.go b/chanbackup/pubsub.go new file mode 100644 index 00000000..a4e41857 --- /dev/null +++ b/chanbackup/pubsub.go @@ -0,0 +1,247 @@ +package chanbackup + +import ( + "bytes" + "net" + "sync" + "sync/atomic" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/keychain" +) + +// Swapper is an interface that allows the chanbackup.SubSwapper to update the +// main multi backup location once it learns of new channels or that prior +// channels have been closed. +type Swapper interface { + // UpdateAndSwap attempts to atomically update the main multi back up + // file location with the new fully packed multi-channel backup. + UpdateAndSwap(newBackup PackedMulti) error +} + +// ChannelWithAddrs bundles an open channel along with all the addresses for +// the channel peer. +// +// TODO(roasbeef): use channel shell instead? +type ChannelWithAddrs struct { + *channeldb.OpenChannel + + // Addrs is the set of addresses that we can use to reach the target + // peer. + Addrs []net.Addr +} + +// ChannelEvent packages a new update of new channels since subscription, and +// channels that have been opened since prior channel event. +type ChannelEvent struct { + // ClosedChans are the set of channels that have been closed since the + // last event. + ClosedChans []wire.OutPoint + + // NewChans is the set of channels that have been opened since the last + // event. + NewChans []ChannelWithAddrs +} + +// ChannelSubscription represents an intent to be notified of any updates to +// the primary channel state. +type ChannelSubscription struct { + // ChanUpdates is a read-only channel that will be sent upon once the + // primary channel state is updated. + ChanUpdates <-chan ChannelEvent + + // Cancel is a closure that allows the caller to cancel their + // subscription and free up any resources allocated. + Cancel func() +} + +// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to +// be notified of any changes to the primary channel state. +type ChannelNotifier interface { + // SubscribeChans requests a new channel subscription relative to the + // initial set of known channels. We use the knownChans as a + // synchronization point to ensure that the chanbackup.SubSwapper does + // not miss any channel open or close events in the period between when + // it's created, and when it requests the channel subscription. + SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error) +} + +// SubSwapper subscribes to new updates to the open channel state, and then +// swaps out the on-disk channel backup state in response. This sub-system +// that will ensure that the multi chan backup file on disk will always be +// updated with the latest channel back up state. We'll receive new +// opened/closed channels from the ChannelNotifier, then use the Swapper to +// update the file state on disk with the new set of open channels. This can +// be used to implement a system that always keeps the multi-chan backup file +// on disk in a consistent state for safety purposes. +// +// TODO(roasbeef): better name lol +type SubSwapper struct { + started uint32 + stopped uint32 + + // backupState are the set of SCBs for all open channels we know of. + backupState map[wire.OutPoint]Single + + // chanEvents is an active subscription to receive new channel state + // over. + chanEvents *ChannelSubscription + + // keyRing is the main key ring that will allow us to pack the new + // multi backup. + keyRing keychain.KeyRing + + Swapper + + quit chan struct{} + wg sync.WaitGroup +} + +// NewSubSwapper creates a new instance of the SubSwapper given the starting +// set of channels, and the required interfaces to be notified of new channel +// updates, pack a multi backup, and swap the current best backup from its +// storage location. +func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier, + keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) { + + // First, we'll subscribe to the latest set of channel updates given + // the set of channels we already know of. + knownChans := make(map[wire.OutPoint]struct{}) + for _, chanBackup := range startingChans { + knownChans[chanBackup.FundingOutpoint] = struct{}{} + } + chanEvents, err := chanNotifier.SubscribeChans(knownChans) + if err != nil { + return nil, err + } + + // Next, we'll construct our own backup state so we can add/remove + // channels that have been opened and closed. + backupState := make(map[wire.OutPoint]Single) + for _, chanBackup := range startingChans { + backupState[chanBackup.FundingOutpoint] = chanBackup + } + + return &SubSwapper{ + backupState: backupState, + chanEvents: chanEvents, + keyRing: keyRing, + Swapper: backupSwapper, + quit: make(chan struct{}), + }, nil +} + +// Start starts the chanbackup.SubSwapper. +func (s *SubSwapper) Start() error { + if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { + return nil + } + + log.Infof("Starting chanbackup.SubSwapper") + + s.wg.Add(1) + go s.backupUpdater() + + return nil +} + +// Stop signals the SubSwapper to being a graceful shutdown. +func (s *SubSwapper) Stop() error { + if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { + return nil + } + + log.Infof("Stopping chanbackup.SubSwapper") + + close(s.quit) + s.wg.Wait() + + return nil +} + +// backupFileUpdater is the primary goroutine of the SubSwapper which is +// responsible for listening for changes to the channel, and updating the +// persistent multi backup state with a new packed multi of the latest channel +// state. +func (s *SubSwapper) backupUpdater() { + // Ensure that once we exit, we'll cancel our active channel + // subscription. + defer s.chanEvents.Cancel() + defer s.wg.Done() + + log.Debugf("SubSwapper's backupUpdater is active!") + + for { + select { + // The channel state has been modified! We'll evaluate all + // changes, and swap out the old packed multi with a new one + // with the latest channel state. + case chanUpdate := <-s.chanEvents.ChanUpdates: + oldStateSize := len(s.backupState) + + // For all new open channels, we'll create a new SCB + // given the required information. + for _, newChan := range chanUpdate.NewChans { + log.Debugf("Adding chanenl %v to backup state", + newChan.FundingOutpoint) + + s.backupState[newChan.FundingOutpoint] = NewSingle( + newChan.OpenChannel, newChan.Addrs, + ) + } + + // For all closed channels, we'll remove the prior + // backup state. + for _, closedChan := range chanUpdate.ClosedChans { + log.Debugf("Removing channel %v from backup "+ + "state", newLogClosure(func() string { + return closedChan.String() + })) + + delete(s.backupState, closedChan) + } + + newStateSize := len(s.backupState) + + // With our updated channel state obtained, we'll + // create a new multi from our series of singles. + var newMulti Multi + for _, backup := range s.backupState { + newMulti.StaticBackups = append( + newMulti.StaticBackups, backup, + ) + } + + // Now that our multi has been assembled, we'll attempt + // to pack (encrypt+encode) the new channel state to + // our target reader. + var b bytes.Buffer + err := newMulti.PackToWriter(&b, s.keyRing) + if err != nil { + log.Errorf("unable to pack multi backup: %v", + err) + continue + } + + log.Infof("Updating on-disk multi SCB backup: "+ + "num_old_chans=%v, num_new_chans=%v", + oldStateSize, newStateSize) + + // Finally, we'll swap out the old backup for this new + // one in a single atomic step. + err = s.Swapper.UpdateAndSwap( + PackedMulti(b.Bytes()), + ) + if err != nil { + log.Errorf("unable to update multi "+ + "backup: %v", err) + continue + } + + // Exit at once if a quit signal is detected. + case <-s.quit: + return + } + } +} diff --git a/chanbackup/pubsub_test.go b/chanbackup/pubsub_test.go new file mode 100644 index 00000000..f571c8b7 --- /dev/null +++ b/chanbackup/pubsub_test.go @@ -0,0 +1,234 @@ +package chanbackup + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/keychain" +) + +type mockSwapper struct { + fail bool + + swaps chan PackedMulti +} + +func newMockSwapper() *mockSwapper { + return &mockSwapper{ + swaps: make(chan PackedMulti), + } +} + +func (m *mockSwapper) UpdateAndSwap(newBackup PackedMulti) error { + if m.fail { + return fmt.Errorf("fail") + } + + m.swaps <- newBackup + + return nil +} + +type mockChannelNotifier struct { + fail bool + + chanEvents chan ChannelEvent +} + +func newMockChannelNotifier() *mockChannelNotifier { + return &mockChannelNotifier{ + chanEvents: make(chan ChannelEvent), + } +} + +func (m *mockChannelNotifier) SubscribeChans(chans map[wire.OutPoint]struct{}) ( + *ChannelSubscription, error) { + + if m.fail { + return nil, fmt.Errorf("fail") + } + + return &ChannelSubscription{ + ChanUpdates: m.chanEvents, + Cancel: func() { + }, + }, nil +} + +// TestNewSubSwapperSubscribeFail tests that if we're unable to obtain a +// channel subscription, then the entire sub-swapper will fail to start. +func TestNewSubSwapperSubscribeFail(t *testing.T) { + t.Parallel() + + keyRing := &mockKeyRing{} + + var swapper mockSwapper + chanNotifier := mockChannelNotifier{ + fail: true, + } + + _, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper) + if err == nil { + t.Fatalf("expected fail due to lack of subscription") + } +} + +func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper, + subSwapper *SubSwapper, keyRing keychain.KeyRing, + expectedChanSet map[wire.OutPoint]Single) { + + t.Helper() + + select { + case newPackedMulti := <-swapper.swaps: + // If we unpack the new multi, then we should find all the old + // channels, and also the new channel included and any deleted + // channel omitted.. + newMulti, err := newPackedMulti.Unpack(keyRing) + if err != nil { + t.Fatalf("unable to unpack multi: %v", err) + } + + // Ensure that once unpacked, the current backup has the + // expected number of Singles. + if len(newMulti.StaticBackups) != len(expectedChanSet) { + t.Fatalf("new backup wasn't included: expected %v "+ + "backups have %v", len(expectedChanSet), + len(newMulti.StaticBackups)) + } + + // We should also find all the old and new channels in this new + // backup. + for _, backup := range newMulti.StaticBackups { + _, ok := expectedChanSet[backup.FundingOutpoint] + if !ok { + t.Fatalf("didn't find backup in original set: %v", + backup.FundingOutpoint) + } + } + + // The internal state of the sub-swapper should also be one + // larger. + if !reflect.DeepEqual(expectedChanSet, subSwapper.backupState) { + t.Fatalf("backup set doesn't match: expected %v got %v", + spew.Sdump(expectedChanSet), + spew.Sdump(subSwapper.backupState)) + } + + case <-time.After(time.Second * 5): + t.Fatalf("update swapper didn't swap out multi") + } +} + +// TestSubSwapperIdempotentStartStop tests that calling the Start/Stop methods +// multiple time is permitted. +func TestSubSwapperIdempotentStartStop(t *testing.T) { + t.Parallel() + + keyRing := &mockKeyRing{} + + var ( + swapper mockSwapper + chanNotifier mockChannelNotifier + ) + + subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper) + if err != nil { + t.Fatalf("unable to init subSwapper: %v", err) + } + + subSwapper.Start() + subSwapper.Start() + + subSwapper.Stop() + subSwapper.Stop() +} + +// TestSubSwapperUpdater tests that the SubSwapper will properly swap out +// new/old channels within the channel set, and notify the swapper to update +// the master multi file backup. +func TestSubSwapperUpdater(t *testing.T) { + t.Parallel() + + keyRing := &mockKeyRing{} + chanNotifier := newMockChannelNotifier() + swapper := newMockSwapper() + + // First, we'll start out by creating a channels set for the initial + // set of channels known to the sub-swapper. + const numStartingChans = 3 + initialChanSet := make([]Single, 0, numStartingChans) + backupSet := make(map[wire.OutPoint]Single) + for i := 0; i < numStartingChans; i++ { + channel, err := genRandomOpenChannelShell() + if err != nil { + t.Fatalf("unable to make test chan: %v", err) + } + + single := NewSingle(channel, nil) + + backupSet[channel.FundingOutpoint] = single + initialChanSet = append(initialChanSet, single) + } + + // With our channel set created, we'll make a fresh sub swapper + // instance to begin our test. + subSwapper, err := NewSubSwapper( + initialChanSet, chanNotifier, keyRing, swapper, + ) + if err != nil { + t.Fatalf("unable to make swapper: %v", err) + } + if err := subSwapper.Start(); err != nil { + t.Fatalf("unable to start sub swapper: %v", err) + } + defer subSwapper.Stop() + + // Now that the sub-swapper is active, we'll notify to add a brand new + // channel to the channel state. + newChannel, err := genRandomOpenChannelShell() + if err != nil { + t.Fatalf("unable to create new chan: %v", err) + } + + // With the new channel created, we'll send a new update to the main + // goroutine telling it about this new channel. + select { + case chanNotifier.chanEvents <- ChannelEvent{ + NewChans: []ChannelWithAddrs{ + { + OpenChannel: newChannel, + }, + }, + }: + case <-time.After(time.Second * 5): + t.Fatalf("update swapper didn't read new channel: %v", err) + } + + backupSet[newChannel.FundingOutpoint] = NewSingle(newChannel, nil) + + // At this point, the sub-swapper should now have packed a new multi, + // and then sent it to the swapper so the back up can be updated. + assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet) + + // We'll now trigger an update to remove an existing channel. + chanToDelete := initialChanSet[0].FundingOutpoint + select { + case chanNotifier.chanEvents <- ChannelEvent{ + ClosedChans: []wire.OutPoint{chanToDelete}, + }: + + case <-time.After(time.Second * 5): + t.Fatalf("update swapper didn't read new channel: %v", err) + } + + delete(backupSet, chanToDelete) + + // Verify that the new set of backups, now has one less after the + // sub-swapper switches the new set with the old. + assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet) +}