Merge pull request #4649 from Crypt-iQ/fmgr_abstract

multi: add fmgr.Manager interface, change usage in peer to use Manager
This commit is contained in:
Eugene 2020-10-02 08:21:15 -04:00 committed by GitHub
commit 10a84f2c75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 192 additions and 299 deletions

20
fmgr/interfaces.go Normal file

@ -0,0 +1,20 @@
package fmgr
import (
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// Manager is an interface that describes the basic operation of a funding
// manager. It should at a minimum process a subset of lnwire messages that
// are denoted as funding messages.
type Manager interface {
// ProcessFundingMsg processes a funding message represented by the
// lnwire.Message parameter along with the Peer object representing a
// connection to the counterparty.
ProcessFundingMsg(lnwire.Message, lnpeer.Peer)
// IsPendingChannel is used to determine whether to send an Error message
// to the funding manager or not.
IsPendingChannel([32]byte, lnpeer.Peer) bool
}

@ -165,53 +165,13 @@ type initFundingMsg struct {
*openChanReq *openChanReq
} }
// fundingOpenMsg couples an lnwire.OpenChannel message with the peer who sent // fundingMsg is sent by the ProcessFundingMsg function and packages a
// the message. This allows the funding manager to queue a response directly to // funding-specific lnwire.Message along with the lnpeer.Peer that sent it.
// the peer, progressing the funding workflow. type fundingMsg struct {
type fundingOpenMsg struct { msg lnwire.Message
msg *lnwire.OpenChannel
peer lnpeer.Peer peer lnpeer.Peer
} }
// fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingAcceptMsg struct {
msg *lnwire.AcceptChannel
peer lnpeer.Peer
}
// fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingCreatedMsg struct {
msg *lnwire.FundingCreated
peer lnpeer.Peer
}
// fundingSignedMsg couples an lnwire.FundingSigned message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingSignedMsg struct {
msg *lnwire.FundingSigned
peer lnpeer.Peer
}
// fundingLockedMsg couples an lnwire.FundingLocked message with the peer who
// sent the message. This allows the funding manager to finalize the funding
// process and announce the existence of the new channel.
type fundingLockedMsg struct {
msg *lnwire.FundingLocked
peer lnpeer.Peer
}
// fundingErrorMsg couples an lnwire.Error message with the peer who sent the
// message. This allows the funding manager to properly process the error.
type fundingErrorMsg struct {
err *lnwire.Error
peerKey *btcec.PublicKey
}
// pendingChannels is a map instantiated per-peer which tracks all active // pendingChannels is a map instantiated per-peer which tracks all active
// pending single funded channels indexed by their pending channel identifier, // pending single funded channels indexed by their pending channel identifier,
// which is a set of 32-bytes generated via a CSPRNG. // which is a set of 32-bytes generated via a CSPRNG.
@ -441,9 +401,9 @@ type fundingManager struct {
// goroutine safe. // goroutine safe.
resMtx sync.RWMutex resMtx sync.RWMutex
// fundingMsgs is a channel which receives wrapped wire messages // fundingMsgs is a channel that relays fundingMsg structs from
// related to funding workflow from outside peers. // external sub-systems using the ProcessFundingMsg call.
fundingMsgs chan interface{} fundingMsgs chan *fundingMsg
// queries is a channel which receives requests to query the internal // queries is a channel which receives requests to query the internal
// state of the funding manager. // state of the funding manager.
@ -515,7 +475,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
activeReservations: make(map[serializedPubKey]pendingChannels), activeReservations: make(map[serializedPubKey]pendingChannels),
signedReservations: make(map[lnwire.ChannelID][32]byte), signedReservations: make(map[lnwire.ChannelID][32]byte),
newChanBarriers: make(map[lnwire.ChannelID]chan struct{}), newChanBarriers: make(map[lnwire.ChannelID]chan struct{}),
fundingMsgs: make(chan interface{}, msgBufferSize), fundingMsgs: make(chan *fundingMsg, msgBufferSize),
fundingRequests: make(chan *initFundingMsg, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize),
localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}), localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}),
handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}), handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}),
@ -809,21 +769,21 @@ func (f *fundingManager) reservationCoordinator() {
for { for {
select { select {
case msg := <-f.fundingMsgs: case fmsg := <-f.fundingMsgs:
switch fmsg := msg.(type) { switch msg := fmsg.msg.(type) {
case *fundingOpenMsg: case *lnwire.OpenChannel:
f.handleFundingOpen(fmsg) f.handleFundingOpen(fmsg.peer, msg)
case *fundingAcceptMsg: case *lnwire.AcceptChannel:
f.handleFundingAccept(fmsg) f.handleFundingAccept(fmsg.peer, msg)
case *fundingCreatedMsg: case *lnwire.FundingCreated:
f.handleFundingCreated(fmsg) f.handleFundingCreated(fmsg.peer, msg)
case *fundingSignedMsg: case *lnwire.FundingSigned:
f.handleFundingSigned(fmsg) f.handleFundingSigned(fmsg.peer, msg)
case *fundingLockedMsg: case *lnwire.FundingLocked:
f.wg.Add(1) f.wg.Add(1)
go f.handleFundingLocked(fmsg) go f.handleFundingLocked(fmsg.peer, msg)
case *fundingErrorMsg: case *lnwire.Error:
f.handleErrorMsg(fmsg) f.handleErrorMsg(fmsg.peer, msg)
} }
case req := <-f.fundingRequests: case req := <-f.fundingRequests:
f.handleInitFundingMsg(req) f.handleInitFundingMsg(req)
@ -1152,13 +1112,11 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
msg.resp <- pendingChannels msg.resp <- pendingChannels
} }
// processFundingOpen sends a message to the fundingManager allowing it to // ProcessFundingMsg sends a message to the internal fundingManager goroutine,
// initiate the new funding workflow with the source peer. // allowing it to handle the lnwire.Message.
func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel, func (f *fundingManager) ProcessFundingMsg(msg lnwire.Message, peer lnpeer.Peer) {
peer lnpeer.Peer) {
select { select {
case f.fundingMsgs <- &fundingOpenMsg{msg, peer}: case f.fundingMsgs <- &fundingMsg{msg, peer}:
case <-f.quit: case <-f.quit:
return return
} }
@ -1206,14 +1164,15 @@ func commitmentType(localFeatures,
// //
// TODO(roasbeef): add error chan to all, let channelManager handle // TODO(roasbeef): add error chan to all, let channelManager handle
// error+propagate // error+propagate
func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { func (f *fundingManager) handleFundingOpen(peer lnpeer.Peer,
msg *lnwire.OpenChannel) {
// Check number of pending channels to be smaller than maximum allowed // Check number of pending channels to be smaller than maximum allowed
// number and send ErrorGeneric to remote peer if condition is // number and send ErrorGeneric to remote peer if condition is
// violated. // violated.
peerPubKey := fmsg.peer.IdentityKey() peerPubKey := peer.IdentityKey()
peerIDKey := newSerializedKey(peerPubKey) peerIDKey := newSerializedKey(peerPubKey)
msg := fmsg.msg
amt := msg.FundingAmount amt := msg.FundingAmount
// We get all pending channels for this peer. This is the list of the // We get all pending channels for this peer. This is the list of the
@ -1237,7 +1196,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
channels, err := f.cfg.Wallet.Cfg.Database.FetchOpenChannels(peerPubKey) channels, err := f.cfg.Wallet.Cfg.Database.FetchOpenChannels(peerPubKey)
if err != nil { if err != nil {
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, err, peer, msg.PendingChannelID, err,
) )
return return
} }
@ -1258,7 +1217,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// block unless white listed // block unless white listed
if numPending >= f.cfg.MaxPendingChannels { if numPending >= f.cfg.MaxPendingChannels {
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, peer, msg.PendingChannelID,
lnwire.ErrMaxPendingChannels, lnwire.ErrMaxPendingChannels,
) )
return return
@ -1273,7 +1232,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Errorf("unable to query wallet: %v", err) fndgLog.Errorf("unable to query wallet: %v", err)
} }
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, peer, msg.PendingChannelID,
lnwire.ErrSynchronizingChain, lnwire.ErrSynchronizingChain,
) )
return return
@ -1282,7 +1241,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// Ensure that the remote party respects our maximum channel size. // Ensure that the remote party respects our maximum channel size.
if amt > f.cfg.MaxChanSize { if amt > f.cfg.MaxChanSize {
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, peer, msg.PendingChannelID,
lnwallet.ErrChanTooLarge(amt, f.cfg.MaxChanSize), lnwallet.ErrChanTooLarge(amt, f.cfg.MaxChanSize),
) )
return return
@ -1292,7 +1251,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// a channel that's below our current min channel size. // a channel that's below our current min channel size.
if amt < f.cfg.MinChanSize { if amt < f.cfg.MinChanSize {
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, peer, msg.PendingChannelID,
lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)), lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)),
) )
return return
@ -1302,7 +1261,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// signal an error. // signal an error.
if f.cfg.RejectPush && msg.PushAmount > 0 { if f.cfg.RejectPush && msg.PushAmount > 0 {
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, peer, msg.PendingChannelID,
lnwallet.ErrNonZeroPushAmount(), lnwallet.ErrNonZeroPushAmount(),
) )
return return
@ -1311,13 +1270,13 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// Send the OpenChannel request to the ChannelAcceptor to determine whether // Send the OpenChannel request to the ChannelAcceptor to determine whether
// this node will accept the channel. // this node will accept the channel.
chanReq := &chanacceptor.ChannelAcceptRequest{ chanReq := &chanacceptor.ChannelAcceptRequest{
Node: fmsg.peer.IdentityKey(), Node: peer.IdentityKey(),
OpenChanMsg: fmsg.msg, OpenChanMsg: msg,
} }
if !f.cfg.OpenChannelPredicate.Accept(chanReq) { if !f.cfg.OpenChannelPredicate.Accept(chanReq) {
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, peer, msg.PendingChannelID,
fmt.Errorf("open channel request rejected"), fmt.Errorf("open channel request rejected"),
) )
return return
@ -1326,7 +1285,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+ fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+
"pendingId=%x) from peer(%x)", amt, msg.PushAmount, "pendingId=%x) from peer(%x)", amt, msg.PushAmount,
msg.CsvDelay, msg.PendingChannelID, msg.CsvDelay, msg.PendingChannelID,
fmsg.peer.IdentityKey().SerializeCompressed()) peer.IdentityKey().SerializeCompressed())
// Attempt to initialize a reservation within the wallet. If the wallet // Attempt to initialize a reservation within the wallet. If the wallet
// has insufficient resources to create the channel, then the // has insufficient resources to create the channel, then the
@ -1339,14 +1298,14 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// case if *both* us and the remote peer are signaling the proper // case if *both* us and the remote peer are signaling the proper
// feature bit. // feature bit.
commitType := commitmentType( commitType := commitmentType(
fmsg.peer.LocalFeatures(), fmsg.peer.RemoteFeatures(), peer.LocalFeatures(), peer.RemoteFeatures(),
) )
chainHash := chainhash.Hash(msg.ChainHash) chainHash := chainhash.Hash(msg.ChainHash)
req := &lnwallet.InitFundingReserveMsg{ req := &lnwallet.InitFundingReserveMsg{
ChainHash: &chainHash, ChainHash: &chainHash,
PendingChanID: msg.PendingChannelID, PendingChanID: msg.PendingChannelID,
NodeID: fmsg.peer.IdentityKey(), NodeID: peer.IdentityKey(),
NodeAddr: fmsg.peer.Address(), NodeAddr: peer.Address(),
LocalFundingAmt: 0, LocalFundingAmt: 0,
RemoteFundingAmt: amt, RemoteFundingAmt: amt,
CommitFeePerKw: chainfee.SatPerKWeight(msg.FeePerKiloWeight), CommitFeePerKw: chainfee.SatPerKWeight(msg.FeePerKiloWeight),
@ -1360,7 +1319,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
reservation, err := f.cfg.Wallet.InitChannelReservation(req) reservation, err := f.cfg.Wallet.InitChannelReservation(req)
if err != nil { if err != nil {
fndgLog.Errorf("Unable to initialize reservation: %v", err) fndgLog.Errorf("Unable to initialize reservation: %v", err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
@ -1385,7 +1344,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
err = reservation.CommitConstraints(channelConstraints) err = reservation.CommitConstraints(channelConstraints)
if err != nil { if err != nil {
fndgLog.Errorf("Unacceptable channel constraints: %v", err) fndgLog.Errorf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
@ -1394,7 +1353,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// A nil address is set in place of user input, because this channel open // A nil address is set in place of user input, because this channel open
// was not initiated by the user. // was not initiated by the user.
shutdown, err := getUpfrontShutdownScript( shutdown, err := getUpfrontShutdownScript(
f.cfg.EnableUpfrontShutdown, fmsg.peer, nil, f.cfg.EnableUpfrontShutdown, peer, nil,
func() (lnwire.DeliveryAddress, error) { func() (lnwire.DeliveryAddress, error) {
addr, err := f.cfg.Wallet.NewAddress(lnwallet.WitnessPubKey, false) addr, err := f.cfg.Wallet.NewAddress(lnwallet.WitnessPubKey, false)
if err != nil { if err != nil {
@ -1405,7 +1364,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
) )
if err != nil { if err != nil {
f.failFundingFlow( f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, peer, msg.PendingChannelID,
fmt.Errorf("getUpfrontShutdownScript error: %v", err), fmt.Errorf("getUpfrontShutdownScript error: %v", err),
) )
return return
@ -1414,7 +1373,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("Requiring %v confirmations for pendingChan(%x): "+ fndgLog.Infof("Requiring %v confirmations for pendingChan(%x): "+
"amt=%v, push_amt=%v, committype=%v, upfrontShutdown=%x", numConfsReq, "amt=%v, push_amt=%v, committype=%v, upfrontShutdown=%x", numConfsReq,
fmsg.msg.PendingChannelID, amt, msg.PushAmount, msg.PendingChannelID, amt, msg.PushAmount,
commitType, msg.UpfrontShutdownScript) commitType, msg.UpfrontShutdownScript)
// Generate our required constraints for the remote party. // Generate our required constraints for the remote party.
@ -1439,7 +1398,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
remoteMaxValue: remoteMaxValue, remoteMaxValue: remoteMaxValue,
remoteMaxHtlcs: maxHtlcs, remoteMaxHtlcs: maxHtlcs,
err: make(chan error, 1), err: make(chan error, 1),
peer: fmsg.peer, peer: peer,
} }
f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx
f.resMtx.Unlock() f.resMtx.Unlock()
@ -1482,7 +1441,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
err = reservation.ProcessSingleContribution(remoteContribution) err = reservation.ProcessSingleContribution(remoteContribution)
if err != nil { if err != nil {
fndgLog.Errorf("unable to add contribution reservation: %v", err) fndgLog.Errorf("unable to add contribution reservation: %v", err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
@ -1512,21 +1471,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
UpfrontShutdownScript: ourContribution.UpfrontShutdown, UpfrontShutdownScript: ourContribution.UpfrontShutdown,
} }
if err := fmsg.peer.SendMessage(true, &fundingAccept); err != nil { if err := peer.SendMessage(true, &fundingAccept); err != nil {
fndgLog.Errorf("unable to send funding response to peer: %v", err) fndgLog.Errorf("unable to send funding response to peer: %v", err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
}
// processFundingAccept sends a message to the fundingManager allowing it to
// continue the second phase of a funding workflow with the target peer.
func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}:
case <-f.quit:
return return
} }
} }
@ -1534,10 +1481,11 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
// handleFundingAccept processes a response to the workflow initiation sent by // handleFundingAccept processes a response to the workflow initiation sent by
// the remote peer. This message then queues a message with the funding // the remote peer. This message then queues a message with the funding
// outpoint, and a commitment signature to the remote peer. // outpoint, and a commitment signature to the remote peer.
func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { func (f *fundingManager) handleFundingAccept(peer lnpeer.Peer,
msg := fmsg.msg msg *lnwire.AcceptChannel) {
pendingChanID := fmsg.msg.PendingChannelID
peerKey := fmsg.peer.IdentityKey() pendingChanID := msg.PendingChannelID
peerKey := peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID) resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil { if err != nil {
@ -1560,7 +1508,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
msg.MinAcceptDepth, chainntnfs.MaxNumConfs, msg.MinAcceptDepth, chainntnfs.MaxNumConfs,
) )
fndgLog.Warnf("Unacceptable channel constraints: %v", err) fndgLog.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
@ -1579,7 +1527,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
err = resCtx.reservation.CommitConstraints(channelConstraints) err = resCtx.reservation.CommitConstraints(channelConstraints)
if err != nil { if err != nil {
fndgLog.Warnf("Unacceptable channel constraints: %v", err) fndgLog.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
@ -1635,7 +1583,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
if err != nil { if err != nil {
fndgLog.Errorf("Unable to process PSBT funding params "+ fndgLog.Errorf("Unable to process PSBT funding params "+
"for contribution from %v: %v", peerKey, err) "for contribution from %v: %v", peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
var buf bytes.Buffer var buf bytes.Buffer
@ -1643,7 +1591,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
if err != nil { if err != nil {
fndgLog.Errorf("Unable to serialize PSBT for "+ fndgLog.Errorf("Unable to serialize PSBT for "+
"contribution from %v: %v", peerKey, err) "contribution from %v: %v", peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
resCtx.updates <- &lnrpc.OpenStatusUpdate{ resCtx.updates <- &lnrpc.OpenStatusUpdate{
@ -1660,7 +1608,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
} else if err != nil { } else if err != nil {
fndgLog.Errorf("Unable to process contribution from %v: %v", fndgLog.Errorf("Unable to process contribution from %v: %v",
peerKey, err) peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) f.failFundingFlow(peer, msg.PendingChannelID, err)
return return
} }
@ -1813,25 +1761,15 @@ func (f *fundingManager) continueFundingAccept(resCtx *reservationWithCtx,
} }
} }
// processFundingCreated queues a funding complete message coupled with the
// source peer to the fundingManager.
func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}:
case <-f.quit:
return
}
}
// handleFundingCreated progresses the funding workflow when the daemon is on // handleFundingCreated progresses the funding workflow when the daemon is on
// the responding side of a single funder workflow. Once this message has been // the responding side of a single funder workflow. Once this message has been
// processed, a signature is sent to the remote peer allowing it to broadcast // processed, a signature is sent to the remote peer allowing it to broadcast
// the funding transaction, progressing the workflow into the final stage. // the funding transaction, progressing the workflow into the final stage.
func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { func (f *fundingManager) handleFundingCreated(peer lnpeer.Peer,
peerKey := fmsg.peer.IdentityKey() msg *lnwire.FundingCreated) {
pendingChanID := fmsg.msg.PendingChannelID
peerKey := peer.IdentityKey()
pendingChanID := msg.PendingChannelID
resCtx, err := f.getReservationCtx(peerKey, pendingChanID) resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil { if err != nil {
@ -1845,14 +1783,14 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// the commitment transaction. So at this point, we can validate the // the commitment transaction. So at this point, we can validate the
// initiator's commitment transaction, then send our own if it's valid. // initiator's commitment transaction, then send our own if it's valid.
// TODO(roasbeef): make case (p vs P) consistent throughout // TODO(roasbeef): make case (p vs P) consistent throughout
fundingOut := fmsg.msg.FundingPoint fundingOut := msg.FundingPoint
fndgLog.Infof("completing pending_id(%x) with ChannelPoint(%v)", fndgLog.Infof("completing pending_id(%x) with ChannelPoint(%v)",
pendingChanID[:], fundingOut) pendingChanID[:], fundingOut)
commitSig, err := fmsg.msg.CommitSig.ToSignature() commitSig, err := msg.CommitSig.ToSignature()
if err != nil { if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err) fndgLog.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err) f.failFundingFlow(peer, pendingChanID, err)
return return
} }
@ -1867,13 +1805,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
if err != nil { if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc. // TODO(roasbeef): better error logging: peerID, channelID, etc.
fndgLog.Errorf("unable to complete single reservation: %v", err) fndgLog.Errorf("unable to complete single reservation: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err) f.failFundingFlow(peer, pendingChanID, err)
return return
} }
// The channel is marked IsPending in the database, and can be removed // The channel is marked IsPending in the database, and can be removed
// from the set of active reservations. // from the set of active reservations.
f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID) f.deleteReservationCtx(peerKey, msg.PendingChannelID)
// If something goes wrong before the funding transaction is confirmed, // If something goes wrong before the funding transaction is confirmed,
// we use this convenience method to delete the pending OpenChannel // we use this convenience method to delete the pending OpenChannel
@ -1921,7 +1859,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ourCommitSig, err := lnwire.NewSigFromSignature(sig) ourCommitSig, err := lnwire.NewSigFromSignature(sig)
if err != nil { if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err) fndgLog.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err) f.failFundingFlow(peer, pendingChanID, err)
deleteFromDatabase() deleteFromDatabase()
return return
} }
@ -1930,9 +1868,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ChanID: channelID, ChanID: channelID,
CommitSig: ourCommitSig, CommitSig: ourCommitSig,
} }
if err := fmsg.peer.SendMessage(true, fundingSigned); err != nil { if err := peer.SendMessage(true, fundingSigned); err != nil {
fndgLog.Errorf("unable to send FundingSigned message: %v", err) fndgLog.Errorf("unable to send FundingSigned message: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err) f.failFundingFlow(peer, pendingChanID, err)
deleteFromDatabase() deleteFromDatabase()
return return
} }
@ -1976,46 +1914,36 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
go f.advanceFundingState(completeChan, pendingChanID, nil) go f.advanceFundingState(completeChan, pendingChanID, nil)
} }
// processFundingSigned sends a single funding sign complete message along with
// the source peer to the funding manager.
func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingSignedMsg{msg, peer}:
case <-f.quit:
return
}
}
// handleFundingSigned processes the final message received in a single funder // handleFundingSigned processes the final message received in a single funder
// workflow. Once this message is processed, the funding transaction is // workflow. Once this message is processed, the funding transaction is
// broadcast. Once the funding transaction reaches a sufficient number of // broadcast. Once the funding transaction reaches a sufficient number of
// confirmations, a message is sent to the responding peer along with a compact // confirmations, a message is sent to the responding peer along with a compact
// encoding of the location of the channel within the blockchain. // encoding of the location of the channel within the blockchain.
func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { func (f *fundingManager) handleFundingSigned(peer lnpeer.Peer,
msg *lnwire.FundingSigned) {
// As the funding signed message will reference the reservation by its // As the funding signed message will reference the reservation by its
// permanent channel ID, we'll need to perform an intermediate look up // permanent channel ID, we'll need to perform an intermediate look up
// before we can obtain the reservation. // before we can obtain the reservation.
f.resMtx.Lock() f.resMtx.Lock()
pendingChanID, ok := f.signedReservations[fmsg.msg.ChanID] pendingChanID, ok := f.signedReservations[msg.ChanID]
delete(f.signedReservations, fmsg.msg.ChanID) delete(f.signedReservations, msg.ChanID)
f.resMtx.Unlock() f.resMtx.Unlock()
if !ok { if !ok {
err := fmt.Errorf("unable to find signed reservation for "+ err := fmt.Errorf("unable to find signed reservation for "+
"chan_id=%x", fmsg.msg.ChanID) "chan_id=%x", msg.ChanID)
fndgLog.Warnf(err.Error()) fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err) f.failFundingFlow(peer, msg.ChanID, err)
return return
} }
peerKey := fmsg.peer.IdentityKey() peerKey := peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID) resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil { if err != nil {
fndgLog.Warnf("Unable to find reservation (peer_id:%v, "+ fndgLog.Warnf("Unable to find reservation (peer_id:%v, "+
"chan_id:%x)", peerKey, pendingChanID[:]) "chan_id:%x)", peerKey, pendingChanID[:])
// TODO: add ErrChanNotFound? // TODO: add ErrChanNotFound?
f.failFundingFlow(fmsg.peer, pendingChanID, err) f.failFundingFlow(peer, pendingChanID, err)
return return
} }
@ -2031,10 +1959,10 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// The remote peer has responded with a signature for our commitment // The remote peer has responded with a signature for our commitment
// transaction. We'll verify the signature for validity, then commit // transaction. We'll verify the signature for validity, then commit
// the state to disk as we can now open the channel. // the state to disk as we can now open the channel.
commitSig, err := fmsg.msg.CommitSig.ToSignature() commitSig, err := msg.CommitSig.ToSignature()
if err != nil { if err != nil {
fndgLog.Errorf("Unable to parse signature: %v", err) fndgLog.Errorf("Unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err) f.failFundingFlow(peer, pendingChanID, err)
return return
} }
@ -2044,7 +1972,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
if err != nil { if err != nil {
fndgLog.Errorf("Unable to complete reservation sign "+ fndgLog.Errorf("Unable to complete reservation sign "+
"complete: %v", err) "complete: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err) f.failFundingFlow(peer, pendingChanID, err)
return return
} }
@ -2719,50 +2647,40 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
return nil return nil
} }
// processFundingLocked sends a message to the fundingManager allowing it to
// finish the funding workflow.
func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingLockedMsg{msg, peer}:
case <-f.quit:
return
}
}
// handleFundingLocked finalizes the channel funding process and enables the // handleFundingLocked finalizes the channel funding process and enables the
// channel to enter normal operating mode. // channel to enter normal operating mode.
func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { func (f *fundingManager) handleFundingLocked(peer lnpeer.Peer,
msg *lnwire.FundingLocked) {
defer f.wg.Done() defer f.wg.Done()
fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+ fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+
"peer %x", fmsg.msg.ChanID, "peer %x", msg.ChanID,
fmsg.peer.IdentityKey().SerializeCompressed()) peer.IdentityKey().SerializeCompressed())
// If we are currently in the process of handling a funding locked // If we are currently in the process of handling a funding locked
// message for this channel, ignore. // message for this channel, ignore.
f.handleFundingLockedMtx.Lock() f.handleFundingLockedMtx.Lock()
_, ok := f.handleFundingLockedBarriers[fmsg.msg.ChanID] _, ok := f.handleFundingLockedBarriers[msg.ChanID]
if ok { if ok {
fndgLog.Infof("Already handling fundingLocked for "+ fndgLog.Infof("Already handling fundingLocked for "+
"ChannelID(%v), ignoring.", fmsg.msg.ChanID) "ChannelID(%v), ignoring.", msg.ChanID)
f.handleFundingLockedMtx.Unlock() f.handleFundingLockedMtx.Unlock()
return return
} }
// If not already handling fundingLocked for this channel, set up // If not already handling fundingLocked for this channel, set up
// barrier, and move on. // barrier, and move on.
f.handleFundingLockedBarriers[fmsg.msg.ChanID] = struct{}{} f.handleFundingLockedBarriers[msg.ChanID] = struct{}{}
f.handleFundingLockedMtx.Unlock() f.handleFundingLockedMtx.Unlock()
defer func() { defer func() {
f.handleFundingLockedMtx.Lock() f.handleFundingLockedMtx.Lock()
delete(f.handleFundingLockedBarriers, fmsg.msg.ChanID) delete(f.handleFundingLockedBarriers, msg.ChanID)
f.handleFundingLockedMtx.Unlock() f.handleFundingLockedMtx.Unlock()
}() }()
f.localDiscoveryMtx.Lock() f.localDiscoveryMtx.Lock()
localDiscoverySignal, ok := f.localDiscoverySignals[fmsg.msg.ChanID] localDiscoverySignal, ok := f.localDiscoverySignals[msg.ChanID]
f.localDiscoveryMtx.Unlock() f.localDiscoveryMtx.Unlock()
if ok { if ok {
@ -2781,14 +2699,14 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// With the signal received, we can now safely delete the entry // With the signal received, we can now safely delete the entry
// from the map. // from the map.
f.localDiscoveryMtx.Lock() f.localDiscoveryMtx.Lock()
delete(f.localDiscoverySignals, fmsg.msg.ChanID) delete(f.localDiscoverySignals, msg.ChanID)
f.localDiscoveryMtx.Unlock() f.localDiscoveryMtx.Unlock()
} }
// First, we'll attempt to locate the channel whose funding workflow is // First, we'll attempt to locate the channel whose funding workflow is
// being finalized by this message. We go to the database rather than // being finalized by this message. We go to the database rather than
// our reservation map as we may have restarted, mid funding flow. // our reservation map as we may have restarted, mid funding flow.
chanID := fmsg.msg.ChanID chanID := msg.ChanID
channel, err := f.cfg.FindChannel(chanID) channel, err := f.cfg.FindChannel(chanID)
if err != nil { if err != nil {
fndgLog.Errorf("Unable to locate ChannelID(%v), cannot complete "+ fndgLog.Errorf("Unable to locate ChannelID(%v), cannot complete "+
@ -2808,7 +2726,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// need to create the next commitment state for the remote party. So // need to create the next commitment state for the remote party. So
// we'll insert that into the channel now before passing it along to // we'll insert that into the channel now before passing it along to
// other sub-systems. // other sub-systems.
err = channel.InsertNextRevocation(fmsg.msg.NextPerCommitmentPoint) err = channel.InsertNextRevocation(msg.NextPerCommitmentPoint)
if err != nil { if err != nil {
fndgLog.Errorf("unable to insert next commitment point: %v", err) fndgLog.Errorf("unable to insert next commitment point: %v", err)
return return
@ -2831,11 +2749,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
f.barrierMtx.Unlock() f.barrierMtx.Unlock()
}() }()
if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil { if err := peer.AddNewChannel(channel, f.quit); err != nil {
fndgLog.Errorf("Unable to add new channel %v with peer %x: %v", fndgLog.Errorf("Unable to add new channel %v with peer %x: %v",
channel.FundingOutpoint, channel.FundingOutpoint,
fmsg.peer.IdentityKey().SerializeCompressed(), peer.IdentityKey().SerializeCompressed(), err,
err) )
} }
} }
@ -3355,40 +3273,29 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
} }
} }
// processFundingError sends a message to the fundingManager allowing it to
// process the occurred generic error.
func (f *fundingManager) processFundingError(err *lnwire.Error,
peerKey *btcec.PublicKey) {
select {
case f.fundingMsgs <- &fundingErrorMsg{err, peerKey}:
case <-f.quit:
return
}
}
// handleErrorMsg processes the error which was received from remote peer, // handleErrorMsg processes the error which was received from remote peer,
// depending on the type of error we should do different clean up steps and // depending on the type of error we should do different clean up steps and
// inform the user about it. // inform the user about it.
func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { func (f *fundingManager) handleErrorMsg(peer lnpeer.Peer,
protocolErr := fmsg.err msg *lnwire.Error) {
chanID := fmsg.err.ChanID chanID := msg.ChanID
peerKey := peer.IdentityKey()
// First, we'll attempt to retrieve and cancel the funding workflow // First, we'll attempt to retrieve and cancel the funding workflow
// that this error was tied to. If we're unable to do so, then we'll // that this error was tied to. If we're unable to do so, then we'll
// exit early as this was an unwarranted error. // exit early as this was an unwarranted error.
resCtx, err := f.cancelReservationCtx(fmsg.peerKey, chanID, true) resCtx, err := f.cancelReservationCtx(peerKey, chanID, true)
if err != nil { if err != nil {
fndgLog.Warnf("Received error for non-existent funding "+ fndgLog.Warnf("Received error for non-existent funding "+
"flow: %v (%v)", err, protocolErr.Error()) "flow: %v (%v)", err, msg.Error())
return return
} }
// If we did indeed find the funding workflow, then we'll return the // If we did indeed find the funding workflow, then we'll return the
// error back to the caller (if any), and cancel the workflow itself. // error back to the caller (if any), and cancel the workflow itself.
fundingErr := fmt.Errorf("received funding error from %x: %v", fundingErr := fmt.Errorf("received funding error from %x: %v",
fmsg.peerKey.SerializeCompressed(), protocolErr.Error(), peerKey.SerializeCompressed(), msg.Error(),
) )
fndgLog.Errorf(fundingErr.Error()) fndgLog.Errorf(fundingErr.Error())
@ -3536,9 +3443,9 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey,
// channel will receive a new, permanent channel ID, and will no longer be // channel will receive a new, permanent channel ID, and will no longer be
// considered pending. // considered pending.
func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte, func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte,
peerKey *btcec.PublicKey) bool { peer lnpeer.Peer) bool {
peerIDKey := newSerializedKey(peerKey) peerIDKey := newSerializedKey(peer.IdentityKey())
f.resMtx.RLock() f.resMtx.RLock()
_, ok := f.activeReservations[peerIDKey][pendingChanID] _, ok := f.activeReservations[peerIDKey][pendingChanID]
f.resMtx.RUnlock() f.resMtx.RUnlock()

@ -691,7 +691,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
} }
// Let Bob handle the init message. // Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice) bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message. // Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent( acceptChannelResponse := assertFundingMsgSent(
@ -704,7 +704,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
assertNumPendingReservations(t, bob, alicePubKey, 1) assertNumPendingReservations(t, bob, alicePubKey, 1)
// Forward the response to Alice. // Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice responds with a FundingCreated message. // Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent( fundingCreated := assertFundingMsgSent(
@ -712,7 +712,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
).(*lnwire.FundingCreated) ).(*lnwire.FundingCreated)
// Give the message to Bob. // Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, alice) bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message. // Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent( fundingSigned := assertFundingMsgSent(
@ -720,7 +720,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
).(*lnwire.FundingSigned) ).(*lnwire.FundingSigned)
// Forward the signature to Alice. // Forward the signature to Alice.
alice.fundingMgr.processFundingSigned(fundingSigned, bob) alice.fundingMgr.ProcessFundingMsg(fundingSigned, bob)
// After Alice processes the singleFundingSignComplete message, she will // After Alice processes the singleFundingSignComplete message, she will
// broadcast the funding transaction to the network. We expect to get a // broadcast the funding transaction to the network. We expect to get a
@ -1250,8 +1250,8 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
waitForOpenUpdate(t, updateChan) waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages. // Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -1378,8 +1378,8 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
} }
// Exchange the fundingLocked messages. // Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -1540,8 +1540,8 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
waitForOpenUpdate(t, updateChan) waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages. // Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -1678,7 +1678,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) {
assertNumPendingReservations(t, alice, bobPubKey, 1) assertNumPendingReservations(t, alice, bobPubKey, 1)
// Let Bob handle the init message. // Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice) bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel. // Bob should answer with an AcceptChannel.
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
@ -1749,7 +1749,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
assertNumPendingReservations(t, alice, bobPubKey, 1) assertNumPendingReservations(t, alice, bobPubKey, 1)
// Let Bob handle the init message. // Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice) bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel. // Bob should answer with an AcceptChannel.
acceptChannelResponse := assertFundingMsgSent( acceptChannelResponse := assertFundingMsgSent(
@ -1760,7 +1760,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
assertNumPendingReservations(t, bob, alicePubKey, 1) assertNumPendingReservations(t, bob, alicePubKey, 1)
// Forward the response to Alice. // Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice responds with a FundingCreated messages. // Alice responds with a FundingCreated messages.
assertFundingMsgSent(t, alice.msgChan, "FundingCreated") assertFundingMsgSent(t, alice.msgChan, "FundingCreated")
@ -1948,9 +1948,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
waitForOpenUpdate(t, updateChan) waitForOpenUpdate(t, updateChan)
// Send the fundingLocked message twice to Alice, and once to Bob. // Send the fundingLocked message twice to Alice, and once to Bob.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -1967,7 +1967,7 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
// Another fundingLocked should also be ignored, since Alice should // Another fundingLocked should also be ignored, since Alice should
// have updated her database at this point. // have updated her database at this point.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
select { select {
case <-alice.newChannels: case <-alice.newChannels:
t.Fatalf("alice sent new channel to peer a second time") t.Fatalf("alice sent new channel to peer a second time")
@ -2056,8 +2056,8 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
recreateAliceFundingManager(t, alice) recreateAliceFundingManager(t, alice)
// Exchange the fundingLocked messages. // Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -2128,10 +2128,10 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
assertFundingLockedSent(t, alice, bob, fundingOutPoint) assertFundingLockedSent(t, alice, bob, fundingOutPoint)
// Let Alice immediately get the fundingLocked message. // Let Alice immediately get the fundingLocked message.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
// Also let Bob get the fundingLocked message. // Also let Bob get the fundingLocked message.
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -2220,8 +2220,8 @@ func TestFundingManagerPrivateChannel(t *testing.T) {
waitForOpenUpdate(t, updateChan) waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages. // Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -2338,8 +2338,8 @@ func TestFundingManagerPrivateRestart(t *testing.T) {
waitForOpenUpdate(t, updateChan) waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages. // Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new // Check that they notify the breach arbiter and peer about the new
// channel. // channel.
@ -2493,7 +2493,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
chanID := openChannelReq.PendingChannelID chanID := openChannelReq.PendingChannelID
// Let Bob handle the init message. // Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice) bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message. // Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent( acceptChannelResponse := assertFundingMsgSent(
@ -2521,7 +2521,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
} }
// Forward the response to Alice. // Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice responds with a FundingCreated message. // Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent( fundingCreated := assertFundingMsgSent(
@ -2632,7 +2632,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Give the message to Bob. // Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, alice) bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message. // Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent( fundingSigned := assertFundingMsgSent(
@ -2640,7 +2640,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
).(*lnwire.FundingSigned) ).(*lnwire.FundingSigned)
// Forward the signature to Alice. // Forward the signature to Alice.
alice.fundingMgr.processFundingSigned(fundingSigned, bob) alice.fundingMgr.ProcessFundingMsg(fundingSigned, bob)
// After Alice processes the singleFundingSignComplete message, she will // After Alice processes the singleFundingSignComplete message, she will
// broadcast the funding transaction to the network. We expect to get a // broadcast the funding transaction to the network. We expect to get a
@ -2761,7 +2761,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
} }
// Let Bob handle the init message. // Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice) bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message for the // Bob should answer with an AcceptChannel message for the
// first maxPending channels. // first maxPending channels.
@ -2784,7 +2784,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Forward the responses to Alice. // Forward the responses to Alice.
var signs []*lnwire.FundingSigned var signs []*lnwire.FundingSigned
for _, accept := range accepts { for _, accept := range accepts {
alice.fundingMgr.processFundingAccept(accept, bob) alice.fundingMgr.ProcessFundingMsg(accept, bob)
// Alice responds with a FundingCreated message. // Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent( fundingCreated := assertFundingMsgSent(
@ -2792,7 +2792,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
).(*lnwire.FundingCreated) ).(*lnwire.FundingCreated)
// Give the message to Bob. // Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, alice) bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message. // Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent( fundingSigned := assertFundingMsgSent(
@ -2804,7 +2804,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Sending another init request from Alice should still make Bob // Sending another init request from Alice should still make Bob
// respond with an error. // respond with an error.
bob.fundingMgr.processFundingOpen(lastOpen, alice) bob.fundingMgr.ProcessFundingMsg(lastOpen, alice)
_ = assertFundingMsgSent( _ = assertFundingMsgSent(
t, bob.msgChan, "Error", t, bob.msgChan, "Error",
).(*lnwire.Error) ).(*lnwire.Error)
@ -2812,7 +2812,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Give the FundingSigned messages to Alice. // Give the FundingSigned messages to Alice.
var txs []*wire.MsgTx var txs []*wire.MsgTx
for i, sign := range signs { for i, sign := range signs {
alice.fundingMgr.processFundingSigned(sign, bob) alice.fundingMgr.ProcessFundingMsg(sign, bob)
// Alice should send a status update for each channel, and // Alice should send a status update for each channel, and
// publish a funding tx to the network. // publish a funding tx to the network.
@ -2840,7 +2840,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Sending another init request from Alice should still make Bob // Sending another init request from Alice should still make Bob
// respond with an error, since the funding transactions are not // respond with an error, since the funding transactions are not
// confirmed yet, // confirmed yet,
bob.fundingMgr.processFundingOpen(lastOpen, alice) bob.fundingMgr.ProcessFundingMsg(lastOpen, alice)
_ = assertFundingMsgSent( _ = assertFundingMsgSent(
t, bob.msgChan, "Error", t, bob.msgChan, "Error",
).(*lnwire.Error) ).(*lnwire.Error)
@ -2866,7 +2866,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
} }
// Now opening another channel should work. // Now opening another channel should work.
bob.fundingMgr.processFundingOpen(lastOpen, alice) bob.fundingMgr.ProcessFundingMsg(lastOpen, alice)
// Bob should answer with an AcceptChannel message. // Bob should answer with an AcceptChannel message.
_ = assertFundingMsgSent( _ = assertFundingMsgSent(
@ -2925,7 +2925,7 @@ func TestFundingManagerRejectPush(t *testing.T) {
} }
// Let Bob handle the init message. // Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice) bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Assert Bob responded with an ErrNonZeroPushAmount error. // Assert Bob responded with an ErrNonZeroPushAmount error.
err := assertFundingMsgSent(t, bob.msgChan, "Error").(*lnwire.Error) err := assertFundingMsgSent(t, bob.msgChan, "Error").(*lnwire.Error)
@ -2982,7 +2982,7 @@ func TestFundingManagerMaxConfs(t *testing.T) {
} }
// Let Bob handle the init message. // Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice) bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message. // Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent( acceptChannelResponse := assertFundingMsgSent(
@ -2993,7 +2993,7 @@ func TestFundingManagerMaxConfs(t *testing.T) {
// MinAcceptDepth Alice won't be willing to accept. // MinAcceptDepth Alice won't be willing to accept.
acceptChannelResponse.MinAcceptDepth = chainntnfs.MaxNumConfs + 1 acceptChannelResponse.MinAcceptDepth = chainntnfs.MaxNumConfs + 1
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice should respond back with an error indicating MinAcceptDepth is // Alice should respond back with an error indicating MinAcceptDepth is
// too large. // too large.
@ -3243,7 +3243,7 @@ func TestMaxChannelSizeConfig(t *testing.T) {
// an error rejecting the channel that exceeds size limit. // an error rejecting the channel that exceeds size limit.
alice.fundingMgr.initFundingWorkflow(bob, initReq) alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg := expectOpenChannelMsg(t, alice.msgChan) openChanMsg := expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice) bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan) assertErrorSent(t, bob.msgChan)
// Create a set of funding managers that will reject wumbo // Create a set of funding managers that will reject wumbo
@ -3258,7 +3258,7 @@ func TestMaxChannelSizeConfig(t *testing.T) {
// We expect Bob to respond with an Accept channel message. // We expect Bob to respond with an Accept channel message.
alice.fundingMgr.initFundingWorkflow(bob, initReq) alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan) openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice) bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
// Verify that wumbo accepting funding managers will respect --maxchansize // Verify that wumbo accepting funding managers will respect --maxchansize
@ -3278,7 +3278,7 @@ func TestMaxChannelSizeConfig(t *testing.T) {
// an error rejecting the channel that exceeds size limit. // an error rejecting the channel that exceeds size limit.
alice.fundingMgr.initFundingWorkflow(bob, initReq) alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan) openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice) bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan) assertErrorSent(t, bob.msgChan)
} }
@ -3311,7 +3311,7 @@ func TestWumboChannelConfig(t *testing.T) {
// We expect Bob to respond with an Accept channel message. // We expect Bob to respond with an Accept channel message.
alice.fundingMgr.initFundingWorkflow(bob, initReq) alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg := expectOpenChannelMsg(t, alice.msgChan) openChanMsg := expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice) bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
// We'll now attempt to create a channel above the wumbo mark, which // We'll now attempt to create a channel above the wumbo mark, which
@ -3322,7 +3322,7 @@ func TestWumboChannelConfig(t *testing.T) {
// an error rejecting the channel. // an error rejecting the channel.
alice.fundingMgr.initFundingWorkflow(bob, initReq) alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan) openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice) bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan) assertErrorSent(t, bob.msgChan)
// Next, we'll re-create the funding managers, but this time allowing // Next, we'll re-create the funding managers, but this time allowing
@ -3337,6 +3337,6 @@ func TestWumboChannelConfig(t *testing.T) {
// issues. // issues.
alice.fundingMgr.initFundingWorkflow(bob, initReq) alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan) openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice) bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
} }

