test: remove many sleeps from integration test by using new topology ntfns

This commit removes a number of sleeps from the set of current
integration tests by replacing them with a synchronous (w/ a timeout)
block until one or many channels are detected as being open within the
network.

As a result, the tests are now more robust, many flakes have been
eliminated, and finally this shaves a few second off of the integration
testing runs.
This commit is contained in:
Olaoluwa Osuntokun 2017-03-14 20:06:33 -07:00
parent 8ed79ae497
commit e4e63eb3a3
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 97 additions and 80 deletions

@ -258,6 +258,12 @@ func testBasicChannelFunding(net *networkHarness, t *harnessTest) {
chanPoint := openChannelAndAssert(ctxt, t, net, net.Alice, net.Bob, chanPoint := openChannelAndAssert(ctxt, t, net, net.Alice, net.Bob,
chanAmt, pushAmt) chanAmt, pushAmt)
ctxt, _ = context.WithTimeout(ctxb, time.Second*15)
err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
t.Fatalf("alice didn't report channel: %v", err)
}
// With then channel open, ensure that the amount specified above has // With then channel open, ensure that the amount specified above has
// properly been pushed to Bob. // properly been pushed to Bob.
balReq := &lnrpc.ChannelBalanceRequest{} balReq := &lnrpc.ChannelBalanceRequest{}
@ -447,18 +453,24 @@ func testChannelBalance(net *networkHarness, t *harnessTest) {
chanPoint := openChannelAndAssert(ctx, t, net, net.Alice, net.Bob, chanPoint := openChannelAndAssert(ctx, t, net, net.Alice, net.Bob,
amount, 0) amount, 0)
// Wait for both Alice and Bob to recognize this new channel.
ctxt, _ := context.WithTimeout(context.Background(), timeout)
err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
t.Fatalf("alice didn't advertise channel before "+
"timeout: %v", err)
}
ctxt, _ = context.WithTimeout(context.Background(), timeout)
err = net.Bob.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
t.Fatalf("bob didn't advertise channel before "+
"timeout: %v", err)
}
// As this is a single funder channel, Alice's balance should be // As this is a single funder channel, Alice's balance should be
// exactly 0.5 BTC since now state transitions have taken place yet. // exactly 0.5 BTC since now state transitions have taken place yet.
checkChannelBalance(net.Alice, amount) checkChannelBalance(net.Alice, amount)
// Since we only explicitly wait for Alice's channel open notification,
// Bob might not yet have updated his internal state in response to
// Alice's channel open proof. So we sleep here for a second to let Bob
// catch up.
// TODO(roasbeef): Bob should also watch for the channel on-chain after
// the changes to restrict the number of pending channels are in.
time.Sleep(time.Second)
// Ensure Bob currently has no available balance within the channel. // Ensure Bob currently has no available balance within the channel.
checkChannelBalance(net.Bob, 0) checkChannelBalance(net.Bob, 0)
@ -727,9 +739,17 @@ func testSingleHopInvoice(net *networkHarness, t *harnessTest) {
t.Fatalf("unable to add invoice: %v", err) t.Fatalf("unable to add invoice: %v", err)
} }
// Wait for Alice to recognize and advertise the new channel generated
// above.
ctxt, _ = context.WithTimeout(ctxb, timeout)
err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
t.Fatalf("alice didn't advertise channel before "+
"timeout: %v", err)
}
// With the invoice for Bob added, send a payment towards Alice paying // With the invoice for Bob added, send a payment towards Alice paying
// to the above generated invoice. // to the above generated invoice.
time.Sleep(time.Millisecond * 500)
sendStream, err := net.Alice.SendPayment(ctxb) sendStream, err := net.Alice.SendPayment(ctxb)
if err != nil { if err != nil {
t.Fatalf("unable to create alice payment stream: %v", err) t.Fatalf("unable to create alice payment stream: %v", err)
@ -754,8 +774,6 @@ func testSingleHopInvoice(net *networkHarness, t *harnessTest) {
} }
// Bob's invoice should now be found and marked as settled. // Bob's invoice should now be found and marked as settled.
// TODO(roasbeef): remove sleep after hooking into the to-be-written
// invoice settlement notification stream
payHash := &lnrpc.PaymentHash{ payHash := &lnrpc.PaymentHash{
RHash: invoiceResp.RHash, RHash: invoiceResp.RHash,
} }
@ -848,9 +866,17 @@ func testListPayments(net *networkHarness, t *harnessTest) {
t.Fatalf("unable to add invoice: %v", err) t.Fatalf("unable to add invoice: %v", err)
} }
// Wait for Alice to recognize and advertise the new channel generated
// above.
ctxt, _ = context.WithTimeout(ctxb, timeout)
err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
t.Fatalf("alice didn't advertise channel before "+
"timeout: %v", err)
}
// With the invoice for Bob added, send a payment towards Alice paying // With the invoice for Bob added, send a payment towards Alice paying
// to the above generated invoice. // to the above generated invoice.
time.Sleep(time.Millisecond * 300)
sendStream, err := net.Alice.SendPayment(ctxb) sendStream, err := net.Alice.SendPayment(ctxb)
if err != nil { if err != nil {
t.Fatalf("unable to create alice payment stream: %v", err) t.Fatalf("unable to create alice payment stream: %v", err)
@ -998,51 +1024,17 @@ func testMultiHopPayments(net *networkHarness, t *harnessTest) {
rHashes[i] = resp.RHash rHashes[i] = resp.RHash
} }
// Carol's routing table should show a path from Carol -> Alice -> Bob, // Wait for carol to recognize both the Channel from herself to Carol,
// with the two channels above recognized as the only links within the // and also the channel from Alice to Bob.
// network. ctxt, _ = context.WithTimeout(ctxb, timeout)
time.Sleep(time.Second * 2) err = carol.WaitForNetworkChannelOpen(ctxt, chanPointCarol)
req := &lnrpc.ChannelGraphRequest{}
chanGraph, err := carol.DescribeGraph(ctxb, req)
if err != nil { if err != nil {
t.Fatalf("unable to query for carol's routing table: %v", err) t.Fatalf("carol didn't advertise her channel: %v", err)
}
if len(chanGraph.Edges) != 2 {
t.Fatalf("only two channels should be seen as active in the "+
"network, instead %v are: %v", len(chanGraph.Edges),
chanGraph.Edges)
}
for _, link := range chanGraph.Edges {
switch {
case link.ChanPoint == aliceFundPoint.String():
switch {
case link.Node1Pub == net.Alice.PubKeyStr &&
link.Node2Pub == net.Bob.PubKeyStr:
continue
case link.Node1Pub == net.Bob.PubKeyStr &&
link.Node2Pub == net.Alice.PubKeyStr:
continue
default:
t.Fatalf("unknown link within routing "+
"table: %v", spew.Sdump(link))
}
case link.ChanPoint == carolFundPoint.String():
switch {
case link.Node1Pub == net.Alice.PubKeyStr &&
link.Node2Pub == carol.PubKeyStr:
continue
case link.Node1Pub == carol.PubKeyStr &&
link.Node2Pub == net.Alice.PubKeyStr:
continue
default:
t.Fatalf("unknown link within routing "+
"table: %v", spew.Sdump(link))
}
default:
t.Fatalf("unknown channel %v found in routing table, "+
"only %v and %v should exist", spew.Sdump(link),
aliceFundPoint, carolFundPoint)
} }
ctxt, _ = context.WithTimeout(ctxb, timeout)
err = carol.WaitForNetworkChannelOpen(ctxt, chanPointAlice)
if err != nil {
t.Fatalf("carol didn't see the alice->bob channel before timeout: %v", err)
} }
// Using Carol as the source, pay to the 5 invoices from Bob created above. // Using Carol as the source, pay to the 5 invoices from Bob created above.
@ -1167,7 +1159,7 @@ func testMultiHopPayments(net *networkHarness, t *harnessTest) {
// Finally, shutdown the node we created for the duration of the tests, // Finally, shutdown the node we created for the duration of the tests,
// only leaving the two seed nodes (Alice and Bob) within our test // only leaving the two seed nodes (Alice and Bob) within our test
// network. // network.
if err := carol.shutdown(); err != nil { if err := carol.Shutdown(); err != nil {
t.Fatalf("unable to shutdown carol: %v", err) t.Fatalf("unable to shutdown carol: %v", err)
} }
} }
@ -1226,9 +1218,18 @@ func testInvoiceSubscriptions(net *networkHarness, t *harnessTest) {
close(updateSent) close(updateSent)
}() }()
// Wait for the channel to be recognized by both Alice and Bob before
// continuing the rest of the test.
ctxt, _ = context.WithTimeout(ctxb, timeout)
err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
// TODO(roasbeef): will need to make num blocks to advertise a
// node param
t.Fatalf("channel not seen by alice before timeout: %v", err)
}
// With the assertion above set up, send a payment from Alice to Bob // With the assertion above set up, send a payment from Alice to Bob
// which should finalize and settle the invoice. // which should finalize and settle the invoice.
time.Sleep(time.Millisecond * 500)
sendStream, err := net.Alice.SendPayment(ctxb) sendStream, err := net.Alice.SendPayment(ctxb)
if err != nil { if err != nil {
t.Fatalf("unable to create alice payment stream: %v", err) t.Fatalf("unable to create alice payment stream: %v", err)
@ -1312,9 +1313,8 @@ func testMaxPendingChannels(net *networkHarness, t *harnessTest) {
} }
// Send open channel requests without generating new blocks thereby // Send open channel requests without generating new blocks thereby
// increasing pool of pending channels. Then check that we can't // increasing pool of pending channels. Then check that we can't open
// open the channel if the number of pending channels exceed // the channel if the number of pending channels exceed max value.
// max value.
openStreams := make([]lnrpc.Lightning_OpenChannelClient, maxPendingChannels) openStreams := make([]lnrpc.Lightning_OpenChannelClient, maxPendingChannels)
for i := 0; i < maxPendingChannels; i++ { for i := 0; i < maxPendingChannels; i++ {
ctx, _ = context.WithTimeout(context.Background(), timeout) ctx, _ = context.WithTimeout(context.Background(), timeout)
@ -1336,12 +1336,12 @@ func testMaxPendingChannels(net *networkHarness, t *harnessTest) {
t.Fatalf("not expected error was received: %v", err) t.Fatalf("not expected error was received: %v", err)
} }
// For now our channels are in pending state, in order to not // For now our channels are in pending state, in order to not interfere
// interfere with other tests we should clean up - complete opening // with other tests we should clean up - complete opening of the
// of the channel and then close it. // channel and then close it.
// Mine a block, then wait for node's to notify us that the channel // Mine a block, then wait for node's to notify us that the channel has
// has been opened. The funding transactions should be found within the // been opened. The funding transactions should be found within the
// newly mined block. // newly mined block.
block := mineBlocks(t, net, 1)[0] block := mineBlocks(t, net, 1)[0]
@ -1358,7 +1358,15 @@ func testMaxPendingChannels(net *networkHarness, t *harnessTest) {
t.Fatalf("unable to create sha hash: %v", err) t.Fatalf("unable to create sha hash: %v", err)
} }
// Ensure that the funding transaction enters a block, and is
// properly advertised by Alice.
assertTxInBlock(t, block, fundingTxID) assertTxInBlock(t, block, fundingTxID)
ctxt, _ = context.WithTimeout(context.Background(), timeout)
err = net.Alice.WaitForNetworkChannelOpen(ctxt, fundingChanPoint)
if err != nil {
t.Fatalf("channel not seen on network before "+
"timeout: %v", err)
}
// The channel should be listed in the peer information // The channel should be listed in the peer information
// returned by both peers. // returned by both peers.
@ -1366,7 +1374,6 @@ func testMaxPendingChannels(net *networkHarness, t *harnessTest) {
Hash: *fundingTxID, Hash: *fundingTxID,
Index: fundingChanPoint.OutputIndex, Index: fundingChanPoint.OutputIndex,
} }
time.Sleep(time.Millisecond * 500)
if err := net.AssertChannelExists(ctx, net.Alice, &chanPoint); err != nil { if err := net.AssertChannelExists(ctx, net.Alice, &chanPoint); err != nil {
t.Fatalf("unable to assert channel existence: %v", err) t.Fatalf("unable to assert channel existence: %v", err)
} }
@ -1384,7 +1391,7 @@ func testMaxPendingChannels(net *networkHarness, t *harnessTest) {
// Finally, shutdown the node we created for the duration of the tests, // Finally, shutdown the node we created for the duration of the tests,
// only leaving the two seed nodes (Alice and Bob) within our test // only leaving the two seed nodes (Alice and Bob) within our test
// network. // network.
if err := carol.shutdown(); err != nil { if err := carol.Shutdown(); err != nil {
t.Fatalf("unable to shutdown carol: %v", err) t.Fatalf("unable to shutdown carol: %v", err)
} }
} }
@ -1618,7 +1625,6 @@ poll:
// Finally, obtain Alice's channel state, she shouldn't report any // Finally, obtain Alice's channel state, she shouldn't report any
// channel as she just successfully brought Bob to justice by sweeping // channel as she just successfully brought Bob to justice by sweeping
// all the channel funds. // all the channel funds.
time.Sleep(time.Second * 2)
req := &lnrpc.ListChannelsRequest{} req := &lnrpc.ListChannelsRequest{}
aliceChanInfo, err := net.Alice.ListChannels(ctxb, req) aliceChanInfo, err := net.Alice.ListChannels(ctxb, req)
if err != nil { if err != nil {
@ -1644,6 +1650,10 @@ func testHtlcErrorPropagation(net *networkHarness, t *harnessTest) {
ctxt, _ := context.WithTimeout(ctxb, timeout) ctxt, _ := context.WithTimeout(ctxb, timeout)
chanPointAlice := openChannelAndAssert(ctxt, t, net, net.Alice, net.Bob, chanPointAlice := openChannelAndAssert(ctxt, t, net, net.Alice, net.Bob,
chanAmt, 0) chanAmt, 0)
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPointAlice); err != nil {
t.Fatalf("channel not seen by alice before timeout: %v", err)
}
assertBaseBalance := func() { assertBaseBalance := func() {
balReq := &lnrpc.ChannelBalanceRequest{} balReq := &lnrpc.ChannelBalanceRequest{}
@ -1693,6 +1703,7 @@ func testHtlcErrorPropagation(net *networkHarness, t *harnessTest) {
defer checkTableTicker.Stop() defer checkTableTicker.Stop()
out: out:
// TODO(roasbeef): make into async hook for node announcements
for { for {
select { select {
case <-checkTableTicker.C: case <-checkTableTicker.C:
@ -1723,6 +1734,13 @@ out:
t.Fatalf("unable to generate carol invoice: %v", err) t.Fatalf("unable to generate carol invoice: %v", err)
} }
// Before we send the payment, ensure that the announcement of the new
// channel has been processed by Alice.
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.Alice.WaitForNetworkChannelOpen(ctxt, chanPointBob); err != nil {
t.Fatalf("channel not seen by alice before timeout: %v", err)
}
// TODO(roasbeef): return failure response rather than failing entire // TODO(roasbeef): return failure response rather than failing entire
// stream on payment error. // stream on payment error.
alicePayStream, err := net.Alice.SendPayment(ctxb) alicePayStream, err := net.Alice.SendPayment(ctxb)
@ -1850,7 +1868,7 @@ out:
// We'll attempt to complete the original invoice we created with Carol // We'll attempt to complete the original invoice we created with Carol
// above, but before we do so, Carol will go offline, resulting in a // above, but before we do so, Carol will go offline, resulting in a
// failed payment. // failed payment.
if err := carol.shutdown(); err != nil { if err := carol.Shutdown(); err != nil {
t.Fatalf("unable to shutdown carol: %v", err) t.Fatalf("unable to shutdown carol: %v", err)
} }
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
@ -1909,7 +1927,7 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) {
// We'll launch a goroutine that'll be responsible for proxying all // We'll launch a goroutine that'll be responsible for proxying all
// notifications recv'd from the client into the channel below. // notifications recv'd from the client into the channel below.
quit := make(chan struct{}) quit := make(chan struct{})
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) graphUpdates := make(chan *lnrpc.GraphTopologyUpdate, 3)
go func() { go func() {
for { for {
select { select {
@ -1933,9 +1951,6 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) {
const numExpectedUpdates = 2 const numExpectedUpdates = 2
for i := 0; i < numExpectedUpdates; i++ { for i := 0; i < numExpectedUpdates; i++ {
select { select {
case <-time.After(time.Second * 5):
t.Fatalf("notification for new channel not sent")
// Ensure that a new update for both created edges is properly // Ensure that a new update for both created edges is properly
// dispatched to our registered client. // dispatched to our registered client.
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphUpdates:
@ -1963,6 +1978,8 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) {
t.Fatalf("unknown connecting node: %v", t.Fatalf("unknown connecting node: %v",
chanUpdate.ConnectingNode) chanUpdate.ConnectingNode)
} }
case <-time.After(time.Second * 10):
t.Fatalf("notification for new channel not sent")
} }
} }
@ -1979,9 +1996,6 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) {
// Similar to the case above, we should receive another notification // Similar to the case above, we should receive another notification
// detailing the channel closure. // detailing the channel closure.
select { select {
case <-time.After(time.Second * 5):
t.Fatalf("notification for channel closure not " +
"sent")
case graphUpdate := <-graphUpdates: case graphUpdate := <-graphUpdates:
if len(graphUpdate.ClosedChans) != 1 { if len(graphUpdate.ClosedChans) != 1 {
t.Fatalf("expected a single update, instead "+ t.Fatalf("expected a single update, instead "+
@ -2003,6 +2017,9 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) {
t.Fatalf("output index mismatch: expected %v, got %v", t.Fatalf("output index mismatch: expected %v, got %v",
chanPoint.OutputIndex, closedChan.ChanPoint) chanPoint.OutputIndex, closedChan.ChanPoint)
} }
case <-time.After(time.Second * 10):
t.Fatalf("notification for channel closure not " +
"sent")
} }
// For the final portion of the test, we'll ensure that once a new node // For the final portion of the test, we'll ensure that once a new node
@ -2032,14 +2049,14 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) {
t.Fatalf("node update pubkey mismatch: expected %v, got %v", t.Fatalf("node update pubkey mismatch: expected %v, got %v",
carol.PubKeyStr, nodeUpdate.IdentityKey) carol.PubKeyStr, nodeUpdate.IdentityKey)
} }
case <-time.After(time.Second * 5): case <-time.After(time.Second * 10):
t.Fatalf("node update ntfn not sent") t.Fatalf("node update ntfn not sent")
} }
close(quit) close(quit)
// Finally, shutdown carol as our test has concluded successfully. // Finally, shutdown carol as our test has concluded successfully.
if err := carol.shutdown(); err != nil { if err := carol.Shutdown(); err != nil {
t.Fatalf("unable to shutdown carol: %v", err) t.Fatalf("unable to shutdown carol: %v", err)
} }
} }

@ -1686,7 +1686,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
// notifications from the channel router. // notifications from the channel router.
client, err := r.server.chanRouter.SubscribeTopology() client, err := r.server.chanRouter.SubscribeTopology()
if err != nil { if err != nil {
return nil return err
} }
// Ensure that the resources for the topology update client is cleaned // Ensure that the resources for the topology update client is cleaned
@ -1697,7 +1697,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
select { select {
// A new update has been sent by the channel router, we'll // A new update has been sent by the channel router, we'll
// marshall it into the form expected by the gRPC client, then // marshal it into the form expected by the gRPC client, then
// send it off. // send it off.
case topChange, ok := <-client.TopologyChanges: case topChange, ok := <-client.TopologyChanges:
// If the second value from the channel read is nil, // If the second value from the channel read is nil,