Merge pull request #3993 from guggero/unconfirmed-chanbackup

chanbackup: update on-disk backup file with unconfirmed channels
This commit is contained in:
Olaoluwa Osuntokun 2020-03-09 17:18:23 -07:00 committed by GitHub
commit cbef26b9f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 212 additions and 52 deletions

@ -7,6 +7,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chanbackup"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
)
@ -51,6 +52,35 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{
quit := make(chan struct{})
chanUpdates := make(chan chanbackup.ChannelEvent, 1)
// sendChanOpenUpdate is a closure that sends a ChannelEvent to the
// chanUpdates channel to inform subscribers about new pending or
// confirmed channels.
sendChanOpenUpdate := func(newOrPendingChan *channeldb.OpenChannel) {
nodeAddrs, err := c.addrs.AddrsForNode(
newOrPendingChan.IdentityPub,
)
if err != nil {
pub := newOrPendingChan.IdentityPub
ltndLog.Errorf("unable to fetch addrs for %x: %v",
pub.SerializeCompressed(), err)
}
chanEvent := chanbackup.ChannelEvent{
NewChans: []chanbackup.ChannelWithAddrs{
{
OpenChannel: newOrPendingChan,
Addrs: nodeAddrs,
},
},
}
select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
}
// In order to adhere to the interface, we'll proxy the events from the
// channel notifier to the sub-swapper in a format it understands.
go func() {
@ -74,37 +104,18 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{
// TODO(roasbeef): batch dispatch ntnfs
switch event := e.(type) {
// A new channel has been opened and is still
// pending. We can still create a backup, even
// if the final channel ID is not yet available.
case channelnotifier.PendingOpenChannelEvent:
pendingChan := event.PendingChannel
sendChanOpenUpdate(pendingChan)
// A new channel has been opened, we'll obtain
// the node address, then send to the
// A new channel has been confirmed, we'll
// obtain the node address, then send to the
// sub-swapper.
case channelnotifier.OpenChannelEvent:
nodeAddrs, err := c.addrs.AddrsForNode(
event.Channel.IdentityPub,
)
if err != nil {
pub := event.Channel.IdentityPub
ltndLog.Errorf("unable to "+
"fetch addrs for %x: %v",
pub.SerializeCompressed(),
err)
}
channel := event.Channel
chanEvent := chanbackup.ChannelEvent{
NewChans: []chanbackup.ChannelWithAddrs{
{
OpenChannel: channel,
Addrs: nodeAddrs,
},
},
}
select {
case chanUpdates <- chanEvent:
case <-quit:
return
}
sendChanOpenUpdate(event.Channel)
// An existing channel has been closed, we'll
// send only the chanPoint of the closed

@ -23,8 +23,14 @@ type ChannelNotifier struct {
// PendingOpenChannelEvent represents a new event where a new channel has
// entered a pending open state.
type PendingOpenChannelEvent struct {
// ChannelPoint is the channelpoint for the new channel.
// ChannelPoint is the channel outpoint for the new channel.
ChannelPoint *wire.OutPoint
// PendingChannel is the channel configuration for the newly created
// channel. This might not have been persisted to the channel DB yet
// because we are still waiting for the final message from the remote
// peer.
PendingChannel *channeldb.OpenChannel
}
// OpenChannelEvent represents a new event where a channel goes from pending
@ -89,10 +95,18 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe()
}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a
// new channel is pending.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) {
event := PendingOpenChannelEvent{ChannelPoint: &chanPoint}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine
// that a new channel is pending. The pending channel is passed as a parameter
// instead of read from the database because it might not yet have been
// persisted to the DB because we still wait for the final message from the
// remote peer.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint,
pendingChan *channeldb.OpenChannel) {
event := PendingOpenChannelEvent{
ChannelPoint: &chanPoint,
PendingChannel: pendingChan,
}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send pending open channel update: %v", err)

@ -366,7 +366,7 @@ type fundingConfig struct {
// NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels
// enter a pending state.
NotifyPendingOpenChannelEvent func(wire.OutPoint)
NotifyPendingOpenChannelEvent func(wire.OutPoint, *channeldb.OpenChannel)
}
// fundingManager acts as an orchestrator/bridge between the wallet's
@ -1705,7 +1705,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(fundingOut)
f.cfg.NotifyPendingOpenChannelEvent(fundingOut, completeChan)
// At this point we have sent our last funding message to the
// initiating peer before the funding transaction will be broadcast.
@ -1853,7 +1853,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
case resCtx.updates <- upd:
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint)
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint, completeChan)
case <-f.quit:
return
}

@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
@ -48,6 +49,10 @@ const (
// testPollSleepMs is the number of milliseconds to sleep between
// each attempt to access the database to check its state.
testPollSleepMs = 500
// maxPending is the maximum number of channels we allow opening to the
// same peer in the max pending channels test.
maxPending = 4
)
var (
@ -138,6 +143,24 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
}, nil
}
type mockChanEvent struct {
openEvent chan wire.OutPoint
pendingOpenEvent chan channelnotifier.PendingOpenChannelEvent
}
func (m *mockChanEvent) NotifyOpenChannelEvent(outpoint wire.OutPoint) {
m.openEvent <- outpoint
}
func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint,
pendingChannel *channeldb.OpenChannel) {
m.pendingOpenEvent <- channelnotifier.PendingOpenChannelEvent{
ChannelPoint: &outpoint,
PendingChannel: pendingChannel,
}
}
type testNode struct {
privKey *btcec.PrivateKey
addr *lnwire.NetAddress
@ -147,6 +170,7 @@ type testNode struct {
fundingMgr *fundingManager
newChannels chan *newChannelMsg
mockNotifier *mockNotifier
mockChanEvent *mockChanEvent
testDir string
shutdownChannel chan struct{}
remoteFeatures []lnwire.FeatureBit
@ -274,6 +298,17 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
bestHeight: fundingBroadcastHeight,
}
// The mock channel event notifier will receive events for each pending
// open and open channel. Because some tests will create multiple
// channels in a row before advancing to the next step, these channels
// need to be buffered.
evt := &mockChanEvent{
openEvent: make(chan wire.OutPoint, maxPending),
pendingOpenEvent: make(
chan channelnotifier.PendingOpenChannelEvent, maxPending,
),
}
dbDir := filepath.Join(tempTestDir, "cdb")
cdb, err := channeldb.Open(dbDir)
if err != nil {
@ -379,9 +414,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
ZombieSweeperInterval: 1 * time.Hour,
ReservationTimeout: 1 * time.Nanosecond,
MaxPendingChannels: DefaultMaxPendingChannels,
NotifyOpenChannelEvent: func(wire.OutPoint) {},
NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent,
OpenChannelPredicate: chainedAcceptor,
NotifyPendingOpenChannelEvent: func(wire.OutPoint) {},
NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent,
}
for _, op := range options {
@ -404,6 +439,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
publTxChan: publTxChan,
fundingMgr: f,
mockNotifier: chainNotifier,
mockChanEvent: evt,
testDir: tempTestDir,
shutdownChannel: shutdownChan,
addr: addr,
@ -685,6 +721,18 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
t.Fatalf("alice did not publish funding tx")
}
// Make sure the notification about the pending channel was sent out.
select {
case <-alice.mockChanEvent.pendingOpenEvent:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send pending channel event")
}
select {
case <-bob.mockChanEvent.pendingOpenEvent:
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send pending channel event")
}
// Finally, make sure neither have active reservation for the channel
// now pending open in the database.
assertNumPendingReservations(t, alice, bobPubKey, 0)
@ -867,6 +915,18 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()
// Make sure the notification about the pending channel was sent out.
select {
case <-alice.mockChanEvent.openEvent:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send open channel event")
}
select {
case <-bob.mockChanEvent.openEvent:
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send open channel event")
}
assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
assertDatabaseState(t, bob, fundingOutPoint, markedOpen)
}
@ -2558,8 +2618,6 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
func TestFundingManagerMaxPendingChannels(t *testing.T) {
t.Parallel()
const maxPending = 4
alice, bob := setupFundingManagers(
t, func(cfg *fundingConfig) {
cfg.MaxPendingChannels = maxPending

@ -13318,6 +13318,15 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to find channel")
}
// To make sure the channel is removed from the backup file as well when
// being abandoned, grab a backup snapshot so we can compare it with the
// later state.
bkupBefore, err := ioutil.ReadFile(net.Alice.ChanBackupPath())
if err != nil {
t.Fatalf("could not get channel backup before abandoning "+
"channel: %v", err)
}
// Send request to abandon channel.
abandonChannelRequest := &lnrpc.AbandonChannelRequest{
ChannelPoint: chanPoint,
@ -13388,6 +13397,16 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) {
"graph!")
}
// Make sure the channel is no longer in the channel backup list.
bkupAfter, err := ioutil.ReadFile(net.Alice.ChanBackupPath())
if err != nil {
t.Fatalf("could not get channel backup before abandoning "+
"channel: %v", err)
}
if len(bkupAfter) >= len(bkupBefore) {
t.Fatalf("channel wasn't removed from channel backup file")
}
// Calling AbandonChannel again, should result in no new errors, as the
// channel has already been removed.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
@ -13690,9 +13709,9 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) {
}
}
// As these two channels were just open, we should've got two
// notifications for channel backups.
assertBackupNtfns(2)
// As these two channels were just opened, we should've got two times
// the pending and open notifications for channel backups.
assertBackupNtfns(2 * 2)
// The on disk file should also exactly match the latest backup that we
// have.
@ -14002,6 +14021,21 @@ func testChanRestoreScenario(t *harnessTest, net *lntest.NetworkHarness,
t.Fatalf("couldn't open pending channel: %v", err)
}
// Give the pubsub some time to update the channel backup.
err = wait.NoError(func() error {
fi, err := os.Stat(dave.ChanBackupPath())
if err != nil {
return err
}
if fi.Size() <= chanbackup.NilMultiSizePacked {
return fmt.Errorf("backup file empty")
}
return nil
}, defaultTimeout)
if err != nil {
t.Fatalf("channel backup not updated in time: %v", err)
}
default:
ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout)
chanPoint := openChannelAndAssert(
@ -14218,7 +14252,7 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) {
mnemonic []string) (nodeRestorer, error) {
// Read the entire Multi backup stored within
// this node's chaannels.backup file.
// this node's channels.backup file.
multi, err := ioutil.ReadFile(backupFilePath)
if err != nil {
return nil, err
@ -14327,7 +14361,7 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) {
mnemonic []string) (nodeRestorer, error) {
// Read the entire Multi backup stored within
// this node's chaannels.backup file.
// this node's channels.backup file.
multi, err := ioutil.ReadFile(backupFilePath)
if err != nil {
return nil, err
@ -14382,10 +14416,48 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) {
},
},
// Create a backup from an unconfirmed channel and make sure
// recovery works as well.
// Use the channel backup file that contains an unconfirmed
// channel and make sure recovery works as well.
{
name: "restore unconfirmed channel",
name: "restore unconfirmed channel file",
channelsUpdated: false,
initiator: true,
private: false,
unconfirmed: true,
restoreMethod: func(oldNode *lntest.HarnessNode,
backupFilePath string,
mnemonic []string) (nodeRestorer, error) {
// Read the entire Multi backup stored within
// this node's channels.backup file.
multi, err := ioutil.ReadFile(backupFilePath)
if err != nil {
return nil, err
}
// Let's assume time passes, the channel
// confirms in the meantime but for some reason
// the backup we made while it was still
// unconfirmed is the only backup we have. We
// should still be able to restore it. To
// simulate time passing, we mine some blocks
// to get the channel confirmed _after_ we saved
// the backup.
mineBlocks(t, net, 6, 1)
// In our nodeRestorer function, we'll restore
// the node from seed, then manually recover
// the channel backup.
return chanRestoreViaRPC(
net, password, mnemonic, multi,
)
},
},
// Create a backup using RPC that contains an unconfirmed
// channel and make sure recovery works as well.
{
name: "restore unconfirmed channel RPC",
channelsUpdated: false,
initiator: true,
private: false,

@ -2253,6 +2253,10 @@ func (r *rpcServer) AbandonChannel(ctx context.Context,
return nil, err
}
// Finally, notify the backup listeners that the channel can be removed
// from any channel backups.
r.server.channelNotifier.NotifyClosedChannelEvent(*chanPoint)
return &lnrpc.AbandonChannelResponse{}, nil
}
@ -5711,7 +5715,7 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
updateStream lnrpc.Lightning_SubscribeChannelBackupsServer) error {
// First, we'll subscribe to the primary channel notifier so we can
// obtain events for new opened/closed channels.
// obtain events for new pending/opened/closed channels.
chanSubscription, err := r.server.channelNotifier.SubscribeChannelEvents()
if err != nil {
return err
@ -5728,9 +5732,10 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription
switch e.(type) {
// We only care about new/closed channels, so we'll
// skip any events for pending/active/inactive channels.
case channelnotifier.PendingOpenChannelEvent:
continue
// skip any events for active/inactive channels.
// To make the subscription behave the same way as the
// synchronous call and the file based backup, we also
// include pending channels in the update.
case channelnotifier.ActiveChannelEvent:
continue
case channelnotifier.InactiveChannelEvent: