chanbackup: always combine new backup data w/ on-disk state
In this commit, we fix a bug that could possibly cause a user's on disk back up file to be wiped out, if they ever started _another_ lnd node with different channel state. To remedy this, before we swap out the channel state with what's on disk, we'll first read out the contents of the on-disk SCB file and _combine_ that with what we have in memory. Fixes #4377
This commit is contained in:
parent
87880c0d56
commit
fc65c9b2cc
@ -18,12 +18,14 @@ type Swapper interface {
|
|||||||
// UpdateAndSwap attempts to atomically update the main multi back up
|
// UpdateAndSwap attempts to atomically update the main multi back up
|
||||||
// file location with the new fully packed multi-channel backup.
|
// file location with the new fully packed multi-channel backup.
|
||||||
UpdateAndSwap(newBackup PackedMulti) error
|
UpdateAndSwap(newBackup PackedMulti) error
|
||||||
|
|
||||||
|
// ExtractMulti attempts to obtain and decode the current SCB instance
|
||||||
|
// stored by the Swapper instance.
|
||||||
|
ExtractMulti(keychain keychain.KeyRing) (*Multi, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChannelWithAddrs bundles an open channel along with all the addresses for
|
// ChannelWithAddrs bundles an open channel along with all the addresses for
|
||||||
// the channel peer.
|
// the channel peer.
|
||||||
//
|
|
||||||
// TODO(roasbeef): use channel shell instead?
|
|
||||||
type ChannelWithAddrs struct {
|
type ChannelWithAddrs struct {
|
||||||
*channeldb.OpenChannel
|
*channeldb.OpenChannel
|
||||||
|
|
||||||
@ -75,8 +77,6 @@ type ChannelNotifier interface {
|
|||||||
// update the file state on disk with the new set of open channels. This can
|
// update the file state on disk with the new set of open channels. This can
|
||||||
// be used to implement a system that always keeps the multi-chan backup file
|
// be used to implement a system that always keeps the multi-chan backup file
|
||||||
// on disk in a consistent state for safety purposes.
|
// on disk in a consistent state for safety purposes.
|
||||||
//
|
|
||||||
// TODO(roasbeef): better name lol
|
|
||||||
type SubSwapper struct {
|
type SubSwapper struct {
|
||||||
started sync.Once
|
started sync.Once
|
||||||
stopped sync.Once
|
stopped sync.Once
|
||||||
@ -155,12 +155,50 @@ func (s *SubSwapper) Stop() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// updateBackupFile updates the backup file in place given the current state of
|
// updateBackupFile updates the backup file in place given the current state of
|
||||||
// the SubSwapper.
|
// the SubSwapper. We accept the set of channels that were closed between this
|
||||||
func (s *SubSwapper) updateBackupFile() error {
|
// update and the last to make sure we leave them out of our backup set union.
|
||||||
|
func (s *SubSwapper) updateBackupFile(closedChans ...wire.OutPoint) error {
|
||||||
|
// Before we pack the new set of SCBs, we'll first decode what we
|
||||||
|
// already have on-disk, to make sure we can decode it (proper seed)
|
||||||
|
// and that we're able to combine it with our new data.
|
||||||
|
diskMulti, err := s.Swapper.ExtractMulti(s.keyRing)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to extract on disk encrypted "+
|
||||||
|
"SCB: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we have channels stored on-disk, we'll create a new set of
|
||||||
|
// the combined old and new channels to make sure we retain what's
|
||||||
|
// already on-disk.
|
||||||
|
//
|
||||||
|
// NOTE: The ordering of this operations means that our in-memory
|
||||||
|
// structure will replace what we read from disk.
|
||||||
|
combinedBackup := make(map[wire.OutPoint]Single)
|
||||||
|
if diskMulti != nil {
|
||||||
|
for _, diskChannel := range diskMulti.StaticBackups {
|
||||||
|
chanPoint := diskChannel.FundingOutpoint
|
||||||
|
combinedBackup[chanPoint] = diskChannel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, memChannel := range s.backupState {
|
||||||
|
chanPoint := memChannel.FundingOutpoint
|
||||||
|
if _, ok := combinedBackup[chanPoint]; ok {
|
||||||
|
log.Warnf("Replacing disk backup for ChannelPoint(%v) "+
|
||||||
|
"w/ newer version", chanPoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
combinedBackup[chanPoint] = memChannel
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the set of closed channels from the final set of backups.
|
||||||
|
for _, closedChan := range closedChans {
|
||||||
|
delete(combinedBackup, closedChan)
|
||||||
|
}
|
||||||
|
|
||||||
// With our updated channel state obtained, we'll create a new multi
|
// With our updated channel state obtained, we'll create a new multi
|
||||||
// from our series of singles.
|
// from our series of singles.
|
||||||
var newMulti Multi
|
var newMulti Multi
|
||||||
for _, backup := range s.backupState {
|
for _, backup := range combinedBackup {
|
||||||
newMulti.StaticBackups = append(
|
newMulti.StaticBackups = append(
|
||||||
newMulti.StaticBackups, backup,
|
newMulti.StaticBackups, backup,
|
||||||
)
|
)
|
||||||
@ -169,13 +207,14 @@ func (s *SubSwapper) updateBackupFile() error {
|
|||||||
// Now that our multi has been assembled, we'll attempt to pack
|
// Now that our multi has been assembled, we'll attempt to pack
|
||||||
// (encrypt+encode) the new channel state to our target reader.
|
// (encrypt+encode) the new channel state to our target reader.
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
err := newMulti.PackToWriter(&b, s.keyRing)
|
err = newMulti.PackToWriter(&b, s.keyRing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to pack multi backup: %v", err)
|
return fmt.Errorf("unable to pack multi backup: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, we'll swap out the old backup for this new one in a single
|
// Finally, we'll swap out the old backup for this new one in a single
|
||||||
// atomic step.
|
// atomic step, combining the file already on-disk with this set of new
|
||||||
|
// channels.
|
||||||
err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
|
err = s.Swapper.UpdateAndSwap(PackedMulti(b.Bytes()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to update multi backup: %v", err)
|
return fmt.Errorf("unable to update multi backup: %v", err)
|
||||||
@ -198,6 +237,8 @@ func (s *SubSwapper) backupUpdater() {
|
|||||||
|
|
||||||
// Before we enter our main loop, we'll update the on-disk state with
|
// Before we enter our main loop, we'll update the on-disk state with
|
||||||
// the latest Single state, as nodes may have new advertised addresses.
|
// the latest Single state, as nodes may have new advertised addresses.
|
||||||
|
//
|
||||||
|
// TODO(roasbeef): do in Start() so we don't start with an invalid SCB?
|
||||||
if err := s.updateBackupFile(); err != nil {
|
if err := s.updateBackupFile(); err != nil {
|
||||||
log.Errorf("Unable to refresh backup file: %v", err)
|
log.Errorf("Unable to refresh backup file: %v", err)
|
||||||
}
|
}
|
||||||
@ -223,6 +264,9 @@ func (s *SubSwapper) backupUpdater() {
|
|||||||
|
|
||||||
// For all closed channels, we'll remove the prior
|
// For all closed channels, we'll remove the prior
|
||||||
// backup state.
|
// backup state.
|
||||||
|
closedChans := make(
|
||||||
|
[]wire.OutPoint, 0, len(chanUpdate.ClosedChans),
|
||||||
|
)
|
||||||
for i, closedChan := range chanUpdate.ClosedChans {
|
for i, closedChan := range chanUpdate.ClosedChans {
|
||||||
log.Debugf("Removing channel %v from backup "+
|
log.Debugf("Removing channel %v from backup "+
|
||||||
"state", newLogClosure(func() string {
|
"state", newLogClosure(func() string {
|
||||||
@ -230,6 +274,8 @@ func (s *SubSwapper) backupUpdater() {
|
|||||||
}))
|
}))
|
||||||
|
|
||||||
delete(s.backupState, closedChan)
|
delete(s.backupState, closedChan)
|
||||||
|
|
||||||
|
closedChans = append(closedChans, closedChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
newStateSize := len(s.backupState)
|
newStateSize := len(s.backupState)
|
||||||
@ -240,7 +286,7 @@ func (s *SubSwapper) backupUpdater() {
|
|||||||
|
|
||||||
// With out new state constructed, we'll, atomically
|
// With out new state constructed, we'll, atomically
|
||||||
// update the on-disk backup state.
|
// update the on-disk backup state.
|
||||||
if err := s.updateBackupFile(); err != nil {
|
if err := s.updateBackupFile(closedChans...); err != nil {
|
||||||
log.Errorf("unable to update backup file: %v",
|
log.Errorf("unable to update backup file: %v",
|
||||||
err)
|
err)
|
||||||
}
|
}
|
||||||
|
@ -2,12 +2,10 @@ package chanbackup
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/davecgh/go-spew/spew"
|
|
||||||
"github.com/lightningnetwork/lnd/keychain"
|
"github.com/lightningnetwork/lnd/keychain"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -15,11 +13,17 @@ type mockSwapper struct {
|
|||||||
fail bool
|
fail bool
|
||||||
|
|
||||||
swaps chan PackedMulti
|
swaps chan PackedMulti
|
||||||
|
|
||||||
|
swapState *Multi
|
||||||
|
|
||||||
|
keyChain keychain.KeyRing
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMockSwapper() *mockSwapper {
|
func newMockSwapper(keychain keychain.KeyRing) *mockSwapper {
|
||||||
return &mockSwapper{
|
return &mockSwapper{
|
||||||
swaps: make(chan PackedMulti),
|
swaps: make(chan PackedMulti),
|
||||||
|
keyChain: keychain,
|
||||||
|
swapState: &Multi{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,11 +32,22 @@ func (m *mockSwapper) UpdateAndSwap(newBackup PackedMulti) error {
|
|||||||
return fmt.Errorf("fail")
|
return fmt.Errorf("fail")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
swapState, err := newBackup.Unpack(m.keyChain)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to decode on disk swaps: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.swapState = swapState
|
||||||
|
|
||||||
m.swaps <- newBackup
|
m.swaps <- newBackup
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockSwapper) ExtractMulti(keychain keychain.KeyRing) (*Multi, error) {
|
||||||
|
return m.swapState, nil
|
||||||
|
}
|
||||||
|
|
||||||
type mockChannelNotifier struct {
|
type mockChannelNotifier struct {
|
||||||
fail bool
|
fail bool
|
||||||
|
|
||||||
@ -87,7 +102,7 @@ func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper,
|
|||||||
case newPackedMulti := <-swapper.swaps:
|
case newPackedMulti := <-swapper.swaps:
|
||||||
// If we unpack the new multi, then we should find all the old
|
// If we unpack the new multi, then we should find all the old
|
||||||
// channels, and also the new channel included and any deleted
|
// channels, and also the new channel included and any deleted
|
||||||
// channel omitted..
|
// channel omitted.
|
||||||
newMulti, err := newPackedMulti.Unpack(keyRing)
|
newMulti, err := newPackedMulti.Unpack(keyRing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to unpack multi: %v", err)
|
t.Fatalf("unable to unpack multi: %v", err)
|
||||||
@ -111,12 +126,19 @@ func assertExpectedBackupSwap(t *testing.T, swapper *mockSwapper,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The internal state of the sub-swapper should also be one
|
// The same applies for our in-memory state, but it's also
|
||||||
// larger.
|
// possible for there to be items in the on-disk state that we
|
||||||
if !reflect.DeepEqual(expectedChanSet, subSwapper.backupState) {
|
// don't know of explicit.
|
||||||
t.Fatalf("backup set doesn't match: expected %v got %v",
|
newChans := make(map[wire.OutPoint]Single)
|
||||||
spew.Sdump(expectedChanSet),
|
for _, newChan := range newMulti.StaticBackups {
|
||||||
spew.Sdump(subSwapper.backupState))
|
newChans[newChan.FundingOutpoint] = newChan
|
||||||
|
}
|
||||||
|
for _, backup := range subSwapper.backupState {
|
||||||
|
_, ok := newChans[backup.FundingOutpoint]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("didn't find backup in original set: %v",
|
||||||
|
backup.FundingOutpoint)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-time.After(time.Second * 5):
|
case <-time.After(time.Second * 5):
|
||||||
@ -133,7 +155,7 @@ func TestSubSwapperIdempotentStartStop(t *testing.T) {
|
|||||||
|
|
||||||
var chanNotifier mockChannelNotifier
|
var chanNotifier mockChannelNotifier
|
||||||
|
|
||||||
swapper := newMockSwapper()
|
swapper := newMockSwapper(keyRing)
|
||||||
subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper)
|
subSwapper, err := NewSubSwapper(nil, &chanNotifier, keyRing, swapper)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to init subSwapper: %v", err)
|
t.Fatalf("unable to init subSwapper: %v", err)
|
||||||
@ -162,7 +184,7 @@ func TestSubSwapperUpdater(t *testing.T) {
|
|||||||
|
|
||||||
keyRing := &mockKeyRing{}
|
keyRing := &mockKeyRing{}
|
||||||
chanNotifier := newMockChannelNotifier()
|
chanNotifier := newMockChannelNotifier()
|
||||||
swapper := newMockSwapper()
|
swapper := newMockSwapper(keyRing)
|
||||||
|
|
||||||
// First, we'll start out by creating a channels set for the initial
|
// First, we'll start out by creating a channels set for the initial
|
||||||
// set of channels known to the sub-swapper.
|
// set of channels known to the sub-swapper.
|
||||||
@ -181,6 +203,24 @@ func TestSubSwapperUpdater(t *testing.T) {
|
|||||||
initialChanSet = append(initialChanSet, single)
|
initialChanSet = append(initialChanSet, single)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We'll also generate two additional channels which will already be
|
||||||
|
// present on disk. However, these will at first only be known by the
|
||||||
|
// on disk backup (the backup set).
|
||||||
|
const numDiskChans = 2
|
||||||
|
for i := 0; i < numDiskChans; i++ {
|
||||||
|
channel, err := genRandomOpenChannelShell()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to make test chan: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
single := NewSingle(channel, nil)
|
||||||
|
|
||||||
|
backupSet[channel.FundingOutpoint] = single
|
||||||
|
swapper.swapState.StaticBackups = append(
|
||||||
|
swapper.swapState.StaticBackups, single,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// With our channel set created, we'll make a fresh sub swapper
|
// With our channel set created, we'll make a fresh sub swapper
|
||||||
// instance to begin our test.
|
// instance to begin our test.
|
||||||
subSwapper, err := NewSubSwapper(
|
subSwapper, err := NewSubSwapper(
|
||||||
|
Loading…
Reference in New Issue
Block a user