chanbackup: add new sub-system for keeping the on disk channels.backup up to date
In this commit, we introduce the chanbackup.SubSwapper interface. It takes a regular Swapper implementation (defined by the chanbackup.SubSwapper) interface along with a chanbackup.ChannelNotifier implementation. Given these two interfaces, we're able to be notified when a new channel is opened or closed, and then use the Swapper to atomically replace the on-disk channel back up. As a result, a Lightning daemon can ensure that they alwayts have a up to date channels.backup on disk that can safely be copied away by users and be used to restore channel funds in the event of partial/total data loss.
This commit is contained in:
parent
60999df08f
commit
3b370fa08d
247
chanbackup/pubsub.go
Normal file
247
chanbackup/pubsub.go
Normal file
@ -0,0 +1,247 @@
|
||||
package chanbackup
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
)
|
||||
|
||||
// Swapper is an interface that allows the chanbackup.SubSwapper to update the
|
||||
// main multi backup location once it learns of new channels or that prior
|
||||
// channels have been closed.
|
||||
type Swapper interface {
|
||||
// UpdateAndSwap attempts to atomically update the main multi back up
|
||||
// file location with the new fully packed multi-channel backup.
|
||||
UpdateAndSwap(newBackup PackedMulti) error
|
||||
}
|
||||
|
||||
// ChannelWithAddrs bundles an open channel along with all the addresses for
|
||||
// the channel peer.
|
||||
//
|
||||
// TODO(roasbeef): use channel shell instead?
|
||||
type ChannelWithAddrs struct {
|
||||
*channeldb.OpenChannel
|
||||
|
||||
// Addrs is the set of addresses that we can use to reach the target
|
||||
// peer.
|
||||
Addrs []net.Addr
|
||||
}
|
||||
|
||||
// ChannelEvent packages a new update of new channels since subscription, and
|
||||
// channels that have been opened since prior channel event.
|
||||
type ChannelEvent struct {
|
||||
// ClosedChans are the set of channels that have been closed since the
|
||||
// last event.
|
||||
ClosedChans []wire.OutPoint
|
||||
|
||||
// NewChans is the set of channels that have been opened since the last
|
||||
// event.
|
||||
NewChans []ChannelWithAddrs
|
||||
}
|
||||
|
||||
// ChannelSubscription represents an intent to be notified of any updates to
|
||||
// the primary channel state.
|
||||
type ChannelSubscription struct {
|
||||
// ChanUpdates is a read-only channel that will be sent upon once the
|
||||
// primary channel state is updated.
|
||||
ChanUpdates <-chan ChannelEvent
|
||||
|
||||
// Cancel is a closure that allows the caller to cancel their
|
||||
// subscription and free up any resources allocated.
|
||||
Cancel func()
|
||||
}
|
||||
|
||||
// ChannelNotifier represents a system that allows the chanbackup.SubSwapper to
|
||||
// be notified of any changes to the primary channel state.
|
||||
type ChannelNotifier interface {
|
||||
// SubscribeChans requests a new channel subscription relative to the
|
||||
// initial set of known channels. We use the knownChans as a
|
||||
// synchronization point to ensure that the chanbackup.SubSwapper does
|
||||
// not miss any channel open or close events in the period between when
|
||||
// it's created, and when it requests the channel subscription.
|
||||
SubscribeChans(map[wire.OutPoint]struct{}) (*ChannelSubscription, error)
|
||||
}
|
||||
|
||||
// SubSwapper subscribes to new updates to the open channel state, and then
|
||||
// swaps out the on-disk channel backup state in response. This sub-system
|
||||
// that will ensure that the multi chan backup file on disk will always be
|
||||
// updated with the latest channel back up state. We'll receive new
|
||||
// opened/closed channels from the ChannelNotifier, then use the Swapper to
|
||||
// update the file state on disk with the new set of open channels. This can
|
||||
// be used to implement a system that always keeps the multi-chan backup file
|
||||
// on disk in a consistent state for safety purposes.
|
||||
//
|
||||
// TODO(roasbeef): better name lol
|
||||
type SubSwapper struct {
|
||||
started uint32
|
||||
stopped uint32
|
||||
|
||||
// backupState are the set of SCBs for all open channels we know of.
|
||||
backupState map[wire.OutPoint]Single
|
||||
|
||||
// chanEvents is an active subscription to receive new channel state
|
||||
// over.
|
||||
chanEvents *ChannelSubscription
|
||||
|
||||
// keyRing is the main key ring that will allow us to pack the new
|
||||
// multi backup.
|
||||
keyRing keychain.KeyRing
|
||||
|
||||
Swapper
|
||||
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewSubSwapper creates a new instance of the SubSwapper given the starting
|
||||
// set of channels, and the required interfaces to be notified of new channel
|
||||
// updates, pack a multi backup, and swap the current best backup from its
|
||||
// storage location.
|
||||
func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
|
||||
keyRing keychain.KeyRing, backupSwapper Swapper) (*SubSwapper, error) {
|
||||
|
||||
// First, we'll subscribe to the latest set of channel updates given
|
||||
// the set of channels we already know of.
|
||||
knownChans := make(map[wire.OutPoint]struct{})
|
||||
for _, chanBackup := range startingChans {
|
||||
knownChans[chanBackup.FundingOutpoint] = struct{}{}
|
||||
}
|
||||
chanEvents, err := chanNotifier.SubscribeChans(knownChans)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Next, we'll construct our own backup state so we can add/remove
|
||||
// channels that have been opened and closed.
|
||||
backupState := make(map[wire.OutPoint]Single)
|
||||
for _, chanBackup := range startingChans {
|
||||
backupState[chanBackup.FundingOutpoint] = chanBackup
|
||||
}
|
||||
|
||||
return &SubSwapper{
|
||||
backupState: backupState,
|
||||
chanEvents: chanEvents,
|
||||
keyRing: keyRing,
|
||||
Swapper: backupSwapper,
|
||||
quit: make(chan struct{}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Start starts the chanbackup.SubSwapper.
|
||||
func (s *SubSwapper) Start() error {
|
||||
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Infof("Starting chanbackup.SubSwapper")
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.backupUpdater()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop signals the SubSwapper to being a graceful shutdown.
|
||||
func (s *SubSwapper) Stop() error {
|
||||
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Infof("Stopping chanbackup.SubSwapper")
|
||||
|
||||
close(s.quit)
|
||||
s.wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// backupFileUpdater is the primary goroutine of the SubSwapper which is
|
||||
// responsible for listening for changes to the channel, and updating the
|
||||
// persistent multi backup state with a new packed multi of the latest channel
|
||||
// state.
|
||||
func (s *SubSwapper) backupUpdater() {
|
||||
// Ensure that once we exit, we'll cancel our active channel
|
||||
// subscription.
|
||||
defer s.chanEvents.Cancel()
|
||||
defer s.wg.Done()
|
||||
|
||||
log.Debugf("SubSwapper's backupUpdater is active!")
|
||||
|
||||
for {
|
||||
select {
|
||||
// The channel state has been modified! We'll evaluate all
|
||||
// changes, and swap out the old packed multi with a new one
|
||||
// with the latest channel state.
|
||||
case chanUpdate := <-s.chanEvents.ChanUpdates:
|
||||
oldStateSize := len(s.backupState)
|
||||
|
||||
// For all new open channels, we'll create a new SCB
|
||||
// given the required information.
|
||||
for _, newChan := range chanUpdate.NewChans {
|
||||
log.Debugf("Adding chanenl %v to backup state",
|
||||
newChan.FundingOutpoint)
|
||||
|
||||
s.backupState[newChan.FundingOutpoint] = NewSingle(
|
||||
newChan.OpenChannel, newChan.Addrs,
|
||||
)
|
||||
}
|
||||
|
||||
// For all closed channels, we'll remove the prior
|
||||
// backup state.
|
||||
for _, closedChan := range chanUpdate.ClosedChans {
|
||||
log.Debugf("Removing channel %v from backup "+
|
||||
"state", newLogClosure(func() string {
|
||||
return closedChan.String()
|
||||
}))
|
||||
|
||||
delete(s.backupState, closedChan)
|
||||
}
|
||||
|
||||
newStateSize := len(s.backupState)
|
||||
|
||||
// With our updated channel state obtained, we'll
|
||||
// create a new multi from our series of singles.
|
||||
var newMulti Multi
|
||||
for _, backup := range s.backupState {
|
||||
newMulti.StaticBackups = append(
|
||||
newMulti.StaticBackups, backup,
|
||||
)
|
||||
}
|
||||
|
||||
// Now that our multi has been assembled, we'll attempt
|
||||
// to pack (encrypt+encode) the new channel state to
|
||||
// our target reader.
|
||||
var b bytes.Buffer
|
||||
err := newMulti.PackToWriter(&b, s.keyRing)
|
||||
if err != nil {
|
||||
log.Errorf("unable to pack multi backup: %v",
|
||||
err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("Updating on-disk multi SCB backup: "+
|
||||
"num_old_chans=%v, num_new_chans=%v",
|
||||
oldStateSize, newStateSize)
|
||||
|
||||
// Finally, we'll swap out the old backup for this new
|
||||
// one in a single atomic step.
|
||||
err = s.Swapper.UpdateAndSwap(
|
||||
PackedMulti(b.Bytes()),
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("unable to update multi "+
|
||||
"backup: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Exit at once if a quit signal is detected.
|
||||
case <-s.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
234
chanbackup/pubsub_test.go
Normal file
234
chanbackup/pubsub_test.go
Normal file
@ -0,0 +1,234 @@
|
||||
package chanbackup
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
)
|
||||
|
||||
type mockSwapper struct {
|
||||
fail bool
|
||||
|
||||
swaps chan PackedMulti
|
||||
}
|
||||
|
||||
func newMockSwapper() *mockSwapper {
|
||||
return &mockSwapper{
|
||||
swaps: make(chan PackedMulti),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockSwapper) UpdateAndSwap(newBackup PackedMulti) error {
|
||||
if m.fail {
|
||||
return fmt.Errorf("fail")
|
||||
}
|
||||
|
||||
m.swaps <- newBackup
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type mockChannelNotifier struct {
|
||||
fail bool
|
||||
|
||||
chanEvents chan ChannelEvent
|
||||
}
|
||||
|
||||
func newMockChannelNotifier() *mockChannelNotifier {
|
||||
return &mockChannelNotifier{
|
||||
chanEvents: make(chan ChannelEvent),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockChannelNotifier) SubscribeChans(chans map[wire.OutPoint]struct{}) (
|
||||
*ChannelSubscription, error) {
|
||||
|
||||
if m.fail {
|
||||
return nil, fmt.Errorf("fail")
|
||||
}
|
||||
|
||||
return &ChannelSubscription{
|
||||
ChanUpdates: m.chanEvents,
|
||||
Cancel: func() {
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TestNewSubSwapperSubscribeFail tests that if we're unable to obtain a
|
||||
// channel subscription, then the entire sub-swapper will fail to start.
|
||||
func TestNewSubSwapperSubscribeFail(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
keyRing := &mockKeyRing{}
|
||||
|
||||
var swapper mockSwapper
|
||||
chanNotifier := mockChannelNotifier{
|
||||
fail: true,
|
||||
}
|
||||
|
||||
_, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper)
|
||||
if err == nil {
|
||||
t.Fatalf("expected fail due to lack of subscription")
|
||||
}
|
||||
}
|
||||
|
||||
func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper,
|
||||
subSwapper *SubSwapper, keyRing keychain.KeyRing,
|
||||
expectedChanSet map[wire.OutPoint]Single) {
|
||||
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case newPackedMulti := <-swapper.swaps:
|
||||
// If we unpack the new multi, then we should find all the old
|
||||
// channels, and also the new channel included and any deleted
|
||||
// channel omitted..
|
||||
newMulti, err := newPackedMulti.Unpack(keyRing)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to unpack multi: %v", err)
|
||||
}
|
||||
|
||||
// Ensure that once unpacked, the current backup has the
|
||||
// expected number of Singles.
|
||||
if len(newMulti.StaticBackups) != len(expectedChanSet) {
|
||||
t.Fatalf("new backup wasn't included: expected %v "+
|
||||
"backups have %v", len(expectedChanSet),
|
||||
len(newMulti.StaticBackups))
|
||||
}
|
||||
|
||||
// We should also find all the old and new channels in this new
|
||||
// backup.
|
||||
for _, backup := range newMulti.StaticBackups {
|
||||
_, ok := expectedChanSet[backup.FundingOutpoint]
|
||||
if !ok {
|
||||
t.Fatalf("didn't find backup in original set: %v",
|
||||
backup.FundingOutpoint)
|
||||
}
|
||||
}
|
||||
|
||||
// The internal state of the sub-swapper should also be one
|
||||
// larger.
|
||||
if !reflect.DeepEqual(expectedChanSet, subSwapper.backupState) {
|
||||
t.Fatalf("backup set doesn't match: expected %v got %v",
|
||||
spew.Sdump(expectedChanSet),
|
||||
spew.Sdump(subSwapper.backupState))
|
||||
}
|
||||
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("update swapper didn't swap out multi")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSubSwapperIdempotentStartStop tests that calling the Start/Stop methods
|
||||
// multiple time is permitted.
|
||||
func TestSubSwapperIdempotentStartStop(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
keyRing := &mockKeyRing{}
|
||||
|
||||
var (
|
||||
swapper mockSwapper
|
||||
chanNotifier mockChannelNotifier
|
||||
)
|
||||
|
||||
subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, &swapper)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to init subSwapper: %v", err)
|
||||
}
|
||||
|
||||
subSwapper.Start()
|
||||
subSwapper.Start()
|
||||
|
||||
subSwapper.Stop()
|
||||
subSwapper.Stop()
|
||||
}
|
||||
|
||||
// TestSubSwapperUpdater tests that the SubSwapper will properly swap out
|
||||
// new/old channels within the channel set, and notify the swapper to update
|
||||
// the master multi file backup.
|
||||
func TestSubSwapperUpdater(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
keyRing := &mockKeyRing{}
|
||||
chanNotifier := newMockChannelNotifier()
|
||||
swapper := newMockSwapper()
|
||||
|
||||
// First, we'll start out by creating a channels set for the initial
|
||||
// set of channels known to the sub-swapper.
|
||||
const numStartingChans = 3
|
||||
initialChanSet := make([]Single, 0, numStartingChans)
|
||||
backupSet := make(map[wire.OutPoint]Single)
|
||||
for i := 0; i < numStartingChans; i++ {
|
||||
channel, err := genRandomOpenChannelShell()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to make test chan: %v", err)
|
||||
}
|
||||
|
||||
single := NewSingle(channel, nil)
|
||||
|
||||
backupSet[channel.FundingOutpoint] = single
|
||||
initialChanSet = append(initialChanSet, single)
|
||||
}
|
||||
|
||||
// With our channel set created, we'll make a fresh sub swapper
|
||||
// instance to begin our test.
|
||||
subSwapper, err := NewSubSwapper(
|
||||
initialChanSet, chanNotifier, keyRing, swapper,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to make swapper: %v", err)
|
||||
}
|
||||
if err := subSwapper.Start(); err != nil {
|
||||
t.Fatalf("unable to start sub swapper: %v", err)
|
||||
}
|
||||
defer subSwapper.Stop()
|
||||
|
||||
// Now that the sub-swapper is active, we'll notify to add a brand new
|
||||
// channel to the channel state.
|
||||
newChannel, err := genRandomOpenChannelShell()
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create new chan: %v", err)
|
||||
}
|
||||
|
||||
// With the new channel created, we'll send a new update to the main
|
||||
// goroutine telling it about this new channel.
|
||||
select {
|
||||
case chanNotifier.chanEvents <- ChannelEvent{
|
||||
NewChans: []ChannelWithAddrs{
|
||||
{
|
||||
OpenChannel: newChannel,
|
||||
},
|
||||
},
|
||||
}:
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("update swapper didn't read new channel: %v", err)
|
||||
}
|
||||
|
||||
backupSet[newChannel.FundingOutpoint] = NewSingle(newChannel, nil)
|
||||
|
||||
// At this point, the sub-swapper should now have packed a new multi,
|
||||
// and then sent it to the swapper so the back up can be updated.
|
||||
assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet)
|
||||
|
||||
// We'll now trigger an update to remove an existing channel.
|
||||
chanToDelete := initialChanSet[0].FundingOutpoint
|
||||
select {
|
||||
case chanNotifier.chanEvents <- ChannelEvent{
|
||||
ClosedChans: []wire.OutPoint{chanToDelete},
|
||||
}:
|
||||
|
||||
case <-time.After(time.Second * 5):
|
||||
t.Fatalf("update swapper didn't read new channel: %v", err)
|
||||
}
|
||||
|
||||
delete(backupSet, chanToDelete)
|
||||
|
||||
// Verify that the new set of backups, now has one less after the
|
||||
// sub-swapper switches the new set with the old.
|
||||
assertExpectedBackupSwap(t, swapper, subSwapper, keyRing, backupSet)
|
||||
}
|
Loading…
Reference in New Issue
Block a user