From d911107ec692fba8685232e3ba0d6b9cd23d0bf6 Mon Sep 17 00:00:00 2001 From: bryanvu Date: Mon, 30 Jan 2017 20:21:52 -0800 Subject: [PATCH] fundingmanager: Update tests for funding manager persistence This commit adds the FundingManagerPersistence test to ensure that the funding process completes as expected when nodes shutdown after the the funding transaction has been broadcast. Note that the final parts of several wallet tests have been removed, as functionality has been moved to the Funding Manager and should now be tested there. --- lnd_test.go | 150 +++++++++++++++++++++++++++++++++++++ lnwallet/interface_test.go | 80 +------------------- lnwallet/wallet.go | 4 + networktest.go | 53 +++++++++++++ 4 files changed, 211 insertions(+), 76 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index c1335c3a..f181b008 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -197,6 +197,45 @@ func closeChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarn return closingTxid } +// numChannelsPending sends an RPC request to a node to get a count of the +// node's channels that are currently in a pending state (with a broadcast, +// but not confirmed funding transaction). +func numChannelsPending(node *lightningNode, ctxt context.Context) (int, error) { + pendingChansRequest := &lnrpc.PendingChannelRequest{ + Status: lnrpc.ChannelStatus_OPENING, + } + resp, err := node.PendingChannels(ctxt, pendingChansRequest) + if err != nil { + return 0, err + } + return len(resp.PendingChannels), nil +} + +// assertNumChannelsPending asserts that a pair of nodes have the expected +// number of pending channels between them. +func assertNumChannelsPending(t *harnessTest, ctxt context.Context, + alice, bob *lightningNode, expected int) { + aliceNumChans, err := numChannelsPending(alice, ctxt) + if err != nil { + t.Fatalf("error fetching alice's node (%v) pending channels %v", + alice.nodeID, err) + } + bobNumChans, err := numChannelsPending(bob, ctxt) + if err != nil { + t.Fatalf("error fetching bob's node (%v) pending channels %v", + bob.nodeID, err) + } + if aliceNumChans != expected { + t.Fatalf("number of pending channels for alice incorrect. "+ + "expected %v, got %v", expected, aliceNumChans) + } + if bobNumChans != expected { + t.Fatalf("number of pending channels for bob incorrect. "+ + "expected %v, got %v", + expected, bobNumChans) + } +} + // testBasicChannelFunding performs a test exercising expected behavior from a // basic funding workflow. The test creates a new channel between Alice and // Bob, then immediately closes the channel after asserting some expected post @@ -246,6 +285,113 @@ func testBasicChannelFunding(net *networkHarness, t *harnessTest) { closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false) } +// testFundingPersistence is intended to ensure that the Funding Manager +// persists the state of new channels prior to broadcasting the channel's +// funding transaction. This ensures that the daemon maintains an up-to-date +// representation of channels if the system is restarted or disconnected. +// testFundingPersistence mirrors testBasicChannelFunding, but adds restarts +// and checks for the state of channels with unconfirmed funding transactions. +func testChannelFundingPersistence(net *networkHarness, t *harnessTest) { + timeout := time.Duration(time.Second * 25) + ctxb := context.Background() + ctxt, _ := context.WithTimeout(ctxb, timeout) + + chanAmt := btcutil.Amount(btcutil.SatoshiPerBitcoin / 2) + pushAmt := btcutil.Amount(0) + + // Create a new channel, then broadcast the funding transaction. + pendingUpdate, err := net.OpenPendingChannel(ctxt, net.Alice, net.Bob, + chanAmt, pushAmt, 1) + if err != nil { + t.Fatalf("unable to open channel: %v", err) + } + + // At this point, the channel's funding transaction will have + // been broadcast, but not confirmed. Alice and Bob's nodes + // should reflect this when queried via RPC. + assertNumChannelsPending(t, ctxt, net.Alice, net.Bob, 1) + + // Restart both nodes to test that the appropriate state has been + // persisted and that both nodes recover gracefully. + if err := net.RestartNode(net.Alice, nil); err != nil { + t.Fatalf("Node restart failed: %v", err) + } + if err := net.RestartNode(net.Bob, nil); err != nil { + t.Fatalf("Node restart failed: %v", err) + } + + fundingTxID, err := chainhash.NewHash(pendingUpdate.Txid) + if err != nil { + t.Fatalf("unable to convert funding txid into chainhash.Hash:"+ + " %v", err) + } + + // Mine a block, then wait for Alice's node to notify us that the + // channel has been opened. The funding transaction should be found + // within the newly mined block. + block := mineBlocks(t, net, 1)[0] + assertTxInBlock(t, block, fundingTxID) + + // Restart both nodes to test that the appropriate state has been + // persisted and that both nodes recover gracefully. + if err := net.RestartNode(net.Alice, nil); err != nil { + t.Fatalf("Node restart failed: %v", err) + } + if err := net.RestartNode(net.Bob, nil); err != nil { + t.Fatalf("Node restart failed: %v", err) + } + + // The following block ensures that after both nodes have restarted, + // they have reconnected before the execution of the next test. + peersTimeout := time.After(3 * time.Second) + checkPeersTick := time.NewTicker(100 * time.Millisecond) + defer checkPeersTick.Stop() +peersPoll: + for { + select { + case <-peersTimeout: + t.Fatalf("peers unable to reconnect after restart") + case <-checkPeersTick.C: + peers, err := net.Bob.ListPeers(ctxt, + &lnrpc.ListPeersRequest{}) + if err != nil { + t.Fatalf("ListPeers error: %v\n", err) + } + if len(peers.Peers) > 0 { + break peersPoll + } + } + } + + // At this point, the channel should be fully opened and there should + // be no pending channels remaining for either node. + assertNumChannelsPending(t, ctxt, net.Alice, net.Bob, 0) + + // The channel should be listed in the peer information returned by + // both peers. + outPoint := wire.OutPoint{ + Hash: *fundingTxID, + Index: pendingUpdate.OutputIndex, + } + + // Check both nodes to ensure that the channel is ready for operation. + if err := net.AssertChannelExists(ctxt, net.Alice, &outPoint); err != nil { + t.Fatalf("unable to assert channel existence: %v", err) + } + if err := net.AssertChannelExists(ctxt, net.Bob, &outPoint); err != nil { + t.Fatalf("unable to assert channel existence: %v", err) + } + + // Finally, immediately close the channel. This function will also + // block until the channel is closed and will additionally assert the + // relevant channel closing post conditions. + chanPoint := &lnrpc.ChannelPoint{ + FundingTxid: pendingUpdate.Txid, + OutputIndex: pendingUpdate.OutputIndex, + } + closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, true) +} + // testChannelBalance creates a new channel between Alice and Bob, then // checks channel balance to be equal amount specified while creation of channel. func testChannelBalance(net *networkHarness, t *harnessTest) { @@ -1725,6 +1871,10 @@ var testsCases = []*testCase{ name: "basic funding flow", test: testBasicChannelFunding, }, + { + name: "funding flow persistence", + test: testChannelFundingPersistence, + }, { name: "channel force closure", test: testChannelForceClosure, diff --git a/lnwallet/interface_test.go b/lnwallet/interface_test.go index 29c1b560..bb443736 100644 --- a/lnwallet/interface_test.go +++ b/lnwallet/interface_test.go @@ -8,7 +8,6 @@ import ( "net" "os" "path/filepath" - "strings" "testing" "time" @@ -470,13 +469,11 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness, wallet *lnwallet if err != nil { t.Fatalf("bob is unable to sign alice's commit tx: %v", err) } - if err := chanReservation.CompleteReservation(bobsSigs, commitSig); err != nil { + _, err = chanReservation.CompleteReservation(bobsSigs, commitSig) + if err != nil { t.Fatalf("unable to complete funding tx: %v", err) } - // At this point, the channel can be considered "open" when the funding - // txn hits a "comfortable" depth. - // The resulting active channel state should have been persisted to the DB. fundingSha := fundingTx.TxHash() channels, err := wallet.ChannelDB.FetchOpenChannels(bobNode.id) @@ -486,56 +483,6 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness, wallet *lnwallet if !bytes.Equal(channels[0].FundingOutpoint.Hash[:], fundingSha[:]) { t.Fatalf("channel state not properly saved") } - - // Assert that the channel opens after a single block. - lnChan := make(chan *lnwallet.LightningChannel, 1) - go func() { - openDetails, err := chanReservation.DispatchChan() - if err != nil { - t.Fatalf("unable to finalize reservation: %v", err) - } - - lnChan <- openDetails.Channel - }() - lnc := assertChannelOpen(t, miner, uint32(numReqConfs), lnChan) - - // Now that the channel is open, execute a cooperative closure of the - // now open channel. - aliceCloseSig, _, err := lnc.InitCooperativeClose() - if err != nil { - t.Fatalf("unable to init cooperative closure: %v", err) - } - aliceCloseSig = append(aliceCloseSig, byte(txscript.SigHashAll)) - - chanInfo := lnc.StateSnapshot() - - // Obtain bob's signature for the closure transaction. - witnessScript := lnc.FundingWitnessScript - fundingOut := lnc.ChannelPoint() - fundingTxIn := wire.NewTxIn(fundingOut, nil, nil) - bobCloseTx := lnwallet.CreateCooperativeCloseTx(fundingTxIn, - chanInfo.RemoteBalance, chanInfo.LocalBalance, - lnc.RemoteDeliveryScript, lnc.LocalDeliveryScript, - true) - bobSig, err := bobNode.signCommitTx(bobCloseTx, witnessScript, int64(lnc.Capacity)) - if err != nil { - t.Fatalf("unable to generate bob's signature for closing tx: %v", err) - } - - // Broadcast the transaction to the network. This transaction should - // be accepted, and found in the next mined block. - ourKey := chanReservation.OurContribution().MultiSigKey.SerializeCompressed() - theirKey := chanReservation.TheirContribution().MultiSigKey.SerializeCompressed() - witness := lnwallet.SpendMultiSig(witnessScript, ourKey, aliceCloseSig, - theirKey, bobSig) - bobCloseTx.TxIn[0].Witness = witness - if err := wallet.PublishTransaction(bobCloseTx); err != nil { - t.Fatalf("broadcast of close tx rejected: %v", err) - } - - // Now that the reservation has conclued, ensure that the wallet has - // cleaned up the state allocated to the reservation. - assertReservationDeleted(chanReservation, t) } func testFundingTransactionLockedOutputs(miner *rpctest.Harness, @@ -746,7 +693,7 @@ func testSingleFunderReservationWorkflowInitiator(miner *rpctest.Harness, if err != nil { t.Fatalf("bob is unable to sign alice's commit tx: %v", err) } - if err := chanReservation.CompleteReservation(nil, bobCommitSig); err != nil { + if _, err := chanReservation.CompleteReservation(nil, bobCommitSig); err != nil { t.Fatalf("unable to complete funding tx: %v", err) } @@ -774,17 +721,6 @@ func testSingleFunderReservationWorkflowInitiator(miner *rpctest.Harness, channeldb.SingleFunder, channels[0].ChanType) } - lnChan := make(chan *lnwallet.LightningChannel, 1) - go func() { - openDetails, err := chanReservation.DispatchChan() - if err != nil { - t.Fatalf("unable to open channel: %v", err) - } - - lnChan <- openDetails.Channel - }() - assertChannelOpen(t, miner, uint32(numReqConfs), lnChan) - assertReservationDeleted(chanReservation, t) } @@ -925,7 +861,7 @@ func testSingleFunderReservationWorkflowResponder(miner *rpctest.Harness, // With this stage complete, Alice can now complete the reservation. bobRevokeKey := bobContribution.RevocationKey - err = chanReservation.CompleteReservationSingle(bobRevokeKey, + _, err = chanReservation.CompleteReservationSingle(bobRevokeKey, fundingOutpoint, bobCommitSig, bobObsfucator) if err != nil { t.Fatalf("unable to complete reservation: %v", err) @@ -937,14 +873,6 @@ func testSingleFunderReservationWorkflowResponder(miner *rpctest.Harness, chanReservation.FundingOutpoint(), fundingOutpoint) } - // Some period of time later, Bob presents us with an SPV proof - // attesting to an open channel. At this point Alice recognizes the - // channel, saves the state to disk, and creates the channel itself. - _, err = chanReservation.FinalizeReservation() - if err != nil && !strings.Contains(err.Error(), "No information") { - t.Fatalf("unable to finalize reservation: %v", err) - } - // TODO(roasbeef): bob verify alice's sig assertReservationDeleted(chanReservation, t) } diff --git a/lnwallet/wallet.go b/lnwallet/wallet.go index 79de9aba..8f255bbd 100644 --- a/lnwallet/wallet.go +++ b/lnwallet/wallet.go @@ -1189,6 +1189,10 @@ func (l *LightningWallet) handleSingleFunderSigs(req *addSingleFunderSigsMsg) { req.completeChan <- pendingReservation.partialState req.err <- nil + + l.limboMtx.Lock() + delete(l.fundingLimbo, req.pendingFundingID) + l.limboMtx.Unlock() } // selectCoinsAndChange performs coin selection in order to obtain witness diff --git a/networktest.go b/networktest.go index d2ee8ad5..e19e84f3 100644 --- a/networktest.go +++ b/networktest.go @@ -696,6 +696,59 @@ func (n *networkHarness) OpenChannel(ctx context.Context, } } +// OpenPendingChannel attempts to open a channel between srcNode and destNode with the +// passed channel funding parameters. If the passed context has a timeout, then +// if the timeout is reached before the channel pending notification is +// received, an error is returned. +func (n *networkHarness) OpenPendingChannel(ctx context.Context, + srcNode, destNode *lightningNode, amt btcutil.Amount, + pushAmt btcutil.Amount, numConfs uint32) (*lnrpc.PendingUpdate, error) { + + openReq := &lnrpc.OpenChannelRequest{ + NodePubkey: destNode.PubKey[:], + LocalFundingAmount: int64(amt), + PushSat: int64(pushAmt), + NumConfs: numConfs, + } + + respStream, err := srcNode.OpenChannel(ctx, openReq) + if err != nil { + return nil, fmt.Errorf("unable to open channel between "+ + "alice and bob: %v", err) + } + + chanPending := make(chan *lnrpc.PendingUpdate) + errChan := make(chan error) + go func() { + // Consume the "channel pending" update. This waits until the node + // notifies us that the final message in the channel funding workflow + // has been sent to the remote node. + resp, err := respStream.Recv() + if err != nil { + errChan <- err + return + } + pendingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending) + if !ok { + errChan <- fmt.Errorf("expected channel pending update, "+ + "instead got %v", resp) + return + } + + chanPending <- pendingResp.ChanPending + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("timeout reached before chan pending " + + "update sent") + case err := <-errChan: + return nil, err + case pendingChan := <-chanPending: + return pendingChan, nil + } +} + // WaitForChannelOpen waits for a notification that a channel is open by // consuming a message from the past open channel stream. If the passed context // has a timeout, then if the timeout is reached before the channel has been