fundingmanager: Update tests for funding manager persistence

This commit adds the FundingManagerPersistence test to ensure that the
funding process completes as expected when nodes shutdown after the the
funding transaction has been broadcast. Note that the final parts of
several wallet tests have been removed, as functionality has been moved
to the Funding Manager and should now be tested there.
This commit is contained in:
bryanvu 2017-01-30 20:21:52 -08:00 committed by Olaoluwa Osuntokun
parent eb490b8833
commit d911107ec6
4 changed files with 211 additions and 76 deletions

@ -197,6 +197,45 @@ func closeChannelAndAssert(ctx context.Context, t *harnessTest, net *networkHarn
return closingTxid
}
// numChannelsPending sends an RPC request to a node to get a count of the
// node's channels that are currently in a pending state (with a broadcast,
// but not confirmed funding transaction).
func numChannelsPending(node *lightningNode, ctxt context.Context) (int, error) {
pendingChansRequest := &lnrpc.PendingChannelRequest{
Status: lnrpc.ChannelStatus_OPENING,
}
resp, err := node.PendingChannels(ctxt, pendingChansRequest)
if err != nil {
return 0, err
}
return len(resp.PendingChannels), nil
}
// assertNumChannelsPending asserts that a pair of nodes have the expected
// number of pending channels between them.
func assertNumChannelsPending(t *harnessTest, ctxt context.Context,
alice, bob *lightningNode, expected int) {
aliceNumChans, err := numChannelsPending(alice, ctxt)
if err != nil {
t.Fatalf("error fetching alice's node (%v) pending channels %v",
alice.nodeID, err)
}
bobNumChans, err := numChannelsPending(bob, ctxt)
if err != nil {
t.Fatalf("error fetching bob's node (%v) pending channels %v",
bob.nodeID, err)
}
if aliceNumChans != expected {
t.Fatalf("number of pending channels for alice incorrect. "+
"expected %v, got %v", expected, aliceNumChans)
}
if bobNumChans != expected {
t.Fatalf("number of pending channels for bob incorrect. "+
"expected %v, got %v",
expected, bobNumChans)
}
}
// testBasicChannelFunding performs a test exercising expected behavior from a
// basic funding workflow. The test creates a new channel between Alice and
// Bob, then immediately closes the channel after asserting some expected post
@ -246,6 +285,113 @@ func testBasicChannelFunding(net *networkHarness, t *harnessTest) {
closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, false)
}
// testFundingPersistence is intended to ensure that the Funding Manager
// persists the state of new channels prior to broadcasting the channel's
// funding transaction. This ensures that the daemon maintains an up-to-date
// representation of channels if the system is restarted or disconnected.
// testFundingPersistence mirrors testBasicChannelFunding, but adds restarts
// and checks for the state of channels with unconfirmed funding transactions.
func testChannelFundingPersistence(net *networkHarness, t *harnessTest) {
timeout := time.Duration(time.Second * 25)
ctxb := context.Background()
ctxt, _ := context.WithTimeout(ctxb, timeout)
chanAmt := btcutil.Amount(btcutil.SatoshiPerBitcoin / 2)
pushAmt := btcutil.Amount(0)
// Create a new channel, then broadcast the funding transaction.
pendingUpdate, err := net.OpenPendingChannel(ctxt, net.Alice, net.Bob,
chanAmt, pushAmt, 1)
if err != nil {
t.Fatalf("unable to open channel: %v", err)
}
// At this point, the channel's funding transaction will have
// been broadcast, but not confirmed. Alice and Bob's nodes
// should reflect this when queried via RPC.
assertNumChannelsPending(t, ctxt, net.Alice, net.Bob, 1)
// Restart both nodes to test that the appropriate state has been
// persisted and that both nodes recover gracefully.
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
if err := net.RestartNode(net.Bob, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
fundingTxID, err := chainhash.NewHash(pendingUpdate.Txid)
if err != nil {
t.Fatalf("unable to convert funding txid into chainhash.Hash:"+
" %v", err)
}
// Mine a block, then wait for Alice's node to notify us that the
// channel has been opened. The funding transaction should be found
// within the newly mined block.
block := mineBlocks(t, net, 1)[0]
assertTxInBlock(t, block, fundingTxID)
// Restart both nodes to test that the appropriate state has been
// persisted and that both nodes recover gracefully.
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
if err := net.RestartNode(net.Bob, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
// The following block ensures that after both nodes have restarted,
// they have reconnected before the execution of the next test.
peersTimeout := time.After(3 * time.Second)
checkPeersTick := time.NewTicker(100 * time.Millisecond)
defer checkPeersTick.Stop()
peersPoll:
for {
select {
case <-peersTimeout:
t.Fatalf("peers unable to reconnect after restart")
case <-checkPeersTick.C:
peers, err := net.Bob.ListPeers(ctxt,
&lnrpc.ListPeersRequest{})
if err != nil {
t.Fatalf("ListPeers error: %v\n", err)
}
if len(peers.Peers) > 0 {
break peersPoll
}
}
}
// At this point, the channel should be fully opened and there should
// be no pending channels remaining for either node.
assertNumChannelsPending(t, ctxt, net.Alice, net.Bob, 0)
// The channel should be listed in the peer information returned by
// both peers.
outPoint := wire.OutPoint{
Hash: *fundingTxID,
Index: pendingUpdate.OutputIndex,
}
// Check both nodes to ensure that the channel is ready for operation.
if err := net.AssertChannelExists(ctxt, net.Alice, &outPoint); err != nil {
t.Fatalf("unable to assert channel existence: %v", err)
}
if err := net.AssertChannelExists(ctxt, net.Bob, &outPoint); err != nil {
t.Fatalf("unable to assert channel existence: %v", err)
}
// Finally, immediately close the channel. This function will also
// block until the channel is closed and will additionally assert the
// relevant channel closing post conditions.
chanPoint := &lnrpc.ChannelPoint{
FundingTxid: pendingUpdate.Txid,
OutputIndex: pendingUpdate.OutputIndex,
}
closeChannelAndAssert(ctxt, t, net, net.Alice, chanPoint, true)
}
// testChannelBalance creates a new channel between Alice and Bob, then
// checks channel balance to be equal amount specified while creation of channel.
func testChannelBalance(net *networkHarness, t *harnessTest) {
@ -1725,6 +1871,10 @@ var testsCases = []*testCase{
name: "basic funding flow",
test: testBasicChannelFunding,
},
{
name: "funding flow persistence",
test: testChannelFundingPersistence,
},
{
name: "channel force closure",
test: testChannelForceClosure,

@ -8,7 +8,6 @@ import (
"net"
"os"
"path/filepath"
"strings"
"testing"
"time"
@ -470,13 +469,11 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness, wallet *lnwallet
if err != nil {
t.Fatalf("bob is unable to sign alice's commit tx: %v", err)
}
if err := chanReservation.CompleteReservation(bobsSigs, commitSig); err != nil {
_, err = chanReservation.CompleteReservation(bobsSigs, commitSig)
if err != nil {
t.Fatalf("unable to complete funding tx: %v", err)
}
// At this point, the channel can be considered "open" when the funding
// txn hits a "comfortable" depth.
// The resulting active channel state should have been persisted to the DB.
fundingSha := fundingTx.TxHash()
channels, err := wallet.ChannelDB.FetchOpenChannels(bobNode.id)
@ -486,56 +483,6 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness, wallet *lnwallet
if !bytes.Equal(channels[0].FundingOutpoint.Hash[:], fundingSha[:]) {
t.Fatalf("channel state not properly saved")
}
// Assert that the channel opens after a single block.
lnChan := make(chan *lnwallet.LightningChannel, 1)
go func() {
openDetails, err := chanReservation.DispatchChan()
if err != nil {
t.Fatalf("unable to finalize reservation: %v", err)
}
lnChan <- openDetails.Channel
}()
lnc := assertChannelOpen(t, miner, uint32(numReqConfs), lnChan)
// Now that the channel is open, execute a cooperative closure of the
// now open channel.
aliceCloseSig, _, err := lnc.InitCooperativeClose()
if err != nil {
t.Fatalf("unable to init cooperative closure: %v", err)
}
aliceCloseSig = append(aliceCloseSig, byte(txscript.SigHashAll))
chanInfo := lnc.StateSnapshot()
// Obtain bob's signature for the closure transaction.
witnessScript := lnc.FundingWitnessScript
fundingOut := lnc.ChannelPoint()
fundingTxIn := wire.NewTxIn(fundingOut, nil, nil)
bobCloseTx := lnwallet.CreateCooperativeCloseTx(fundingTxIn,
chanInfo.RemoteBalance, chanInfo.LocalBalance,
lnc.RemoteDeliveryScript, lnc.LocalDeliveryScript,
true)
bobSig, err := bobNode.signCommitTx(bobCloseTx, witnessScript, int64(lnc.Capacity))
if err != nil {
t.Fatalf("unable to generate bob's signature for closing tx: %v", err)
}
// Broadcast the transaction to the network. This transaction should
// be accepted, and found in the next mined block.
ourKey := chanReservation.OurContribution().MultiSigKey.SerializeCompressed()
theirKey := chanReservation.TheirContribution().MultiSigKey.SerializeCompressed()
witness := lnwallet.SpendMultiSig(witnessScript, ourKey, aliceCloseSig,
theirKey, bobSig)
bobCloseTx.TxIn[0].Witness = witness
if err := wallet.PublishTransaction(bobCloseTx); err != nil {
t.Fatalf("broadcast of close tx rejected: %v", err)
}
// Now that the reservation has conclued, ensure that the wallet has
// cleaned up the state allocated to the reservation.
assertReservationDeleted(chanReservation, t)
}
func testFundingTransactionLockedOutputs(miner *rpctest.Harness,
@ -746,7 +693,7 @@ func testSingleFunderReservationWorkflowInitiator(miner *rpctest.Harness,
if err != nil {
t.Fatalf("bob is unable to sign alice's commit tx: %v", err)
}
if err := chanReservation.CompleteReservation(nil, bobCommitSig); err != nil {
if _, err := chanReservation.CompleteReservation(nil, bobCommitSig); err != nil {
t.Fatalf("unable to complete funding tx: %v", err)
}
@ -774,17 +721,6 @@ func testSingleFunderReservationWorkflowInitiator(miner *rpctest.Harness,
channeldb.SingleFunder, channels[0].ChanType)
}
lnChan := make(chan *lnwallet.LightningChannel, 1)
go func() {
openDetails, err := chanReservation.DispatchChan()
if err != nil {
t.Fatalf("unable to open channel: %v", err)
}
lnChan <- openDetails.Channel
}()
assertChannelOpen(t, miner, uint32(numReqConfs), lnChan)
assertReservationDeleted(chanReservation, t)
}
@ -925,7 +861,7 @@ func testSingleFunderReservationWorkflowResponder(miner *rpctest.Harness,
// With this stage complete, Alice can now complete the reservation.
bobRevokeKey := bobContribution.RevocationKey
err = chanReservation.CompleteReservationSingle(bobRevokeKey,
_, err = chanReservation.CompleteReservationSingle(bobRevokeKey,
fundingOutpoint, bobCommitSig, bobObsfucator)
if err != nil {
t.Fatalf("unable to complete reservation: %v", err)
@ -937,14 +873,6 @@ func testSingleFunderReservationWorkflowResponder(miner *rpctest.Harness,
chanReservation.FundingOutpoint(), fundingOutpoint)
}
// Some period of time later, Bob presents us with an SPV proof
// attesting to an open channel. At this point Alice recognizes the
// channel, saves the state to disk, and creates the channel itself.
_, err = chanReservation.FinalizeReservation()
if err != nil && !strings.Contains(err.Error(), "No information") {
t.Fatalf("unable to finalize reservation: %v", err)
}
// TODO(roasbeef): bob verify alice's sig
assertReservationDeleted(chanReservation, t)
}

@ -1189,6 +1189,10 @@ func (l *LightningWallet) handleSingleFunderSigs(req *addSingleFunderSigsMsg) {
req.completeChan <- pendingReservation.partialState
req.err <- nil
l.limboMtx.Lock()
delete(l.fundingLimbo, req.pendingFundingID)
l.limboMtx.Unlock()
}
// selectCoinsAndChange performs coin selection in order to obtain witness

@ -696,6 +696,59 @@ func (n *networkHarness) OpenChannel(ctx context.Context,
}
}
// OpenPendingChannel attempts to open a channel between srcNode and destNode with the
// passed channel funding parameters. If the passed context has a timeout, then
// if the timeout is reached before the channel pending notification is
// received, an error is returned.
func (n *networkHarness) OpenPendingChannel(ctx context.Context,
srcNode, destNode *lightningNode, amt btcutil.Amount,
pushAmt btcutil.Amount, numConfs uint32) (*lnrpc.PendingUpdate, error) {
openReq := &lnrpc.OpenChannelRequest{
NodePubkey: destNode.PubKey[:],
LocalFundingAmount: int64(amt),
PushSat: int64(pushAmt),
NumConfs: numConfs,
}
respStream, err := srcNode.OpenChannel(ctx, openReq)
if err != nil {
return nil, fmt.Errorf("unable to open channel between "+
"alice and bob: %v", err)
}
chanPending := make(chan *lnrpc.PendingUpdate)
errChan := make(chan error)
go func() {
// Consume the "channel pending" update. This waits until the node
// notifies us that the final message in the channel funding workflow
// has been sent to the remote node.
resp, err := respStream.Recv()
if err != nil {
errChan <- err
return
}
pendingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending)
if !ok {
errChan <- fmt.Errorf("expected channel pending update, "+
"instead got %v", resp)
return
}
chanPending <- pendingResp.ChanPending
}()
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout reached before chan pending " +
"update sent")
case err := <-errChan:
return nil, err
case pendingChan := <-chanPending:
return pendingChan, nil
}
}
// WaitForChannelOpen waits for a notification that a channel is open by
// consuming a message from the past open channel stream. If the passed context
// has a timeout, then if the timeout is reached before the channel has been