lnd_test: prevent calling Fatal in goroutine

This commit prevents an error that I've seen on travis,
wherein the test fails because a call to Fatal happens
after the test finishes. The root cause is that we call
Fatal in a goroutine that is reading from the subscribe
graph rpc call.

To fix this, we now pass an err chan back into the main
test context, where we can receive any errors and fail
the test if one comes through.
This commit is contained in:
Conner Fromknecht 2018-08-22 22:16:16 -07:00
parent 2f1b024679
commit c569c40cef
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -798,8 +798,7 @@ type expectedChanUpdate struct {
// waitForChannelUpdate waits for a node to receive the expected channel // waitForChannelUpdate waits for a node to receive the expected channel
// updates. // updates.
func waitForChannelUpdate(t *harnessTest, func waitForChannelUpdate(t *harnessTest, subscription graphSubscription,
graphUpdates chan *lnrpc.GraphTopologyUpdate,
expUpdates []expectedChanUpdate) { expUpdates []expectedChanUpdate) {
// Create an array indicating which expected channel updates we have // Create an array indicating which expected channel updates we have
@ -808,7 +807,7 @@ func waitForChannelUpdate(t *harnessTest,
out: out:
for { for {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-subscription.updateChan:
for _, update := range graphUpdate.ChannelUpdates { for _, update := range graphUpdate.ChannelUpdates {
// For each expected update, check if it matches // For each expected update, check if it matches
@ -857,6 +856,8 @@ out:
break break
} }
} }
case err := <-subscription.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(20 * time.Second): case <-time.After(20 * time.Second):
t.Fatalf("did not receive channel update") t.Fatalf("did not receive channel update")
} }
@ -948,10 +949,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Launch notification clients for all nodes, such that we can // Launch notification clients for all nodes, such that we can
// get notified when they discover new channels and updates in the // get notified when they discover new channels and updates in the
// graph. // graph.
aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxb, net.Alice) aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice)
defer close(aQuit) defer close(aliceSub.quit)
bobUpdates, bQuit := subscribeGraphNotifications(t, ctxb, net.Bob) bobSub := subscribeGraphNotifications(t, ctxb, net.Bob)
defer close(bQuit) defer close(bobSub.quit)
chanAmt := maxBtcFundingAmount chanAmt := maxBtcFundingAmount
pushAmt := chanAmt / 2 pushAmt := chanAmt / 2
@ -968,7 +969,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// We add all the nodes' update channels to a slice, such that we can // We add all the nodes' update channels to a slice, such that we can
// make sure they all receive the expected updates. // make sure they all receive the expected updates.
nodeUpdates := []chan *lnrpc.GraphTopologyUpdate{aliceUpdates, bobUpdates} graphSubs := []graphSubscription{aliceSub, bobSub}
nodes := []*lntest.HarnessNode{net.Alice, net.Bob} nodes := []*lntest.HarnessNode{net.Alice, net.Bob}
// Alice and Bob should see each other's ChannelUpdates, advertising the // Alice and Bob should see each other's ChannelUpdates, advertising the
@ -980,9 +981,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
MinHtlc: defaultMinHtlc, MinHtlc: defaultMinHtlc,
} }
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint},
{net.Bob.PubKeyStr, expectedPolicy, chanPoint}, {net.Bob.PubKeyStr, expectedPolicy, chanPoint},
@ -1019,10 +1020,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Clean up carol's node when the test finishes. // Clean up carol's node when the test finishes.
defer shutdownAndAssert(net, t, carol) defer shutdownAndAssert(net, t, carol)
carolUpdates, cQuit := subscribeGraphNotifications(t, ctxb, carol) carolSub := subscribeGraphNotifications(t, ctxb, carol)
defer close(cQuit) defer close(carolSub.quit)
nodeUpdates = append(nodeUpdates, carolUpdates) graphSubs = append(graphSubs, carolSub)
nodes = append(nodes, carol) nodes = append(nodes, carol)
// Send some coins to Carol that can be used for channel funding. // Send some coins to Carol that can be used for channel funding.
@ -1065,9 +1066,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
MinHtlc: defaultMinHtlc, MinHtlc: defaultMinHtlc,
} }
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2}, {net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2},
{carol.PubKeyStr, expectedPolicyCarol, chanPoint2}, {carol.PubKeyStr, expectedPolicyCarol, chanPoint2},
@ -1251,9 +1252,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
} }
// Wait for all nodes to have seen the policy update done by Bob. // Wait for all nodes to have seen the policy update done by Bob.
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Bob.PubKeyStr, expectedPolicy, chanPoint}, {net.Bob.PubKeyStr, expectedPolicy, chanPoint},
}, },
@ -1340,9 +1341,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Wait for all nodes to have seen the policy updates for both of // Wait for all nodes to have seen the policy updates for both of
// Alice's channels. // Alice's channels.
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint},
{net.Alice.PubKeyStr, expectedPolicy, chanPoint3}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint3},
@ -3297,12 +3298,11 @@ func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
// Wait for listener node to receive the channel update from node. // Wait for listener node to receive the channel update from node.
ctxt, _ = context.WithTimeout(ctxb, timeout) ctxt, _ = context.WithTimeout(ctxb, timeout)
listenerUpdates, aQuit := subscribeGraphNotifications(t, ctxt, graphSub := subscribeGraphNotifications(t, ctxt, listenerNode)
listenerNode) defer close(graphSub.quit)
defer close(aQuit)
waitForChannelUpdate( waitForChannelUpdate(
t, listenerUpdates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{node.PubKeyStr, expectedPolicy, chanPoint}, {node.PubKeyStr, expectedPolicy, chanPoint},
}, },
@ -7322,10 +7322,19 @@ out:
} }
} }
// graphSubscription houses the proxied update and error chans for a node's
// graph subscriptions.
type graphSubscription struct {
updateChan chan *lnrpc.GraphTopologyUpdate
errChan chan error
quit chan struct{}
}
// subscribeGraphNotifications subscribes to channel graph updates and launches // subscribeGraphNotifications subscribes to channel graph updates and launches
// a goroutine that forwards these to the returned channel. // a goroutine that forwards these to the returned channel.
func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
node *lntest.HarnessNode) (chan *lnrpc.GraphTopologyUpdate, chan struct{}) { node *lntest.HarnessNode) graphSubscription {
// We'll first start by establishing a notification client which will // We'll first start by establishing a notification client which will
// send us notifications upon detected changes in the channel graph. // send us notifications upon detected changes in the channel graph.
req := &lnrpc.GraphTopologySubscription{} req := &lnrpc.GraphTopologySubscription{}
@ -7337,6 +7346,7 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
// We'll launch a goroutine that will be responsible for proxying all // We'll launch a goroutine that will be responsible for proxying all
// notifications recv'd from the client into the channel below. // notifications recv'd from the client into the channel below.
errChan := make(chan error, 1)
quit := make(chan struct{}) quit := make(chan struct{})
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20) graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20)
go func() { go func() {
@ -7357,8 +7367,11 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
if err == io.EOF { if err == io.EOF {
return return
} else if err != nil { } else if err != nil {
t.Fatalf("unable to recv graph update: %v", select {
err) case errChan <- err:
case <-quit:
}
return
} }
select { select {
@ -7369,7 +7382,12 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
} }
} }
}() }()
return graphUpdates, quit
return graphSubscription{
updateChan: graphUpdates,
errChan: errChan,
quit: quit,
}
} }
func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) {
@ -7378,7 +7396,10 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
ctxb := context.Background() ctxb := context.Background()
// Let Alice subscribe to graph notifications. // Let Alice subscribe to graph notifications.
graphUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) graphSub := subscribeGraphNotifications(
t, ctxb, net.Alice,
)
defer close(graphSub.quit)
// Open a new channel between Alice and Bob. // Open a new channel between Alice and Bob.
ctxt, _ := context.WithTimeout(ctxb, timeout) ctxt, _ := context.WithTimeout(ctxb, timeout)
@ -7397,7 +7418,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
select { select {
// Ensure that a new update for both created edges is properly // Ensure that a new update for both created edges is properly
// dispatched to our registered client. // dispatched to our registered client.
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
if len(graphUpdate.ChannelUpdates) > 0 { if len(graphUpdate.ChannelUpdates) > 0 {
chanUpdate := graphUpdate.ChannelUpdates[0] chanUpdate := graphUpdate.ChannelUpdates[0]
@ -7432,6 +7453,8 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
nodeUpdate.IdentityKey) nodeUpdate.IdentityKey)
} }
} }
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("timeout waiting for graph notification %v", i) t.Fatalf("timeout waiting for graph notification %v", i)
} }
@ -7452,7 +7475,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
out: out:
for { for {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
if len(graphUpdate.ClosedChans) != 1 { if len(graphUpdate.ClosedChans) != 1 {
continue continue
} }
@ -7485,6 +7508,9 @@ out:
} }
break out break out
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("notification for channel closure not " + t.Fatalf("notification for channel closure not " +
"sent") "sent")
@ -7531,7 +7557,7 @@ out:
// Bob's new node announcement, and the channel between Bob and Carol. // Bob's new node announcement, and the channel between Bob and Carol.
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
if len(graphUpdate.NodeUpdates) > 0 { if len(graphUpdate.NodeUpdates) > 0 {
nodeUpdate := graphUpdate.NodeUpdates[0] nodeUpdate := graphUpdate.NodeUpdates[0]
switch nodeUpdate.IdentityKey { switch nodeUpdate.IdentityKey {
@ -7565,6 +7591,8 @@ out:
chanUpdate.ConnectingNode) chanUpdate.ConnectingNode)
} }
} }
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("timeout waiting for graph notification %v", i) t.Fatalf("timeout waiting for graph notification %v", i)
} }
@ -7573,8 +7601,6 @@ out:
// Close the channel between Bob and Carol. // Close the channel between Bob and Carol.
ctxt, _ = context.WithTimeout(context.Background(), timeout) ctxt, _ = context.WithTimeout(context.Background(), timeout)
closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false)
close(quit)
} }
// testNodeAnnouncement ensures that when a node is started with one or more // testNodeAnnouncement ensures that when a node is started with one or more
@ -7582,8 +7608,8 @@ out:
// announced to the network and reported in the network graph. // announced to the network and reported in the network graph.
func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background() ctxb := context.Background()
aliceUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice)
defer close(quit) defer close(aliceSub.quit)
advertisedAddrs := []string{ advertisedAddrs := []string{
"192.168.1.1:8333", "192.168.1.1:8333",
@ -7636,12 +7662,12 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) {
} }
} }
waitForAddrsInUpdate := func(graphUpdates <-chan *lnrpc.GraphTopologyUpdate, waitForAddrsInUpdate := func(graphSub graphSubscription,
nodePubKey string, targetAddrs ...string) { nodePubKey string, targetAddrs ...string) {
for { for {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
for _, update := range graphUpdate.NodeUpdates { for _, update := range graphUpdate.NodeUpdates {
if update.IdentityKey == nodePubKey { if update.IdentityKey == nodePubKey {
assertAddrs( assertAddrs(
@ -7651,13 +7677,17 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) {
return return
} }
} }
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(20 * time.Second): case <-time.After(20 * time.Second):
t.Fatalf("did not receive node ann update") t.Fatalf("did not receive node ann update")
} }
} }
} }
waitForAddrsInUpdate(aliceUpdates, dave.PubKeyStr, advertisedAddrs...) waitForAddrsInUpdate(
aliceSub, dave.PubKeyStr, advertisedAddrs...,
)
// Close the channel between Bob and Dave. // Close the channel between Bob and Dave.
ctxt, _ = context.WithTimeout(ctxb, timeout) ctxt, _ = context.WithTimeout(ctxb, timeout)
@ -11560,11 +11590,11 @@ func testRouteFeeCutoff(net *lntest.NetworkHarness, t *harnessTest) {
// Wait for Alice to receive the channel update from Carol. // Wait for Alice to receive the channel update from Carol.
ctxt, _ = context.WithTimeout(ctxb, timeout) ctxt, _ = context.WithTimeout(ctxb, timeout)
aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxt, net.Alice) aliceSub := subscribeGraphNotifications(t, ctxt, net.Alice)
defer close(aQuit) defer close(aliceSub.quit)
waitForChannelUpdate( waitForChannelUpdate(
t, aliceUpdates, t, aliceSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{carol.PubKeyStr, expectedPolicy, chanPointCarolDave}, {carol.PubKeyStr, expectedPolicy, chanPointCarolDave},
}, },
@ -11774,8 +11804,8 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to connect bob to dave: %v", err) t.Fatalf("unable to connect bob to dave: %v", err)
} }
daveUpdates, quit := subscribeGraphNotifications(t, ctxb, dave) daveSub := subscribeGraphNotifications(t, ctxb, dave)
defer close(quit) defer close(daveSub.quit)
// We should expect to see a channel update with the default routing // We should expect to see a channel update with the default routing
// policy, except that it should indicate the channel is disabled. // policy, except that it should indicate the channel is disabled.
@ -11794,7 +11824,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to suspend carol: %v", err) t.Fatalf("unable to suspend carol: %v", err)
} }
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, {eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
}, },
@ -11808,7 +11838,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
expectedPolicy.Disabled = false expectedPolicy.Disabled = false
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, {eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
}, },
@ -11832,7 +11862,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
// receive an update marking each as disabled. // receive an update marking each as disabled.
expectedPolicy.Disabled = true expectedPolicy.Disabled = true
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob}, {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob},
{net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol}, {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol},
@ -11854,7 +11884,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
} }
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, {eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
}, },