diff --git a/fmgr/interfaces.go b/fmgr/interfaces.go new file mode 100644 index 00000000..19fe8630 --- /dev/null +++ b/fmgr/interfaces.go @@ -0,0 +1,20 @@ +package fmgr + +import ( + "github.com/lightningnetwork/lnd/lnpeer" + "github.com/lightningnetwork/lnd/lnwire" +) + +// Manager is an interface that describes the basic operation of a funding +// manager. It should at a minimum process a subset of lnwire messages that +// are denoted as funding messages. +type Manager interface { + // ProcessFundingMsg processes a funding message represented by the + // lnwire.Message parameter along with the Peer object representing a + // connection to the counterparty. + ProcessFundingMsg(lnwire.Message, lnpeer.Peer) + + // IsPendingChannel is used to determine whether to send an Error message + // to the funding manager or not. + IsPendingChannel([32]byte, lnpeer.Peer) bool +} diff --git a/fundingmanager.go b/fundingmanager.go index a9276f1c..1f16fefc 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -165,53 +165,13 @@ type initFundingMsg struct { *openChanReq } -// fundingOpenMsg couples an lnwire.OpenChannel message with the peer who sent -// the message. This allows the funding manager to queue a response directly to -// the peer, progressing the funding workflow. -type fundingOpenMsg struct { - msg *lnwire.OpenChannel +// fundingMsg is sent by the ProcessFundingMsg function and packages a +// funding-specific lnwire.Message along with the lnpeer.Peer that sent it. +type fundingMsg struct { + msg lnwire.Message peer lnpeer.Peer } -// fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who -// sent the message. This allows the funding manager to queue a response -// directly to the peer, progressing the funding workflow. -type fundingAcceptMsg struct { - msg *lnwire.AcceptChannel - peer lnpeer.Peer -} - -// fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who -// sent the message. This allows the funding manager to queue a response -// directly to the peer, progressing the funding workflow. -type fundingCreatedMsg struct { - msg *lnwire.FundingCreated - peer lnpeer.Peer -} - -// fundingSignedMsg couples an lnwire.FundingSigned message with the peer who -// sent the message. This allows the funding manager to queue a response -// directly to the peer, progressing the funding workflow. -type fundingSignedMsg struct { - msg *lnwire.FundingSigned - peer lnpeer.Peer -} - -// fundingLockedMsg couples an lnwire.FundingLocked message with the peer who -// sent the message. This allows the funding manager to finalize the funding -// process and announce the existence of the new channel. -type fundingLockedMsg struct { - msg *lnwire.FundingLocked - peer lnpeer.Peer -} - -// fundingErrorMsg couples an lnwire.Error message with the peer who sent the -// message. This allows the funding manager to properly process the error. -type fundingErrorMsg struct { - err *lnwire.Error - peerKey *btcec.PublicKey -} - // pendingChannels is a map instantiated per-peer which tracks all active // pending single funded channels indexed by their pending channel identifier, // which is a set of 32-bytes generated via a CSPRNG. @@ -441,9 +401,9 @@ type fundingManager struct { // goroutine safe. resMtx sync.RWMutex - // fundingMsgs is a channel which receives wrapped wire messages - // related to funding workflow from outside peers. - fundingMsgs chan interface{} + // fundingMsgs is a channel that relays fundingMsg structs from + // external sub-systems using the ProcessFundingMsg call. + fundingMsgs chan *fundingMsg // queries is a channel which receives requests to query the internal // state of the funding manager. @@ -515,7 +475,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) { activeReservations: make(map[serializedPubKey]pendingChannels), signedReservations: make(map[lnwire.ChannelID][32]byte), newChanBarriers: make(map[lnwire.ChannelID]chan struct{}), - fundingMsgs: make(chan interface{}, msgBufferSize), + fundingMsgs: make(chan *fundingMsg, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize), localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}), handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}), @@ -809,21 +769,21 @@ func (f *fundingManager) reservationCoordinator() { for { select { - case msg := <-f.fundingMsgs: - switch fmsg := msg.(type) { - case *fundingOpenMsg: - f.handleFundingOpen(fmsg) - case *fundingAcceptMsg: - f.handleFundingAccept(fmsg) - case *fundingCreatedMsg: - f.handleFundingCreated(fmsg) - case *fundingSignedMsg: - f.handleFundingSigned(fmsg) - case *fundingLockedMsg: + case fmsg := <-f.fundingMsgs: + switch msg := fmsg.msg.(type) { + case *lnwire.OpenChannel: + f.handleFundingOpen(fmsg.peer, msg) + case *lnwire.AcceptChannel: + f.handleFundingAccept(fmsg.peer, msg) + case *lnwire.FundingCreated: + f.handleFundingCreated(fmsg.peer, msg) + case *lnwire.FundingSigned: + f.handleFundingSigned(fmsg.peer, msg) + case *lnwire.FundingLocked: f.wg.Add(1) - go f.handleFundingLocked(fmsg) - case *fundingErrorMsg: - f.handleErrorMsg(fmsg) + go f.handleFundingLocked(fmsg.peer, msg) + case *lnwire.Error: + f.handleErrorMsg(fmsg.peer, msg) } case req := <-f.fundingRequests: f.handleInitFundingMsg(req) @@ -1152,13 +1112,11 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { msg.resp <- pendingChannels } -// processFundingOpen sends a message to the fundingManager allowing it to -// initiate the new funding workflow with the source peer. -func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel, - peer lnpeer.Peer) { - +// ProcessFundingMsg sends a message to the internal fundingManager goroutine, +// allowing it to handle the lnwire.Message. +func (f *fundingManager) ProcessFundingMsg(msg lnwire.Message, peer lnpeer.Peer) { select { - case f.fundingMsgs <- &fundingOpenMsg{msg, peer}: + case f.fundingMsgs <- &fundingMsg{msg, peer}: case <-f.quit: return } @@ -1206,14 +1164,15 @@ func commitmentType(localFeatures, // // TODO(roasbeef): add error chan to all, let channelManager handle // error+propagate -func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { +func (f *fundingManager) handleFundingOpen(peer lnpeer.Peer, + msg *lnwire.OpenChannel) { + // Check number of pending channels to be smaller than maximum allowed // number and send ErrorGeneric to remote peer if condition is // violated. - peerPubKey := fmsg.peer.IdentityKey() + peerPubKey := peer.IdentityKey() peerIDKey := newSerializedKey(peerPubKey) - msg := fmsg.msg amt := msg.FundingAmount // We get all pending channels for this peer. This is the list of the @@ -1237,7 +1196,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { channels, err := f.cfg.Wallet.Cfg.Database.FetchOpenChannels(peerPubKey) if err != nil { f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, err, + peer, msg.PendingChannelID, err, ) return } @@ -1258,7 +1217,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // block unless white listed if numPending >= f.cfg.MaxPendingChannels { f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, + peer, msg.PendingChannelID, lnwire.ErrMaxPendingChannels, ) return @@ -1273,7 +1232,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Errorf("unable to query wallet: %v", err) } f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, + peer, msg.PendingChannelID, lnwire.ErrSynchronizingChain, ) return @@ -1282,7 +1241,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // Ensure that the remote party respects our maximum channel size. if amt > f.cfg.MaxChanSize { f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, + peer, msg.PendingChannelID, lnwallet.ErrChanTooLarge(amt, f.cfg.MaxChanSize), ) return @@ -1292,7 +1251,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // a channel that's below our current min channel size. if amt < f.cfg.MinChanSize { f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, + peer, msg.PendingChannelID, lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)), ) return @@ -1302,7 +1261,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // signal an error. if f.cfg.RejectPush && msg.PushAmount > 0 { f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, + peer, msg.PendingChannelID, lnwallet.ErrNonZeroPushAmount(), ) return @@ -1311,13 +1270,13 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // Send the OpenChannel request to the ChannelAcceptor to determine whether // this node will accept the channel. chanReq := &chanacceptor.ChannelAcceptRequest{ - Node: fmsg.peer.IdentityKey(), - OpenChanMsg: fmsg.msg, + Node: peer.IdentityKey(), + OpenChanMsg: msg, } if !f.cfg.OpenChannelPredicate.Accept(chanReq) { f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, + peer, msg.PendingChannelID, fmt.Errorf("open channel request rejected"), ) return @@ -1326,7 +1285,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+ "pendingId=%x) from peer(%x)", amt, msg.PushAmount, msg.CsvDelay, msg.PendingChannelID, - fmsg.peer.IdentityKey().SerializeCompressed()) + peer.IdentityKey().SerializeCompressed()) // Attempt to initialize a reservation within the wallet. If the wallet // has insufficient resources to create the channel, then the @@ -1339,14 +1298,14 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // case if *both* us and the remote peer are signaling the proper // feature bit. commitType := commitmentType( - fmsg.peer.LocalFeatures(), fmsg.peer.RemoteFeatures(), + peer.LocalFeatures(), peer.RemoteFeatures(), ) chainHash := chainhash.Hash(msg.ChainHash) req := &lnwallet.InitFundingReserveMsg{ ChainHash: &chainHash, PendingChanID: msg.PendingChannelID, - NodeID: fmsg.peer.IdentityKey(), - NodeAddr: fmsg.peer.Address(), + NodeID: peer.IdentityKey(), + NodeAddr: peer.Address(), LocalFundingAmt: 0, RemoteFundingAmt: amt, CommitFeePerKw: chainfee.SatPerKWeight(msg.FeePerKiloWeight), @@ -1360,7 +1319,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { reservation, err := f.cfg.Wallet.InitChannelReservation(req) if err != nil { fndgLog.Errorf("Unable to initialize reservation: %v", err) - f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } @@ -1385,7 +1344,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { err = reservation.CommitConstraints(channelConstraints) if err != nil { fndgLog.Errorf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } @@ -1394,7 +1353,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // A nil address is set in place of user input, because this channel open // was not initiated by the user. shutdown, err := getUpfrontShutdownScript( - f.cfg.EnableUpfrontShutdown, fmsg.peer, nil, + f.cfg.EnableUpfrontShutdown, peer, nil, func() (lnwire.DeliveryAddress, error) { addr, err := f.cfg.Wallet.NewAddress(lnwallet.WitnessPubKey, false) if err != nil { @@ -1405,7 +1364,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { ) if err != nil { f.failFundingFlow( - fmsg.peer, fmsg.msg.PendingChannelID, + peer, msg.PendingChannelID, fmt.Errorf("getUpfrontShutdownScript error: %v", err), ) return @@ -1414,7 +1373,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { fndgLog.Infof("Requiring %v confirmations for pendingChan(%x): "+ "amt=%v, push_amt=%v, committype=%v, upfrontShutdown=%x", numConfsReq, - fmsg.msg.PendingChannelID, amt, msg.PushAmount, + msg.PendingChannelID, amt, msg.PushAmount, commitType, msg.UpfrontShutdownScript) // Generate our required constraints for the remote party. @@ -1439,7 +1398,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { remoteMaxValue: remoteMaxValue, remoteMaxHtlcs: maxHtlcs, err: make(chan error, 1), - peer: fmsg.peer, + peer: peer, } f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() @@ -1482,7 +1441,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { err = reservation.ProcessSingleContribution(remoteContribution) if err != nil { fndgLog.Errorf("unable to add contribution reservation: %v", err) - f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } @@ -1512,21 +1471,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { UpfrontShutdownScript: ourContribution.UpfrontShutdown, } - if err := fmsg.peer.SendMessage(true, &fundingAccept); err != nil { + if err := peer.SendMessage(true, &fundingAccept); err != nil { fndgLog.Errorf("unable to send funding response to peer: %v", err) - f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) - return - } -} - -// processFundingAccept sends a message to the fundingManager allowing it to -// continue the second phase of a funding workflow with the target peer. -func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, - peer lnpeer.Peer) { - - select { - case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}: - case <-f.quit: + f.failFundingFlow(peer, msg.PendingChannelID, err) return } } @@ -1534,10 +1481,11 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel, // handleFundingAccept processes a response to the workflow initiation sent by // the remote peer. This message then queues a message with the funding // outpoint, and a commitment signature to the remote peer. -func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { - msg := fmsg.msg - pendingChanID := fmsg.msg.PendingChannelID - peerKey := fmsg.peer.IdentityKey() +func (f *fundingManager) handleFundingAccept(peer lnpeer.Peer, + msg *lnwire.AcceptChannel) { + + pendingChanID := msg.PendingChannelID + peerKey := peer.IdentityKey() resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { @@ -1560,7 +1508,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { msg.MinAcceptDepth, chainntnfs.MaxNumConfs, ) fndgLog.Warnf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } @@ -1579,7 +1527,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { err = resCtx.reservation.CommitConstraints(channelConstraints) if err != nil { fndgLog.Warnf("Unacceptable channel constraints: %v", err) - f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } @@ -1635,7 +1583,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { if err != nil { fndgLog.Errorf("Unable to process PSBT funding params "+ "for contribution from %v: %v", peerKey, err) - f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } var buf bytes.Buffer @@ -1643,7 +1591,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { if err != nil { fndgLog.Errorf("Unable to serialize PSBT for "+ "contribution from %v: %v", peerKey, err) - f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } resCtx.updates <- &lnrpc.OpenStatusUpdate{ @@ -1660,7 +1608,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { } else if err != nil { fndgLog.Errorf("Unable to process contribution from %v: %v", peerKey, err) - f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err) + f.failFundingFlow(peer, msg.PendingChannelID, err) return } @@ -1813,25 +1761,15 @@ func (f *fundingManager) continueFundingAccept(resCtx *reservationWithCtx, } } -// processFundingCreated queues a funding complete message coupled with the -// source peer to the fundingManager. -func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated, - peer lnpeer.Peer) { - - select { - case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}: - case <-f.quit: - return - } -} - // handleFundingCreated progresses the funding workflow when the daemon is on // the responding side of a single funder workflow. Once this message has been // processed, a signature is sent to the remote peer allowing it to broadcast // the funding transaction, progressing the workflow into the final stage. -func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { - peerKey := fmsg.peer.IdentityKey() - pendingChanID := fmsg.msg.PendingChannelID +func (f *fundingManager) handleFundingCreated(peer lnpeer.Peer, + msg *lnwire.FundingCreated) { + + peerKey := peer.IdentityKey() + pendingChanID := msg.PendingChannelID resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { @@ -1845,14 +1783,14 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // the commitment transaction. So at this point, we can validate the // initiator's commitment transaction, then send our own if it's valid. // TODO(roasbeef): make case (p vs P) consistent throughout - fundingOut := fmsg.msg.FundingPoint + fundingOut := msg.FundingPoint fndgLog.Infof("completing pending_id(%x) with ChannelPoint(%v)", pendingChanID[:], fundingOut) - commitSig, err := fmsg.msg.CommitSig.ToSignature() + commitSig, err := msg.CommitSig.ToSignature() if err != nil { fndgLog.Errorf("unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peer, pendingChanID, err) + f.failFundingFlow(peer, pendingChanID, err) return } @@ -1867,13 +1805,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { if err != nil { // TODO(roasbeef): better error logging: peerID, channelID, etc. fndgLog.Errorf("unable to complete single reservation: %v", err) - f.failFundingFlow(fmsg.peer, pendingChanID, err) + f.failFundingFlow(peer, pendingChanID, err) return } // The channel is marked IsPending in the database, and can be removed // from the set of active reservations. - f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID) + f.deleteReservationCtx(peerKey, msg.PendingChannelID) // If something goes wrong before the funding transaction is confirmed, // we use this convenience method to delete the pending OpenChannel @@ -1921,7 +1859,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ourCommitSig, err := lnwire.NewSigFromSignature(sig) if err != nil { fndgLog.Errorf("unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peer, pendingChanID, err) + f.failFundingFlow(peer, pendingChanID, err) deleteFromDatabase() return } @@ -1930,9 +1868,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { ChanID: channelID, CommitSig: ourCommitSig, } - if err := fmsg.peer.SendMessage(true, fundingSigned); err != nil { + if err := peer.SendMessage(true, fundingSigned); err != nil { fndgLog.Errorf("unable to send FundingSigned message: %v", err) - f.failFundingFlow(fmsg.peer, pendingChanID, err) + f.failFundingFlow(peer, pendingChanID, err) deleteFromDatabase() return } @@ -1976,46 +1914,36 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { go f.advanceFundingState(completeChan, pendingChanID, nil) } -// processFundingSigned sends a single funding sign complete message along with -// the source peer to the funding manager. -func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned, - peer lnpeer.Peer) { - - select { - case f.fundingMsgs <- &fundingSignedMsg{msg, peer}: - case <-f.quit: - return - } -} - // handleFundingSigned processes the final message received in a single funder // workflow. Once this message is processed, the funding transaction is // broadcast. Once the funding transaction reaches a sufficient number of // confirmations, a message is sent to the responding peer along with a compact // encoding of the location of the channel within the blockchain. -func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { +func (f *fundingManager) handleFundingSigned(peer lnpeer.Peer, + msg *lnwire.FundingSigned) { + // As the funding signed message will reference the reservation by its // permanent channel ID, we'll need to perform an intermediate look up // before we can obtain the reservation. f.resMtx.Lock() - pendingChanID, ok := f.signedReservations[fmsg.msg.ChanID] - delete(f.signedReservations, fmsg.msg.ChanID) + pendingChanID, ok := f.signedReservations[msg.ChanID] + delete(f.signedReservations, msg.ChanID) f.resMtx.Unlock() if !ok { err := fmt.Errorf("unable to find signed reservation for "+ - "chan_id=%x", fmsg.msg.ChanID) + "chan_id=%x", msg.ChanID) fndgLog.Warnf(err.Error()) - f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err) + f.failFundingFlow(peer, msg.ChanID, err) return } - peerKey := fmsg.peer.IdentityKey() + peerKey := peer.IdentityKey() resCtx, err := f.getReservationCtx(peerKey, pendingChanID) if err != nil { fndgLog.Warnf("Unable to find reservation (peer_id:%v, "+ "chan_id:%x)", peerKey, pendingChanID[:]) // TODO: add ErrChanNotFound? - f.failFundingFlow(fmsg.peer, pendingChanID, err) + f.failFundingFlow(peer, pendingChanID, err) return } @@ -2031,10 +1959,10 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { // The remote peer has responded with a signature for our commitment // transaction. We'll verify the signature for validity, then commit // the state to disk as we can now open the channel. - commitSig, err := fmsg.msg.CommitSig.ToSignature() + commitSig, err := msg.CommitSig.ToSignature() if err != nil { fndgLog.Errorf("Unable to parse signature: %v", err) - f.failFundingFlow(fmsg.peer, pendingChanID, err) + f.failFundingFlow(peer, pendingChanID, err) return } @@ -2044,7 +1972,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { if err != nil { fndgLog.Errorf("Unable to complete reservation sign "+ "complete: %v", err) - f.failFundingFlow(fmsg.peer, pendingChanID, err) + f.failFundingFlow(peer, pendingChanID, err) return } @@ -2719,50 +2647,40 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel, return nil } -// processFundingLocked sends a message to the fundingManager allowing it to -// finish the funding workflow. -func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked, - peer lnpeer.Peer) { - - select { - case f.fundingMsgs <- &fundingLockedMsg{msg, peer}: - case <-f.quit: - return - } -} - // handleFundingLocked finalizes the channel funding process and enables the // channel to enter normal operating mode. -func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { +func (f *fundingManager) handleFundingLocked(peer lnpeer.Peer, + msg *lnwire.FundingLocked) { + defer f.wg.Done() fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+ - "peer %x", fmsg.msg.ChanID, - fmsg.peer.IdentityKey().SerializeCompressed()) + "peer %x", msg.ChanID, + peer.IdentityKey().SerializeCompressed()) // If we are currently in the process of handling a funding locked // message for this channel, ignore. f.handleFundingLockedMtx.Lock() - _, ok := f.handleFundingLockedBarriers[fmsg.msg.ChanID] + _, ok := f.handleFundingLockedBarriers[msg.ChanID] if ok { fndgLog.Infof("Already handling fundingLocked for "+ - "ChannelID(%v), ignoring.", fmsg.msg.ChanID) + "ChannelID(%v), ignoring.", msg.ChanID) f.handleFundingLockedMtx.Unlock() return } // If not already handling fundingLocked for this channel, set up // barrier, and move on. - f.handleFundingLockedBarriers[fmsg.msg.ChanID] = struct{}{} + f.handleFundingLockedBarriers[msg.ChanID] = struct{}{} f.handleFundingLockedMtx.Unlock() defer func() { f.handleFundingLockedMtx.Lock() - delete(f.handleFundingLockedBarriers, fmsg.msg.ChanID) + delete(f.handleFundingLockedBarriers, msg.ChanID) f.handleFundingLockedMtx.Unlock() }() f.localDiscoveryMtx.Lock() - localDiscoverySignal, ok := f.localDiscoverySignals[fmsg.msg.ChanID] + localDiscoverySignal, ok := f.localDiscoverySignals[msg.ChanID] f.localDiscoveryMtx.Unlock() if ok { @@ -2781,14 +2699,14 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { // With the signal received, we can now safely delete the entry // from the map. f.localDiscoveryMtx.Lock() - delete(f.localDiscoverySignals, fmsg.msg.ChanID) + delete(f.localDiscoverySignals, msg.ChanID) f.localDiscoveryMtx.Unlock() } // First, we'll attempt to locate the channel whose funding workflow is // being finalized by this message. We go to the database rather than // our reservation map as we may have restarted, mid funding flow. - chanID := fmsg.msg.ChanID + chanID := msg.ChanID channel, err := f.cfg.FindChannel(chanID) if err != nil { fndgLog.Errorf("Unable to locate ChannelID(%v), cannot complete "+ @@ -2808,7 +2726,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { // need to create the next commitment state for the remote party. So // we'll insert that into the channel now before passing it along to // other sub-systems. - err = channel.InsertNextRevocation(fmsg.msg.NextPerCommitmentPoint) + err = channel.InsertNextRevocation(msg.NextPerCommitmentPoint) if err != nil { fndgLog.Errorf("unable to insert next commitment point: %v", err) return @@ -2831,11 +2749,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) { f.barrierMtx.Unlock() }() - if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil { + if err := peer.AddNewChannel(channel, f.quit); err != nil { fndgLog.Errorf("Unable to add new channel %v with peer %x: %v", channel.FundingOutpoint, - fmsg.peer.IdentityKey().SerializeCompressed(), - err) + peer.IdentityKey().SerializeCompressed(), err, + ) } } @@ -3355,40 +3273,29 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { } } -// processFundingError sends a message to the fundingManager allowing it to -// process the occurred generic error. -func (f *fundingManager) processFundingError(err *lnwire.Error, - peerKey *btcec.PublicKey) { - - select { - case f.fundingMsgs <- &fundingErrorMsg{err, peerKey}: - case <-f.quit: - return - } -} - // handleErrorMsg processes the error which was received from remote peer, // depending on the type of error we should do different clean up steps and // inform the user about it. -func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { - protocolErr := fmsg.err +func (f *fundingManager) handleErrorMsg(peer lnpeer.Peer, + msg *lnwire.Error) { - chanID := fmsg.err.ChanID + chanID := msg.ChanID + peerKey := peer.IdentityKey() // First, we'll attempt to retrieve and cancel the funding workflow // that this error was tied to. If we're unable to do so, then we'll // exit early as this was an unwarranted error. - resCtx, err := f.cancelReservationCtx(fmsg.peerKey, chanID, true) + resCtx, err := f.cancelReservationCtx(peerKey, chanID, true) if err != nil { fndgLog.Warnf("Received error for non-existent funding "+ - "flow: %v (%v)", err, protocolErr.Error()) + "flow: %v (%v)", err, msg.Error()) return } // If we did indeed find the funding workflow, then we'll return the // error back to the caller (if any), and cancel the workflow itself. fundingErr := fmt.Errorf("received funding error from %x: %v", - fmsg.peerKey.SerializeCompressed(), protocolErr.Error(), + peerKey.SerializeCompressed(), msg.Error(), ) fndgLog.Errorf(fundingErr.Error()) @@ -3536,9 +3443,9 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey, // channel will receive a new, permanent channel ID, and will no longer be // considered pending. func (f *fundingManager) IsPendingChannel(pendingChanID [32]byte, - peerKey *btcec.PublicKey) bool { + peer lnpeer.Peer) bool { - peerIDKey := newSerializedKey(peerKey) + peerIDKey := newSerializedKey(peer.IdentityKey()) f.resMtx.RLock() _, ok := f.activeReservations[peerIDKey][pendingChanID] f.resMtx.RUnlock() diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 41173203..114829b0 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -691,7 +691,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, } // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, alice) + bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice) // Bob should answer with an AcceptChannel message. acceptChannelResponse := assertFundingMsgSent( @@ -704,7 +704,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, assertNumPendingReservations(t, bob, alicePubKey, 1) // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) + alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob) // Alice responds with a FundingCreated message. fundingCreated := assertFundingMsgSent( @@ -712,7 +712,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.FundingCreated) // Give the message to Bob. - bob.fundingMgr.processFundingCreated(fundingCreated, alice) + bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice) // Finally, Bob should send the FundingSigned message. fundingSigned := assertFundingMsgSent( @@ -720,7 +720,7 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt, ).(*lnwire.FundingSigned) // Forward the signature to Alice. - alice.fundingMgr.processFundingSigned(fundingSigned, bob) + alice.fundingMgr.ProcessFundingMsg(fundingSigned, bob) // After Alice processes the singleFundingSignComplete message, she will // broadcast the funding transaction to the network. We expect to get a @@ -1250,8 +1250,8 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1378,8 +1378,8 @@ func TestFundingManagerRestartBehavior(t *testing.T) { } // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1540,8 +1540,8 @@ func TestFundingManagerOfflinePeer(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1678,7 +1678,7 @@ func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) { assertNumPendingReservations(t, alice, bobPubKey, 1) // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, alice) + bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice) // Bob should answer with an AcceptChannel. assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") @@ -1749,7 +1749,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { assertNumPendingReservations(t, alice, bobPubKey, 1) // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, alice) + bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice) // Bob should answer with an AcceptChannel. acceptChannelResponse := assertFundingMsgSent( @@ -1760,7 +1760,7 @@ func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { assertNumPendingReservations(t, bob, alicePubKey, 1) // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) + alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob) // Alice responds with a FundingCreated messages. assertFundingMsgSent(t, alice.msgChan, "FundingCreated") @@ -1948,9 +1948,9 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { waitForOpenUpdate(t, updateChan) // Send the fundingLocked message twice to Alice, and once to Bob. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -1967,7 +1967,7 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // Another fundingLocked should also be ignored, since Alice should // have updated her database at this point. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) select { case <-alice.newChannels: t.Fatalf("alice sent new channel to peer a second time") @@ -2056,8 +2056,8 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { recreateAliceFundingManager(t, alice) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -2128,10 +2128,10 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { assertFundingLockedSent(t, alice, bob, fundingOutPoint) // Let Alice immediately get the fundingLocked message. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) // Also let Bob get the fundingLocked message. - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -2220,8 +2220,8 @@ func TestFundingManagerPrivateChannel(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -2338,8 +2338,8 @@ func TestFundingManagerPrivateRestart(t *testing.T) { waitForOpenUpdate(t, updateChan) // Exchange the fundingLocked messages. - alice.fundingMgr.processFundingLocked(fundingLockedBob, bob) - bob.fundingMgr.processFundingLocked(fundingLockedAlice, alice) + alice.fundingMgr.ProcessFundingMsg(fundingLockedBob, bob) + bob.fundingMgr.ProcessFundingMsg(fundingLockedAlice, alice) // Check that they notify the breach arbiter and peer about the new // channel. @@ -2493,7 +2493,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { chanID := openChannelReq.PendingChannelID // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, alice) + bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice) // Bob should answer with an AcceptChannel message. acceptChannelResponse := assertFundingMsgSent( @@ -2521,7 +2521,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { } // Forward the response to Alice. - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) + alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob) // Alice responds with a FundingCreated message. fundingCreated := assertFundingMsgSent( @@ -2632,7 +2632,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { t.Fatal(err) } // Give the message to Bob. - bob.fundingMgr.processFundingCreated(fundingCreated, alice) + bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice) // Finally, Bob should send the FundingSigned message. fundingSigned := assertFundingMsgSent( @@ -2640,7 +2640,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) { ).(*lnwire.FundingSigned) // Forward the signature to Alice. - alice.fundingMgr.processFundingSigned(fundingSigned, bob) + alice.fundingMgr.ProcessFundingMsg(fundingSigned, bob) // After Alice processes the singleFundingSignComplete message, she will // broadcast the funding transaction to the network. We expect to get a @@ -2761,7 +2761,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) { } // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, alice) + bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice) // Bob should answer with an AcceptChannel message for the // first maxPending channels. @@ -2784,7 +2784,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) { // Forward the responses to Alice. var signs []*lnwire.FundingSigned for _, accept := range accepts { - alice.fundingMgr.processFundingAccept(accept, bob) + alice.fundingMgr.ProcessFundingMsg(accept, bob) // Alice responds with a FundingCreated message. fundingCreated := assertFundingMsgSent( @@ -2792,7 +2792,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) { ).(*lnwire.FundingCreated) // Give the message to Bob. - bob.fundingMgr.processFundingCreated(fundingCreated, alice) + bob.fundingMgr.ProcessFundingMsg(fundingCreated, alice) // Finally, Bob should send the FundingSigned message. fundingSigned := assertFundingMsgSent( @@ -2804,7 +2804,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) { // Sending another init request from Alice should still make Bob // respond with an error. - bob.fundingMgr.processFundingOpen(lastOpen, alice) + bob.fundingMgr.ProcessFundingMsg(lastOpen, alice) _ = assertFundingMsgSent( t, bob.msgChan, "Error", ).(*lnwire.Error) @@ -2812,7 +2812,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) { // Give the FundingSigned messages to Alice. var txs []*wire.MsgTx for i, sign := range signs { - alice.fundingMgr.processFundingSigned(sign, bob) + alice.fundingMgr.ProcessFundingMsg(sign, bob) // Alice should send a status update for each channel, and // publish a funding tx to the network. @@ -2840,7 +2840,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) { // Sending another init request from Alice should still make Bob // respond with an error, since the funding transactions are not // confirmed yet, - bob.fundingMgr.processFundingOpen(lastOpen, alice) + bob.fundingMgr.ProcessFundingMsg(lastOpen, alice) _ = assertFundingMsgSent( t, bob.msgChan, "Error", ).(*lnwire.Error) @@ -2866,7 +2866,7 @@ func TestFundingManagerMaxPendingChannels(t *testing.T) { } // Now opening another channel should work. - bob.fundingMgr.processFundingOpen(lastOpen, alice) + bob.fundingMgr.ProcessFundingMsg(lastOpen, alice) // Bob should answer with an AcceptChannel message. _ = assertFundingMsgSent( @@ -2925,7 +2925,7 @@ func TestFundingManagerRejectPush(t *testing.T) { } // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, alice) + bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice) // Assert Bob responded with an ErrNonZeroPushAmount error. err := assertFundingMsgSent(t, bob.msgChan, "Error").(*lnwire.Error) @@ -2982,7 +2982,7 @@ func TestFundingManagerMaxConfs(t *testing.T) { } // Let Bob handle the init message. - bob.fundingMgr.processFundingOpen(openChannelReq, alice) + bob.fundingMgr.ProcessFundingMsg(openChannelReq, alice) // Bob should answer with an AcceptChannel message. acceptChannelResponse := assertFundingMsgSent( @@ -2993,7 +2993,7 @@ func TestFundingManagerMaxConfs(t *testing.T) { // MinAcceptDepth Alice won't be willing to accept. acceptChannelResponse.MinAcceptDepth = chainntnfs.MaxNumConfs + 1 - alice.fundingMgr.processFundingAccept(acceptChannelResponse, bob) + alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob) // Alice should respond back with an error indicating MinAcceptDepth is // too large. @@ -3243,7 +3243,7 @@ func TestMaxChannelSizeConfig(t *testing.T) { // an error rejecting the channel that exceeds size limit. alice.fundingMgr.initFundingWorkflow(bob, initReq) openChanMsg := expectOpenChannelMsg(t, alice.msgChan) - bob.fundingMgr.processFundingOpen(openChanMsg, alice) + bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice) assertErrorSent(t, bob.msgChan) // Create a set of funding managers that will reject wumbo @@ -3258,7 +3258,7 @@ func TestMaxChannelSizeConfig(t *testing.T) { // We expect Bob to respond with an Accept channel message. alice.fundingMgr.initFundingWorkflow(bob, initReq) openChanMsg = expectOpenChannelMsg(t, alice.msgChan) - bob.fundingMgr.processFundingOpen(openChanMsg, alice) + bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice) assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") // Verify that wumbo accepting funding managers will respect --maxchansize @@ -3278,7 +3278,7 @@ func TestMaxChannelSizeConfig(t *testing.T) { // an error rejecting the channel that exceeds size limit. alice.fundingMgr.initFundingWorkflow(bob, initReq) openChanMsg = expectOpenChannelMsg(t, alice.msgChan) - bob.fundingMgr.processFundingOpen(openChanMsg, alice) + bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice) assertErrorSent(t, bob.msgChan) } @@ -3311,7 +3311,7 @@ func TestWumboChannelConfig(t *testing.T) { // We expect Bob to respond with an Accept channel message. alice.fundingMgr.initFundingWorkflow(bob, initReq) openChanMsg := expectOpenChannelMsg(t, alice.msgChan) - bob.fundingMgr.processFundingOpen(openChanMsg, alice) + bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice) assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") // We'll now attempt to create a channel above the wumbo mark, which @@ -3322,7 +3322,7 @@ func TestWumboChannelConfig(t *testing.T) { // an error rejecting the channel. alice.fundingMgr.initFundingWorkflow(bob, initReq) openChanMsg = expectOpenChannelMsg(t, alice.msgChan) - bob.fundingMgr.processFundingOpen(openChanMsg, alice) + bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice) assertErrorSent(t, bob.msgChan) // Next, we'll re-create the funding managers, but this time allowing @@ -3337,6 +3337,6 @@ func TestWumboChannelConfig(t *testing.T) { // issues. alice.fundingMgr.initFundingWorkflow(bob, initReq) openChanMsg = expectOpenChannelMsg(t, alice.msgChan) - bob.fundingMgr.processFundingOpen(openChanMsg, alice) + bob.fundingMgr.ProcessFundingMsg(openChanMsg, alice) assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") } diff --git a/peer/brontide.go b/peer/brontide.go index 0cf0b24d..c50f4eba 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -24,6 +24,7 @@ import ( "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/feature" + "github.com/lightningnetwork/lnd/fmgr" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hop" @@ -268,33 +269,8 @@ type Config struct { FetchLastChanUpdate func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) - // ProcessFundingOpen is used to hand off an OpenChannel message to the - // funding manager. - ProcessFundingOpen func(*lnwire.OpenChannel, lnpeer.Peer) - - // ProcessFundingAccept is used to hand off an AcceptChannel message to the - // funding manager. - ProcessFundingAccept func(*lnwire.AcceptChannel, lnpeer.Peer) - - // ProcessFundingCreated is used to hand off a FundingCreated message to - // the funding manager. - ProcessFundingCreated func(*lnwire.FundingCreated, lnpeer.Peer) - - // ProcessFundingSigned is used to hand off a FundingSigned message to the - // funding manager. - ProcessFundingSigned func(*lnwire.FundingSigned, lnpeer.Peer) - - // ProcessFundingLocked is used to hand off a FundingLocked message to the - // funding manager. - ProcessFundingLocked func(*lnwire.FundingLocked, lnpeer.Peer) - - // ProcessFundingError is used to hand off an Error message to the funding - // manager. - ProcessFundingError func(*lnwire.Error, *btcec.PublicKey) - - // IsPendingChannel is used to determine whether to send an Error message - // to the funding manager or not. - IsPendingChannel func([32]byte, *btcec.PublicKey) bool + // FundingManager is an implementation of the fmgr.Manager interface. + FundingManager fmgr.Manager // Hodl is used when creating ChannelLinks to specify HodlFlags as // breakpoints in dev builds. @@ -1336,16 +1312,13 @@ out: pongBytes := make([]byte, msg.NumPongBytes) p.queueMsg(lnwire.NewPong(pongBytes), nil) - case *lnwire.OpenChannel: - p.cfg.ProcessFundingOpen(msg, p) - case *lnwire.AcceptChannel: - p.cfg.ProcessFundingAccept(msg, p) - case *lnwire.FundingCreated: - p.cfg.ProcessFundingCreated(msg, p) - case *lnwire.FundingSigned: - p.cfg.ProcessFundingSigned(msg, p) - case *lnwire.FundingLocked: - p.cfg.ProcessFundingLocked(msg, p) + case *lnwire.OpenChannel, + *lnwire.AcceptChannel, + *lnwire.FundingCreated, + *lnwire.FundingSigned, + *lnwire.FundingLocked: + + p.cfg.FundingManager.ProcessFundingMsg(msg, p) case *lnwire.Shutdown: select { @@ -1483,8 +1456,6 @@ func (p *Brontide) storeError(err error) { // // NOTE: This method should only be called from within the readHandler. func (p *Brontide) handleError(msg *lnwire.Error) bool { - key := p.cfg.Addr.IdentityKey - // Store the error we have received. p.storeError(msg) @@ -1500,8 +1471,8 @@ func (p *Brontide) handleError(msg *lnwire.Error) bool { // If the channel ID for the error message corresponds to a pending // channel, then the funding manager will handle the error. - case p.cfg.IsPendingChannel(msg.ChanID, key): - p.cfg.ProcessFundingError(msg, key) + case p.cfg.FundingManager.IsPendingChannel(msg.ChanID, p): + p.cfg.FundingManager.ProcessFundingMsg(msg, p) return false // If not we hand the error to the channel link for this channel. diff --git a/server.go b/server.go index 34acc992..177d0892 100644 --- a/server.go +++ b/server.go @@ -2982,14 +2982,9 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, PrunePersistentPeerConnection: s.prunePersistentPeerConnection, - FetchLastChanUpdate: s.fetchLastChanUpdate(), - ProcessFundingOpen: s.fundingMgr.processFundingOpen, - ProcessFundingAccept: s.fundingMgr.processFundingAccept, - ProcessFundingCreated: s.fundingMgr.processFundingCreated, - ProcessFundingSigned: s.fundingMgr.processFundingSigned, - ProcessFundingLocked: s.fundingMgr.processFundingLocked, - ProcessFundingError: s.fundingMgr.processFundingError, - IsPendingChannel: s.fundingMgr.IsPendingChannel, + FetchLastChanUpdate: s.fetchLastChanUpdate(), + + FundingManager: s.fundingMgr, Hodl: s.cfg.Hodl, UnsafeReplay: s.cfg.UnsafeReplay,