diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index e484b9a6..e6a1fada 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -23,8 +23,14 @@ type ChannelNotifier struct { // PendingOpenChannelEvent represents a new event where a new channel has // entered a pending open state. type PendingOpenChannelEvent struct { - // ChannelPoint is the channelpoint for the new channel. + // ChannelPoint is the channel outpoint for the new channel. ChannelPoint *wire.OutPoint + + // PendingChannel is the channel configuration for the newly created + // channel. This might not have been persisted to the channel DB yet + // because we are still waiting for the final message from the remote + // peer. + PendingChannel *channeldb.OpenChannel } // OpenChannelEvent represents a new event where a channel goes from pending @@ -89,10 +95,18 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) { return c.ntfnServer.Subscribe() } -// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a -// new channel is pending. -func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) { - event := PendingOpenChannelEvent{ChannelPoint: &chanPoint} +// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine +// that a new channel is pending. The pending channel is passed as a parameter +// instead of read from the database because it might not yet have been +// persisted to the DB because we still wait for the final message from the +// remote peer. +func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint, + pendingChan *channeldb.OpenChannel) { + + event := PendingOpenChannelEvent{ + ChannelPoint: &chanPoint, + PendingChannel: pendingChan, + } if err := c.ntfnServer.SendUpdate(event); err != nil { log.Warnf("Unable to send pending open channel update: %v", err) diff --git a/fundingmanager.go b/fundingmanager.go index 8484b945..2937a19c 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -366,7 +366,7 @@ type fundingConfig struct { // NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels // enter a pending state. - NotifyPendingOpenChannelEvent func(wire.OutPoint) + NotifyPendingOpenChannelEvent func(wire.OutPoint, *channeldb.OpenChannel) } // fundingManager acts as an orchestrator/bridge between the wallet's @@ -1697,7 +1697,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // Inform the ChannelNotifier that the channel has entered // pending open state. - f.cfg.NotifyPendingOpenChannelEvent(fundingOut) + f.cfg.NotifyPendingOpenChannelEvent(fundingOut, completeChan) // At this point we have sent our last funding message to the // initiating peer before the funding transaction will be broadcast. @@ -1845,7 +1845,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { case resCtx.updates <- upd: // Inform the ChannelNotifier that the channel has entered // pending open state. - f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint) + f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint, completeChan) case <-f.quit: return } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index c9f9a421..f1ff528d 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" @@ -48,6 +49,10 @@ const ( // testPollSleepMs is the number of milliseconds to sleep between // each attempt to access the database to check its state. testPollSleepMs = 500 + + // maxPending is the maximum number of channels we allow opening to the + // same peer in the max pending channels test. + maxPending = 4 ) var ( @@ -138,6 +143,24 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, }, nil } +type mockChanEvent struct { + openEvent chan wire.OutPoint + pendingOpenEvent chan channelnotifier.PendingOpenChannelEvent +} + +func (m *mockChanEvent) NotifyOpenChannelEvent(outpoint wire.OutPoint) { + m.openEvent <- outpoint +} + +func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint, + pendingChannel *channeldb.OpenChannel) { + + m.pendingOpenEvent <- channelnotifier.PendingOpenChannelEvent{ + ChannelPoint: &outpoint, + PendingChannel: pendingChannel, + } +} + type testNode struct { privKey *btcec.PrivateKey addr *lnwire.NetAddress @@ -147,6 +170,7 @@ type testNode struct { fundingMgr *fundingManager newChannels chan *newChannelMsg mockNotifier *mockNotifier + mockChanEvent *mockChanEvent testDir string shutdownChannel chan struct{} remoteFeatures []lnwire.FeatureBit @@ -274,6 +298,17 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, bestHeight: fundingBroadcastHeight, } + // The mock channel event notifier will receive events for each pending + // open and open channel. Because some tests will create multiple + // channels in a row before advancing to the next step, these channels + // need to be buffered. + evt := &mockChanEvent{ + openEvent: make(chan wire.OutPoint, maxPending), + pendingOpenEvent: make( + chan channelnotifier.PendingOpenChannelEvent, maxPending, + ), + } + dbDir := filepath.Join(tempTestDir, "cdb") cdb, err := channeldb.Open(dbDir) if err != nil { @@ -379,9 +414,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, ZombieSweeperInterval: 1 * time.Hour, ReservationTimeout: 1 * time.Nanosecond, MaxPendingChannels: DefaultMaxPendingChannels, - NotifyOpenChannelEvent: func(wire.OutPoint) {}, + NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent, OpenChannelPredicate: chainedAcceptor, - NotifyPendingOpenChannelEvent: func(wire.OutPoint) {}, + NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent, } for _, op := range options { @@ -404,6 +439,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, publTxChan: publTxChan, fundingMgr: f, mockNotifier: chainNotifier, + mockChanEvent: evt, testDir: tempTestDir, shutdownChannel: shutdownChan, addr: addr, @@ -685,6 +721,18 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, t.Fatalf("alice did not publish funding tx") } + // Make sure the notification about the pending channel was sent out. + select { + case <-alice.mockChanEvent.pendingOpenEvent: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send pending channel event") + } + select { + case <-bob.mockChanEvent.pendingOpenEvent: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send pending channel event") + } + // Finally, make sure neither have active reservation for the channel // now pending open in the database. assertNumPendingReservations(t, alice, bobPubKey, 0) @@ -867,6 +915,18 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { t.Helper() + // Make sure the notification about the pending channel was sent out. + select { + case <-alice.mockChanEvent.openEvent: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send open channel event") + } + select { + case <-bob.mockChanEvent.openEvent: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send open channel event") + } + assertDatabaseState(t, alice, fundingOutPoint, markedOpen) assertDatabaseState(t, bob, fundingOutPoint, markedOpen) } @@ -2558,8 +2618,6 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { func TestFundingManagerMaxPendingChannels(t *testing.T) { t.Parallel() - const maxPending = 4 - alice, bob := setupFundingManagers( t, func(cfg *fundingConfig) { cfg.MaxPendingChannels = maxPending