diff --git a/lnd_test.go b/lnd_test.go index 11b99934..2ee3e87d 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -798,8 +798,7 @@ type expectedChanUpdate struct { // waitForChannelUpdate waits for a node to receive the expected channel // updates. -func waitForChannelUpdate(t *harnessTest, - graphUpdates chan *lnrpc.GraphTopologyUpdate, +func waitForChannelUpdate(t *harnessTest, subscription graphSubscription, expUpdates []expectedChanUpdate) { // Create an array indicating which expected channel updates we have @@ -808,7 +807,7 @@ func waitForChannelUpdate(t *harnessTest, out: for { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-subscription.updateChan: for _, update := range graphUpdate.ChannelUpdates { // For each expected update, check if it matches @@ -857,6 +856,8 @@ out: break } } + case err := <-subscription.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(20 * time.Second): t.Fatalf("did not receive channel update") } @@ -948,10 +949,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Launch notification clients for all nodes, such that we can // get notified when they discover new channels and updates in the // graph. - aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxb, net.Alice) - defer close(aQuit) - bobUpdates, bQuit := subscribeGraphNotifications(t, ctxb, net.Bob) - defer close(bQuit) + aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice) + defer close(aliceSub.quit) + bobSub := subscribeGraphNotifications(t, ctxb, net.Bob) + defer close(bobSub.quit) chanAmt := maxBtcFundingAmount pushAmt := chanAmt / 2 @@ -968,7 +969,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // We add all the nodes' update channels to a slice, such that we can // make sure they all receive the expected updates. - nodeUpdates := []chan *lnrpc.GraphTopologyUpdate{aliceUpdates, bobUpdates} + graphSubs := []graphSubscription{aliceSub, bobSub} nodes := []*lntest.HarnessNode{net.Alice, net.Bob} // Alice and Bob should see each other's ChannelUpdates, advertising the @@ -980,9 +981,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { MinHtlc: defaultMinHtlc, } - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Bob.PubKeyStr, expectedPolicy, chanPoint}, @@ -1019,10 +1020,10 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Clean up carol's node when the test finishes. defer shutdownAndAssert(net, t, carol) - carolUpdates, cQuit := subscribeGraphNotifications(t, ctxb, carol) - defer close(cQuit) + carolSub := subscribeGraphNotifications(t, ctxb, carol) + defer close(carolSub.quit) - nodeUpdates = append(nodeUpdates, carolUpdates) + graphSubs = append(graphSubs, carolSub) nodes = append(nodes, carol) // Send some coins to Carol that can be used for channel funding. @@ -1065,9 +1066,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { MinHtlc: defaultMinHtlc, } - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Bob.PubKeyStr, expectedPolicyBob, chanPoint2}, {carol.PubKeyStr, expectedPolicyCarol, chanPoint2}, @@ -1251,9 +1252,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { } // Wait for all nodes to have seen the policy update done by Bob. - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Bob.PubKeyStr, expectedPolicy, chanPoint}, }, @@ -1340,9 +1341,9 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) { // Wait for all nodes to have seen the policy updates for both of // Alice's channels. - for _, updates := range nodeUpdates { + for _, graphSub := range graphSubs { waitForChannelUpdate( - t, updates, + t, graphSub, []expectedChanUpdate{ {net.Alice.PubKeyStr, expectedPolicy, chanPoint}, {net.Alice.PubKeyStr, expectedPolicy, chanPoint3}, @@ -3297,12 +3298,11 @@ func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode, // Wait for listener node to receive the channel update from node. ctxt, _ = context.WithTimeout(ctxb, timeout) - listenerUpdates, aQuit := subscribeGraphNotifications(t, ctxt, - listenerNode) - defer close(aQuit) + graphSub := subscribeGraphNotifications(t, ctxt, listenerNode) + defer close(graphSub.quit) waitForChannelUpdate( - t, listenerUpdates, + t, graphSub, []expectedChanUpdate{ {node.PubKeyStr, expectedPolicy, chanPoint}, }, @@ -7322,10 +7322,19 @@ out: } } +// graphSubscription houses the proxied update and error chans for a node's +// graph subscriptions. +type graphSubscription struct { + updateChan chan *lnrpc.GraphTopologyUpdate + errChan chan error + quit chan struct{} +} + // subscribeGraphNotifications subscribes to channel graph updates and launches // a goroutine that forwards these to the returned channel. func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, - node *lntest.HarnessNode) (chan *lnrpc.GraphTopologyUpdate, chan struct{}) { + node *lntest.HarnessNode) graphSubscription { + // We'll first start by establishing a notification client which will // send us notifications upon detected changes in the channel graph. req := &lnrpc.GraphTopologySubscription{} @@ -7337,6 +7346,7 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, // We'll launch a goroutine that will be responsible for proxying all // notifications recv'd from the client into the channel below. + errChan := make(chan error, 1) quit := make(chan struct{}) graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 20) go func() { @@ -7357,8 +7367,11 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, if err == io.EOF { return } else if err != nil { - t.Fatalf("unable to recv graph update: %v", - err) + select { + case errChan <- err: + case <-quit: + } + return } select { @@ -7369,7 +7382,12 @@ func subscribeGraphNotifications(t *harnessTest, ctxb context.Context, } } }() - return graphUpdates, quit + + return graphSubscription{ + updateChan: graphUpdates, + errChan: errChan, + quit: quit, + } } func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) { @@ -7378,7 +7396,10 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) ctxb := context.Background() // Let Alice subscribe to graph notifications. - graphUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) + graphSub := subscribeGraphNotifications( + t, ctxb, net.Alice, + ) + defer close(graphSub.quit) // Open a new channel between Alice and Bob. ctxt, _ := context.WithTimeout(ctxb, timeout) @@ -7397,7 +7418,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) select { // Ensure that a new update for both created edges is properly // dispatched to our registered client. - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: if len(graphUpdate.ChannelUpdates) > 0 { chanUpdate := graphUpdate.ChannelUpdates[0] @@ -7432,6 +7453,8 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) nodeUpdate.IdentityKey) } } + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(time.Second * 10): t.Fatalf("timeout waiting for graph notification %v", i) } @@ -7452,7 +7475,7 @@ func testGraphTopologyNotifications(net *lntest.NetworkHarness, t *harnessTest) out: for { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: if len(graphUpdate.ClosedChans) != 1 { continue } @@ -7485,6 +7508,9 @@ out: } break out + + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(time.Second * 10): t.Fatalf("notification for channel closure not " + "sent") @@ -7531,7 +7557,7 @@ out: // Bob's new node announcement, and the channel between Bob and Carol. for i := 0; i < 3; i++ { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: if len(graphUpdate.NodeUpdates) > 0 { nodeUpdate := graphUpdate.NodeUpdates[0] switch nodeUpdate.IdentityKey { @@ -7565,6 +7591,8 @@ out: chanUpdate.ConnectingNode) } } + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(time.Second * 10): t.Fatalf("timeout waiting for graph notification %v", i) } @@ -7573,8 +7601,6 @@ out: // Close the channel between Bob and Carol. ctxt, _ = context.WithTimeout(context.Background(), timeout) closeChannelAndAssert(ctxt, t, net, net.Bob, chanPoint, false) - - close(quit) } // testNodeAnnouncement ensures that when a node is started with one or more @@ -7582,8 +7608,8 @@ out: // announced to the network and reported in the network graph. func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { ctxb := context.Background() - aliceUpdates, quit := subscribeGraphNotifications(t, ctxb, net.Alice) - defer close(quit) + aliceSub := subscribeGraphNotifications(t, ctxb, net.Alice) + defer close(aliceSub.quit) advertisedAddrs := []string{ "192.168.1.1:8333", @@ -7636,12 +7662,12 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { } } - waitForAddrsInUpdate := func(graphUpdates <-chan *lnrpc.GraphTopologyUpdate, + waitForAddrsInUpdate := func(graphSub graphSubscription, nodePubKey string, targetAddrs ...string) { for { select { - case graphUpdate := <-graphUpdates: + case graphUpdate := <-graphSub.updateChan: for _, update := range graphUpdate.NodeUpdates { if update.IdentityKey == nodePubKey { assertAddrs( @@ -7651,13 +7677,17 @@ func testNodeAnnouncement(net *lntest.NetworkHarness, t *harnessTest) { return } } + case err := <-graphSub.errChan: + t.Fatalf("unable to recv graph update: %v", err) case <-time.After(20 * time.Second): t.Fatalf("did not receive node ann update") } } } - waitForAddrsInUpdate(aliceUpdates, dave.PubKeyStr, advertisedAddrs...) + waitForAddrsInUpdate( + aliceSub, dave.PubKeyStr, advertisedAddrs..., + ) // Close the channel between Bob and Dave. ctxt, _ = context.WithTimeout(ctxb, timeout) @@ -11560,11 +11590,11 @@ func testRouteFeeCutoff(net *lntest.NetworkHarness, t *harnessTest) { // Wait for Alice to receive the channel update from Carol. ctxt, _ = context.WithTimeout(ctxb, timeout) - aliceUpdates, aQuit := subscribeGraphNotifications(t, ctxt, net.Alice) - defer close(aQuit) + aliceSub := subscribeGraphNotifications(t, ctxt, net.Alice) + defer close(aliceSub.quit) waitForChannelUpdate( - t, aliceUpdates, + t, aliceSub, []expectedChanUpdate{ {carol.PubKeyStr, expectedPolicy, chanPointCarolDave}, }, @@ -11774,8 +11804,8 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to connect bob to dave: %v", err) } - daveUpdates, quit := subscribeGraphNotifications(t, ctxb, dave) - defer close(quit) + daveSub := subscribeGraphNotifications(t, ctxb, dave) + defer close(daveSub.quit) // We should expect to see a channel update with the default routing // policy, except that it should indicate the channel is disabled. @@ -11794,7 +11824,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { t.Fatalf("unable to suspend carol: %v", err) } waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, }, @@ -11808,7 +11838,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { expectedPolicy.Disabled = false waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, }, @@ -11832,7 +11862,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { // receive an update marking each as disabled. expectedPolicy.Disabled = true waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceBob}, {net.Alice.PubKeyStr, expectedPolicy, chanPointAliceCarol}, @@ -11854,7 +11884,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) { } waitForChannelUpdate( - t, daveUpdates, + t, daveSub, []expectedChanUpdate{ {eve.PubKeyStr, expectedPolicy, chanPointEveCarol}, },