fundingmgr+channelnotifier: add channel config to pending chan ntfn

To be able to write a new channel backup file for pending channels,
we need to include the channel configuration in the pending channel
notification event.
This commit is contained in:
Oliver Gugger 2020-02-10 11:02:14 +01:00
parent 2cd26d7556
commit 182835d504
No known key found for this signature in database
GPG Key ID: 8E4256593F177720
3 changed files with 84 additions and 12 deletions

@ -23,8 +23,14 @@ type ChannelNotifier struct {
// PendingOpenChannelEvent represents a new event where a new channel has
// entered a pending open state.
type PendingOpenChannelEvent struct {
// ChannelPoint is the channelpoint for the new channel.
// ChannelPoint is the channel outpoint for the new channel.
ChannelPoint *wire.OutPoint
// PendingChannel is the channel configuration for the newly created
// channel. This might not have been persisted to the channel DB yet
// because we are still waiting for the final message from the remote
// peer.
PendingChannel *channeldb.OpenChannel
}
// OpenChannelEvent represents a new event where a channel goes from pending
@ -89,10 +95,18 @@ func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe()
}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine that a
// new channel is pending.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint) {
event := PendingOpenChannelEvent{ChannelPoint: &chanPoint}
// NotifyPendingOpenChannelEvent notifies the channelEventNotifier goroutine
// that a new channel is pending. The pending channel is passed as a parameter
// instead of read from the database because it might not yet have been
// persisted to the DB because we still wait for the final message from the
// remote peer.
func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint,
pendingChan *channeldb.OpenChannel) {
event := PendingOpenChannelEvent{
ChannelPoint: &chanPoint,
PendingChannel: pendingChan,
}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send pending open channel update: %v", err)

@ -366,7 +366,7 @@ type fundingConfig struct {
// NotifyPendingOpenChannelEvent informs the ChannelNotifier when channels
// enter a pending state.
NotifyPendingOpenChannelEvent func(wire.OutPoint)
NotifyPendingOpenChannelEvent func(wire.OutPoint, *channeldb.OpenChannel)
}
// fundingManager acts as an orchestrator/bridge between the wallet's
@ -1697,7 +1697,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(fundingOut)
f.cfg.NotifyPendingOpenChannelEvent(fundingOut, completeChan)
// At this point we have sent our last funding message to the
// initiating peer before the funding transaction will be broadcast.
@ -1845,7 +1845,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
case resCtx.updates <- upd:
// Inform the ChannelNotifier that the channel has entered
// pending open state.
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint)
f.cfg.NotifyPendingOpenChannelEvent(*fundingPoint, completeChan)
case <-f.quit:
return
}

@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
@ -48,6 +49,10 @@ const (
// testPollSleepMs is the number of milliseconds to sleep between
// each attempt to access the database to check its state.
testPollSleepMs = 500
// maxPending is the maximum number of channels we allow opening to the
// same peer in the max pending channels test.
maxPending = 4
)
var (
@ -138,6 +143,24 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
}, nil
}
type mockChanEvent struct {
openEvent chan wire.OutPoint
pendingOpenEvent chan channelnotifier.PendingOpenChannelEvent
}
func (m *mockChanEvent) NotifyOpenChannelEvent(outpoint wire.OutPoint) {
m.openEvent <- outpoint
}
func (m *mockChanEvent) NotifyPendingOpenChannelEvent(outpoint wire.OutPoint,
pendingChannel *channeldb.OpenChannel) {
m.pendingOpenEvent <- channelnotifier.PendingOpenChannelEvent{
ChannelPoint: &outpoint,
PendingChannel: pendingChannel,
}
}
type testNode struct {
privKey *btcec.PrivateKey
addr *lnwire.NetAddress
@ -147,6 +170,7 @@ type testNode struct {
fundingMgr *fundingManager
newChannels chan *newChannelMsg
mockNotifier *mockNotifier
mockChanEvent *mockChanEvent
testDir string
shutdownChannel chan struct{}
remoteFeatures []lnwire.FeatureBit
@ -274,6 +298,17 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
bestHeight: fundingBroadcastHeight,
}
// The mock channel event notifier will receive events for each pending
// open and open channel. Because some tests will create multiple
// channels in a row before advancing to the next step, these channels
// need to be buffered.
evt := &mockChanEvent{
openEvent: make(chan wire.OutPoint, maxPending),
pendingOpenEvent: make(
chan channelnotifier.PendingOpenChannelEvent, maxPending,
),
}
dbDir := filepath.Join(tempTestDir, "cdb")
cdb, err := channeldb.Open(dbDir)
if err != nil {
@ -379,9 +414,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
ZombieSweeperInterval: 1 * time.Hour,
ReservationTimeout: 1 * time.Nanosecond,
MaxPendingChannels: DefaultMaxPendingChannels,
NotifyOpenChannelEvent: func(wire.OutPoint) {},
NotifyOpenChannelEvent: evt.NotifyOpenChannelEvent,
OpenChannelPredicate: chainedAcceptor,
NotifyPendingOpenChannelEvent: func(wire.OutPoint) {},
NotifyPendingOpenChannelEvent: evt.NotifyPendingOpenChannelEvent,
}
for _, op := range options {
@ -404,6 +439,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
publTxChan: publTxChan,
fundingMgr: f,
mockNotifier: chainNotifier,
mockChanEvent: evt,
testDir: tempTestDir,
shutdownChannel: shutdownChan,
addr: addr,
@ -685,6 +721,18 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
t.Fatalf("alice did not publish funding tx")
}
// Make sure the notification about the pending channel was sent out.
select {
case <-alice.mockChanEvent.pendingOpenEvent:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send pending channel event")
}
select {
case <-bob.mockChanEvent.pendingOpenEvent:
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send pending channel event")
}
// Finally, make sure neither have active reservation for the channel
// now pending open in the database.
assertNumPendingReservations(t, alice, bobPubKey, 0)
@ -867,6 +915,18 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()
// Make sure the notification about the pending channel was sent out.
select {
case <-alice.mockChanEvent.openEvent:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send open channel event")
}
select {
case <-bob.mockChanEvent.openEvent:
case <-time.After(time.Second * 5):
t.Fatalf("bob did not send open channel event")
}
assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
assertDatabaseState(t, bob, fundingOutPoint, markedOpen)
}
@ -2558,8 +2618,6 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
func TestFundingManagerMaxPendingChannels(t *testing.T) {
t.Parallel()
const maxPending = 4
alice, bob := setupFundingManagers(
t, func(cfg *fundingConfig) {
cfg.MaxPendingChannels = maxPending