fundingmanager: add max pending channel check

This commit is contained in:
Andrey Samokhvalov 2016-10-15 16:24:56 +03:00
parent 19d8abade8
commit 7196c4bb1c
5 changed files with 220 additions and 18 deletions

@ -13,8 +13,10 @@ import (
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"fmt"
"github.com/BitfuryLightning/tools/rt" "github.com/BitfuryLightning/tools/rt"
"github.com/BitfuryLightning/tools/rt/graph" "github.com/BitfuryLightning/tools/rt/graph"
"google.golang.org/grpc"
) )
const ( const (
@ -88,6 +90,14 @@ type fundingOpenMsg struct {
peer *peer peer *peer
} }
// fundingErrorMsg couples an lnwire.ErrorGeneric message
// with the peer who sent the message. This allows the funding
// manager properly process the error.
type fundingErrorMsg struct {
err *lnwire.ErrorGeneric
peer *peer
}
// pendingChannels is a map instantiated per-peer which tracks all active // pendingChannels is a map instantiated per-peer which tracks all active
// pending single funded channels indexed by their pending channel identifier. // pending single funded channels indexed by their pending channel identifier.
type pendingChannels map[uint64]*reservationWithCtx type pendingChannels map[uint64]*reservationWithCtx
@ -232,6 +242,8 @@ out:
f.handleFundingSignComplete(fmsg) f.handleFundingSignComplete(fmsg)
case *fundingOpenMsg: case *fundingOpenMsg:
f.handleFundingOpen(fmsg) f.handleFundingOpen(fmsg)
case *fundingErrorMsg:
f.handleErrorGenericMsg(fmsg)
} }
case req := <-f.fundingRequests: case req := <-f.fundingRequests:
f.handleInitFundingMsg(req) f.handleInitFundingMsg(req)
@ -297,6 +309,23 @@ func (f *fundingManager) processFundingRequest(msg *lnwire.SingleFundingRequest,
// TODO(roasbeef): add error chan to all, let channelManager handle // TODO(roasbeef): add error chan to all, let channelManager handle
// error+propagate // error+propagate
func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
// Check number of pending channels to be smaller than maximum allowed
// number and send ErrorGeneric to remote peer if condition is violated.
if len(f.activeReservations[fmsg.peer.id]) >= cfg.MaxPendingChannels {
errMsg := &lnwire.ErrorGeneric{
ChannelPoint: &wire.OutPoint{
Hash: wire.ShaHash{},
Index: 0,
},
Problem: "Number of pending channels exceed maximum",
ErrorID: lnwire.ErrorMaxPendingChannels,
PendingChannelID: fmsg.msg.ChannelID,
}
fmsg.peer.queueMsg(errMsg, nil)
return
}
msg := fmsg.msg msg := fmsg.msg
amt := msg.FundingAmount amt := msg.FundingAmount
delay := msg.CsvDelay delay := msg.CsvDelay
@ -684,6 +713,7 @@ func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq)
// wallet, then sends a funding request to the remote peer kicking off the // wallet, then sends a funding request to the remote peer kicking off the
// funding workflow. // funding workflow.
func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
nodeID := msg.peer.lightningID nodeID := msg.peer.lightningID
localAmt := msg.localFundingAmt localAmt := msg.localFundingAmt
@ -755,3 +785,46 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
) )
msg.peer.queueMsg(fundingReq, nil) msg.peer.queueMsg(fundingReq, nil)
} }
// processErrorGeneric sends a message to the fundingManager allowing it
// to process the occurred generic error.
func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric,
peer *peer) {
f.fundingMsgs <- &fundingErrorMsg{err, peer}
}
// handleErrorGenericMsg process the error which was received from remote peer,
// depends on the type of error we should do different clean up steps and
// inform user about it.
func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) {
if fmsg.err.ErrorID == lnwire.ErrorMaxPendingChannels {
peerID := fmsg.peer.id
chanID := fmsg.err.PendingChannelID
f.resMtx.RLock()
resCtx, ok := f.activeReservations[peerID][chanID]
f.resMtx.RUnlock()
if !ok {
resCtx.err <- fmt.Errorf("ErrorGeneric error " +
"was returned from remote peer for channel "+
"(id: %v), but it can't be found and thereby "+
"can't be canceled.", chanID)
}
if err := resCtx.reservation.Cancel(); err != nil {
resCtx.err <- fmt.Errorf("Remote peer responded "+
"with: Number of pending channels exceed "+
"maximum, but we can't cancel the reservation "+
"- %v", err)
} else {
resCtx.err <- grpc.Errorf(OpenChannelFundingError,
"Remote peer responded with: Number of "+
"pending channels exceed maximum")
}
f.resMtx.Lock()
delete(f.activeReservations[peerID], chanID)
f.resMtx.Unlock()
}
}

