From 7196c4bb1c8327f9f257af764aa4991193d039f8 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Sat, 15 Oct 2016 16:24:56 +0300 Subject: [PATCH] fundingmanager: add max pending channel check --- fundingmanager.go | 73 ++++++++++++++++++++ lnd_test.go | 145 +++++++++++++++++++++++++++++++++++----- lnwire/error_generic.go | 6 ++ peer.go | 10 +++ rpcserver.go | 4 ++ 5 files changed, 220 insertions(+), 18 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index c8cfbe08..6251cfde 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -13,8 +13,10 @@ import ( "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" + "fmt" "github.com/BitfuryLightning/tools/rt" "github.com/BitfuryLightning/tools/rt/graph" + "google.golang.org/grpc" ) const ( @@ -88,6 +90,14 @@ type fundingOpenMsg struct { peer *peer } +// fundingErrorMsg couples an lnwire.ErrorGeneric message +// with the peer who sent the message. This allows the funding +// manager properly process the error. +type fundingErrorMsg struct { + err *lnwire.ErrorGeneric + peer *peer +} + // pendingChannels is a map instantiated per-peer which tracks all active // pending single funded channels indexed by their pending channel identifier. type pendingChannels map[uint64]*reservationWithCtx @@ -232,6 +242,8 @@ out: f.handleFundingSignComplete(fmsg) case *fundingOpenMsg: f.handleFundingOpen(fmsg) + case *fundingErrorMsg: + f.handleErrorGenericMsg(fmsg) } case req := <-f.fundingRequests: f.handleInitFundingMsg(req) @@ -297,6 +309,23 @@ func (f *fundingManager) processFundingRequest(msg *lnwire.SingleFundingRequest, // TODO(roasbeef): add error chan to all, let channelManager handle // error+propagate func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { + + // Check number of pending channels to be smaller than maximum allowed + // number and send ErrorGeneric to remote peer if condition is violated. + if len(f.activeReservations[fmsg.peer.id]) >= cfg.MaxPendingChannels { + errMsg := &lnwire.ErrorGeneric{ + ChannelPoint: &wire.OutPoint{ + Hash: wire.ShaHash{}, + Index: 0, + }, + Problem: "Number of pending channels exceed maximum", + ErrorID: lnwire.ErrorMaxPendingChannels, + PendingChannelID: fmsg.msg.ChannelID, + } + fmsg.peer.queueMsg(errMsg, nil) + return + } + msg := fmsg.msg amt := msg.FundingAmount delay := msg.CsvDelay @@ -684,6 +713,7 @@ func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq) // wallet, then sends a funding request to the remote peer kicking off the // funding workflow. func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { + nodeID := msg.peer.lightningID localAmt := msg.localFundingAmt @@ -755,3 +785,46 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { ) msg.peer.queueMsg(fundingReq, nil) } + +// processErrorGeneric sends a message to the fundingManager allowing it +// to process the occurred generic error. +func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric, + peer *peer) { + f.fundingMsgs <- &fundingErrorMsg{err, peer} +} + +// handleErrorGenericMsg process the error which was received from remote peer, +// depends on the type of error we should do different clean up steps and +// inform user about it. +func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) { + if fmsg.err.ErrorID == lnwire.ErrorMaxPendingChannels { + peerID := fmsg.peer.id + chanID := fmsg.err.PendingChannelID + + f.resMtx.RLock() + resCtx, ok := f.activeReservations[peerID][chanID] + f.resMtx.RUnlock() + + if !ok { + resCtx.err <- fmt.Errorf("ErrorGeneric error " + + "was returned from remote peer for channel "+ + "(id: %v), but it can't be found and thereby "+ + "can't be canceled.", chanID) + } + + if err := resCtx.reservation.Cancel(); err != nil { + resCtx.err <- fmt.Errorf("Remote peer responded "+ + "with: Number of pending channels exceed "+ + "maximum, but we can't cancel the reservation "+ + "- %v", err) + } else { + resCtx.err <- grpc.Errorf(OpenChannelFundingError, + "Remote peer responded with: Number of "+ + "pending channels exceed maximum") + } + + f.resMtx.Lock() + delete(f.activeReservations[peerID], chanID) + f.resMtx.Unlock() + } +} diff --git a/lnd_test.go b/lnd_test.go index 43c0a4c9..5f72d1f6 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -13,6 +13,7 @@ import ( "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcrpcclient" "github.com/roasbeef/btcutil" + "google.golang.org/grpc" "testing" ) @@ -86,6 +87,29 @@ func assertTxInBlock(ct *CT, block *btcutil.Block, txid *wire.ShaHash) { ct.Errorf("funding tx was not included in block") } +// mineBlocks mine 'num' of blocks and check that blocks are present in +// node blockchain. +func mineBlocks(ct *CT, net *networkHarness, num uint32) []*btcutil.Block { + blocks := make([]*btcutil.Block, num) + + blockHashes, err := net.Miner.Node.Generate(num) + if err != nil { + ct.Errorf("unable to generate blocks: %v", err) + } + + for i, blockHash := range blockHashes { + block, err := net.Miner.Node.GetBlock(blockHash) + if err != nil { + ct.Errorf("unable to get block: %v", err) + } + + blocks[i] = block + } + + return blocks +} + + // openChannelAndAssert attempts to open a channel with the specified // parameters extended from Alice to Bob. Additionally, two items are asserted // after the channel is considered open: the funding transaction should be @@ -102,14 +126,8 @@ func openChannelAndAssert(ct *CT, net *networkHarness, ctx context.Context, // Mine a block, then wait for Alice's node to notify us that the // channel has been opened. The funding transaction should be found // within the newly mined block. - blockHash, err := net.Miner.Node.Generate(1) - if err != nil { - ct.Errorf("unable to generate block: %v", err) - } - block, err := net.Miner.Node.GetBlock(blockHash[0]) - if err != nil { - ct.Errorf("unable to get block: %v", err) - } + block := mineBlocks(ct, net, 1)[0] + fundingChanPoint, err := net.WaitForChannelOpen(ctx, chanOpenUpdate) if err != nil { ct.Errorf("error while waiting for channel open: %v", err) @@ -150,14 +168,7 @@ func closeChannelAndAssert(ct *CT, net *networkHarness, ctx context.Context, // Finally, generate a single block, wait for the final close status // update, then ensure that the closing transaction was included in the // block. - blockHash, err := net.Miner.Node.Generate(1) - if err != nil { - ct.Errorf("unable to generate block: %v", err) - } - block, err := net.Miner.Node.GetBlock(blockHash[0]) - if err != nil { - ct.Errorf("unable to get block: %v", err) - } + block := mineBlocks(ct, net, 1)[0] closingTxid, err := net.WaitForChannelClose(ctx, closeUpdates) if err != nil { @@ -752,16 +763,114 @@ func testBasicChannelCreation(net *networkHarness, ct *CT) { } } -type testCase func(net *networkHarness, t *testing.T) +// testMaxPendingChannels checks that error is returned from remote peer if +// max pending channel number was exceeded and that '--maxpendingchannels' flag +// exists and works properly. +func testMaxPendingChannels(net *networkHarness, ct *CT) { + maxPendingChannels := defaultMaxPendingChannels + 1 + amount := btcutil.Amount(btcutil.SatoshiPerBitcoin) + + timeout := time.Duration(time.Second * 10) + ctx, _ := context.WithTimeout(context.Background(), timeout) + + // Create a new node (Carol) with greater number of max pending + // channels. + args := []string{ + fmt.Sprintf("--maxpendingchannels=%v", maxPendingChannels), + } + + carol, err := net.NewNode(args) + + if err != nil { + ct.Errorf("unable to create new nodes: %v", err) + } + if err := net.ConnectNodes(ctx, net.Alice, carol); err != nil { + ct.Errorf("unable to connect carol to alice: %v", err) + } + + carolBalance := btcutil.Amount(maxPendingChannels) * amount + if err := net.SendCoins(ctx, carolBalance, carol); err != nil { + ct.Errorf("unable to send coins to carol: %v", err) + } + + // Send open channel requests without generating new blocks thereby + // increasing pool of pending channels. Then check that we can't + // open the channel if the number of pending channels exceed + // max value. + openStreams := make([]lnrpc.Lightning_OpenChannelClient, maxPendingChannels) + for i := 0; i < maxPendingChannels; i++ { + stream, err := net.OpenChannel(ctx, net.Alice, carol, amount, 1) + if err != nil { + ct.Errorf("unable to open channel: %v", err) + } + openStreams[i] = stream + } + + // Carol exhausted available amount of pending channels, next open + // channel request should cause ErrorGeneric to be sent back to Alice. + _, err = net.OpenChannel(ctx, net.Alice, carol, amount, 1) + if err == nil { + ct.Errorf("error wasn't received") + } else if grpc.Code(err) != OpenChannelFundingError { + ct.Errorf("not expected error was received : %v", err) + } + + // For now our channels are in pending state, in order to not + // interfere with other tests we should clean up - complete opening + // of the channel and then close it. + + // Mine a block, then wait for node's to notify us that the channel + // has been opened. The funding transactions should be found within the + // newly mined block. + block := mineBlocks(ct, net, 1)[0] + + chanPoints := make([]*lnrpc.ChannelPoint, maxPendingChannels) + + for i, stream := range openStreams { + fundingChanPoint, err := net.WaitForChannelOpen(ctx, stream) + if err != nil { + ct.Errorf("error while waiting for channel open: %v", err) + } + + fundingTxID, err := wire.NewShaHash(fundingChanPoint.FundingTxid) + if err != nil { + ct.Errorf("unable to create sha hash: %v", err) + } + + assertTxInBlock(ct, block, fundingTxID) + + // The channel should be listed in the peer information + // returned by both peers. + chanPoint := wire.OutPoint{ + Hash: *fundingTxID, + Index: fundingChanPoint.OutputIndex, + } + if err := net.AssertChannelExists(ctx, net.Alice, &chanPoint); err != nil { + ct.Errorf("unable to assert channel existence: %v", err) + } + + chanPoints[i] = fundingChanPoint + } + + // Finally close the channel between Alice and Carol, asserting that the + // channel has been properly closed on-chain. + for _, chanPoint := range chanPoints { + closeChannelAndAssert(ct, net, ctx, net.Alice, chanPoint) + } + +} + +type testCase func(net *networkHarness, ct *CT) var testCases = map[string]testCase{ "basic funding flow": testBasicChannelFunding, "channel force closure": testChannelForceClosure, "channel balance": testChannelBalance, "single hop invoice": testSingleHopInvoice, + "max pending channel": testMaxPendingChannels, "multi-hop payments": testMultiHopPayments, + "multiple channel creation": testBasicChannelCreation, "invoice update subscription": testInvoiceSubscriptions, - "multiple channel creation": testBasicChannelCreation, } // TestLightningNetworkDaemon performs a series of integration tests amongst a diff --git a/lnwire/error_generic.go b/lnwire/error_generic.go index c5ff2f0d..dea6f1d3 100644 --- a/lnwire/error_generic.go +++ b/lnwire/error_generic.go @@ -7,6 +7,12 @@ import ( "github.com/roasbeef/btcd/wire" ) +const ( + // Is returned by remote peer when number of pending channels exceed max + // value. + ErrorMaxPendingChannels = 1 +) + // ErrorGeneric represents a generic error bound to an exact channel. The // message format is purposefully general in order to allow expression of a wide // array of possible errors. Each ErrorGeneric message is directed at a particular diff --git a/peer.go b/peer.go index a996c4be..f970c8d9 100644 --- a/peer.go +++ b/peer.go @@ -373,6 +373,16 @@ out: p.remoteCloseChanReqs <- msg // TODO(roasbeef): interface for htlc update msgs // * .(CommitmentUpdater) + + case *lnwire.ErrorGeneric: + switch msg.ErrorID { + case lnwire.ErrorMaxPendingChannels: + p.server.fundingMgr.processErrorGeneric(msg, p) + default: + peerLog.Warn("ErrorGeneric(%v) handling isn't" + + " implemented.", msg.ErrorID) + } + case *lnwire.HTLCAddRequest: isChanUpdate = true targetChan = msg.ChannelPoint diff --git a/rpcserver.go b/rpcserver.go index c93b4be8..976deccd 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -27,6 +27,10 @@ import ( "golang.org/x/net/context" ) +const ( + OpenChannelFundingError = 100 +) + var ( defaultAccount uint32 = waddrmgr.DefaultAccountNum )