Merge pull request #1770 from cfromknecht/prevent-goroutine-fail

lnd_test: Prevent calling Fatal in goroutine
This commit is contained in:
Olaoluwa Osuntokun 2018-09-03 19:17:00 -07:00 committed by GitHub
commit 32b0f3ff95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -798,8 +798,7 @@ type expectedChanUpdate struct {
// waitForChannelUpdate waits for a node to receive the expected channel // waitForChannelUpdate waits for a node to receive the expected channel
// updates. // updates.
func waitForChannelUpdate(t *harnessTest, func waitForChannelUpdate(t *harnessTest, subscription graphSubscription,
graphUpdates chan *lnrpc.GraphTopologyUpdate,
expUpdates []expectedChanUpdate) { expUpdates []expectedChanUpdate) {
// Create an array indicating which expected channel updates we have // Create an array indicating which expected channel updates we have
@ -808,7 +807,7 @@ func waitForChannelUpdate(t *harnessTest,
out: out:
for { for {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-subscription.updateChan:
for _, update := range graphUpdate.ChannelUpdates { for _, update := range graphUpdate.ChannelUpdates {
// For each expected update, check if it matches // For each expected update, check if it matches
@ -857,6 +856,8 @@ out:
break break
} }
} }
case err := <-subscription.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(20 * time.Second): case <-time.After(20 * time.Second):
t.Fatalf("did not receive channel update") t.Fatalf("did not receive channel update")
} }
@ -948,10 +949,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Launch notification clients for all nodes, such that we can // Launch notification clients for all nodes, such that we can
// get notified when they discover new channels and updates in the // get notified when they discover new channels and updates in the
// graph. // graph.
aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxb, net.Alice) aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice)
defer close(aQuit) defer close(aliceSub.quit)
bobUpdates, bQuit := subscribeGraphNotifications(t, ctxb, net.Bob) bobSub := subscribeGraphNotifications(t, ctxb, net.Bob)
defer close(bQuit) defer close(bobSub.quit)
chanAmt := maxBtcFundingAmount chanAmt := maxBtcFundingAmount
pushAmt := chanAmt / 2 pushAmt := chanAmt / 2
@ -968,7 +969,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// We add all the nodes' update channels to a slice, such that we can // We add all the nodes' update channels to a slice, such that we can
// make sure they all receive the expected updates. // make sure they all receive the expected updates.
nodeUpdates := []chan *lnrpc.GraphTopologyUpdate{aliceUpdates, bobUpdates} graphSubs := []graphSubscription{aliceSub, bobSub}
nodes := []*lntest.HarnessNode{net.Alice, net.Bob} nodes := []*lntest.HarnessNode{net.Alice, net.Bob}
// Alice and Bob should see each other's ChannelUpdates, advertising the // Alice and Bob should see each other's ChannelUpdates, advertising the
@ -980,9 +981,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
MinHtlc: defaultMinHtlc, MinHtlc: defaultMinHtlc,
} }
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint},
{net.Bob.PubKeyStr, expectedPolicy, chanPoint}, {net.Bob.PubKeyStr, expectedPolicy, chanPoint},
@ -1019,10 +1020,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Clean up carol's node when the test finishes. // Clean up carol's node when the test finishes.
defer shutdownAndAssert(net, t, carol) defer shutdownAndAssert(net, t, carol)
carolUpdates, cQuit := subscribeGraphNotifications(t, ctxb, carol) carolSub := subscribeGraphNotifications(t, ctxb, carol)
defer close(cQuit) defer close(carolSub.quit)
nodeUpdates = append(nodeUpdates, carolUpdates) graphSubs = append(graphSubs, carolSub)
nodes = append(nodes, carol) nodes = append(nodes, carol)
// Send some coins to Carol that can be used for channel funding. // Send some coins to Carol that can be used for channel funding.
@ -1065,9 +1066,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
MinHtlc: defaultMinHtlc, MinHtlc: defaultMinHtlc,
} }
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2}, {net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2},
{carol.PubKeyStr, expectedPolicyCarol, chanPoint2}, {carol.PubKeyStr, expectedPolicyCarol, chanPoint2},
@ -1251,9 +1252,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
} }
// Wait for all nodes to have seen the policy update done by Bob. // Wait for all nodes to have seen the policy update done by Bob.
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Bob.PubKeyStr, expectedPolicy, chanPoint}, {net.Bob.PubKeyStr, expectedPolicy, chanPoint},
}, },
@ -1340,9 +1341,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
// Wait for all nodes to have seen the policy updates for both of // Wait for all nodes to have seen the policy updates for both of
// Alice's channels. // Alice's channels.
for _, updates := range nodeUpdates { for _, graphSub := range graphSubs {
waitForChannelUpdate( waitForChannelUpdate(
t, updates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint},
{net.Alice.PubKeyStr, expectedPolicy, chanPoint3}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint3},
@ -3347,12 +3348,11 @@ func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
// Wait for listener node to receive the channel update from node. // Wait for listener node to receive the channel update from node.
ctxt, _ = context.WithTimeout(ctxb, timeout) ctxt, _ = context.WithTimeout(ctxb, timeout)
listenerUpdates, aQuit := subscribeGraphNotifications(t, ctxt, graphSub := subscribeGraphNotifications(t, ctxt, listenerNode)
listenerNode) defer close(graphSub.quit)
defer close(aQuit)
waitForChannelUpdate( waitForChannelUpdate(
t, listenerUpdates, t, graphSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{node.PubKeyStr, expectedPolicy, chanPoint}, {node.PubKeyStr, expectedPolicy, chanPoint},
}, },
@ -7372,10 +7372,19 @@ out:
} }
} }
// graphSubscription houses the proxied update and error chans for a node's
// graph subscriptions.
type graphSubscription struct {
updateChan chan *lnrpc.GraphTopologyUpdate
errChan chan error
quit chan struct{}
}
// subscribeGraphNotifications subscribes to channel graph updates and launches // subscribeGraphNotifications subscribes to channel graph updates and launches
// a goroutine that forwards these to the returned channel. // a goroutine that forwards these to the returned channel.
func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
node *lntest.HarnessNode) (chan *lnrpc.GraphTopologyUpdate, chan struct{}) { node *lntest.HarnessNode) graphSubscription {
// We'll first start by establishing a notification client which will // We'll first start by establishing a notification client which will
// send us notifications upon detected changes in the channel graph. // send us notifications upon detected changes in the channel graph.
req := &lnrpc.GraphTopologySubscription{} req := &lnrpc.GraphTopologySubscription{}
@ -7387,6 +7396,7 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
// We'll launch a goroutine that will be responsible for proxying all // We'll launch a goroutine that will be responsible for proxying all
// notifications recv'd from the client into the channel below. // notifications recv'd from the client into the channel below.
errChan := make(chan error, 1)
quit := make(chan struct{}) quit := make(chan struct{})
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20) graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20)
go func() { go func() {
@ -7407,8 +7417,11 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
if err == io.EOF { if err == io.EOF {
return return
} else if err != nil { } else if err != nil {
t.Fatalf("unable to recv graph update: %v", select {
err) case errChan <- err:
case <-quit:
}
return
} }
select { select {
@ -7419,7 +7432,12 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context,
} }
} }
}() }()
return graphUpdates, quit
return graphSubscription{
updateChan: graphUpdates,
errChan: errChan,
quit: quit,
}
} }
func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) {
@ -7428,7 +7446,10 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
ctxb := context.Background() ctxb := context.Background()
// Let Alice subscribe to graph notifications. // Let Alice subscribe to graph notifications.
graphUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) graphSub := subscribeGraphNotifications(
t, ctxb, net.Alice,
)
defer close(graphSub.quit)
// Open a new channel between Alice and Bob. // Open a new channel between Alice and Bob.
ctxt, _ := context.WithTimeout(ctxb, timeout) ctxt, _ := context.WithTimeout(ctxb, timeout)
@ -7447,7 +7468,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
select { select {
// Ensure that a new update for both created edges is properly // Ensure that a new update for both created edges is properly
// dispatched to our registered client. // dispatched to our registered client.
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
if len(graphUpdate.ChannelUpdates) > 0 { if len(graphUpdate.ChannelUpdates) > 0 {
chanUpdate := graphUpdate.ChannelUpdates[0] chanUpdate := graphUpdate.ChannelUpdates[0]
@ -7482,6 +7503,8 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
nodeUpdate.IdentityKey) nodeUpdate.IdentityKey)
} }
} }
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("timeout waiting for graph notification %v", i) t.Fatalf("timeout waiting for graph notification %v", i)
} }
@ -7502,7 +7525,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest)
out: out:
for { for {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
if len(graphUpdate.ClosedChans) != 1 { if len(graphUpdate.ClosedChans) != 1 {
continue continue
} }
@ -7535,6 +7558,9 @@ out:
} }
break out break out
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("notification for channel closure not " + t.Fatalf("notification for channel closure not " +
"sent") "sent")
@ -7581,7 +7607,7 @@ out:
// Bob's new node announcement, and the channel between Bob and Carol. // Bob's new node announcement, and the channel between Bob and Carol.
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
if len(graphUpdate.NodeUpdates) > 0 { if len(graphUpdate.NodeUpdates) > 0 {
nodeUpdate := graphUpdate.NodeUpdates[0] nodeUpdate := graphUpdate.NodeUpdates[0]
switch nodeUpdate.IdentityKey { switch nodeUpdate.IdentityKey {
@ -7615,6 +7641,8 @@ out:
chanUpdate.ConnectingNode) chanUpdate.ConnectingNode)
} }
} }
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(time.Second * 10): case <-time.After(time.Second * 10):
t.Fatalf("timeout waiting for graph notification %v", i) t.Fatalf("timeout waiting for graph notification %v", i)
} }
@ -7623,8 +7651,6 @@ out:
// Close the channel between Bob and Carol. // Close the channel between Bob and Carol.
ctxt, _ = context.WithTimeout(context.Background(), timeout) ctxt, _ = context.WithTimeout(context.Background(), timeout)
closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false)
close(quit)
} }
// testNodeAnnouncement ensures that when a node is started with one or more // testNodeAnnouncement ensures that when a node is started with one or more
@ -7632,8 +7658,8 @@ out:
// announced to the network and reported in the network graph. // announced to the network and reported in the network graph.
func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background() ctxb := context.Background()
aliceUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice)
defer close(quit) defer close(aliceSub.quit)
advertisedAddrs := []string{ advertisedAddrs := []string{
"192.168.1.1:8333", "192.168.1.1:8333",
@ -7686,12 +7712,12 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) {
} }
} }
waitForAddrsInUpdate := func(graphUpdates <-chan *lnrpc.GraphTopologyUpdate, waitForAddrsInUpdate := func(graphSub graphSubscription,
nodePubKey string, targetAddrs ...string) { nodePubKey string, targetAddrs ...string) {
for { for {
select { select {
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphSub.updateChan:
for _, update := range graphUpdate.NodeUpdates { for _, update := range graphUpdate.NodeUpdates {
if update.IdentityKey == nodePubKey { if update.IdentityKey == nodePubKey {
assertAddrs( assertAddrs(
@ -7701,13 +7727,17 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) {
return return
} }
} }
case err := <-graphSub.errChan:
t.Fatalf("unable to recv graph update: %v", err)
case <-time.After(20 * time.Second): case <-time.After(20 * time.Second):
t.Fatalf("did not receive node ann update") t.Fatalf("did not receive node ann update")
} }
} }
} }
waitForAddrsInUpdate(aliceUpdates, dave.PubKeyStr, advertisedAddrs...) waitForAddrsInUpdate(
aliceSub, dave.PubKeyStr, advertisedAddrs...,
)
// Close the channel between Bob and Dave. // Close the channel between Bob and Dave.
ctxt, _ = context.WithTimeout(ctxb, timeout) ctxt, _ = context.WithTimeout(ctxb, timeout)
@ -11610,11 +11640,11 @@ func testRouteFeeCutoff(net *lntest.NetworkHarness, t *harnessTest) {
// Wait for Alice to receive the channel update from Carol. // Wait for Alice to receive the channel update from Carol.
ctxt, _ = context.WithTimeout(ctxb, timeout) ctxt, _ = context.WithTimeout(ctxb, timeout)
aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxt, net.Alice) aliceSub := subscribeGraphNotifications(t, ctxt, net.Alice)
defer close(aQuit) defer close(aliceSub.quit)
waitForChannelUpdate( waitForChannelUpdate(
t, aliceUpdates, t, aliceSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{carol.PubKeyStr, expectedPolicy, chanPointCarolDave}, {carol.PubKeyStr, expectedPolicy, chanPointCarolDave},
}, },
@ -11824,8 +11854,8 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to connect bob to dave: %v", err) t.Fatalf("unable to connect bob to dave: %v", err)
} }
daveUpdates, quit := subscribeGraphNotifications(t, ctxb, dave) daveSub := subscribeGraphNotifications(t, ctxb, dave)
defer close(quit) defer close(daveSub.quit)
// We should expect to see a channel update with the default routing // We should expect to see a channel update with the default routing
// policy, except that it should indicate the channel is disabled. // policy, except that it should indicate the channel is disabled.
@ -11844,7 +11874,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("unable to suspend carol: %v", err) t.Fatalf("unable to suspend carol: %v", err)
} }
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, {eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
}, },
@ -11858,7 +11888,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
expectedPolicy.Disabled = false expectedPolicy.Disabled = false
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, {eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
}, },
@ -11882,7 +11912,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
// receive an update marking each as disabled. // receive an update marking each as disabled.
expectedPolicy.Disabled = true expectedPolicy.Disabled = true
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob}, {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob},
{net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol}, {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol},
@ -11904,7 +11934,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
} }
waitForChannelUpdate( waitForChannelUpdate(
t, daveUpdates, t, daveSub,
[]expectedChanUpdate{ []expectedChanUpdate{
{eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, {eve.PubKeyStr, expectedPolicy, chanPointEveCarol},
}, },