multi: add fmgr.Manager interface, change usage in peer to use Manager

This gets rid of several config functions in the peer and should pave
the way for easier testing of the funding manager and peer.
This commit is contained in:
eugene 2020-10-01 10:27:13 -04:00
parent 0a8f06651a
commit e0859e121f
5 changed files with 192 additions and 299 deletions

20
fmgr/interfaces.go Normal file
View File

@ -0,0 +1,20 @@
package fmgr
import (
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwire"
)
// Manager is an interface that describes the basic operation of a funding
// manager. It should at a minimum process a subset of lnwire messages that
// are denoted as funding messages.
type Manager interface {
// ProcessFundingMsg processes a funding message represented by the
// lnwire.Message parameter along with the Peer object representing a
// connection to the counterparty.
ProcessFundingMsg(lnwire.Message, lnpeer.Peer)
// IsPendingChannel is used to determine whether to send an Error message
// to the funding manager or not.
IsPendingChannel([32]byte, lnpeer.Peer) bool
}

View File

@ -165,53 +165,13 @@ type initFundingMsg struct {
*openChanReq
}
// fundingOpenMsg couples an lnwire.OpenChannel message with the peer who sent
// the message. This allows the funding manager to queue a response directly to
// the peer, progressing the funding workflow.
type fundingOpenMsg struct {
msg *lnwire.OpenChannel
// fundingMsg is sent by the ProcessFundingMsg function and packages a
// funding-specific lnwire.Message along with the lnpeer.Peer that sent it.
type fundingMsg struct {
msg lnwire.Message
peer lnpeer.Peer
}
// fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingAcceptMsg struct {
msg *lnwire.AcceptChannel
peer lnpeer.Peer
}
// fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingCreatedMsg struct {
msg *lnwire.FundingCreated
peer lnpeer.Peer
}
// fundingSignedMsg couples an lnwire.FundingSigned message with the peer who
// sent the message. This allows the funding manager to queue a response
// directly to the peer, progressing the funding workflow.
type fundingSignedMsg struct {
msg *lnwire.FundingSigned
peer lnpeer.Peer
}
// fundingLockedMsg couples an lnwire.FundingLocked message with the peer who
// sent the message. This allows the funding manager to finalize the funding
// process and announce the existence of the new channel.
type fundingLockedMsg struct {
msg *lnwire.FundingLocked
peer lnpeer.Peer
}
// fundingErrorMsg couples an lnwire.Error message with the peer who sent the
// message. This allows the funding manager to properly process the error.
type fundingErrorMsg struct {
err *lnwire.Error
peerKey *btcec.PublicKey
}
// pendingChannels is a map instantiated per-peer which tracks all active
// pending single funded channels indexed by their pending channel identifier,
// which is a set of 32-bytes generated via a CSPRNG.
@ -441,9 +401,9 @@ type fundingManager struct {
// goroutine safe.
resMtx sync.RWMutex
// fundingMsgs is a channel which receives wrapped wire messages
// related to funding workflow from outside peers.
fundingMsgs chan interface{}
// fundingMsgs is a channel that relays fundingMsg structs from
// external sub-systems using the ProcessFundingMsg call.
fundingMsgs chan *fundingMsg
// queries is a channel which receives requests to query the internal
// state of the funding manager.
@ -515,7 +475,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
activeReservations: make(map[serializedPubKey]pendingChannels),
signedReservations: make(map[lnwire.ChannelID][32]byte),
newChanBarriers: make(map[lnwire.ChannelID]chan struct{}),
fundingMsgs: make(chan interface{}, msgBufferSize),
fundingMsgs: make(chan *fundingMsg, msgBufferSize),
fundingRequests: make(chan *initFundingMsg, msgBufferSize),
localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}),
handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}),
@ -809,21 +769,21 @@ func (f *fundingManager) reservationCoordinator() {
for {
select {
case msg := <-f.fundingMsgs:
switch fmsg := msg.(type) {
case *fundingOpenMsg:
f.handleFundingOpen(fmsg)
case *fundingAcceptMsg:
f.handleFundingAccept(fmsg)
case *fundingCreatedMsg:
f.handleFundingCreated(fmsg)
case *fundingSignedMsg:
f.handleFundingSigned(fmsg)
case *fundingLockedMsg:
case fmsg := <-f.fundingMsgs:
switch msg := fmsg.msg.(type) {
case *lnwire.OpenChannel:
f.handleFundingOpen(fmsg.peer, msg)
case *lnwire.AcceptChannel:
f.handleFundingAccept(fmsg.peer, msg)
case *lnwire.FundingCreated:
f.handleFundingCreated(fmsg.peer, msg)
case *lnwire.FundingSigned:
f.handleFundingSigned(fmsg.peer, msg)
case *lnwire.FundingLocked:
f.wg.Add(1)
go f.handleFundingLocked(fmsg)
case *fundingErrorMsg:
f.handleErrorMsg(fmsg)
go f.handleFundingLocked(fmsg.peer, msg)
case *lnwire.Error:
f.handleErrorMsg(fmsg.peer, msg)
}
case req := <-f.fundingRequests:
f.handleInitFundingMsg(req)
@ -1152,13 +1112,11 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
msg.resp <- pendingChannels
}
// processFundingOpen sends a message to the fundingManager allowing it to
// initiate the new funding workflow with the source peer.
func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel,
peer lnpeer.Peer) {
// ProcessFundingMsg sends a message to the internal fundingManager goroutine,
// allowing it to handle the lnwire.Message.
func (f *fundingManager) ProcessFundingMsg(msg lnwire.Message, peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingOpenMsg{msg, peer}:
case f.fundingMsgs <- &fundingMsg{msg, peer}:
case <-f.quit:
return
}
@ -1206,14 +1164,15 @@ func commitmentType(localFeatures,
//
// TODO(roasbeef): add error chan to all, let channelManager handle
// error+propagate
func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
func (f *fundingManager) handleFundingOpen(peer lnpeer.Peer,
msg *lnwire.OpenChannel) {
// Check number of pending channels to be smaller than maximum allowed
// number and send ErrorGeneric to remote peer if condition is
// violated.
peerPubKey := fmsg.peer.IdentityKey()
peerPubKey := peer.IdentityKey()
peerIDKey := newSerializedKey(peerPubKey)
msg := fmsg.msg
amt := msg.FundingAmount
// We get all pending channels for this peer. This is the list of the
@ -1237,7 +1196,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
channels, err := f.cfg.Wallet.Cfg.Database.FetchOpenChannels(peerPubKey)
if err != nil {
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID, err,
peer, msg.PendingChannelID, err,
)
return
}
@ -1258,7 +1217,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// block unless white listed
if numPending >= f.cfg.MaxPendingChannels {
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID,
peer, msg.PendingChannelID,
lnwire.ErrMaxPendingChannels,
)
return
@ -1273,7 +1232,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Errorf("unable to query wallet: %v", err)
}
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID,
peer, msg.PendingChannelID,
lnwire.ErrSynchronizingChain,
)
return
@ -1282,7 +1241,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// Ensure that the remote party respects our maximum channel size.
if amt > f.cfg.MaxChanSize {
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID,
peer, msg.PendingChannelID,
lnwallet.ErrChanTooLarge(amt, f.cfg.MaxChanSize),
)
return
@ -1292,7 +1251,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// a channel that's below our current min channel size.
if amt < f.cfg.MinChanSize {
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID,
peer, msg.PendingChannelID,
lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)),
)
return
@ -1302,7 +1261,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// signal an error.
if f.cfg.RejectPush && msg.PushAmount > 0 {
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID,
peer, msg.PendingChannelID,
lnwallet.ErrNonZeroPushAmount(),
)
return
@ -1311,13 +1270,13 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// Send the OpenChannel request to the ChannelAcceptor to determine whether
// this node will accept the channel.
chanReq := &chanacceptor.ChannelAcceptRequest{
Node: fmsg.peer.IdentityKey(),
OpenChanMsg: fmsg.msg,
Node: peer.IdentityKey(),
OpenChanMsg: msg,
}
if !f.cfg.OpenChannelPredicate.Accept(chanReq) {
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID,
peer, msg.PendingChannelID,
fmt.Errorf("open channel request rejected"),
)
return
@ -1326,7 +1285,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+
"pendingId=%x) from peer(%x)", amt, msg.PushAmount,
msg.CsvDelay, msg.PendingChannelID,
fmsg.peer.IdentityKey().SerializeCompressed())
peer.IdentityKey().SerializeCompressed())
// Attempt to initialize a reservation within the wallet. If the wallet
// has insufficient resources to create the channel, then the
@ -1339,14 +1298,14 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// case if *both* us and the remote peer are signaling the proper
// feature bit.
commitType := commitmentType(
fmsg.peer.LocalFeatures(), fmsg.peer.RemoteFeatures(),
peer.LocalFeatures(), peer.RemoteFeatures(),
)
chainHash := chainhash.Hash(msg.ChainHash)
req := &lnwallet.InitFundingReserveMsg{
ChainHash: &chainHash,
PendingChanID: msg.PendingChannelID,
NodeID: fmsg.peer.IdentityKey(),
NodeAddr: fmsg.peer.Address(),
NodeID: peer.IdentityKey(),
NodeAddr: peer.Address(),
LocalFundingAmt: 0,
RemoteFundingAmt: amt,
CommitFeePerKw: chainfee.SatPerKWeight(msg.FeePerKiloWeight),
@ -1360,7 +1319,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
reservation, err := f.cfg.Wallet.InitChannelReservation(req)
if err != nil {
fndgLog.Errorf("Unable to initialize reservation: %v", err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
@ -1385,7 +1344,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
err = reservation.CommitConstraints(channelConstraints)
if err != nil {
fndgLog.Errorf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
@ -1394,7 +1353,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// A nil address is set in place of user input, because this channel open
// was not initiated by the user.
shutdown, err := getUpfrontShutdownScript(
f.cfg.EnableUpfrontShutdown, fmsg.peer, nil,
f.cfg.EnableUpfrontShutdown, peer, nil,
func() (lnwire.DeliveryAddress, error) {
addr, err := f.cfg.Wallet.NewAddress(lnwallet.WitnessPubKey, false)
if err != nil {
@ -1405,7 +1364,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
)
if err != nil {
f.failFundingFlow(
fmsg.peer, fmsg.msg.PendingChannelID,
peer, msg.PendingChannelID,
fmt.Errorf("getUpfrontShutdownScript error: %v", err),
)
return
@ -1414,7 +1373,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("Requiring %v confirmations for pendingChan(%x): "+
"amt=%v, push_amt=%v, committype=%v, upfrontShutdown=%x", numConfsReq,
fmsg.msg.PendingChannelID, amt, msg.PushAmount,
msg.PendingChannelID, amt, msg.PushAmount,
commitType, msg.UpfrontShutdownScript)
// Generate our required constraints for the remote party.
@ -1439,7 +1398,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
remoteMaxValue: remoteMaxValue,
remoteMaxHtlcs: maxHtlcs,
err: make(chan error, 1),
peer: fmsg.peer,
peer: peer,
}
f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx
f.resMtx.Unlock()
@ -1482,7 +1441,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
err = reservation.ProcessSingleContribution(remoteContribution)
if err != nil {
fndgLog.Errorf("unable to add contribution reservation: %v", err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
@ -1512,21 +1471,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
UpfrontShutdownScript: ourContribution.UpfrontShutdown,
}
if err := fmsg.peer.SendMessage(true, &fundingAccept); err != nil {
if err := peer.SendMessage(true, &fundingAccept); err != nil {
fndgLog.Errorf("unable to send funding response to peer: %v", err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
}
// processFundingAccept sends a message to the fundingManager allowing it to
// continue the second phase of a funding workflow with the target peer.
func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}:
case <-f.quit:
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
}
@ -1534,10 +1481,11 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
// handleFundingAccept processes a response to the workflow initiation sent by
// the remote peer. This message then queues a message with the funding
// outpoint, and a commitment signature to the remote peer.
func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
msg := fmsg.msg
pendingChanID := fmsg.msg.PendingChannelID
peerKey := fmsg.peer.IdentityKey()
func (f *fundingManager) handleFundingAccept(peer lnpeer.Peer,
msg *lnwire.AcceptChannel) {
pendingChanID := msg.PendingChannelID
peerKey := peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil {
@ -1560,7 +1508,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
msg.MinAcceptDepth, chainntnfs.MaxNumConfs,
)
fndgLog.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
@ -1579,7 +1527,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
err = resCtx.reservation.CommitConstraints(channelConstraints)
if err != nil {
fndgLog.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
@ -1635,7 +1583,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
if err != nil {
fndgLog.Errorf("Unable to process PSBT funding params "+
"for contribution from %v: %v", peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
var buf bytes.Buffer
@ -1643,7 +1591,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
if err != nil {
fndgLog.Errorf("Unable to serialize PSBT for "+
"contribution from %v: %v", peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
resCtx.updates <- &lnrpc.OpenStatusUpdate{
@ -1660,7 +1608,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
} else if err != nil {
fndgLog.Errorf("Unable to process contribution from %v: %v",
peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
f.failFundingFlow(peer, msg.PendingChannelID, err)
return
}
@ -1813,25 +1761,15 @@ func (f *fundingManager) continueFundingAccept(resCtx *reservationWithCtx,
}
}
// processFundingCreated queues a funding complete message coupled with the
// source peer to the fundingManager.
func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}:
case <-f.quit:
return
}
}
// handleFundingCreated progresses the funding workflow when the daemon is on
// the responding side of a single funder workflow. Once this message has been
// processed, a signature is sent to the remote peer allowing it to broadcast
// the funding transaction, progressing the workflow into the final stage.
func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
peerKey := fmsg.peer.IdentityKey()
pendingChanID := fmsg.msg.PendingChannelID
func (f *fundingManager) handleFundingCreated(peer lnpeer.Peer,
msg *lnwire.FundingCreated) {
peerKey := peer.IdentityKey()
pendingChanID := msg.PendingChannelID
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil {
@ -1845,14 +1783,14 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// the commitment transaction. So at this point, we can validate the
// initiator's commitment transaction, then send our own if it's valid.
// TODO(roasbeef): make case (p vs P) consistent throughout
fundingOut := fmsg.msg.FundingPoint
fundingOut := msg.FundingPoint
fndgLog.Infof("completing pending_id(%x) with ChannelPoint(%v)",
pendingChanID[:], fundingOut)
commitSig, err := fmsg.msg.CommitSig.ToSignature()
commitSig, err := msg.CommitSig.ToSignature()
if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
f.failFundingFlow(peer, pendingChanID, err)
return
}
@ -1867,13 +1805,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc.
fndgLog.Errorf("unable to complete single reservation: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
f.failFundingFlow(peer, pendingChanID, err)
return
}
// The channel is marked IsPending in the database, and can be removed
// from the set of active reservations.
f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
f.deleteReservationCtx(peerKey, msg.PendingChannelID)
// If something goes wrong before the funding transaction is confirmed,
// we use this convenience method to delete the pending OpenChannel
@ -1921,7 +1859,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ourCommitSig, err := lnwire.NewSigFromSignature(sig)
if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
f.failFundingFlow(peer, pendingChanID, err)
deleteFromDatabase()
return
}
@ -1930,9 +1868,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ChanID: channelID,
CommitSig: ourCommitSig,
}
if err := fmsg.peer.SendMessage(true, fundingSigned); err != nil {
if err := peer.SendMessage(true, fundingSigned); err != nil {
fndgLog.Errorf("unable to send FundingSigned message: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
f.failFundingFlow(peer, pendingChanID, err)
deleteFromDatabase()
return
}
@ -1976,46 +1914,36 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
go f.advanceFundingState(completeChan, pendingChanID, nil)
}
// processFundingSigned sends a single funding sign complete message along with
// the source peer to the funding manager.
func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingSignedMsg{msg, peer}:
case <-f.quit:
return
}
}
// handleFundingSigned processes the final message received in a single funder
// workflow. Once this message is processed, the funding transaction is
// broadcast. Once the funding transaction reaches a sufficient number of
// confirmations, a message is sent to the responding peer along with a compact
// encoding of the location of the channel within the blockchain.
func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
func (f *fundingManager) handleFundingSigned(peer lnpeer.Peer,
msg *lnwire.FundingSigned) {
// As the funding signed message will reference the reservation by its
// permanent channel ID, we'll need to perform an intermediate look up
// before we can obtain the reservation.
f.resMtx.Lock()
pendingChanID, ok := f.signedReservations[fmsg.msg.ChanID]
delete(f.signedReservations, fmsg.msg.ChanID)
pendingChanID, ok := f.signedReservations[msg.ChanID]
delete(f.signedReservations, msg.ChanID)
f.resMtx.Unlock()
if !ok {
err := fmt.Errorf("unable to find signed reservation for "+
"chan_id=%x", fmsg.msg.ChanID)
"chan_id=%x", msg.ChanID)
fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err)
f.failFundingFlow(peer, msg.ChanID, err)
return
}
peerKey := fmsg.peer.IdentityKey()
peerKey := peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil {
fndgLog.Warnf("Unable to find reservation (peer_id:%v, "+
"chan_id:%x)", peerKey, pendingChanID[:])
// TODO: add ErrChanNotFound?
f.failFundingFlow(fmsg.peer, pendingChanID, err)
f.failFundingFlow(peer, pendingChanID, err)
return
}
@ -2031,10 +1959,10 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// The remote peer has responded with a signature for our commitment
// transaction. We'll verify the signature for validity, then commit
// the state to disk as we can now open the channel.
commitSig, err := fmsg.msg.CommitSig.ToSignature()
commitSig, err := msg.CommitSig.ToSignature()
if err != nil {
fndgLog.Errorf("Unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
f.failFundingFlow(peer, pendingChanID, err)
return
}
@ -2044,7 +1972,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
if err != nil {
fndgLog.Errorf("Unable to complete reservation sign "+
"complete: %v", err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
f.failFundingFlow(peer, pendingChanID, err)
return
}
@ -2719,50 +2647,40 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
return nil
}
// processFundingLocked sends a message to the fundingManager allowing it to
// finish the funding workflow.
func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingLockedMsg{msg, peer}:
case <-f.quit:
return
}
}
// handleFundingLocked finalizes the channel funding process and enables the
// channel to enter normal operating mode.
func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
func (f *fundingManager) handleFundingLocked(peer lnpeer.Peer,
msg *lnwire.FundingLocked) {
defer f.wg.Done()
fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+
"peer %x", fmsg.msg.ChanID,
fmsg.peer.IdentityKey().SerializeCompressed())
"peer %x", msg.ChanID,
peer.IdentityKey().SerializeCompressed())
// If we are currently in the process of handling a funding locked
// message for this channel, ignore.
f.handleFundingLockedMtx.Lock()
_, ok := f.handleFundingLockedBarriers[fmsg.msg.ChanID]
_, ok := f.handleFundingLockedBarriers[msg.ChanID]
if ok {
fndgLog.Infof("Already handling fundingLocked for "+
"ChannelID(%v), ignoring.", fmsg.msg.ChanID)
"ChannelID(%v), ignoring.", msg.ChanID)
f.handleFundingLockedMtx.Unlock()
return
}
// If not already handling fundingLocked for this channel, set up
// barrier, and move on.
f.handleFundingLockedBarriers[fmsg.msg.ChanID] = struct{}{}
f.handleFundingLockedBarriers[msg.ChanID] = struct{}{}
f.handleFundingLockedMtx.Unlock()
defer func() {
f.handleFundingLockedMtx.Lock()
delete(f.handleFundingLockedBarriers, fmsg.msg.ChanID)
delete(f.handleFundingLockedBarriers, msg.ChanID)
f.handleFundingLockedMtx.Unlock()
}()
f.localDiscoveryMtx.Lock()
localDiscoverySignal, ok := f.localDiscoverySignals[fmsg.msg.ChanID]
localDiscoverySignal, ok := f.localDiscoverySignals[msg.ChanID]
f.localDiscoveryMtx.Unlock()
if ok {
@ -2781,14 +2699,14 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// With the signal received, we can now safely delete the entry
// from the map.
f.localDiscoveryMtx.Lock()
delete(f.localDiscoverySignals, fmsg.msg.ChanID)
delete(f.localDiscoverySignals, msg.ChanID)
f.localDiscoveryMtx.Unlock()
}
// First, we'll attempt to locate the channel whose funding workflow is
// being finalized by this message. We go to the database rather than
// our reservation map as we may have restarted, mid funding flow.
chanID := fmsg.msg.ChanID
chanID := msg.ChanID
channel, err := f.cfg.FindChannel(chanID)
if err != nil {
fndgLog.Errorf("Unable to locate ChannelID(%v), cannot complete "+
@ -2808,7 +2726,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// need to create the next commitment state for the remote party. So
// we'll insert that into the channel now before passing it along to
// other sub-systems.
err = channel.InsertNextRevocation(fmsg.msg.NextPerCommitmentPoint)
err = channel.InsertNextRevocation(msg.NextPerCommitmentPoint)
if err != nil {
fndgLog.Errorf("unable to insert next commitment point: %v", err)
return
@ -2831,11 +2749,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
f.barrierMtx.Unlock()
}()
if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil {
if err := peer.AddNewChannel(channel, f.quit); err != nil {
fndgLog.Errorf("Unable to add new channel %v with peer %x: %v",
channel.FundingOutpoint,
fmsg.peer.IdentityKey().SerializeCompressed(),
err)
peer.IdentityKey().SerializeCompressed(), err,
)
}
}
@ -3355,40 +3273,29 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
}
}
// processFundingError sends a message to the fundingManager allowing it to
// process the occurred generic error.
func (f *fundingManager) processFundingError(err *lnwire.Error,
peerKey *btcec.PublicKey) {
select {
case f.fundingMsgs <- &fundingErrorMsg{err, peerKey}:
case <-f.quit:
return
}
}
// handleErrorMsg processes the error which was received from remote peer,
// depending on the type of error we should do different clean up steps and
// inform the user about it.
func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
protocolErr := fmsg.err
func (f *fundingManager) handleErrorMsg(peer lnpeer.Peer,
msg *lnwire.Error) {
chanID := fmsg.err.ChanID
chanID := msg.ChanID
peerKey := peer.IdentityKey()
// First, we'll attempt to retrieve and cancel the funding workflow
// that this error was tied to. If we're unable to do so, then we'll
// exit early as this was an unwarranted error.
resCtx, err := f.cancelReservationCtx(fmsg.peerKey, chanID, true)
resCtx, err := f.cancelReservationCtx(peerKey, chanID, true)
if err != nil {
fndgLog.Warnf("Received error for non-existent funding "+
"flow: %v (%v)", err, protocolErr.Error())
"flow: %v (%v)", err, msg.Error())
return
}
// If we did indeed find the funding workflow, then we'll return the
// error back to the caller (if any), and cancel the workflow itself.
fundingErr := fmt.Errorf("received funding error from %x: %v",
fmsg.peerKey.SerializeCompressed(), protocolErr.Error(),
peerKey.SerializeCompressed(), msg.Error(),
)
fndgLog.Errorf(fundingErr.Error())
@ -3536,9 +3443,9 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey,
// channel will receive a new, permanent channel ID, and will no longer be
// considered pending.
func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte,
peerKey *btcec.PublicKey) bool {
peer lnpeer.Peer) bool {
peerIDKey := newSerializedKey(peerKey)
peerIDKey := newSerializedKey(peer.IdentityKey())
f.resMtx.RLock()
_, ok := f.activeReservations[peerIDKey][pendingChanID]
f.resMtx.RUnlock()

View File

@ -691,7 +691,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
}
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent(
@ -704,7 +704,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
assertNumPendingReservations(t, bob, alicePubKey, 1)
// Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob)
alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent(
@ -712,7 +712,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
).(*lnwire.FundingCreated)
// Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, alice)
bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent(
@ -720,7 +720,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
).(*lnwire.FundingSigned)
// Forward the signature to Alice.
alice.fundingMgr.processFundingSigned(fundingSigned, bob)
alice.fundingMgr.ProcessFundingMsg(fundingSigned, bob)
// After Alice processes the singleFundingSignComplete message, she will
// broadcast the funding transaction to the network. We expect to get a
@ -1250,8 +1250,8 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1378,8 +1378,8 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
}
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1540,8 +1540,8 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1678,7 +1678,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) {
assertNumPendingReservations(t, alice, bobPubKey, 1)
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel.
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
@ -1749,7 +1749,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
assertNumPendingReservations(t, alice, bobPubKey, 1)
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel.
acceptChannelResponse := assertFundingMsgSent(
@ -1760,7 +1760,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
assertNumPendingReservations(t, bob, alicePubKey, 1)
// Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob)
alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice responds with a FundingCreated messages.
assertFundingMsgSent(t, alice.msgChan, "FundingCreated")
@ -1948,9 +1948,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Send the fundingLocked message twice to Alice, and once to Bob.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -1967,7 +1967,7 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
// Another fundingLocked should also be ignored, since Alice should
// have updated her database at this point.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
select {
case <-alice.newChannels:
t.Fatalf("alice sent new channel to peer a second time")
@ -2056,8 +2056,8 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
recreateAliceFundingManager(t, alice)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -2128,10 +2128,10 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
// Let Alice immediately get the fundingLocked message.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
// Also let Bob get the fundingLocked message.
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -2220,8 +2220,8 @@ func TestFundingManagerPrivateChannel(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -2338,8 +2338,8 @@ func TestFundingManagerPrivateRestart(t *testing.T) {
waitForOpenUpdate(t, updateChan)
// Exchange the fundingLocked messages.
alice.fundingMgr.processFundingLocked(fundingLockedBob, bob)
bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice)
alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob)
bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
@ -2493,7 +2493,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
chanID := openChannelReq.PendingChannelID
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent(
@ -2521,7 +2521,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
}
// Forward the response to Alice.
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob)
alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent(
@ -2632,7 +2632,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
t.Fatal(err)
}
// Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, alice)
bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent(
@ -2640,7 +2640,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
).(*lnwire.FundingSigned)
// Forward the signature to Alice.
alice.fundingMgr.processFundingSigned(fundingSigned, bob)
alice.fundingMgr.ProcessFundingMsg(fundingSigned, bob)
// After Alice processes the singleFundingSignComplete message, she will
// broadcast the funding transaction to the network. We expect to get a
@ -2761,7 +2761,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
}
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message for the
// first maxPending channels.
@ -2784,7 +2784,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Forward the responses to Alice.
var signs []*lnwire.FundingSigned
for _, accept := range accepts {
alice.fundingMgr.processFundingAccept(accept, bob)
alice.fundingMgr.ProcessFundingMsg(accept, bob)
// Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent(
@ -2792,7 +2792,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
).(*lnwire.FundingCreated)
// Give the message to Bob.
bob.fundingMgr.processFundingCreated(fundingCreated, alice)
bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice)
// Finally, Bob should send the FundingSigned message.
fundingSigned := assertFundingMsgSent(
@ -2804,7 +2804,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Sending another init request from Alice should still make Bob
// respond with an error.
bob.fundingMgr.processFundingOpen(lastOpen, alice)
bob.fundingMgr.ProcessFundingMsg(lastOpen, alice)
_ = assertFundingMsgSent(
t, bob.msgChan, "Error",
).(*lnwire.Error)
@ -2812,7 +2812,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Give the FundingSigned messages to Alice.
var txs []*wire.MsgTx
for i, sign := range signs {
alice.fundingMgr.processFundingSigned(sign, bob)
alice.fundingMgr.ProcessFundingMsg(sign, bob)
// Alice should send a status update for each channel, and
// publish a funding tx to the network.
@ -2840,7 +2840,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
// Sending another init request from Alice should still make Bob
// respond with an error, since the funding transactions are not
// confirmed yet,
bob.fundingMgr.processFundingOpen(lastOpen, alice)
bob.fundingMgr.ProcessFundingMsg(lastOpen, alice)
_ = assertFundingMsgSent(
t, bob.msgChan, "Error",
).(*lnwire.Error)
@ -2866,7 +2866,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) {
}
// Now opening another channel should work.
bob.fundingMgr.processFundingOpen(lastOpen, alice)
bob.fundingMgr.ProcessFundingMsg(lastOpen, alice)
// Bob should answer with an AcceptChannel message.
_ = assertFundingMsgSent(
@ -2925,7 +2925,7 @@ func TestFundingManagerRejectPush(t *testing.T) {
}
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Assert Bob responded with an ErrNonZeroPushAmount error.
err := assertFundingMsgSent(t, bob.msgChan, "Error").(*lnwire.Error)
@ -2982,7 +2982,7 @@ func TestFundingManagerMaxConfs(t *testing.T) {
}
// Let Bob handle the init message.
bob.fundingMgr.processFundingOpen(openChannelReq, alice)
bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice)
// Bob should answer with an AcceptChannel message.
acceptChannelResponse := assertFundingMsgSent(
@ -2993,7 +2993,7 @@ func TestFundingManagerMaxConfs(t *testing.T) {
// MinAcceptDepth Alice won't be willing to accept.
acceptChannelResponse.MinAcceptDepth = chainntnfs.MaxNumConfs + 1
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob)
alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Alice should respond back with an error indicating MinAcceptDepth is
// too large.
@ -3243,7 +3243,7 @@ func TestMaxChannelSizeConfig(t *testing.T) {
// an error rejecting the channel that exceeds size limit.
alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg := expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan)
// Create a set of funding managers that will reject wumbo
@ -3258,7 +3258,7 @@ func TestMaxChannelSizeConfig(t *testing.T) {
// We expect Bob to respond with an Accept channel message.
alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
// Verify that wumbo accepting funding managers will respect --maxchansize
@ -3278,7 +3278,7 @@ func TestMaxChannelSizeConfig(t *testing.T) {
// an error rejecting the channel that exceeds size limit.
alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan)
}
@ -3311,7 +3311,7 @@ func TestWumboChannelConfig(t *testing.T) {
// We expect Bob to respond with an Accept channel message.
alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg := expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
// We'll now attempt to create a channel above the wumbo mark, which
@ -3322,7 +3322,7 @@ func TestWumboChannelConfig(t *testing.T) {
// an error rejecting the channel.
alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertErrorSent(t, bob.msgChan)
// Next, we'll re-create the funding managers, but this time allowing
@ -3337,6 +3337,6 @@ func TestWumboChannelConfig(t *testing.T) {
// issues.
alice.fundingMgr.initFundingWorkflow(bob, initReq)
openChanMsg = expectOpenChannelMsg(t, alice.msgChan)
bob.fundingMgr.processFundingOpen(openChanMsg, alice)
bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice)
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
}

View File

@ -24,6 +24,7 @@ import (
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/fmgr"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
@ -268,33 +269,8 @@ type Config struct {
FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate,
error)
// ProcessFundingOpen is used to hand off an OpenChannel message to the
// funding manager.
ProcessFundingOpen func(*lnwire.OpenChannel, lnpeer.Peer)
// ProcessFundingAccept is used to hand off an AcceptChannel message to the
// funding manager.
ProcessFundingAccept func(*lnwire.AcceptChannel, lnpeer.Peer)
// ProcessFundingCreated is used to hand off a FundingCreated message to
// the funding manager.
ProcessFundingCreated func(*lnwire.FundingCreated, lnpeer.Peer)
// ProcessFundingSigned is used to hand off a FundingSigned message to the
// funding manager.
ProcessFundingSigned func(*lnwire.FundingSigned, lnpeer.Peer)
// ProcessFundingLocked is used to hand off a FundingLocked message to the
// funding manager.
ProcessFundingLocked func(*lnwire.FundingLocked, lnpeer.Peer)
// ProcessFundingError is used to hand off an Error message to the funding
// manager.
ProcessFundingError func(*lnwire.Error, *btcec.PublicKey)
// IsPendingChannel is used to determine whether to send an Error message
// to the funding manager or not.
IsPendingChannel func([32]byte, *btcec.PublicKey) bool
// FundingManager is an implementation of the fmgr.Manager interface.
FundingManager fmgr.Manager
// Hodl is used when creating ChannelLinks to specify HodlFlags as
// breakpoints in dev builds.
@ -1336,16 +1312,13 @@ out:
pongBytes := make([]byte, msg.NumPongBytes)
p.queueMsg(lnwire.NewPong(pongBytes), nil)
case *lnwire.OpenChannel:
p.cfg.ProcessFundingOpen(msg, p)
case *lnwire.AcceptChannel:
p.cfg.ProcessFundingAccept(msg, p)
case *lnwire.FundingCreated:
p.cfg.ProcessFundingCreated(msg, p)
case *lnwire.FundingSigned:
p.cfg.ProcessFundingSigned(msg, p)
case *lnwire.FundingLocked:
p.cfg.ProcessFundingLocked(msg, p)
case *lnwire.OpenChannel,
*lnwire.AcceptChannel,
*lnwire.FundingCreated,
*lnwire.FundingSigned,
*lnwire.FundingLocked:
p.cfg.FundingManager.ProcessFundingMsg(msg, p)
case *lnwire.Shutdown:
select {
@ -1483,8 +1456,6 @@ func (p *Brontide) storeError(err error) {
//
// NOTE: This method should only be called from within the readHandler.
func (p *Brontide) handleError(msg *lnwire.Error) bool {
key := p.cfg.Addr.IdentityKey
// Store the error we have received.
p.storeError(msg)
@ -1500,8 +1471,8 @@ func (p *Brontide) handleError(msg *lnwire.Error) bool {
// If the channel ID for the error message corresponds to a pending
// channel, then the funding manager will handle the error.
case p.cfg.IsPendingChannel(msg.ChanID, key):
p.cfg.ProcessFundingError(msg, key)
case p.cfg.FundingManager.IsPendingChannel(msg.ChanID, p):
p.cfg.FundingManager.ProcessFundingMsg(msg, p)
return false
// If not we hand the error to the channel link for this channel.

View File

@ -2982,14 +2982,9 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
PrunePersistentPeerConnection: s.prunePersistentPeerConnection,
FetchLastChanUpdate: s.fetchLastChanUpdate(),
ProcessFundingOpen: s.fundingMgr.processFundingOpen,
ProcessFundingAccept: s.fundingMgr.processFundingAccept,
ProcessFundingCreated: s.fundingMgr.processFundingCreated,
ProcessFundingSigned: s.fundingMgr.processFundingSigned,
ProcessFundingLocked: s.fundingMgr.processFundingLocked,
ProcessFundingError: s.fundingMgr.processFundingError,
IsPendingChannel: s.fundingMgr.IsPendingChannel,
FetchLastChanUpdate: s.fetchLastChanUpdate(),
FundingManager: s.fundingMgr,
Hodl: s.cfg.Hodl,
UnsafeReplay: s.cfg.UnsafeReplay,