lnd_test: alter basic channel creation test to test chan subscription.

Because the integration tests are already long-running, it is preferable to
add testing for the RPC channel update subscription to an existing test rather
than adding additional tests.
This commit is contained in:
Valentine Wallace 2018-11-01 20:20:21 -07:00 committed by Valentine Wallace
parent ff0339a186
commit b826101aae

View File

@ -5694,15 +5694,137 @@ func testInvoiceSubscriptions(net *lntest.NetworkHarness, t *harnessTest) {
closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false)
}
// testBasicChannelCreation test multiple channel opening and closing.
func testBasicChannelCreation(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
// channelSubscription houses the proxied update and error chans for a node's
// channel subscriptions.
type channelSubscription struct {
updateChan chan *lnrpc.ChannelEventUpdate
errChan chan error
quit chan struct{}
}
// subscribeChannelNotifications subscribes to channel updates and launches a
// goroutine that forwards these to the returned channel.
func subscribeChannelNotifications(ctxb context.Context, t *harnessTest,
node *lntest.HarnessNode) channelSubscription {
// We'll first start by establishing a notification client which will
// send us notifications upon channels becoming active, inactive or
// closed.
req := &lnrpc.ChannelEventSubscription{}
ctx, cancelFunc := context.WithCancel(ctxb)
chanUpdateClient, err := node.SubscribeChannelEvents(ctx, req)
if err != nil {
t.Fatalf("unable to create channel update client: %v", err)
}
// We'll launch a goroutine that will be responsible for proxying all
// notifications recv'd from the client into the channel below.
errChan := make(chan error, 1)
quit := make(chan struct{})
chanUpdates := make(chan *lnrpc.ChannelEventUpdate, 20)
go func() {
defer cancelFunc()
for {
select {
case <-quit:
return
default:
chanUpdate, err := chanUpdateClient.Recv()
select {
case <-quit:
return
default:
}
if err == io.EOF {
return
} else if err != nil {
select {
case errChan <- err:
case <-quit:
}
return
}
select {
case chanUpdates <- chanUpdate:
case <-quit:
return
}
}
}
}()
return channelSubscription{
updateChan: chanUpdates,
errChan: errChan,
quit: quit,
}
}
// verifyCloseUpdate is used to verify that a closed channel update is of the
// expected type.
func verifyCloseUpdate(chanUpdate *lnrpc.ChannelEventUpdate,
force bool, forceType lnrpc.ChannelCloseSummary_ClosureType) error {
// We should receive one inactive and one closed notification
// for each channel.
switch update := chanUpdate.Channel.(type) {
case *lnrpc.ChannelEventUpdate_InactiveChannel:
if chanUpdate.Type != lnrpc.ChannelEventUpdate_INACTIVE_CHANNEL {
return fmt.Errorf("update type mismatch: expected %v, got %v",
lnrpc.ChannelEventUpdate_INACTIVE_CHANNEL,
chanUpdate.Type)
}
case *lnrpc.ChannelEventUpdate_ClosedChannel:
if chanUpdate.Type !=
lnrpc.ChannelEventUpdate_CLOSED_CHANNEL {
return fmt.Errorf("update type mismatch: expected %v, got %v",
lnrpc.ChannelEventUpdate_CLOSED_CHANNEL,
chanUpdate.Type)
}
switch force {
case true:
if update.ClosedChannel.CloseType != forceType {
return fmt.Errorf("channel closure type mismatch: "+
"expected %v, got %v",
forceType,
update.ClosedChannel.CloseType)
}
case false:
if update.ClosedChannel.CloseType !=
lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE {
return fmt.Errorf("channel closure type "+
"mismatch: expected %v, got %v",
lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE,
update.ClosedChannel.CloseType)
}
}
default:
return fmt.Errorf("channel update channel of wrong type, "+
"expected closed channel, got %T",
update)
}
return nil
}
// testBasicChannelCreationAndUpdates tests multiple channel opening and closing,
// and ensures that if a node is subscribed to channel updates they will be
// received correctly for both cooperative and force closed channels.
func testBasicChannelCreationAndUpdates(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
const (
numChannels = 2
amount = maxBtcFundingAmount
)
// Let Bob subscribe to channel notifications.
bobChanSub := subscribeChannelNotifications(ctxb, t, net.Bob)
defer close(bobChanSub.quit)
// Open the channel between Alice and Bob, asserting that the
// channel has been properly open on-chain.
chanPoints := make([]*lnrpc.ChannelPoint, numChannels)
@ -5716,11 +5838,89 @@ func testBasicChannelCreation(net *lntest.NetworkHarness, t *harnessTest) {
)
}
// Close the channel between Alice and Bob, asserting that the
// channel has been properly closed on-chain.
for _, chanPoint := range chanPoints {
ctxt, _ := context.WithTimeout(ctxb, channelCloseTimeout)
closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false)
// Since each of the channels just became open, Bob should we receive an
// open and an active notification for each channel.
var numChannelUpds int
for numChannelUpds < 2*numChannels {
select {
case update := <-bobChanSub.updateChan:
switch update.Type {
case lnrpc.ChannelEventUpdate_ACTIVE_CHANNEL:
case lnrpc.ChannelEventUpdate_OPEN_CHANNEL:
default:
t.Fatalf("update type mismatch: expected open or active "+
"channel notification, got: %v", update.Type)
}
numChannelUpds++
case <-time.After(time.Second * 10):
t.Fatalf("timeout waiting for channel notifications, "+
"only received %d/%d chanupds", numChannelUpds,
numChannels)
}
}
// Subscribe Alice to channel updates so we can test that both remote
// and local force close notifications are received correctly.
aliceChanSub := subscribeChannelNotifications(ctxb, t, net.Alice)
defer close(aliceChanSub.quit)
// Close the channel between Alice and Bob, asserting that the channel
// has been properly closed on-chain.
for i, chanPoint := range chanPoints {
ctx, _ := context.WithTimeout(context.Background(), defaultTimeout)
// Force close half of the channels.
force := i%2 == 0
closeChannelAndAssert(ctx, t, net, net.Alice, chanPoint, force)
if force {
cleanupForceClose(t, net, net.Alice, chanPoint)
}
}
// verifyCloseUpdatesReceived is used to verify that Alice and Bob
// receive the correct channel updates in order.
verifyCloseUpdatesReceived := func(sub channelSubscription,
forceType lnrpc.ChannelCloseSummary_ClosureType) error {
// Ensure one inactive and one closed notification is received for each
// closed channel.
numChannelUpds := 0
for numChannelUpds < 2*numChannels {
// Every other channel should be force closed.
force := (numChannelUpds/2)%2 == 0
select {
case chanUpdate := <-sub.updateChan:
err := verifyCloseUpdate(chanUpdate, force, forceType)
if err != nil {
return err
}
numChannelUpds++
case err := <-sub.errChan:
return err
case <-time.After(time.Second * 10):
return fmt.Errorf("timeout waiting for channel "+
"notifications, only received %d/%d "+
"chanupds", numChannelUpds, 2*numChannels)
}
}
return nil
}
// Verify Bob receives all closed channel notifications. He should
// receive a remote force close notification for force closed channels.
if err := verifyCloseUpdatesReceived(bobChanSub,
lnrpc.ChannelCloseSummary_REMOTE_FORCE_CLOSE); err != nil {
t.Fatalf("errored verifying close updates: %v", err)
}
// Verify Alice receives all closed channel notifications. She should
// receive a remote force close notification for force closed channels.
if err := verifyCloseUpdatesReceived(aliceChanSub,
lnrpc.ChannelCloseSummary_LOCAL_FORCE_CLOSE); err != nil {
t.Fatalf("errored verifying close updates: %v", err)
}
}
@ -12943,8 +13143,8 @@ var testsCases = []*testCase{
test: testMultiHopOverPrivateChannels,
},
{
name: "multiple channel creation",
test: testBasicChannelCreation,
name: "multiple channel creation and update subscription",
test: testBasicChannelCreationAndUpdates,
},
{
name: "invoice update subscription",