@ -24,6 +24,7 @@ import (
"github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/feature" "github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/fmgr"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/htlcswitch/hop"
@ -268,33 +269,8 @@ type Config struct {
FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate,
error) error)
// ProcessFundingOpen is used to hand off an OpenChannel message to the // FundingManager is an implementation of the fmgr.Manager interface.
// funding manager. FundingManager fmgr.Manager
ProcessFundingOpen func(*lnwire.OpenChannel, lnpeer.Peer)
// ProcessFundingAccept is used to hand off an AcceptChannel message to the
// funding manager.
ProcessFundingAccept func(*lnwire.AcceptChannel, lnpeer.Peer)
// ProcessFundingCreated is used to hand off a FundingCreated message to
// the funding manager.
ProcessFundingCreated func(*lnwire.FundingCreated, lnpeer.Peer)
// ProcessFundingSigned is used to hand off a FundingSigned message to the
// funding manager.
ProcessFundingSigned func(*lnwire.FundingSigned, lnpeer.Peer)
// ProcessFundingLocked is used to hand off a FundingLocked message to the
// funding manager.
ProcessFundingLocked func(*lnwire.FundingLocked, lnpeer.Peer)
// ProcessFundingError is used to hand off an Error message to the funding
// manager.
ProcessFundingError func(*lnwire.Error, *btcec.PublicKey)
// IsPendingChannel is used to determine whether to send an Error message
// to the funding manager or not.
IsPendingChannel func([32]byte, *btcec.PublicKey) bool
// Hodl is used when creating ChannelLinks to specify HodlFlags as // Hodl is used when creating ChannelLinks to specify HodlFlags as
// breakpoints in dev builds. // breakpoints in dev builds.
@ -1336,16 +1312,13 @@ out:
pongBytes := make([]byte, msg.NumPongBytes) pongBytes := make([]byte, msg.NumPongBytes)
p.queueMsg(lnwire.NewPong(pongBytes), nil) p.queueMsg(lnwire.NewPong(pongBytes), nil)
case *lnwire.OpenChannel: case *lnwire.OpenChannel,
p.cfg.ProcessFundingOpen(msg, p) *lnwire.AcceptChannel,
case *lnwire.AcceptChannel: *lnwire.FundingCreated,
p.cfg.ProcessFundingAccept(msg, p) *lnwire.FundingSigned,
case *lnwire.FundingCreated: *lnwire.FundingLocked:
p.cfg.ProcessFundingCreated(msg, p)
case *lnwire.FundingSigned: p.cfg.FundingManager.ProcessFundingMsg(msg, p)
p.cfg.ProcessFundingSigned(msg, p)
case *lnwire.FundingLocked:
p.cfg.ProcessFundingLocked(msg, p)
case *lnwire.Shutdown: case *lnwire.Shutdown:
select { select {
@ -1483,8 +1456,6 @@ func (p *Brontide) storeError(err error) {
// //
// NOTE: This method should only be called from within the readHandler. // NOTE: This method should only be called from within the readHandler.
func (p *Brontide) handleError(msg *lnwire.Error) bool { func (p *Brontide) handleError(msg *lnwire.Error) bool {
key := p.cfg.Addr.IdentityKey
// Store the error we have received. // Store the error we have received.
p.storeError(msg) p.storeError(msg)
@ -1500,8 +1471,8 @@ func (p *Brontide) handleError(msg *lnwire.Error) bool {
// If the channel ID for the error message corresponds to a pending // If the channel ID for the error message corresponds to a pending
// channel, then the funding manager will handle the error. // channel, then the funding manager will handle the error.
case p.cfg.IsPendingChannel(msg.ChanID, key): case p.cfg.FundingManager.IsPendingChannel(msg.ChanID, p):
p.cfg.ProcessFundingError(msg, key) p.cfg.FundingManager.ProcessFundingMsg(msg, p)
return false return false
// If not we hand the error to the channel link for this channel. // If not we hand the error to the channel link for this channel.

@ -2982,14 +2982,9 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
PrunePersistentPeerConnection: s.prunePersistentPeerConnection, PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
FetchLastChanUpdate: s.fetchLastChanUpdate(), FetchLastChanUpdate: s.fetchLastChanUpdate(),
ProcessFundingOpen: s.fundingMgr.processFundingOpen,
ProcessFundingAccept: s.fundingMgr.processFundingAccept, FundingManager: s.fundingMgr,
ProcessFundingCreated: s.fundingMgr.processFundingCreated,
ProcessFundingSigned: s.fundingMgr.processFundingSigned,
ProcessFundingLocked: s.fundingMgr.processFundingLocked,
ProcessFundingError: s.fundingMgr.processFundingError,
IsPendingChannel: s.fundingMgr.IsPendingChannel,
Hodl: s.cfg.Hodl, Hodl: s.cfg.Hodl,
UnsafeReplay: s.cfg.UnsafeReplay, UnsafeReplay: s.cfg.UnsafeReplay,