From 182835d504d1202048e55bf32d54cc4914c12ca2 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 10 Feb 2020 11:02:14 +0100 Subject: [PATCH 1/5] fundingmgr+channelnotifier: add channel config to pending chan ntfn To be able to write a new channel backup file for pending channels, we need to include the channel configuration in the pending channel notification event. --- channelnotifier/channelnotifier.go | 24 ++++++++--- fundingmanager.go | 6 +-- fundingmanager_test.go | 66 ++++++++++++++++++++++++++++-- 3 files changed, 84 insertions(+), 12 deletions(-) diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index e484b9a6..e6a1fada 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -23,8 +23,14 @@ type ChannelNotifier struct { // PendingOpenChannelEvent represents a new event where a new channel has // entered a pending open state. type PendingOpenChannelEvent struct { - // ChannelPoint is the channelpoint for the new channel. + // ChannelPoint is the channel outpoint for the new channel. ChannelPoint *wire.OutPoint + + // PendingChannel is the channel configuration for the newly created + // channel. This might not have been persisted to the channel DB yet + // because we are still waiting for the final message from the remote + // peer. + PendingChannel *channeldb.OpenChannel } // OpenChannelEvent represents a new event where a channel goes from pending @@ -89,10 +95,18 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) { return c.ntfnServer.Subscribe() } -// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a -// new channel is pending. -func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) { - event := PendingOpenChannelEvent{ChannelPoint: &chanPoint} +// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine +// that a new channel is pending. The pending channel is passed as a parameter +// instead of read from the database because it might not yet have been +// persisted to the DB because we still wait for the final message from the +// remote peer. +func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint, + pendingChan *channeldb.OpenChannel) { + + event := PendingOpenChannelEvent{ + ChannelPoint: &chanPoint, + PendingChannel: pendingChan, + } if err := c.ntfnServer.SendUpdate(event); err != nil { log.Warnf("Unable to send pending open channel update: %v", err) diff --git a/fundingmanager.go b/fundingmanager.go index 8484b945..2937a19c 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -366,7 +366,7 @@ type fundingConfig struct { // NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels // enter a pending state. - NotifyPendingOpenChannelEvent func(wire.OutPoint) + NotifyPendingOpenChannelEvent func(wire.OutPoint, *channeldb.OpenChannel) } // fundingManager acts as an orchestrator/bridge between the wallet's @@ -1697,7 +1697,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Inform the ChannelNotifier that the channel has entered // pending open state. - f.cfg.NotifyPendingOpenChannelEvent(fundingOut) + f.cfg.NotifyPendingOpenChannelEvent(fundingOut, completeChan) // At this point we have sent our last funding message to the // initiating peer before the funding transaction will be broadcast. @@ -1845,7 +1845,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { case resCtx.updates <- upd: // Inform the ChannelNotifier that the channel has entered // pending open state. - f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint) + f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint, completeChan) case <-f.quit: return } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index c9f9a421..f1ff528d 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" @@ -48,6 +49,10 @@ const ( // testPollSleepMs is the number of milliseconds to sleep between // each attempt to access the database to check its state. testPollSleepMs = 500 + + // maxPending is the maximum number of channels we allow opening to the + // same peer in the max pending channels test. + maxPending = 4 ) var ( @@ -138,6 +143,24 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, }, nil } +type mockChanEvent struct { + openEvent chan wire.OutPoint + pendingOpenEvent chan channelnotifier.PendingOpenChannelEvent +} + +func (m *mockChanEvent) NotifyOpenChannelEvent(outpoint wire.OutPoint) { + m.openEvent <- outpoint +} + +func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint, + pendingChannel *channeldb.OpenChannel) { + + m.pendingOpenEvent <- channelnotifier.PendingOpenChannelEvent{ + ChannelPoint: &outpoint, + PendingChannel: pendingChannel, + } +} + type testNode struct { privKey *btcec.PrivateKey addr *lnwire.NetAddress @@ -147,6 +170,7 @@ type testNode struct { fundingMgr *fundingManager newChannels chan *newChannelMsg mockNotifier *mockNotifier + mockChanEvent *mockChanEvent testDir string shutdownChannel chan struct{} remoteFeatures []lnwire.FeatureBit @@ -274,6 +298,17 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, bestHeight: fundingBroadcastHeight, } + // The mock channel event notifier will receive events for each pending + // open and open channel. Because some tests will create multiple + // channels in a row before advancing to the next step, these channels + // need to be buffered. + evt := &mockChanEvent{ + openEvent: make(chan wire.OutPoint, maxPending), + pendingOpenEvent: make( + chan channelnotifier.PendingOpenChannelEvent, maxPending, + ), + } + dbDir := filepath.Join(tempTestDir, "cdb") cdb, err := channeldb.Open(dbDir) if err != nil { @@ -379,9 +414,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, ZombieSweeperInterval: 1 * time.Hour, ReservationTimeout: 1 * time.Nanosecond, MaxPendingChannels: DefaultMaxPendingChannels, - NotifyOpenChannelEvent: func(wire.OutPoint) {}, + NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent, OpenChannelPredicate: chainedAcceptor, - NotifyPendingOpenChannelEvent: func(wire.OutPoint) {}, + NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent, } for _, op := range options { @@ -404,6 +439,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, publTxChan: publTxChan, fundingMgr: f, mockNotifier: chainNotifier, + mockChanEvent: evt, testDir: tempTestDir, shutdownChannel: shutdownChan, addr: addr, @@ -685,6 +721,18 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, t.Fatalf("alice did not publish funding tx") } + // Make sure the notification about the pending channel was sent out. + select { + case <-alice.mockChanEvent.pendingOpenEvent: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send pending channel event") + } + select { + case <-bob.mockChanEvent.pendingOpenEvent: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send pending channel event") + } + // Finally, make sure neither have active reservation for the channel // now pending open in the database. assertNumPendingReservations(t, alice, bobPubKey, 0) @@ -867,6 +915,18 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { t.Helper() + // Make sure the notification about the pending channel was sent out. + select { + case <-alice.mockChanEvent.openEvent: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send open channel event") + } + select { + case <-bob.mockChanEvent.openEvent: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send open channel event") + } + assertDatabaseState(t, alice, fundingOutPoint, markedOpen) assertDatabaseState(t, bob, fundingOutPoint, markedOpen) } @@ -2558,8 +2618,6 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { func TestFundingManagerMaxPendingChannels(t *testing.T) { t.Parallel() - const maxPending = 4 - alice, bob := setupFundingManagers( t, func(cfg *fundingConfig) { cfg.MaxPendingChannels = maxPending From d8c62c37a85a2b9e366ce9f9e940537e805f0bcf Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 10 Feb 2020 11:37:44 +0100 Subject: [PATCH 2/5] channel_notifier: update backup on pending chan To fix the discrepancy between getting the channel backups via RPC where all pending channels are included, we also update the channel.backup file on disk whenever we get a pending channel event notification. --- channel_notifier.go | 67 ++++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 28 deletions(-) diff --git a/channel_notifier.go b/channel_notifier.go index 36a2224a..960ba0d9 100644 --- a/channel_notifier.go +++ b/channel_notifier.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" ) @@ -51,6 +52,35 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{ quit := make(chan struct{}) chanUpdates := make(chan chanbackup.ChannelEvent, 1) + // sendChanOpenUpdate is a closure that sends a ChannelEvent to the + // chanUpdates channel to inform subscribers about new pending or + // confirmed channels. + sendChanOpenUpdate := func(newOrPendingChan *channeldb.OpenChannel) { + nodeAddrs, err := c.addrs.AddrsForNode( + newOrPendingChan.IdentityPub, + ) + if err != nil { + pub := newOrPendingChan.IdentityPub + ltndLog.Errorf("unable to fetch addrs for %x: %v", + pub.SerializeCompressed(), err) + } + + chanEvent := chanbackup.ChannelEvent{ + NewChans: []chanbackup.ChannelWithAddrs{ + { + OpenChannel: newOrPendingChan, + Addrs: nodeAddrs, + }, + }, + } + + select { + case chanUpdates <- chanEvent: + case <-quit: + return + } + } + // In order to adhere to the interface, we'll proxy the events from the // channel notifier to the sub-swapper in a format it understands. go func() { @@ -74,37 +104,18 @@ func (c *channelNotifier) SubscribeChans(startingChans map[wire.OutPoint]struct{ // TODO(roasbeef): batch dispatch ntnfs switch event := e.(type) { + // A new channel has been opened and is still + // pending. We can still create a backup, even + // if the final channel ID is not yet available. + case channelnotifier.PendingOpenChannelEvent: + pendingChan := event.PendingChannel + sendChanOpenUpdate(pendingChan) - // A new channel has been opened, we'll obtain - // the node address, then send to the + // A new channel has been confirmed, we'll + // obtain the node address, then send to the // sub-swapper. case channelnotifier.OpenChannelEvent: - nodeAddrs, err := c.addrs.AddrsForNode( - event.Channel.IdentityPub, - ) - if err != nil { - pub := event.Channel.IdentityPub - ltndLog.Errorf("unable to "+ - "fetch addrs for %x: %v", - pub.SerializeCompressed(), - err) - } - - channel := event.Channel - chanEvent := chanbackup.ChannelEvent{ - NewChans: []chanbackup.ChannelWithAddrs{ - { - OpenChannel: channel, - Addrs: nodeAddrs, - }, - }, - } - - select { - case chanUpdates <- chanEvent: - case <-quit: - return - } + sendChanOpenUpdate(event.Channel) // An existing channel has been closed, we'll // send only the chanPoint of the closed From 8a2c02f8eadb2a392853ca67162a23f9684d8cc3 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Mon, 10 Feb 2020 17:04:25 +0100 Subject: [PATCH 3/5] itest: test unconfirmed channel backup file --- lntest/itest/lnd_test.go | 63 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index c18b60d9..dfe5ee10 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -13987,6 +13987,21 @@ func testChanRestoreScenario(t *harnessTest, net *lntest.NetworkHarness, t.Fatalf("couldn't open pending channel: %v", err) } + // Give the pubsub some time to update the channel backup. + err = wait.NoError(func() error { + fi, err := os.Stat(dave.ChanBackupPath()) + if err != nil { + return err + } + if fi.Size() <= chanbackup.NilMultiSizePacked { + return fmt.Errorf("backup file empty") + } + return nil + }, defaultTimeout) + if err != nil { + t.Fatalf("channel backup not updated in time: %v", err) + } + default: ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout) chanPoint := openChannelAndAssert( @@ -14203,7 +14218,7 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) { mnemonic []string) (nodeRestorer, error) { // Read the entire Multi backup stored within - // this node's chaannels.backup file. + // this node's channels.backup file. multi, err := ioutil.ReadFile(backupFilePath) if err != nil { return nil, err @@ -14312,7 +14327,7 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) { mnemonic []string) (nodeRestorer, error) { // Read the entire Multi backup stored within - // this node's chaannels.backup file. + // this node's channels.backup file. multi, err := ioutil.ReadFile(backupFilePath) if err != nil { return nil, err @@ -14367,10 +14382,48 @@ func testChannelBackupRestore(net *lntest.NetworkHarness, t *harnessTest) { }, }, - // Create a backup from an unconfirmed channel and make sure - // recovery works as well. + // Use the channel backup file that contains an unconfirmed + // channel and make sure recovery works as well. { - name: "restore unconfirmed channel", + name: "restore unconfirmed channel file", + channelsUpdated: false, + initiator: true, + private: false, + unconfirmed: true, + restoreMethod: func(oldNode *lntest.HarnessNode, + backupFilePath string, + mnemonic []string) (nodeRestorer, error) { + + // Read the entire Multi backup stored within + // this node's channels.backup file. + multi, err := ioutil.ReadFile(backupFilePath) + if err != nil { + return nil, err + } + + // Let's assume time passes, the channel + // confirms in the meantime but for some reason + // the backup we made while it was still + // unconfirmed is the only backup we have. We + // should still be able to restore it. To + // simulate time passing, we mine some blocks + // to get the channel confirmed _after_ we saved + // the backup. + mineBlocks(t, net, 6, 1) + + // In our nodeRestorer function, we'll restore + // the node from seed, then manually recover + // the channel backup. + return chanRestoreViaRPC( + net, password, mnemonic, multi, + ) + }, + }, + + // Create a backup using RPC that contains an unconfirmed + // channel and make sure recovery works as well. + { + name: "restore unconfirmed channel RPC", channelsUpdated: false, initiator: true, private: false, From 4e0c276154024aae36d5bdf2b3ece6ce45fc9783 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Wed, 12 Feb 2020 13:41:48 +0100 Subject: [PATCH 4/5] rpcserver: don't skip pending channels in backup subscription The synchronous call to get all channel backups also include channels that are pending at the moment of the call. A previous commit added pending channels to the file based backup as well. So this is the last backup method that needs to be adjusted to also contain unconfirmed channels. --- lntest/itest/lnd_test.go | 6 +++--- rpcserver.go | 9 +++++---- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index dfe5ee10..69de3914 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -13675,9 +13675,9 @@ func testChannelBackupUpdates(net *lntest.NetworkHarness, t *harnessTest) { } } - // As these two channels were just open, we should've got two - // notifications for channel backups. - assertBackupNtfns(2) + // As these two channels were just opened, we should've got two times + // the pending and open notifications for channel backups. + assertBackupNtfns(2 * 2) // The on disk file should also exactly match the latest backup that we // have. diff --git a/rpcserver.go b/rpcserver.go index f4e13cd6..d7a27d90 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -5605,7 +5605,7 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription updateStream lnrpc.Lightning_SubscribeChannelBackupsServer) error { // First, we'll subscribe to the primary channel notifier so we can - // obtain events for new opened/closed channels. + // obtain events for new pending/opened/closed channels. chanSubscription, err := r.server.channelNotifier.SubscribeChannelEvents() if err != nil { return err @@ -5622,9 +5622,10 @@ func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription switch e.(type) { // We only care about new/closed channels, so we'll - // skip any events for pending/active/inactive channels. - case channelnotifier.PendingOpenChannelEvent: - continue + // skip any events for active/inactive channels. + // To make the subscription behave the same way as the + // synchronous call and the file based backup, we also + // include pending channels in the update. case channelnotifier.ActiveChannelEvent: continue case channelnotifier.InactiveChannelEvent: From ab024b98eed9cced304d9b3e87544bf746fc63a9 Mon Sep 17 00:00:00 2001 From: Oliver Gugger Date: Wed, 4 Mar 2020 10:57:06 +0100 Subject: [PATCH 5/5] rpcserver+itest: remove channel from backup when abandoning it --- lntest/itest/lnd_test.go | 19 +++++++++++++++++++ rpcserver.go | 4 ++++ 2 files changed, 23 insertions(+) diff --git a/lntest/itest/lnd_test.go b/lntest/itest/lnd_test.go index 69de3914..31142497 100644 --- a/lntest/itest/lnd_test.go +++ b/lntest/itest/lnd_test.go @@ -13303,6 +13303,15 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to find channel") } + // To make sure the channel is removed from the backup file as well when + // being abandoned, grab a backup snapshot so we can compare it with the + // later state. + bkupBefore, err := ioutil.ReadFile(net.Alice.ChanBackupPath()) + if err != nil { + t.Fatalf("could not get channel backup before abandoning "+ + "channel: %v", err) + } + // Send request to abandon channel. abandonChannelRequest := &lnrpc.AbandonChannelRequest{ ChannelPoint: chanPoint, @@ -13373,6 +13382,16 @@ func testAbandonChannel(net *lntest.NetworkHarness, t *harnessTest) { "graph!") } + // Make sure the channel is no longer in the channel backup list. + bkupAfter, err := ioutil.ReadFile(net.Alice.ChanBackupPath()) + if err != nil { + t.Fatalf("could not get channel backup before abandoning "+ + "channel: %v", err) + } + if len(bkupAfter) >= len(bkupBefore) { + t.Fatalf("channel wasn't removed from channel backup file") + } + // Calling AbandonChannel again, should result in no new errors, as the // channel has already been removed. ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) diff --git a/rpcserver.go b/rpcserver.go index d7a27d90..1759dd08 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2254,6 +2254,10 @@ func (r *rpcServer) AbandonChannel(ctx context.Context, return nil, err } + // Finally, notify the backup listeners that the channel can be removed + // from any channel backups. + r.server.channelNotifier.NotifyClosedChannelEvent(*chanPoint) + return &lnrpc.AbandonChannelResponse{}, nil }