diff --git a/lnd_test.go b/lnd_test.go index 696b1b9f..f1783179 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -28,81 +28,87 @@ func assertTxInBlock(block *btcutil.Block, txid *wire.ShaHash, t *testing.T) { t.Fatalf("funding tx was not included in block") } -// getChannelHelpers returns a series of helper functions as closures which may -// be useful within tests to execute common activities such as synchronously -// waiting for channels to open/close. -func getChannelHelpers(ctxb context.Context, net *networkHarness, - t *testing.T) (func(*lightningNode, *lightningNode, btcutil.Amount) *lnrpc.ChannelPoint, - func(*lightningNode, *lnrpc.ChannelPoint)) { +// openChannelAndAssert attempts to open a channel with the specified +// parameters extended from Alice to Bob. Additionally, two items are asserted +// after the channel is considered open: the funding transactino should be +// found within a block, and that Alice can report the status of the new +// channel. +func openChannelAndAssert(t *testing.T, net *networkHarness, ctx context.Context, + alice, bob *lightningNode, amount btcutil.Amount) *lnrpc.ChannelPoint { - openChannel := func(alice *lightningNode, bob *lightningNode, amount btcutil.Amount) *lnrpc.ChannelPoint { - chanOpenUpdate, err := net.OpenChannel(ctxb, alice, bob, amount, 1) - if err != nil { - t.Fatalf("unable to open channel: %v", err) - } - - // Mine a block, then wait for Alice's node to notify us that the - // channel has been opened. The funding transaction should be found - // within the newly mined block. - blockHash, err := net.Miner.Node.Generate(1) - if err != nil { - t.Fatalf("unable to generate block: %v", err) - } - block, err := net.Miner.Node.GetBlock(blockHash[0]) - if err != nil { - t.Fatalf("unable to get block: %v", err) - } - fundingChanPoint, err := net.WaitForChannelOpen(chanOpenUpdate) - if err != nil { - t.Fatalf("error while waiting for channel open: %v", err) - } - fundingTxID, err := wire.NewShaHash(fundingChanPoint.FundingTxid) - if err != nil { - t.Fatalf("unable to create sha hash: %v", err) - } - assertTxInBlock(block, fundingTxID, t) - - // The channel should be listed in the peer information returned by - // both peers. - chanPoint := wire.OutPoint{ - Hash: *fundingTxID, - Index: fundingChanPoint.OutputIndex, - } - err = net.AssertChannelExists(ctxb, alice, &chanPoint) - if err != nil { - t.Fatalf("unable to assert channel existence: %v", err) - } - - return fundingChanPoint + chanOpenUpdate, err := net.OpenChannel(ctx, alice, bob, amount, 1) + if err != nil { + t.Fatalf("unable to open channel: %v", err) } - closeChannel := func(node *lightningNode, fundingChanPoint *lnrpc.ChannelPoint) { - closeUpdates, err := net.CloseChannel(ctxb, node, fundingChanPoint, false) - if err != nil { - t.Fatalf("unable to close channel: %v", err) - } - - // Finally, generate a single block, wait for the final close status - // update, then ensure that the closing transaction was included in the - // block. - blockHash, err := net.Miner.Node.Generate(1) - if err != nil { - t.Fatalf("unable to generate block: %v", err) - } - block, err := net.Miner.Node.GetBlock(blockHash[0]) - if err != nil { - t.Fatalf("unable to get block: %v", err) - } - - closingTxid, err := net.WaitForChannelClose(closeUpdates) - if err != nil { - t.Fatalf("error while waiting for channel close: %v", err) - } - assertTxInBlock(block, closingTxid, t) + // Mine a block, then wait for Alice's node to notify us that the + // channel has been opened. The funding transaction should be found + // within the newly mined block. + blockHash, err := net.Miner.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate block: %v", err) + } + block, err := net.Miner.Node.GetBlock(blockHash[0]) + if err != nil { + t.Fatalf("unable to get block: %v", err) + } + fundingChanPoint, err := net.WaitForChannelOpen(ctx, chanOpenUpdate) + if err != nil { + t.Fatalf("error while waiting for channel open: %v", err) + } + fundingTxID, err := wire.NewShaHash(fundingChanPoint.FundingTxid) + if err != nil { + t.Fatalf("unable to create sha hash: %v", err) + } + assertTxInBlock(block, fundingTxID, t) + // The channel should be listed in the peer information returned by + // both peers. + chanPoint := wire.OutPoint{ + Hash: *fundingTxID, + Index: fundingChanPoint.OutputIndex, + } + err = net.AssertChannelExists(ctx, alice, &chanPoint) + if err != nil { + t.Fatalf("unable to assert channel existence: %v", err) } - return openChannel, closeChannel + return fundingChanPoint +} + +// closeChannelAndAssert attemps to close a channel identified by the passed +// channel point owned by the passed lighting node. A fully blocking channel +// closure is attempted, therefore the passed context should be a child derived +// via timeout from a base parent. Additionally, once the channel has been +// detected as closed, an assertion checks that the transaction is found within +// a block. +func closeChannelAndAssert(t *testing.T, net *networkHarness, + ctx context.Context, node *lightningNode, + fundingChanPoint *lnrpc.ChannelPoint) { + + closeUpdates, err := net.CloseChannel(ctx, node, fundingChanPoint, false) + if err != nil { + t.Fatalf("unable to close channel: %v", err) + } + + // Finally, generate a single block, wait for the final close status + // update, then ensure that the closing transaction was included in the + // block. + blockHash, err := net.Miner.Node.Generate(1) + if err != nil { + t.Fatalf("unable to generate block: %v", err) + } + block, err := net.Miner.Node.GetBlock(blockHash[0]) + if err != nil { + t.Fatalf("unable to get block: %v", err) + } + + closingTxid, err := net.WaitForChannelClose(ctx, closeUpdates) + if err != nil { + t.Fatalf("error while waiting for channel close: %v", err) + } + + assertTxInBlock(block, closingTxid, t) } // testBasicChannelFunding performs a test exercising expected behavior from a @@ -111,8 +117,8 @@ func getChannelHelpers(ctxb context.Context, net *networkHarness, // conditions. Finally, the chain itself is checked to ensure the closing // transaction was mined. func testBasicChannelFunding(net *networkHarness, t *testing.T) { + timeout := time.Duration(time.Second * 5) ctxb := context.Background() - openChannel, closeChannel := getChannelHelpers(ctxb, net, t) chanAmt := btcutil.Amount(btcutil.SatoshiPerBitcoin / 2) @@ -121,19 +127,21 @@ func testBasicChannelFunding(net *networkHarness, t *testing.T) { // open or an error occurs in the funding process. A series of // assertions will be executed to ensure the funding process completed // successfully. - chanPoint := openChannel(net.Alice, net.Bob, chanAmt) + ctxt, _ := context.WithTimeout(ctxb, timeout) + chanPoint := openChannelAndAssert(t, net, ctxt, net.Alice, net.Bob, chanAmt) // Finally, immediately close the channel. This function will also // block until the channel is closed and will additionally assert the // relevant channel closing post conditions. - closeChannel(net.Alice, chanPoint) + ctxt, _ = context.WithTimeout(ctxb, timeout) + closeChannelAndAssert(t, net, ctxt, net.Alice, chanPoint) } // testChannelBalance creates a new channel between Alice and Bob, then // checks channel balance to be equal amount specified while creation of channel. func testChannelBalance(net *networkHarness, t *testing.T) { + timeout := time.Duration(time.Second * 5) ctxb := context.Background() - openChannel, closeChannel := getChannelHelpers(ctxb, net, t) // Creates a helper closure to be used below which asserts the proper // response to a channel balance RPC. @@ -152,7 +160,8 @@ func testChannelBalance(net *networkHarness, t *testing.T) { // Open a channel with 0.5 BTC between Alice and Bob, ensuring the // channel has been opened properly. amount := btcutil.Amount(btcutil.SatoshiPerBitcoin / 2) - chanPoint := openChannel(net.Alice, net.Bob, amount) + ctxt, _ := context.WithTimeout(ctxb, timeout) + chanPoint := openChannelAndAssert(t, net, ctxt, net.Alice, net.Bob, amount) // As this is a single funder channel, Alice's balance should be // exactly 0.5 BTC since now state transitions have taken place yet. @@ -171,7 +180,8 @@ func testChannelBalance(net *networkHarness, t *testing.T) { // Finally close the channel between Alice and Bob, asserting that the // channel has been properly closed on-chain. - closeChannel(net.Alice, chanPoint) + ctxt, _ = context.WithTimeout(ctxb, timeout) + closeChannelAndAssert(t, net, ctxt, net.Alice, chanPoint) } // testChannelForceClosure performs a test to exercise the behavior of "force" @@ -184,6 +194,7 @@ func testChannelBalance(net *networkHarness, t *testing.T) { // // TODO(roabeef): also add an unsettled HTLC before force closing. func testChannelForceClosure(net *networkHarness, t *testing.T) { + timeout := time.Duration(time.Second * 5) ctxb := context.Background() // First establish a channel ween with a capacity of 100k satoshis @@ -198,7 +209,8 @@ func testChannelForceClosure(net *networkHarness, t *testing.T) { if _, err := net.Miner.Node.Generate(numFundingConfs); err != nil { t.Fatalf("unable to mine block: %v", err) } - chanPoint, err := net.WaitForChannelOpen(chanOpenUpdate) + ctxt, _ := context.WithTimeout(ctxb, timeout) + chanPoint, err := net.WaitForChannelOpen(ctxt, chanOpenUpdate) if err != nil { t.Fatalf("error while waiting for channel to open: %v", err) } @@ -217,7 +229,8 @@ func testChannelForceClosure(net *networkHarness, t *testing.T) { if _, err := net.Miner.Node.Generate(1); err != nil { t.Fatalf("unable to generate block: %v", err) } - closingTxID, err := net.WaitForChannelClose(closeUpdate) + ctxt, _ = context.WithTimeout(ctxb, timeout) + closingTxID, err := net.WaitForChannelClose(ctxt, closeUpdate) if err != nil { t.Fatalf("error while waiting for channel close: %v", err) } diff --git a/networktest.go b/networktest.go index 9ff14d48..acf60e66 100644 --- a/networktest.go +++ b/networktest.go @@ -274,7 +274,6 @@ func newNetworkHarness() (*networkHarness, error) { }, nil } - // InitializeSeedNodes initialized alice and bob nodes given an already // running instance of btcd's rpctest harness and extra command line flags, // which should be formatted properly - "--arg=value". @@ -421,6 +420,8 @@ out: bobResp.Balance == expectedBalance { break out } + case <-time.After(time.Second * 30): + return fmt.Errorf("balances not synced after deadline") } } @@ -502,17 +503,26 @@ func (n *networkHarness) OnTxAccepted(hash *wire.ShaHash, amt btcutil.Amount) { }() } -// WaitForTxBroadcast blocks until the target txid is seen on the network. -func (n *networkHarness) WaitForTxBroadcast(txid wire.ShaHash) { +// WaitForTxBroadcast blocks until the target txid is seen on the network. If +// the transaction isn't seen within the network before the passed timeout, +// then an error is returend. +func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid wire.ShaHash) error { eventChan := make(chan struct{}) n.watchRequests <- &watchRequest{txid, eventChan} - <-eventChan + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("tx not seen before context timeout") + } } // OpenChannel attemps to open a channel between srcNode and destNode with the -// passed channel funding parameters. +// passed channel funding parameters. If the passed context has a timeout, then +// if the timeout is reeached before the channel pending notification is +// received, an error is returned. func (n *networkHarness) OpenChannel(ctx context.Context, srcNode, destNode *lightningNode, amt btcutil.Amount, numConfs uint32) (lnrpc.Lightning_OpenChannelClient, error) { @@ -529,38 +539,73 @@ func (n *networkHarness) OpenChannel(ctx context.Context, "alice and bob: %v", err) } - // Consume the "channel pending" update. This waits until the node - // notifies us that the final message in the channel funding workflow - // has been sent to the remote node. - resp, err := respStream.Recv() - if err != nil { - return nil, fmt.Errorf("unable to read rpc resp: %v", err) - } - if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok { - return nil, fmt.Errorf("expected channel pending update, "+ - "instead got %v", resp) - } + chanOpen := make(chan struct{}) + errChan := make(chan error) + go func() { + // Consume the "channel pending" update. This waits until the node + // notifies us that the final message in the channel funding workflow + // has been sent to the remote node. + resp, err := respStream.Recv() + if err != nil { + errChan <- err + } + if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok { + errChan <- fmt.Errorf("expected channel pending update, "+ + "instead got %v", resp) + } - return respStream, nil + close(chanOpen) + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached before chan pending " + + "update sent") + case err := <-errChan: + return nil, err + case <-chanOpen: + return respStream, nil + } } // WaitForChannelOpen waits for a notification that a channel is open by -// consuming a message from the past open channel stream. -func (n *networkHarness) WaitForChannelOpen(openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) { - resp, err := openChanStream.Recv() - if err != nil { - return nil, fmt.Errorf("unable to read rpc resp: %v", err) - } - fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) - if !ok { - return nil, fmt.Errorf("expected channel open update, instead got %v", resp) - } +// consuming a message from the past open channel stream. If the passed context +// has a timeout, then if the timeout is reached before the channel has been +// opened, then an error is returned. +func (n *networkHarness) WaitForChannelOpen(ctx context.Context, + openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) { - return fundingResp.ChanOpen.ChannelPoint, nil + errChan := make(chan error) + respChan := make(chan *lnrpc.ChannelPoint) + go func() { + resp, err := openChanStream.Recv() + if err != nil { + errChan <- fmt.Errorf("unable to read rpc resp: %v", err) + } + fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) + if !ok { + errChan <- fmt.Errorf("expected channel open update, "+ + "instead got %v", resp) + } + + respChan <- fundingResp.ChanOpen.ChannelPoint + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached while waiting for " + + "channel open") + case err := <-errChan: + return nil, err + case chanPoint := <-respChan: + return chanPoint, nil + } } // CloseChannel close channel attempts to close the channel indicated by the -// passed channel point, initiated by the passed lnNode. +// passed channel point, initiated by the passed lnNode. If the passed context +// has a timeout, then if the timeout is reached before the channel close is +// pending, then an error is returned. func (n *networkHarness) CloseChannel(ctx context.Context, lnNode *lightningNode, cp *lnrpc.ChannelPoint, force bool) (lnrpc.Lightning_CloseChannelClient, error) { @@ -574,41 +619,86 @@ func (n *networkHarness) CloseChannel(ctx context.Context, return nil, fmt.Errorf("unable to close channel: %v", err) } - // Consume the "channel close" update in order to wait for the closing - // transaction to be broadcast, then wait for the closing tx to be seen - // within the network. - closeResp, err := closeRespStream.Recv() - if err != nil { - return nil, fmt.Errorf("unable to read rpc resp: %v", err) - } - pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending) - if !ok { - return nil, fmt.Errorf("expected close pending update, got %v", pendingClose) - } - closeTxid, err := wire.NewShaHash(pendingClose.ClosePending.Txid) - if err != nil { - return nil, err - } - n.WaitForTxBroadcast(*closeTxid) + errChan := make(chan error) + fin := make(chan struct{}) + go func() { + // Consume the "channel close" update in order to wait for the closing + // transaction to be broadcast, then wait for the closing tx to be seen + // within the network. + closeResp, err := closeRespStream.Recv() + if err != nil { + errChan <- err + return + } + pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending) + if !ok { + errChan <- fmt.Errorf("expected channel close update, "+ + "instead got %v", pendingClose) + return + } - return closeRespStream, nil + closeTxid, err := wire.NewShaHash(pendingClose.ClosePending.Txid) + if err != nil { + errChan <- err + return + } + if err := n.WaitForTxBroadcast(ctx, *closeTxid); err != nil { + errChan <- err + return + } + + close(fin) + }() + + // Wait until either the deadline for the context expires, an error + // occurs, or the channel close update is received. + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached before channel close " + + "initiated") + case err := <-errChan: + return nil, err + case <-fin: + return closeRespStream, nil + } } // WaitForChannelClose waits for a notification from the passed channel close -// stream that the node has deemed the channel has been fully closed. -func (n *networkHarness) WaitForChannelClose(closeChanStream lnrpc.Lightning_CloseChannelClient) (*wire.ShaHash, error) { - // TODO(roasbeef): use passed ctx to set a deadline on amount of time to - // wait. - closeResp, err := closeChanStream.Recv() - if err != nil { - return nil, fmt.Errorf("unable to read rpc resp: %v", err) - } - closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose) - if !ok { - return nil, fmt.Errorf("expected channel open update, instead got %v", closeFin) - } +// stream that the node has deemed the channel has been fully closed. If the +// passed context has a timeout, then if the timeout is reached before the +// notification is received then an error is returned. +func (n *networkHarness) WaitForChannelClose(ctx context.Context, + closeChanStream lnrpc.Lightning_CloseChannelClient) (*wire.ShaHash, error) { - return wire.NewShaHash(closeFin.ChanClose.ClosingTxid) + errChan := make(chan error) + updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose) + go func() { + closeResp, err := closeChanStream.Recv() + if err != nil { + errChan <- err + return + } + + closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose) + if !ok { + errChan <- fmt.Errorf("expected channel close update, "+ + "instead got %v", closeFin) + return + } + + updateChan <- closeFin + }() + + // Wait until either the deadline for the context expires, an error + // occurs, or the channel close update is received. + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached before update sent") + case err := <-errChan: + return nil, err + case update := <-updateChan: + return wire.NewShaHash(update.ChanClose.ClosingTxid) + } } // AssertChannelExists asserts that an active channel identified by