From c569c40cef99cd70aa98ab2aabcb0485903a4695 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 22 Aug 2018 22:16:16 -0700 Subject: [PATCH] lnd_test: prevent calling Fatal in goroutine This commit prevents an error that I've seen on travis, wherein the test fails because a call to Fatal happens after the test finishes. The root cause is that we call Fatal in a goroutine that is reading from the subscribe graph rpc call. To fix this, we now pass an err chan back into the main test context, where we can receive any errors and fail the test if one comes through. --- lnd_test.go | 124 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 77 insertions(+), 47 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index 11b99934..2ee3e87d 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -798,8 +798,7 @@ type expectedChanUpdate struct { // waitForChannelUpdate waits for a node to receive the expected channel // updates. -func waitForChannelUpdate(t *harnessTest, - graphUpdates chan *lnrpc.GraphTopologyUpdate, +func waitForChannelUpdate(t *harnessTest, subscription graphSubscription, expUpdates []expectedChanUpdate) { // Create an array indicating which expected channel updates we have @@ -808,7 +807,7 @@ func waitForChannelUpdate(t *harnessTest, out: for { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-subscription.updateChan: for _, update := range graphUpdate.ChannelUpdates { // For each expected update, check if it matches @@ -857,6 +856,8 @@ out: break } } + case err := <-subscription.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(20 * time.Second): t.Fatalf("did not receive channel update") } @@ -948,10 +949,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Launch notification clients for all nodes, such that we can // get notified when they discover new channels and updates in the // graph. - aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxb, net.Alice) - defer close(aQuit) - bobUpdates, bQuit := subscribeGraphNotifications(t, ctxb, net.Bob) - defer close(bQuit) + aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice) + defer close(aliceSub.quit) + bobSub := subscribeGraphNotifications(t, ctxb, net.Bob) + defer close(bobSub.quit) chanAmt := maxBtcFundingAmount pushAmt := chanAmt / 2 @@ -968,7 +969,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // We add all the nodes' update channels to a slice, such that we can // make sure they all receive the expected updates. - nodeUpdates := []chan *lnrpc.GraphTopologyUpdate{aliceUpdates, bobUpdates} + graphSubs := []graphSubscription{aliceSub, bobSub} nodes := []*lntest.HarnessNode{net.Alice, net.Bob} // Alice and Bob should see each other's ChannelUpdates, advertising the @@ -980,9 +981,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { MinHtlc: defaultMinHtlc, } - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Bob.PubKeyStr, expectedPolicy, chanPoint}, @@ -1019,10 +1020,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Clean up carol's node when the test finishes. defer shutdownAndAssert(net, t, carol) - carolUpdates, cQuit := subscribeGraphNotifications(t, ctxb, carol) - defer close(cQuit) + carolSub := subscribeGraphNotifications(t, ctxb, carol) + defer close(carolSub.quit) - nodeUpdates = append(nodeUpdates, carolUpdates) + graphSubs = append(graphSubs, carolSub) nodes = append(nodes, carol) // Send some coins to Carol that can be used for channel funding. @@ -1065,9 +1066,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { MinHtlc: defaultMinHtlc, } - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2}, {carol.PubKeyStr, expectedPolicyCarol, chanPoint2}, @@ -1251,9 +1252,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { } // Wait for all nodes to have seen the policy update done by Bob. - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Bob.PubKeyStr, expectedPolicy, chanPoint}, }, @@ -1340,9 +1341,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Wait for all nodes to have seen the policy updates for both of // Alice's channels. - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint3}, @@ -3297,12 +3298,11 @@ func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode, // Wait for listener node to receive the channel update from node. ctxt, _ = context.WithTimeout(ctxb, timeout) - listenerUpdates, aQuit := subscribeGraphNotifications(t, ctxt, - listenerNode) - defer close(aQuit) + graphSub := subscribeGraphNotifications(t, ctxt, listenerNode) + defer close(graphSub.quit) waitForChannelUpdate( - t, listenerUpdates, + t, graphSub, []expectedChanUpdate{ {node.PubKeyStr, expectedPolicy, chanPoint}, }, @@ -7322,10 +7322,19 @@ out: } } +// graphSubscription houses the proxied update and error chans for a node's +// graph subscriptions. +type graphSubscription struct { + updateChan chan *lnrpc.GraphTopologyUpdate + errChan chan error + quit chan struct{} +} + // subscribeGraphNotifications subscribes to channel graph updates and launches // a goroutine that forwards these to the returned channel. func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, - node *lntest.HarnessNode) (chan *lnrpc.GraphTopologyUpdate, chan struct{}) { + node *lntest.HarnessNode) graphSubscription { + // We'll first start by establishing a notification client which will // send us notifications upon detected changes in the channel graph. req := &lnrpc.GraphTopologySubscription{} @@ -7337,6 +7346,7 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, // We'll launch a goroutine that will be responsible for proxying all // notifications recv'd from the client into the channel below. + errChan := make(chan error, 1) quit := make(chan struct{}) graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20) go func() { @@ -7357,8 +7367,11 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, if err == io.EOF { return } else if err != nil { - t.Fatalf("unable to recv graph update: %v", - err) + select { + case errChan <- err: + case <-quit: + } + return } select { @@ -7369,7 +7382,12 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, } } }() - return graphUpdates, quit + + return graphSubscription{ + updateChan: graphUpdates, + errChan: errChan, + quit: quit, + } } func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { @@ -7378,7 +7396,10 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) ctxb := context.Background() // Let Alice subscribe to graph notifications. - graphUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) + graphSub := subscribeGraphNotifications( + t, ctxb, net.Alice, + ) + defer close(graphSub.quit) // Open a new channel between Alice and Bob. ctxt, _ := context.WithTimeout(ctxb, timeout) @@ -7397,7 +7418,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) select { // Ensure that a new update for both created edges is properly // dispatched to our registered client. - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: if len(graphUpdate.ChannelUpdates) > 0 { chanUpdate := graphUpdate.ChannelUpdates[0] @@ -7432,6 +7453,8 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) nodeUpdate.IdentityKey) } } + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(time.Second * 10): t.Fatalf("timeout waiting for graph notification %v", i) } @@ -7452,7 +7475,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) out: for { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: if len(graphUpdate.ClosedChans) != 1 { continue } @@ -7485,6 +7508,9 @@ out: } break out + + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(time.Second * 10): t.Fatalf("notification for channel closure not " + "sent") @@ -7531,7 +7557,7 @@ out: // Bob's new node announcement, and the channel between Bob and Carol. for i := 0; i < 3; i++ { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: if len(graphUpdate.NodeUpdates) > 0 { nodeUpdate := graphUpdate.NodeUpdates[0] switch nodeUpdate.IdentityKey { @@ -7565,6 +7591,8 @@ out: chanUpdate.ConnectingNode) } } + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(time.Second * 10): t.Fatalf("timeout waiting for graph notification %v", i) } @@ -7573,8 +7601,6 @@ out: // Close the channel between Bob and Carol. ctxt, _ = context.WithTimeout(context.Background(), timeout) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false) - - close(quit) } // testNodeAnnouncement ensures that when a node is started with one or more @@ -7582,8 +7608,8 @@ out: // announced to the network and reported in the network graph. func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() - aliceUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) - defer close(quit) + aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice) + defer close(aliceSub.quit) advertisedAddrs := []string{ "192.168.1.1:8333", @@ -7636,12 +7662,12 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { } } - waitForAddrsInUpdate := func(graphUpdates <-chan *lnrpc.GraphTopologyUpdate, + waitForAddrsInUpdate := func(graphSub graphSubscription, nodePubKey string, targetAddrs ...string) { for { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: for _, update := range graphUpdate.NodeUpdates { if update.IdentityKey == nodePubKey { assertAddrs( @@ -7651,13 +7677,17 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { return } } + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(20 * time.Second): t.Fatalf("did not receive node ann update") } } } - waitForAddrsInUpdate(aliceUpdates, dave.PubKeyStr, advertisedAddrs...) + waitForAddrsInUpdate( + aliceSub, dave.PubKeyStr, advertisedAddrs..., + ) // Close the channel between Bob and Dave. ctxt, _ = context.WithTimeout(ctxb, timeout) @@ -11560,11 +11590,11 @@ func testRouteFeeCutoff(net *lntest.NetworkHarness, t *harnessTest) { // Wait for Alice to receive the channel update from Carol. ctxt, _ = context.WithTimeout(ctxb, timeout) - aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxt, net.Alice) - defer close(aQuit) + aliceSub := subscribeGraphNotifications(t, ctxt, net.Alice) + defer close(aliceSub.quit) waitForChannelUpdate( - t, aliceUpdates, + t, aliceSub, []expectedChanUpdate{ {carol.PubKeyStr, expectedPolicy, chanPointCarolDave}, }, @@ -11774,8 +11804,8 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to connect bob to dave: %v", err) } - daveUpdates, quit := subscribeGraphNotifications(t, ctxb, dave) - defer close(quit) + daveSub := subscribeGraphNotifications(t, ctxb, dave) + defer close(daveSub.quit) // We should expect to see a channel update with the default routing // policy, except that it should indicate the channel is disabled. @@ -11794,7 +11824,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to suspend carol: %v", err) } waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, }, @@ -11808,7 +11838,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { expectedPolicy.Disabled = false waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, }, @@ -11832,7 +11862,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { // receive an update marking each as disabled. expectedPolicy.Disabled = true waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob}, {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol}, @@ -11854,7 +11884,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { } waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, },