lnd: remove openChanReq, call InitFundingWorkflow with InitFundingMsg

This commit is contained in:
eugene 2020-11-24 12:19:39 -05:00
parent e85baea1b3
commit 936227798a
5 changed files with 312 additions and 283 deletions

View File

@ -120,8 +120,76 @@ func (r *reservationWithCtx) updateTimestamp() {
// embedded within this message giving the funding manager full context w.r.t
// the workflow.
type InitFundingMsg struct {
peer lnpeer.Peer
*openChanReq
// Peer is the peer that we want to open a channel to.
Peer lnpeer.Peer
// TargetPubkey is the public key of the peer.
TargetPubkey *btcec.PublicKey
// ChainHash is the target genesis hash for this channel.
ChainHash chainhash.Hash
// SubtractFees set to true means that fees will be subtracted
// from the LocalFundingAmt.
SubtractFees bool
// LocalFundingAmt is the size of the channel.
LocalFundingAmt btcutil.Amount
// PushAmt is the amount pushed to the counterparty.
PushAmt lnwire.MilliSatoshi
// FundingFeePerKw is the fee for the funding transaction.
FundingFeePerKw chainfee.SatPerKWeight
// Private determines whether or not this channel will be private.
Private bool
// MinHtlcIn is the minimum incoming HTLC that we accept.
MinHtlcIn lnwire.MilliSatoshi
// RemoteCsvDelay is the CSV delay we require for the remote peer.
RemoteCsvDelay uint16
// MinConfs indicates the minimum number of confirmations that each
// output selected to fund the channel should satisfy.
MinConfs int32
// ShutdownScript is an optional upfront shutdown script for the
// channel. This value is optional, so may be nil.
ShutdownScript lnwire.DeliveryAddress
// MaxValueInFlight is the maximum amount of coins in MilliSatoshi
// that can be pending within the channel. It only applies to the
// remote party.
MaxValueInFlight lnwire.MilliSatoshi
// MaxHtlcs is the maximum number of HTLCs that the remote peer
// can offer us.
MaxHtlcs uint16
// MaxLocalCsv is the maximum local csv delay we will accept from our
// peer.
MaxLocalCsv uint16
// ChanFunder is an optional channel funder that allows the caller to
// control exactly how the channel funding is carried out. If not
// specified, then the default chanfunding.WalletAssembler will be
// used.
ChanFunder chanfunding.Assembler
// PendingChanID is not all zeroes (the default value), then this will
// be the pending channel ID used for the funding flow within the wire
// protocol.
PendingChanID [32]byte
// Updates is a channel which updates to the opening status of the channel
// are sent on.
Updates chan *lnrpc.OpenStatusUpdate
// Err is a channel which errors encountered during the funding flow are
// sent on.
Err chan error
}
// fundingMsg is sent by the ProcessFundingMsg function and packages a
@ -2919,11 +2987,8 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey, localFundingKey,
// InitFundingWorkflow sends a message to the funding manager instructing it
// to initiate a single funder workflow with the source peer.
// TODO(roasbeef): re-visit blocking nature..
func (f *Manager) InitFundingWorkflow(peer lnpeer.Peer, req *openChanReq) {
f.fundingRequests <- &InitFundingMsg{
peer: peer,
openChanReq: req,
}
func (f *Manager) InitFundingWorkflow(msg *InitFundingMsg) {
f.fundingRequests <- msg
}
// getUpfrontShutdownScript takes a user provided script and a getScript
@ -2975,13 +3040,13 @@ func getUpfrontShutdownScript(enableUpfrontShutdown bool, peer lnpeer.Peer,
// funding workflow.
func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
var (
peerKey = msg.peer.IdentityKey()
localAmt = msg.localFundingAmt
minHtlcIn = msg.minHtlcIn
remoteCsvDelay = msg.remoteCsvDelay
maxValue = msg.maxValueInFlight
maxHtlcs = msg.maxHtlcs
maxCSV = msg.maxLocalCsv
peerKey = msg.Peer.IdentityKey()
localAmt = msg.LocalFundingAmt
minHtlcIn = msg.MinHtlcIn
remoteCsvDelay = msg.RemoteCsvDelay
maxValue = msg.MaxValueInFlight
maxHtlcs = msg.MaxHtlcs
maxCSV = msg.MaxLocalCsv
)
// If no maximum CSV delay was set for this channel, we use our default
@ -3000,14 +3065,14 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
}
fndgLog.Infof("Initiating fundingRequest(local_amt=%v "+
"(subtract_fees=%v), push_amt=%v, chain_hash=%v, peer=%x, "+
"dust_limit=%v, min_confs=%v)", localAmt, msg.subtractFees,
msg.pushAmt, msg.chainHash, peerKey.SerializeCompressed(),
ourDustLimit, msg.minConfs)
"dust_limit=%v, min_confs=%v)", localAmt, msg.SubtractFees,
msg.PushAmt, msg.ChainHash, peerKey.SerializeCompressed(),
ourDustLimit, msg.MinConfs)
// We set the channel flags to indicate whether we want this channel to
// be announced to the network.
var channelFlags lnwire.FundingFlag
if !msg.openChanReq.private {
if !msg.Private {
// This channel will be announced.
channelFlags = lnwire.FFAnnounceChannel
}
@ -3016,15 +3081,15 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
// Otherwise we'll generate a fresh one as normal. This will be used
// to track this reservation throughout its lifetime.
var chanID [32]byte
if msg.pendingChanID == zeroID {
if msg.PendingChanID == zeroID {
chanID = f.nextPendingChanID()
} else {
// If the user specified their own pending channel ID, then
// we'll ensure it doesn't collide with any existing pending
// channel ID.
chanID = msg.pendingChanID
chanID = msg.PendingChanID
if _, err := f.getReservationCtx(peerKey, chanID); err == nil {
msg.err <- fmt.Errorf("pendingChannelID(%x) "+
msg.Err <- fmt.Errorf("pendingChannelID(%x) "+
"already present", chanID[:])
return
}
@ -3035,8 +3100,8 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
// address from the wallet if our node is configured to set shutdown
// address by default).
shutdown, err := getUpfrontShutdownScript(
f.cfg.EnableUpfrontShutdown, msg.peer,
msg.openChanReq.shutdownScript,
f.cfg.EnableUpfrontShutdown, msg.Peer,
msg.ShutdownScript,
func() (lnwire.DeliveryAddress, error) {
addr, err := f.cfg.Wallet.NewAddress(
lnwallet.WitnessPubKey, false,
@ -3048,7 +3113,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
},
)
if err != nil {
msg.err <- err
msg.Err <- err
return
}
@ -3060,7 +3125,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
// format we can use with this peer. This is dependent on *both* us and
// the remote peer are signaling the proper feature bit.
commitType := commitmentType(
msg.peer.LocalFeatures(), msg.peer.RemoteFeatures(),
msg.Peer.LocalFeatures(), msg.Peer.RemoteFeatures(),
)
// First, we'll query the fee estimator for a fee that should get the
@ -3069,7 +3134,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
// to execute a timely unilateral channel closure if needed.
commitFeePerKw, err := f.cfg.FeeEstimator.EstimateFeePerKW(3)
if err != nil {
msg.err <- err
msg.Err <- err
return
}
@ -3082,25 +3147,25 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
}
req := &lnwallet.InitFundingReserveMsg{
ChainHash: &msg.chainHash,
ChainHash: &msg.ChainHash,
PendingChanID: chanID,
NodeID: peerKey,
NodeAddr: msg.peer.Address(),
SubtractFees: msg.subtractFees,
NodeAddr: msg.Peer.Address(),
SubtractFees: msg.SubtractFees,
LocalFundingAmt: localAmt,
RemoteFundingAmt: 0,
CommitFeePerKw: commitFeePerKw,
FundingFeePerKw: msg.fundingFeePerKw,
PushMSat: msg.pushAmt,
FundingFeePerKw: msg.FundingFeePerKw,
PushMSat: msg.PushAmt,
Flags: channelFlags,
MinConfs: msg.minConfs,
MinConfs: msg.MinConfs,
CommitType: commitType,
ChanFunder: msg.chanFunder,
ChanFunder: msg.ChanFunder,
}
reservation, err := f.cfg.Wallet.InitChannelReservation(req)
if err != nil {
msg.err <- err
msg.Err <- err
return
}
@ -3154,9 +3219,9 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
remoteMaxHtlcs: maxHtlcs,
maxLocalCsv: maxCSV,
reservation: reservation,
peer: msg.peer,
updates: msg.updates,
err: msg.err,
peer: msg.Peer,
updates: msg.Updates,
err: msg.Err,
}
f.activeReservations[peerIDKey][chanID] = resCtx
f.resMtx.Unlock()
@ -3174,13 +3239,13 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
chanReserve := f.cfg.RequiredRemoteChanReserve(capacity, ourDustLimit)
fndgLog.Infof("Starting funding workflow with %v for pending_id(%x), "+
"committype=%v", msg.peer.Address(), chanID, commitType)
"committype=%v", msg.Peer.Address(), chanID, commitType)
fundingOpen := lnwire.OpenChannel{
ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash,
PendingChannelID: chanID,
FundingAmount: capacity,
PushAmount: msg.pushAmt,
PushAmount: msg.PushAmt,
DustLimit: ourContribution.DustLimit,
MaxValueInFlight: maxValue,
ChannelReserve: chanReserve,
@ -3197,7 +3262,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
ChannelFlags: channelFlags,
UpfrontShutdownScript: shutdown,
}
if err := msg.peer.SendMessage(true, &fundingOpen); err != nil {
if err := msg.Peer.SendMessage(true, &fundingOpen); err != nil {
e := fmt.Errorf("unable to send funding request message: %v",
err)
fndgLog.Errorf(e.Error())
@ -3209,7 +3274,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
msg.err <- e
msg.Err <- e
return
}
}

View File

@ -657,25 +657,26 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
// Create a funding request and start the workflow.
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
subtractFees: subtractFees,
localFundingAmt: localFundingAmt,
pushAmt: lnwire.NewMSatFromSatoshis(pushAmt),
fundingFeePerKw: 1000,
private: !announceChan,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
SubtractFees: subtractFees,
LocalFundingAmt: localFundingAmt,
PushAmt: lnwire.NewMSatFromSatoshis(pushAmt),
FundingFeePerKw: 1000,
Private: !announceChan,
Updates: updateChan,
Err: errChan,
}
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -1317,22 +1318,23 @@ func testLocalCSVLimit(t *testing.T, aliceMaxCSV, bobRequiredCSV uint16) {
// First, we will initiate an outgoing channel from Alice -> Bob.
errChan := make(chan error, 1)
updateChan := make(chan *lnrpc.OpenStatusUpdate)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 200000,
fundingFeePerKw: 1000,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 200000,
FundingFeePerKw: 1000,
Updates: updateChan,
Err: errChan,
}
// Alice should have sent the OpenChannel message to Bob.
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
@ -1382,23 +1384,24 @@ func testLocalCSVLimit(t *testing.T, aliceMaxCSV, bobRequiredCSV uint16) {
// handle incoming channels, opening a channel from Bob->Alice.
errChan = make(chan error, 1)
updateChan = make(chan *lnrpc.OpenStatusUpdate)
initReq = &openChanReq{
targetPubkey: alice.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 200000,
fundingFeePerKw: 1000,
updates: updateChan,
err: errChan,
initReq = &InitFundingMsg{
Peer: alice,
TargetPubkey: alice.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 200000,
FundingFeePerKw: 1000,
Updates: updateChan,
Err: errChan,
}
bob.fundingMgr.InitFundingWorkflow(alice, initReq)
bob.fundingMgr.InitFundingWorkflow(initReq)
// Bob should have sent the OpenChannel message to Alice.
var bobMsg lnwire.Message
select {
case bobMsg = <-bob.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("bob OpenChannel message failed: %v", err)
case <-time.After(time.Second * 5):
@ -1734,23 +1737,24 @@ func TestFundingManagerPeerTimeoutAfterInitFunding(t *testing.T) {
// Create a funding request and start the workflow.
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 500000,
pushAmt: lnwire.NewMSatFromSatoshis(0),
private: false,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 500000,
PushAmt: lnwire.NewMSatFromSatoshis(0),
Private: false,
Updates: updateChan,
Err: errChan,
}
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -1796,23 +1800,24 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) {
// Create a funding request and start the workflow.
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 500000,
pushAmt: lnwire.NewMSatFromSatoshis(0),
private: false,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 500000,
PushAmt: lnwire.NewMSatFromSatoshis(0),
Private: false,
Updates: updateChan,
Err: errChan,
}
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -1867,23 +1872,24 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
// Create a funding request and start the workflow.
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 500000,
pushAmt: lnwire.NewMSatFromSatoshis(0),
private: false,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 500000,
PushAmt: lnwire.NewMSatFromSatoshis(0),
Private: false,
Updates: updateChan,
Err: errChan,
}
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -2591,26 +2597,27 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
// Create a funding request with the custom parameters and start the
// workflow.
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: localAmt,
pushAmt: lnwire.NewMSatFromSatoshis(pushAmt),
private: false,
maxValueInFlight: maxValueInFlight,
minHtlcIn: minHtlcIn,
remoteCsvDelay: csvDelay,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: localAmt,
PushAmt: lnwire.NewMSatFromSatoshis(pushAmt),
Private: false,
MaxValueInFlight: maxValueInFlight,
MinHtlcIn: minHtlcIn,
RemoteCsvDelay: csvDelay,
Updates: updateChan,
Err: errChan,
}
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -2871,19 +2878,20 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
)
defer tearDownFundingManagers(t, alice, bob)
// Create openChanReqs for maxPending+1 channels.
var initReqs []*openChanReq
// Create InitFundingMsg structs for maxPending+1 channels.
var initReqs []*InitFundingMsg
for i := 0; i < maxPending+1; i++ {
updateChan := make(chan *lnrpc.OpenStatusUpdate)
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 5000000,
pushAmt: lnwire.NewMSatFromSatoshis(0),
private: false,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 5000000,
PushAmt: lnwire.NewMSatFromSatoshis(0),
Private: false,
Updates: updateChan,
Err: errChan,
}
initReqs = append(initReqs, initReq)
}
@ -2892,13 +2900,13 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
var accepts []*lnwire.AcceptChannel
var lastOpen *lnwire.OpenChannel
for i, initReq := range initReqs {
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -2974,7 +2982,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// publish a funding tx to the network.
var pendingUpdate *lnrpc.OpenStatusUpdate
select {
case pendingUpdate = <-initReqs[i].updates:
case pendingUpdate = <-initReqs[i].Updates:
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenStatusUpdate_ChanPending")
}
@ -3046,23 +3054,24 @@ func TestFundingManagerRejectPush(t *testing.T) {
// Create a funding request and start the workflow.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 500000,
pushAmt: lnwire.NewMSatFromSatoshis(10),
private: true,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 500000,
PushAmt: lnwire.NewMSatFromSatoshis(10),
Private: true,
Updates: updateChan,
Err: errChan,
}
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -3103,23 +3112,24 @@ func TestFundingManagerMaxConfs(t *testing.T) {
// Create a funding request and start the workflow.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: 500000,
pushAmt: lnwire.NewMSatFromSatoshis(10),
private: false,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: 500000,
PushAmt: lnwire.NewMSatFromSatoshis(10),
Private: false,
Updates: updateChan,
Err: errChan,
}
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
// Alice should have sent the OpenChannel message to Bob.
var aliceMsg lnwire.Message
select {
case aliceMsg = <-alice.msgChan:
case err := <-initReq.err:
case err := <-initReq.Err:
t.Fatalf("error init funding workflow: %v", err)
case <-time.After(time.Second * 5):
t.Fatalf("alice did not send OpenChannel message")
@ -3385,19 +3395,20 @@ func TestMaxChannelSizeConfig(t *testing.T) {
// imposed by --maxchansize, which should be rejected.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: funding.MaxBtcFundingAmount,
pushAmt: lnwire.NewMSatFromSatoshis(0),
private: false,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: funding.MaxBtcFundingAmount,
PushAmt: lnwire.NewMSatFromSatoshis(0),
Private: false,
Updates: updateChan,
Err: errChan,
}
// After processing the funding open message, bob should respond with
// an error rejecting the channel that exceeds size limit.
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
openChanMsg := expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan)
@ -3411,8 +3422,11 @@ func TestMaxChannelSizeConfig(t *testing.T) {
cfg.MaxChanSize = funding.MaxBtcFundingAmount + 1
})
// Reset the Peer to the newly created one.
initReq.Peer = bob
// We expect Bob to respond with an Accept channel message.
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
@ -3426,13 +3440,16 @@ func TestMaxChannelSizeConfig(t *testing.T) {
cfg.MaxChanSize = btcutil.Amount(100000000)
})
// Reset the Peer to the newly created one.
initReq.Peer = bob
// Attempt to create a channel above the limit
// imposed by --maxchansize, which should be rejected.
initReq.localFundingAmt = btcutil.SatoshiPerBitcoin + 1
initReq.LocalFundingAmt = btcutil.SatoshiPerBitcoin + 1
// After processing the funding open message, bob should respond with
// an error rejecting the channel that exceeds size limit.
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan)
@ -3454,29 +3471,30 @@ func TestWumboChannelConfig(t *testing.T) {
// funding process w/o issue.
updateChan := make(chan *lnrpc.OpenStatusUpdate)
errChan := make(chan error, 1)
initReq := &openChanReq{
targetPubkey: bob.privKey.PubKey(),
chainHash: *fundingNetParams.GenesisHash,
localFundingAmt: funding.MaxBtcFundingAmount,
pushAmt: lnwire.NewMSatFromSatoshis(0),
private: false,
updates: updateChan,
err: errChan,
initReq := &InitFundingMsg{
Peer: bob,
TargetPubkey: bob.privKey.PubKey(),
ChainHash: *fundingNetParams.GenesisHash,
LocalFundingAmt: funding.MaxBtcFundingAmount,
PushAmt: lnwire.NewMSatFromSatoshis(0),
Private: false,
Updates: updateChan,
Err: errChan,
}
// We expect Bob to respond with an Accept channel message.
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
openChanMsg := expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
// We'll now attempt to create a channel above the wumbo mark, which
// should be rejected.
initReq.localFundingAmt = btcutil.SatoshiPerBitcoin
initReq.LocalFundingAmt = btcutil.SatoshiPerBitcoin
// After processing the funding open message, bob should respond with
// an error rejecting the channel.
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan)
@ -3489,9 +3507,12 @@ func TestWumboChannelConfig(t *testing.T) {
cfg.MaxChanSize = funding.MaxBtcFundingAmountWumbo
})
// Reset the Peer to the newly created one.
initReq.Peer = bob
// We should now be able to initiate a wumbo channel funding w/o any
// issues.
alice.fundingMgr.InitFundingWorkflow(bob, initReq)
alice.fundingMgr.InitFundingWorkflow(initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")

View File

@ -97,18 +97,18 @@ func (c *chanController) OpenChannel(target *btcec.PublicKey,
// Construct the open channel request and send it to the server to begin
// the funding workflow.
req := &openChanReq{
targetPubkey: target,
chainHash: *c.netParams.GenesisHash,
subtractFees: true,
localFundingAmt: amt,
pushAmt: 0,
minHtlcIn: c.chanMinHtlcIn,
fundingFeePerKw: feePerKw,
private: c.private,
remoteCsvDelay: 0,
minConfs: c.minConfs,
maxValueInFlight: 0,
req := &InitFundingMsg{
TargetPubkey: target,
ChainHash: *c.netParams.GenesisHash,
SubtractFees: true,
LocalFundingAmt: amt,
PushAmt: 0,
MinHtlcIn: c.chanMinHtlcIn,
FundingFeePerKw: feePerKw,
Private: c.private,
RemoteCsvDelay: 0,
MinConfs: c.minConfs,
MaxValueInFlight: 0,
}
updateStream, errChan := c.server.OpenChannel(req)

View File

@ -1761,11 +1761,11 @@ func (r *rpcServer) canOpenChannel() error {
return nil
}
// praseOpenChannelReq parses an OpenChannelRequest message into the server's
// native openChanReq struct. The logic is abstracted so that it can be shared
// between OpenChannel and OpenChannelSync.
// praseOpenChannelReq parses an OpenChannelRequest message into an InitFundingMsg
// struct. The logic is abstracted so that it can be shared between OpenChannel
// and OpenChannelSync.
func (r *rpcServer) parseOpenChannelReq(in *lnrpc.OpenChannelRequest,
isSync bool) (*openChanReq, error) {
isSync bool) (*InitFundingMsg, error) {
rpcsLog.Debugf("[openchannel] request to NodeKey(%x) "+
"allocation(us=%v, them=%v)", in.NodePubkey,
@ -1892,20 +1892,20 @@ func (r *rpcServer) parseOpenChannelReq(in *lnrpc.OpenChannelRequest,
// Instruct the server to trigger the necessary events to attempt to
// open a new channel. A stream is returned in place, this stream will
// be used to consume updates of the state of the pending channel.
return &openChanReq{
targetPubkey: nodePubKey,
chainHash: *r.cfg.ActiveNetParams.GenesisHash,
localFundingAmt: localFundingAmt,
pushAmt: lnwire.NewMSatFromSatoshis(remoteInitialBalance),
minHtlcIn: minHtlcIn,
fundingFeePerKw: feeRate,
private: in.Private,
remoteCsvDelay: remoteCsvDelay,
minConfs: minConfs,
shutdownScript: script,
maxValueInFlight: maxValue,
maxHtlcs: maxHtlcs,
maxLocalCsv: uint16(in.MaxLocalCsv),
return &InitFundingMsg{
TargetPubkey: nodePubKey,
ChainHash: *r.cfg.ActiveNetParams.GenesisHash,
LocalFundingAmt: localFundingAmt,
PushAmt: lnwire.NewMSatFromSatoshis(remoteInitialBalance),
MinHtlcIn: minHtlcIn,
FundingFeePerKw: feeRate,
Private: in.Private,
RemoteCsvDelay: remoteCsvDelay,
MinConfs: minConfs,
ShutdownScript: script,
MaxValueInFlight: maxValue,
MaxHtlcs: maxHtlcs,
MaxLocalCsv: uint16(in.MaxLocalCsv),
}, nil
}
@ -1936,8 +1936,8 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
// Map the channel point shim into a new
// chanfunding.CannedAssembler that the wallet will use
// to obtain the channel point details.
copy(req.pendingChanID[:], chanPointShim.PendingChanId)
req.chanFunder, err = newFundingShimAssembler(
copy(req.PendingChanID[:], chanPointShim.PendingChanId)
req.ChanFunder, err = newFundingShimAssembler(
chanPointShim, true, r.server.cc.KeyRing,
)
if err != nil {
@ -1954,9 +1954,9 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
// Instruct the wallet to use the new
// chanfunding.PsbtAssembler to construct the funding
// transaction.
copy(req.pendingChanID[:], psbtShim.PendingChanId)
req.chanFunder, err = newPsbtAssembler(
in, req.minConfs, psbtShim,
copy(req.PendingChanID[:], psbtShim.PendingChanId)
req.ChanFunder, err = newPsbtAssembler(
in, req.MinConfs, psbtShim,
&r.server.cc.Wallet.Cfg.NetParams,
)
if err != nil {
@ -1973,7 +1973,7 @@ out:
select {
case err := <-errChan:
rpcsLog.Errorf("unable to open channel to NodeKey(%x): %v",
req.targetPubkey.SerializeCompressed(), err)
req.TargetPubkey.SerializeCompressed(), err)
return err
case fundingUpdate := <-updateChan:
rpcsLog.Tracef("[openchannel] sending update: %v",
@ -2005,7 +2005,7 @@ out:
}
rpcsLog.Tracef("[openchannel] success NodeKey(%x), ChannelPoint(%v)",
req.targetPubkey.SerializeCompressed(), outpoint)
req.TargetPubkey.SerializeCompressed(), outpoint)
return nil
}
@ -2030,7 +2030,7 @@ func (r *rpcServer) OpenChannelSync(ctx context.Context,
// If an error occurs them immediately return the error to the client.
case err := <-errChan:
rpcsLog.Errorf("unable to open channel to NodeKey(%x): %v",
req.targetPubkey.SerializeCompressed(), err)
req.TargetPubkey.SerializeCompressed(), err)
return nil, err
// Otherwise, wait for the first channel update. The first update sent

View File

@ -52,7 +52,6 @@ import (
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwallet/chanfunding"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/nat"
"github.com/lightningnetwork/lnd/netann"
@ -3484,63 +3483,6 @@ func (s *server) removePeer(p *peer.Brontide) {
s.peerNotifier.NotifyPeerOffline(pubKey)
}
// openChanReq is a message sent to the server in order to request the
// initiation of a channel funding workflow to the peer with either the
// specified relative peer ID, or a global lightning ID.
type openChanReq struct {
targetPubkey *btcec.PublicKey
chainHash chainhash.Hash
subtractFees bool
localFundingAmt btcutil.Amount
pushAmt lnwire.MilliSatoshi
fundingFeePerKw chainfee.SatPerKWeight
private bool
// minHtlcIn is the minimum incoming htlc that we accept.
minHtlcIn lnwire.MilliSatoshi
remoteCsvDelay uint16
// minConfs indicates the minimum number of confirmations that each
// output selected to fund the channel should satisfy.
minConfs int32
// shutdownScript is an optional upfront shutdown script for the channel.
// This value is optional, so may be nil.
shutdownScript lnwire.DeliveryAddress
// maxValueInFlight is the maximum amount of coins in millisatoshi that can
// be pending within the channel. It only applies to the remote party.
maxValueInFlight lnwire.MilliSatoshi
maxHtlcs uint16
// maxLocalCsv is the maximum local csv delay we will accept from our
// peer.
maxLocalCsv uint16
// TODO(roasbeef): add ability to specify channel constraints as well
// chanFunder is an optional channel funder that allows the caller to
// control exactly how the channel funding is carried out. If not
// specified, then the default chanfunding.WalletAssembler will be
// used.
chanFunder chanfunding.Assembler
// pendingChanID is not all zeroes (the default value), then this will
// be the pending channel ID used for the funding flow within the wire
// protocol.
pendingChanID [32]byte
updates chan *lnrpc.OpenStatusUpdate
err chan error
}
// ConnectToPeer requests that the server connect to a Lightning Network peer
// at the specified address. This function will *block* until either a
// connection is established, or the initial handshake process fails.
@ -3685,25 +3627,26 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
//
// NOTE: This function is safe for concurrent access.
func (s *server) OpenChannel(
req *openChanReq) (chan *lnrpc.OpenStatusUpdate, chan error) {
req *InitFundingMsg) (chan *lnrpc.OpenStatusUpdate, chan error) {
// The updateChan will have a buffer of 2, since we expect a ChanPending
// + a ChanOpen update, and we want to make sure the funding process is
// not blocked if the caller is not reading the updates.
req.updates = make(chan *lnrpc.OpenStatusUpdate, 2)
req.err = make(chan error, 1)
req.Updates = make(chan *lnrpc.OpenStatusUpdate, 2)
req.Err = make(chan error, 1)
// First attempt to locate the target peer to open a channel with, if
// we're unable to locate the peer then this request will fail.
pubKeyBytes := req.targetPubkey.SerializeCompressed()
pubKeyBytes := req.TargetPubkey.SerializeCompressed()
s.mu.RLock()
peer, ok := s.peersByPub[string(pubKeyBytes)]
if !ok {
s.mu.RUnlock()
req.err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
return req.updates, req.err
req.Err <- fmt.Errorf("peer %x is not online", pubKeyBytes)
return req.Updates, req.Err
}
req.Peer = peer
s.mu.RUnlock()
// We'll wait until the peer is active before beginning the channel
@ -3711,32 +3654,32 @@ func (s *server) OpenChannel(
select {
case <-peer.ActiveSignal():
case <-peer.QuitSignal():
req.err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
return req.updates, req.err
req.Err <- fmt.Errorf("peer %x disconnected", pubKeyBytes)
return req.Updates, req.Err
case <-s.quit:
req.err <- ErrServerShuttingDown
return req.updates, req.err
req.Err <- ErrServerShuttingDown
return req.Updates, req.Err
}
// If the fee rate wasn't specified, then we'll use a default
// confirmation target.
if req.fundingFeePerKw == 0 {
if req.FundingFeePerKw == 0 {
estimator := s.cc.FeeEstimator
feeRate, err := estimator.EstimateFeePerKW(6)
if err != nil {
req.err <- err
return req.updates, req.err
req.Err <- err
return req.Updates, req.Err
}
req.fundingFeePerKw = feeRate
req.FundingFeePerKw = feeRate
}
// Spawn a goroutine to send the funding workflow request to the funding
// manager. This allows the server to continue handling queries instead
// of blocking on this request which is exported as a synchronous
// request to the outside world.
go s.fundingMgr.InitFundingWorkflow(peer, req)
go s.fundingMgr.InitFundingWorkflow(req)
return req.updates, req.err
return req.Updates, req.Err
}
// Peers returns a slice of all active peers.