@ -13,6 +13,7 @@ import (
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcrpcclient" "github.com/roasbeef/btcrpcclient"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"google.golang.org/grpc"
"testing" "testing"
) )
@ -86,6 +87,29 @@ func assertTxInBlock(ct *CT, block *btcutil.Block, txid *wire.ShaHash) {
ct.Errorf("funding tx was not included in block") ct.Errorf("funding tx was not included in block")
} }
// mineBlocks mine 'num' of blocks and check that blocks are present in
// node blockchain.
func mineBlocks(ct *CT, net *networkHarness, num uint32) []*btcutil.Block {
blocks := make([]*btcutil.Block, num)
blockHashes, err := net.Miner.Node.Generate(num)
if err != nil {
ct.Errorf("unable to generate blocks: %v", err)
}
for i, blockHash := range blockHashes {
block, err := net.Miner.Node.GetBlock(blockHash)
if err != nil {
ct.Errorf("unable to get block: %v", err)
}
blocks[i] = block
}
return blocks
}
// openChannelAndAssert attempts to open a channel with the specified // openChannelAndAssert attempts to open a channel with the specified
// parameters extended from Alice to Bob. Additionally, two items are asserted // parameters extended from Alice to Bob. Additionally, two items are asserted
// after the channel is considered open: the funding transaction should be // after the channel is considered open: the funding transaction should be
@ -102,14 +126,8 @@ func openChannelAndAssert(ct *CT, net *networkHarness, ctx context.Context,
// Mine a block, then wait for Alice's node to notify us that the // Mine a block, then wait for Alice's node to notify us that the
// channel has been opened. The funding transaction should be found // channel has been opened. The funding transaction should be found
// within the newly mined block. // within the newly mined block.
blockHash, err := net.Miner.Node.Generate(1) block := mineBlocks(ct, net, 1)[0]
if err != nil {
ct.Errorf("unable to generate block: %v", err)
}
block, err := net.Miner.Node.GetBlock(blockHash[0])
if err != nil {
ct.Errorf("unable to get block: %v", err)
}
fundingChanPoint, err := net.WaitForChannelOpen(ctx, chanOpenUpdate) fundingChanPoint, err := net.WaitForChannelOpen(ctx, chanOpenUpdate)
if err != nil { if err != nil {
ct.Errorf("error while waiting for channel open: %v", err) ct.Errorf("error while waiting for channel open: %v", err)
@ -150,14 +168,7 @@ func closeChannelAndAssert(ct *CT, net *networkHarness, ctx context.Context,
// Finally, generate a single block, wait for the final close status // Finally, generate a single block, wait for the final close status
// update, then ensure that the closing transaction was included in the // update, then ensure that the closing transaction was included in the
// block. // block.
blockHash, err := net.Miner.Node.Generate(1) block := mineBlocks(ct, net, 1)[0]
if err != nil {
ct.Errorf("unable to generate block: %v", err)
}
block, err := net.Miner.Node.GetBlock(blockHash[0])
if err != nil {
ct.Errorf("unable to get block: %v", err)
}
closingTxid, err := net.WaitForChannelClose(ctx, closeUpdates) closingTxid, err := net.WaitForChannelClose(ctx, closeUpdates)
if err != nil { if err != nil {
@ -752,16 +763,114 @@ func testBasicChannelCreation(net *networkHarness, ct *CT) {
} }
} }
type testCase func(net *networkHarness, t *testing.T) // testMaxPendingChannels checks that error is returned from remote peer if
// max pending channel number was exceeded and that '--maxpendingchannels' flag
// exists and works properly.
func testMaxPendingChannels(net *networkHarness, ct *CT) {
maxPendingChannels := defaultMaxPendingChannels + 1
amount := btcutil.Amount(btcutil.SatoshiPerBitcoin)
timeout := time.Duration(time.Second * 10)
ctx, _ := context.WithTimeout(context.Background(), timeout)
// Create a new node (Carol) with greater number of max pending
// channels.
args := []string{
fmt.Sprintf("--maxpendingchannels=%v", maxPendingChannels),
}
carol, err := net.NewNode(args)
if err != nil {
ct.Errorf("unable to create new nodes: %v", err)
}
if err := net.ConnectNodes(ctx, net.Alice, carol); err != nil {
ct.Errorf("unable to connect carol to alice: %v", err)
}
carolBalance := btcutil.Amount(maxPendingChannels) * amount
if err := net.SendCoins(ctx, carolBalance, carol); err != nil {
ct.Errorf("unable to send coins to carol: %v", err)
}
// Send open channel requests without generating new blocks thereby
// increasing pool of pending channels. Then check that we can't
// open the channel if the number of pending channels exceed
// max value.
openStreams := make([]lnrpc.Lightning_OpenChannelClient, maxPendingChannels)
for i := 0; i < maxPendingChannels; i++ {
stream, err := net.OpenChannel(ctx, net.Alice, carol, amount, 1)
if err != nil {
ct.Errorf("unable to open channel: %v", err)
}
openStreams[i] = stream
}
// Carol exhausted available amount of pending channels, next open
// channel request should cause ErrorGeneric to be sent back to Alice.
_, err = net.OpenChannel(ctx, net.Alice, carol, amount, 1)
if err == nil {
ct.Errorf("error wasn't received")
} else if grpc.Code(err) != OpenChannelFundingError {
ct.Errorf("not expected error was received : %v", err)
}
// For now our channels are in pending state, in order to not
// interfere with other tests we should clean up - complete opening
// of the channel and then close it.
// Mine a block, then wait for node's to notify us that the channel
// has been opened. The funding transactions should be found within the
// newly mined block.
block := mineBlocks(ct, net, 1)[0]
chanPoints := make([]*lnrpc.ChannelPoint, maxPendingChannels)
for i, stream := range openStreams {
fundingChanPoint, err := net.WaitForChannelOpen(ctx, stream)
if err != nil {
ct.Errorf("error while waiting for channel open: %v", err)
}
fundingTxID, err := wire.NewShaHash(fundingChanPoint.FundingTxid)
if err != nil {
ct.Errorf("unable to create sha hash: %v", err)
}
assertTxInBlock(ct, block, fundingTxID)
// The channel should be listed in the peer information
// returned by both peers.
chanPoint := wire.OutPoint{
Hash: *fundingTxID,
Index: fundingChanPoint.OutputIndex,
}
if err := net.AssertChannelExists(ctx, net.Alice, &chanPoint); err != nil {
ct.Errorf("unable to assert channel existence: %v", err)
}
chanPoints[i] = fundingChanPoint
}
// Finally close the channel between Alice and Carol, asserting that the
// channel has been properly closed on-chain.
for _, chanPoint := range chanPoints {
closeChannelAndAssert(ct, net, ctx, net.Alice, chanPoint)
}
}
type testCase func(net *networkHarness, ct *CT)
var testCases = map[string]testCase{ var testCases = map[string]testCase{
"basic funding flow": testBasicChannelFunding, "basic funding flow": testBasicChannelFunding,
"channel force closure": testChannelForceClosure, "channel force closure": testChannelForceClosure,
"channel balance": testChannelBalance, "channel balance": testChannelBalance,
"single hop invoice": testSingleHopInvoice, "single hop invoice": testSingleHopInvoice,
"max pending channel": testMaxPendingChannels,
"multi-hop payments": testMultiHopPayments, "multi-hop payments": testMultiHopPayments,
"invoice update subscription": testInvoiceSubscriptions,
"multiple channel creation": testBasicChannelCreation, "multiple channel creation": testBasicChannelCreation,
"invoice update subscription": testInvoiceSubscriptions,
} }
// TestLightningNetworkDaemon performs a series of integration tests amongst a // TestLightningNetworkDaemon performs a series of integration tests amongst a

@ -7,6 +7,12 @@ import (
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
) )
const (
// Is returned by remote peer when number of pending channels exceed max
// value.
ErrorMaxPendingChannels = 1
)
// ErrorGeneric represents a generic error bound to an exact channel. The // ErrorGeneric represents a generic error bound to an exact channel. The
// message format is purposefully general in order to allow expression of a wide // message format is purposefully general in order to allow expression of a wide
// array of possible errors. Each ErrorGeneric message is directed at a particular // array of possible errors. Each ErrorGeneric message is directed at a particular

10
peer.go

@ -373,6 +373,16 @@ out:
p.remoteCloseChanReqs <- msg p.remoteCloseChanReqs <- msg
// TODO(roasbeef): interface for htlc update msgs // TODO(roasbeef): interface for htlc update msgs
// * .(CommitmentUpdater) // * .(CommitmentUpdater)
case *lnwire.ErrorGeneric:
switch msg.ErrorID {
case lnwire.ErrorMaxPendingChannels:
p.server.fundingMgr.processErrorGeneric(msg, p)
default:
peerLog.Warn("ErrorGeneric(%v) handling isn't" +
" implemented.", msg.ErrorID)
}
case *lnwire.HTLCAddRequest: case *lnwire.HTLCAddRequest:
isChanUpdate = true isChanUpdate = true
targetChan = msg.ChannelPoint targetChan = msg.ChannelPoint

@ -27,6 +27,10 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const (
OpenChannelFundingError = 100
)
var ( var (
defaultAccount uint32 = waddrmgr.DefaultAccountNum defaultAccount uint32 = waddrmgr.DefaultAccountNum
) )