fundingmanager: send messages to peers directly

In this commit, we modify the existing message sending functionality
within the fundingmanager. Due to each mesage send requiring to hold the
server's lock to retrieve the peer, we might run into a case where the
lock is held for a larger than usual amount of time and would therefore
block on sending the message within the fundingmanager. We remedy this
by taking a similar approach to some recent changes within the gossiper.
We now keep track of each peer within the internal fundingmanager
messages and send messages directly to them.
This commit is contained in:
Wilmer Paulino 2018-07-05 13:41:51 -07:00
parent e669e90017
commit 9cedef9245
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 117 additions and 152 deletions

@ -105,7 +105,7 @@ var (
// * deadlines, etc.
type reservationWithCtx struct {
reservation *lnwallet.ChannelReservation
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
chanAmt btcutil.Amount
@ -152,7 +152,7 @@ func (r *reservationWithCtx) updateTimestamp() {
// embedded within this message giving the funding manager full context w.r.t
// the workflow.
type initFundingMsg struct {
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
*openChanReq
}
@ -161,7 +161,7 @@ type initFundingMsg struct {
// the peer, progressing the funding workflow.
type fundingOpenMsg struct {
msg *lnwire.OpenChannel
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
}
// fundingAcceptMsg couples an lnwire.AcceptChannel message with the peer who
@ -169,7 +169,7 @@ type fundingOpenMsg struct {
// directly to the peer, progressing the funding workflow.
type fundingAcceptMsg struct {
msg *lnwire.AcceptChannel
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
}
// fundingCreatedMsg couples an lnwire.FundingCreated message with the peer who
@ -177,7 +177,7 @@ type fundingAcceptMsg struct {
// directly to the peer, progressing the funding workflow.
type fundingCreatedMsg struct {
msg *lnwire.FundingCreated
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
}
// fundingSignedMsg couples an lnwire.FundingSigned message with the peer who
@ -185,7 +185,7 @@ type fundingCreatedMsg struct {
// directly to the peer, progressing the funding workflow.
type fundingSignedMsg struct {
msg *lnwire.FundingSigned
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
}
// fundingLockedMsg couples an lnwire.FundingLocked message with the peer who
@ -193,7 +193,7 @@ type fundingSignedMsg struct {
// process and announce the existence of the new channel.
type fundingLockedMsg struct {
msg *lnwire.FundingLocked
peerAddress *lnwire.NetAddress
peer lnpeer.Peer
}
// fundingErrorMsg couples an lnwire.Error message with the peer who sent the
@ -270,10 +270,12 @@ type fundingConfig struct {
SendToPeer func(target *btcec.PublicKey, msgs ...lnwire.Message) error
// NotifyWhenOnline allows the FundingManager to register with a
// subsystem that will notify it when the peer comes online.
// This is used when sending the fundingLocked message, since it MUST be
// subsystem that will notify it when the peer comes online. This is
// used when sending the fundingLocked message, since it MUST be
// delivered after the funding transaction is confirmed.
NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{})
//
// NOTE: The peerChan channel must be buffered.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// FindPeer searches the list of peers connected to the node so that
// the FundingManager can notify other daemon subsystems as necessary
@ -827,13 +829,13 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) {
//
// TODO(roasbeef): if peer disconnects, and haven't yet broadcast funding
// transaction, then all reservations should be cleared.
func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
tempChanID [32]byte, fundingErr error) {
func (f *fundingManager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
fundingErr error) {
fndgLog.Debugf("Failing funding flow for pendingID=%x: %v",
tempChanID, fundingErr)
ctx, err := f.cancelReservationCtx(peer, tempChanID)
ctx, err := f.cancelReservationCtx(peer.IdentityKey(), tempChanID)
if err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
@ -868,8 +870,8 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
}
fndgLog.Debugf("Sending funding error to peer (%x): %v",
peer.SerializeCompressed(), spew.Sdump(errMsg))
if err := f.cfg.SendToPeer(peer, errMsg); err != nil {
peer.IdentityKey().SerializeCompressed(), spew.Sdump(errMsg))
if err := peer.SendMessage(false, errMsg); err != nil {
fndgLog.Errorf("unable to send error message to peer %v", err)
}
}
@ -950,10 +952,10 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
// processFundingOpen sends a message to the fundingManager allowing it to
// initiate the new funding workflow with the source peer.
func (f *fundingManager) processFundingOpen(msg *lnwire.OpenChannel,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingOpenMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingOpenMsg{msg, peer}:
case <-f.quit:
return
}
@ -969,7 +971,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// Check number of pending channels to be smaller than maximum allowed
// number and send ErrorGeneric to remote peer if condition is
// violated.
peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey)
peerIDKey := newSerializedKey(fmsg.peer.IdentityKey())
msg := fmsg.msg
amt := msg.FundingAmount
@ -980,8 +982,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
if len(f.activeReservations[peerIDKey]) >= cfg.MaxPendingChannels {
f.resMtx.RUnlock()
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
lnwire.ErrMaxPendingChannels)
fmsg.peer, fmsg.msg.PendingChannelID,
lnwire.ErrMaxPendingChannels,
)
return
}
f.resMtx.RUnlock()
@ -995,8 +998,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Errorf("unable to query wallet: %v", err)
}
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
lnwire.ErrSynchronizingChain)
fmsg.peer, fmsg.msg.PendingChannelID,
lnwire.ErrSynchronizingChain,
)
return
}
@ -1004,7 +1008,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// current soft-limit for channel size.
if msg.FundingAmount > maxFundingAmount {
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
fmsg.peer, fmsg.msg.PendingChannelID,
lnwire.ErrChanTooLarge,
)
return
@ -1014,7 +1018,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// a channel that's below our current min channel size.
if amt < f.cfg.MinChanSize {
f.failFundingFlow(
fmsg.peerAddress.IdentityKey, fmsg.msg.PendingChannelID,
fmsg.peer, fmsg.msg.PendingChannelID,
lnwallet.ErrChanTooSmall(amt, btcutil.Amount(f.cfg.MinChanSize)),
)
return
@ -1023,7 +1027,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+
"pendingId=%x) from peer(%x)", amt, msg.PushAmount,
msg.CsvDelay, msg.PendingChannelID,
fmsg.peerAddress.IdentityKey.SerializeCompressed())
fmsg.peer.IdentityKey().SerializeCompressed())
// Attempt to initialize a reservation within the wallet. If the wallet
// has insufficient resources to create the channel, then the
@ -1034,13 +1038,12 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
reservation, err := f.cfg.Wallet.InitChannelReservation(
amt, 0, msg.PushAmount,
lnwallet.SatPerKWeight(msg.FeePerKiloWeight), 0,
fmsg.peerAddress.IdentityKey, fmsg.peerAddress.Address,
fmsg.peer.IdentityKey(), fmsg.peer.Address(),
&chainHash, msg.ChannelFlags,
)
if err != nil {
fndgLog.Errorf("Unable to initialize reservation: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
@ -1060,9 +1063,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
)
if err != nil {
fndgLog.Errorf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
fmsg.msg.PendingChannelID, err,
)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err)
return
}
@ -1090,7 +1091,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlc,
err: make(chan error, 1),
peerAddress: fmsg.peerAddress,
peer: fmsg.peer,
}
f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx
f.resMtx.Unlock()
@ -1132,8 +1133,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
err = reservation.ProcessSingleContribution(remoteContribution)
if err != nil {
fndgLog.Errorf("unable to add contribution reservation: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
@ -1161,11 +1161,9 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
HtlcPoint: ourContribution.HtlcBasePoint.PubKey,
FirstCommitmentPoint: ourContribution.FirstCommitmentPoint,
}
err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, &fundingAccept)
if err != nil {
if err := fmsg.peer.SendMessage(false, &fundingAccept); err != nil {
fndgLog.Errorf("unable to send funding response to peer: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
}
@ -1173,10 +1171,10 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// processFundingAccept sends a message to the fundingManager allowing it to
// continue the second phase of a funding workflow with the target peer.
func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingAcceptMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingAcceptMsg{msg, peer}:
case <-f.quit:
return
}
@ -1188,7 +1186,7 @@ func (f *fundingManager) processFundingAccept(msg *lnwire.AcceptChannel,
func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
msg := fmsg.msg
pendingChanID := fmsg.msg.PendingChannelID
peerKey := fmsg.peerAddress.IdentityKey
peerKey := fmsg.peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil {
@ -1212,8 +1210,7 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
)
if err != nil {
fndgLog.Warnf("Unacceptable channel constraints: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
fmsg.msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, fmsg.msg.PendingChannelID, err)
return
}
@ -1259,9 +1256,8 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
err = resCtx.reservation.ProcessContribution(remoteContribution)
if err != nil {
fndgLog.Errorf("Unable to process contribution from %v: %v",
fmsg.peerAddress.IdentityKey, err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
peerKey, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
@ -1304,15 +1300,12 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
fundingCreated.CommitSig, err = lnwire.NewSigFromRawSignature(sig)
if err != nil {
fndgLog.Errorf("Unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated)
if err != nil {
if err := fmsg.peer.SendMessage(false, fundingCreated); err != nil {
fndgLog.Errorf("Unable to send funding complete message: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err)
f.failFundingFlow(fmsg.peer, msg.PendingChannelID, err)
return
}
}
@ -1320,10 +1313,10 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
// processFundingCreated queues a funding complete message coupled with the
// source peer to the fundingManager.
func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingCreatedMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingCreatedMsg{msg, peer}:
case <-f.quit:
return
}
@ -1334,7 +1327,7 @@ func (f *fundingManager) processFundingCreated(msg *lnwire.FundingCreated,
// processed, a signature is sent to the remote peer allowing it to broadcast
// the funding transaction, progressing the workflow into the final stage.
func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
peerKey := fmsg.peerAddress.IdentityKey
peerKey := fmsg.peer.IdentityKey()
pendingChanID := fmsg.msg.PendingChannelID
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
@ -1364,8 +1357,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc.
fndgLog.Errorf("unable to complete single reservation: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
return
}
@ -1405,8 +1397,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ourCommitSig, err := lnwire.NewSigFromRawSignature(sig)
if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
deleteFromDatabase()
return
}
@ -1415,10 +1406,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
ChanID: channelID,
CommitSig: ourCommitSig,
}
if err := f.cfg.SendToPeer(peerKey, fundingSigned); err != nil {
if err := fmsg.peer.SendMessage(false, fundingSigned); err != nil {
fndgLog.Errorf("unable to send FundingSigned message: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
deleteFromDatabase()
return
}
@ -1476,8 +1466,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
err := fmt.Errorf("timeout waiting for funding tx "+
"(%v) to confirm", completeChan.FundingOutpoint)
fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
deleteFromDatabase()
return
case <-f.quit:
@ -1496,8 +1485,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// Success, funding transaction was confirmed.
f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
err := f.handleFundingConfirmation(completeChan,
shortChanID)
err := f.handleFundingConfirmation(
fmsg.peer, completeChan, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
@ -1509,10 +1499,10 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// processFundingSigned sends a single funding sign complete message along with
// the source peer to the funding manager.
func (f *fundingManager) processFundingSigned(msg *lnwire.FundingSigned,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingSignedMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingSignedMsg{msg, peer}:
case <-f.quit:
return
}
@ -1535,20 +1525,17 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
err := fmt.Errorf("Unable to find signed reservation for "+
"chan_id=%x", fmsg.msg.ChanID)
fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
fmsg.msg.ChanID, err)
f.failFundingFlow(fmsg.peer, fmsg.msg.ChanID, err)
return
}
peerKey := fmsg.peerAddress.IdentityKey
resCtx, err := f.getReservationCtx(fmsg.peerAddress.IdentityKey,
pendingChanID)
peerKey := fmsg.peer.IdentityKey()
resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil {
fndgLog.Warnf("Unable to find reservation (peerID:%v, chanID:%x)",
peerKey, pendingChanID[:])
// TODO: add ErrChanNotFound?
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
return
}
@ -1568,8 +1555,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig)
if err != nil {
fndgLog.Errorf("Unable to complete reservation sign complete: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
f.failFundingFlow(fmsg.peer, pendingChanID, err)
return
}
@ -1583,7 +1569,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
}
fndgLog.Infof("Finalizing pendingID(%x) over ChannelPoint(%v), "+
"waiting for channel open on-chain", pendingChanID[:], fundingPoint)
"waiting for channel open on-chain", pendingChanID[:],
fundingPoint)
// Send an update to the upstream client that the negotiation process
// is over.
@ -1648,7 +1635,9 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
}
defer lnChannel.Stop()
err = f.sendFundingLocked(completeChan, lnChannel, shortChanID)
err = f.sendFundingLocked(
fmsg.peer, completeChan, lnChannel, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed sending fundingLocked: %v", err)
return
@ -1894,10 +1883,11 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
}
// handleFundingConfirmation is a wrapper method for creating a new
// lnwallet.LightningChannel object, calling sendFundingLocked, addToRouterGraph,
// and annAfterSixConfs. This is called after the funding transaction is
// confirmed.
func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel,
// lnwallet.LightningChannel object, calling sendFundingLocked,
// addToRouterGraph, and annAfterSixConfs. This is called after the funding
// transaction is confirmed.
func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer,
completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// We create the state-machine object which wraps the database state.
@ -1913,7 +1903,7 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC
fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID)
err = f.sendFundingLocked(completeChan, lnChannel, shortChanID)
err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID)
if err != nil {
return fmt.Errorf("failed sending fundingLocked: %v", err)
}
@ -1933,11 +1923,12 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC
// sendFundingLocked creates and sends the fundingLocked message.
// This should be called after the funding transaction has been confirmed,
// and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel,
func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID) error {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
peerKey := completeChan.IdentityPub
// Next, we'll send over the funding locked message which marks that we
// consider the channel open by presenting the remote party with our
@ -1962,29 +1953,26 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
// down.
for {
fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+
"peer %x", chanID,
completeChan.IdentityPub.SerializeCompressed())
"peer %x", chanID, peerKey.SerializeCompressed())
err = f.cfg.SendToPeer(completeChan.IdentityPub,
fundingLockedMsg)
if err == nil {
// Sending succeeded, we can break out and continue
// the funding flow.
if err := peer.SendMessage(false, fundingLockedMsg); err == nil {
// Sending succeeded, we can break out and continue the
// funding flow.
break
}
fndgLog.Warnf("unable to send fundingLocked to peer %x: "+
"%v. Will retry when online",
completeChan.IdentityPub.SerializeCompressed(), err)
fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+
"Will retry when online", peerKey.SerializeCompressed(),
err)
connected := make(chan struct{})
connected := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(completeChan.IdentityPub, connected)
select {
case <-connected:
fndgLog.Infof("Peer(%x) came back online, will retry "+
"sending FundingLocked for ChannelID(%v)",
completeChan.IdentityPub.SerializeCompressed(),
chanID)
peerKey.SerializeCompressed(), chanID)
// Retry sending.
case <-f.quit:
@ -2165,10 +2153,10 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
// processFundingLocked sends a message to the fundingManager allowing it to
// finish the funding workflow.
func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
peerAddress *lnwire.NetAddress) {
peer lnpeer.Peer) {
select {
case f.fundingMsgs <- &fundingLockedMsg{msg, peerAddress}:
case f.fundingMsgs <- &fundingLockedMsg{msg, peer}:
case <-f.quit:
return
}
@ -2180,7 +2168,7 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
defer f.wg.Done()
fndgLog.Debugf("Received FundingLocked for ChannelID(%v) from "+
"peer %x", fmsg.msg.ChanID,
fmsg.peerAddress.IdentityKey.SerializeCompressed())
fmsg.peer.IdentityKey().SerializeCompressed())
// If we are currently in the process of handling a funding locked
// message for this channel, ignore.
@ -2276,32 +2264,11 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
f.barrierMtx.Unlock()
}()
// Finally, we'll find the peer that sent us this message so we can
// provide it with the fully initialized channel state.
peer, err := f.cfg.FindPeer(fmsg.peerAddress.IdentityKey)
if err != nil {
fndgLog.Errorf("Unable to find peer: %v", err)
if err := fmsg.peer.AddNewChannel(channel, f.quit); err != nil {
fndgLog.Errorf("Unable to add new channel %v with peer %x: %v",
fmsg.peer.IdentityKey().SerializeCompressed(),
*channel.ChanPoint, err)
channel.Stop()
return
}
newChanDone := make(chan struct{})
newChanMsg := &newChannelMsg{
channel: channel,
done: newChanDone,
}
select {
case peer.newChannels <- newChanMsg:
case <-f.quit:
return
}
// We pause here to wait for the peer to recognize the new channel
// before we close the channel barrier corresponding to the channel.
select {
case <-f.quit:
return
case <-newChanDone: // Fallthrough if we're not quitting.
}
}
@ -2520,11 +2487,9 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
// initFundingWorkflow sends a message to the funding manager instructing it
// to initiate a single funder workflow with the source peer.
// TODO(roasbeef): re-visit blocking nature..
func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress,
req *openChanReq) {
func (f *fundingManager) initFundingWorkflow(peer lnpeer.Peer, req *openChanReq) {
f.fundingRequests <- &initFundingMsg{
peerAddress: peerAddress,
peer: peer,
openChanReq: req,
}
}
@ -2534,7 +2499,7 @@ func (f *fundingManager) initFundingWorkflow(peerAddress *lnwire.NetAddress,
// funding workflow.
func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
var (
peerKey = msg.peerAddress.IdentityKey
peerKey = msg.peer.IdentityKey()
localAmt = msg.localFundingAmt
remoteAmt = msg.remoteFundingAmt
capacity = localAmt + remoteAmt
@ -2553,8 +2518,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+
"capacity=%v, chainhash=%v, addr=%v, dustLimit=%v)", localAmt,
msg.pushAmt, capacity, msg.chainHash, msg.peerAddress.Address,
ourDustLimit)
msg.pushAmt, capacity, msg.chainHash, peerKey, ourDustLimit)
// First, we'll query the fee estimator for a fee that should get the
// commitment transaction confirmed by the next few blocks (conf target
@ -2590,7 +2554,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
// request will fail, and be aborted.
reservation, err := f.cfg.Wallet.InitChannelReservation(
capacity, localAmt, msg.pushAmt, commitFeePerKw,
msg.fundingFeePerVSize, peerKey, msg.peerAddress.Address,
msg.fundingFeePerVSize, peerKey, msg.peer.Address(),
&msg.chainHash, channelFlags,
)
if err != nil {
@ -2631,7 +2595,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlc,
reservation: reservation,
peerAddress: msg.peerAddress,
peer: msg.peer,
updates: msg.updates,
err: msg.err,
}
@ -2653,7 +2617,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
maxHtlcs := f.cfg.RequiredRemoteMaxHTLCs(capacity)
fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)",
msg.peerAddress.Address, chanID)
msg.peer.Address(), chanID)
fundingOpen := lnwire.OpenChannel{
ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash,
@ -2675,7 +2639,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
FirstCommitmentPoint: ourContribution.FirstCommitmentPoint,
ChannelFlags: channelFlags,
}
if err := f.cfg.SendToPeer(peerKey, &fundingOpen); err != nil {
if err := msg.peer.SendMessage(false, &fundingOpen); err != nil {
e := fmt.Errorf("Unable to send funding request message: %v",
err)
fndgLog.Errorf(e.Error())
@ -2786,10 +2750,11 @@ func (f *fundingManager) pruneZombieReservations() {
f.resMtx.RUnlock()
for pendingChanID, resCtx := range zombieReservations {
err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+
"chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:])
err := fmt.Errorf("reservation timed out waiting for peer "+
"(peerID:%v, chanID:%x)", resCtx.peer.IdentityKey(),
pendingChanID[:])
fndgLog.Warnf(err.Error())
f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err)
f.failFundingFlow(resCtx.peer, pendingChanID, err)
}
}

10
peer.go

@ -911,15 +911,15 @@ out:
p.queueMsg(lnwire.NewPong(pongBytes), nil)
case *lnwire.OpenChannel:
p.server.fundingMgr.processFundingOpen(msg, p.addr)
p.server.fundingMgr.processFundingOpen(msg, p)
case *lnwire.AcceptChannel:
p.server.fundingMgr.processFundingAccept(msg, p.addr)
p.server.fundingMgr.processFundingAccept(msg, p)
case *lnwire.FundingCreated:
p.server.fundingMgr.processFundingCreated(msg, p.addr)
p.server.fundingMgr.processFundingCreated(msg, p)
case *lnwire.FundingSigned:
p.server.fundingMgr.processFundingSigned(msg, p.addr)
p.server.fundingMgr.processFundingSigned(msg, p)
case *lnwire.FundingLocked:
p.server.fundingMgr.processFundingLocked(msg, p.addr)
p.server.fundingMgr.processFundingLocked(msg, p)
case *lnwire.Shutdown:
select {

@ -2520,7 +2520,7 @@ func (s *server) OpenChannel(nodeKey *btcec.PublicKey,
// TODO(roasbeef): pass in chan that's closed if/when funding succeeds
// so can track as persistent peer?
go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req)
go s.fundingMgr.initFundingWorkflow(targetPeer, req)
return updateChan, errChan
}