fundingmanager: move final funding steps from wallet to funding manager.

Once a channel funding process has advanced to the point of broadcasting
the funding transaction, the state of the channel should be persisted
so that the nodes can disconnect or go down without having to wait for the
funding transaction to be confirmed on the blockchain.

Previously, the finalization of the funding process was handled by a
combination of the funding manager, the peer and the wallet, but if
the remote peer is no longer online or no longer connected, this flow
will no longer work. This commit moves all funding steps following
the transaction broadcast into the funding manager, which is available
as long as the daemon is running.
This commit is contained in:
bryanvu 2017-01-23 18:19:54 -08:00 committed by Olaoluwa Osuntokun
parent 508138f8c8
commit e549a3f0ed
10 changed files with 402 additions and 512 deletions

@ -9,6 +9,8 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -83,11 +85,11 @@ type fundingSignCompleteMsg struct {
peerAddress *lnwire.NetAddress peerAddress *lnwire.NetAddress
} }
// fundingOpenMsg couples an lnwire.SingleFundingOpenProof message // fundingLockedMsg couples an lnwire.FundingLocked message with the peer who
// with the peer who sent the message. This allows the funding manager to // sent the message. This allows the funding manager to finalize the funding
// queue a response directly to the peer, progressing the funding workflow. // process and announce the existence of the new channel.
type fundingOpenMsg struct { type fundingLockedMsg struct {
msg *lnwire.SingleFundingOpenProof msg *lnwire.FundingLocked
peerAddress *lnwire.NetAddress peerAddress *lnwire.NetAddress
} }
@ -99,11 +101,6 @@ type fundingErrorMsg struct {
peerAddress *lnwire.NetAddress peerAddress *lnwire.NetAddress
} }
type newChannelReq struct {
channel *lnwallet.LightningChannel
done chan struct{}
}
// pendingChannels is a map instantiated per-peer which tracks all active // pendingChannels is a map instantiated per-peer which tracks all active
// pending single funded channels indexed by their pending channel identifier. // pending single funded channels indexed by their pending channel identifier.
type pendingChannels map[uint64]*reservationWithCtx type pendingChannels map[uint64]*reservationWithCtx
@ -125,6 +122,10 @@ func newSerializedKey(pubKey *btcec.PublicKey) serializedPubKey {
// within the configuration MUST be non-nil for the FundingManager to carry out // within the configuration MUST be non-nil for the FundingManager to carry out
// its duties. // its duties.
type fundingConfig struct { type fundingConfig struct {
// IDKey is the PublicKey that is used to identify this node within the
// Lightning Network.
IDKey *btcec.PublicKey
// Wallet handles the parts of the funding process that involves moving // Wallet handles the parts of the funding process that involves moving
// funds from on-chain transaction outputs into Lightning channels. // funds from on-chain transaction outputs into Lightning channels.
Wallet *lnwallet.LightningWallet Wallet *lnwallet.LightningWallet
@ -135,6 +136,15 @@ type fundingConfig struct {
// commitment transaction. // commitment transaction.
ArbiterChan chan<- *lnwallet.LightningChannel ArbiterChan chan<- *lnwallet.LightningChannel
// Notifier is used by the FundingManager to determine when the
// channel's funding transaction has been confirmed on the blockchain
// so that the channel creation process can be completed.
Notifier chainntnfs.ChainNotifier
// ProcessRoutingMessage is used by the FundingManager to announce newly created
// channels to the rest of the Lightning Network.
ProcessRoutingMessage func(msg lnwire.Message, src *btcec.PublicKey)
// SendToPeer allows the FundingManager to send messages to the peer // SendToPeer allows the FundingManager to send messages to the peer
// node during the multiple steps involved in the creation of the // node during the multiple steps involved in the creation of the
// channel's funding transaction and initial commitment transaction. // channel's funding transaction and initial commitment transaction.
@ -144,6 +154,10 @@ type fundingConfig struct {
// the FundingManager can notify other daemon subsystems as necessary // the FundingManager can notify other daemon subsystems as necessary
// during the funding process. // during the funding process.
FindPeer func(peerKey *btcec.PublicKey) (*peer, error) FindPeer func(peerKey *btcec.PublicKey) (*peer, error)
// FindChannel queries the database for the channel with the given
// funding transaction outpoint.
FindChannel func(chanPoint wire.OutPoint) (*lnwallet.LightningChannel, error)
} }
// fundingManager acts as an orchestrator/bridge between the wallet's // fundingManager acts as an orchestrator/bridge between the wallet's
@ -231,6 +245,30 @@ func (f *fundingManager) Start() error {
fndgLog.Tracef("Funding manager running") fndgLog.Tracef("Funding manager running")
// Upon restart, the Funding Manager will check the database to load any
// channels that were waiting for their funding transactions to be
// confirmed on the blockchain at the time when the daemon last went
// down.
pendingChannels, err := f.cfg.Wallet.ChannelDB.FetchPendingChannels()
if err != nil {
return err
}
// For any channels that were in a pending state when the daemon was last
// connected, the Funding Manager will re-initialize the channel barriers
// and will also launch waitForFundingConfirmation to wait for the
// channel's funding transaction to be confirmed on the blockchain.
for _, channel := range pendingChannels {
f.barrierMtx.Lock()
fndgLog.Tracef("Creating chan barrier for "+
"ChannelPoint(%v)", *channel.FundingOutpoint)
f.newChanBarriers[*channel.FundingOutpoint] = make(chan struct{})
f.barrierMtx.Unlock()
doneChan := make(chan struct{})
go f.waitForFundingConfirmation(channel, doneChan)
}
f.wg.Add(1) // TODO(roasbeef): tune f.wg.Add(1) // TODO(roasbeef): tune
go f.reservationCoordinator() go f.reservationCoordinator()
@ -319,8 +357,8 @@ func (f *fundingManager) reservationCoordinator() {
f.handleFundingComplete(fmsg) f.handleFundingComplete(fmsg)
case *fundingSignCompleteMsg: case *fundingSignCompleteMsg:
f.handleFundingSignComplete(fmsg) f.handleFundingSignComplete(fmsg)
case *fundingOpenMsg: case *fundingLockedMsg:
f.handleFundingOpen(fmsg) f.handleFundingLocked(fmsg)
case *fundingErrorMsg: case *fundingErrorMsg:
f.handleErrorGenericMsg(fmsg) f.handleErrorGenericMsg(fmsg)
} }
@ -364,22 +402,6 @@ func (f *fundingManager) handleNumPending(msg *numPendingReq) {
// workflow (funding txn confirmation). // workflow (funding txn confirmation).
func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
var pendingChannels []*pendingChannel var pendingChannels []*pendingChannel
for _, peerChannels := range f.activeReservations {
for _, pendingChan := range peerChannels {
res := pendingChan.reservation
localFund := res.OurContribution().FundingAmount
remoteFund := res.TheirContribution().FundingAmount
pendingChan := &pendingChannel{
identityPub: pendingChan.peerAddress.IdentityKey,
channelPoint: res.FundingOutpoint(),
capacity: localFund + remoteFund,
localBalance: localFund,
remoteBalance: remoteFund,
}
pendingChannels = append(pendingChannels, pendingChan)
}
}
dbPendingChannels, err := f.cfg.Wallet.ChannelDB.FetchPendingChannels() dbPendingChannels, err := f.cfg.Wallet.ChannelDB.FetchPendingChannels()
if err != nil { if err != nil {
@ -484,8 +506,9 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
// TODO(roasbeef): assuming this was an inbound connection, replace // TODO(roasbeef): assuming this was an inbound connection, replace
// port with default advertised port // port with default advertised port
reservation, err := f.cfg.Wallet.InitChannelReservation(amt, 0, reservation, err := f.cfg.Wallet.InitChannelReservation(amt, 0,
fmsg.peerAddress.IdentityKey, fmsg.peerAddress.Address, 1, delay, fmsg.peerAddress.IdentityKey, fmsg.peerAddress.Address,
ourDustLimit, msg.PushSatoshis) uint16(fmsg.msg.ConfirmationDepth), delay, ourDustLimit,
msg.PushSatoshis)
if err != nil { if err != nil {
// TODO(roasbeef): push ErrorGeneric message // TODO(roasbeef): push ErrorGeneric message
fndgLog.Errorf("Unable to initialize reservation: %v", err) fndgLog.Errorf("Unable to initialize reservation: %v", err)
@ -550,7 +573,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
fundingResp := lnwire.NewSingleFundingResponse(msg.ChannelID, fundingResp := lnwire.NewSingleFundingResponse(msg.ChannelID,
ourContribution.RevocationKey, ourContribution.CommitKey, ourContribution.RevocationKey, ourContribution.CommitKey,
ourContribution.MultiSigKey, ourContribution.CsvDelay, ourContribution.MultiSigKey, ourContribution.CsvDelay,
deliveryScript, ourDustLimit) deliveryScript, ourDustLimit, msg.ConfirmationDepth)
if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingResp); err != nil { if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingResp); err != nil {
fndgLog.Errorf("unable to send funding response to peer: %v", err) fndgLog.Errorf("unable to send funding response to peer: %v", err)
@ -704,8 +727,8 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
// With all the necessary data available, attempt to advance the // With all the necessary data available, attempt to advance the
// funding workflow to the next stage. If this succeeds then the // funding workflow to the next stage. If this succeeds then the
// funding transaction will broadcast after our next message. // funding transaction will broadcast after our next message.
err = resCtx.reservation.CompleteReservationSingle(revokeKey, &fundingOut, completeChan, err := resCtx.reservation.CompleteReservationSingle(
commitSig, obsfucator) revokeKey, &fundingOut, commitSig, obsfucator)
if err != nil { if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc. // TODO(roasbeef): better error logging: peerID, channelID, etc.
fndgLog.Errorf("unable to complete single reservation: %v", err) fndgLog.Errorf("unable to complete single reservation: %v", err)
@ -744,6 +767,14 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
cancelReservation() cancelReservation()
return return
} }
go func() {
doneChan := make(chan struct{})
go f.waitForFundingConfirmation(completeChan, doneChan)
<-doneChan
f.deleteReservationCtx(peerKey, fmsg.msg.ChannelID)
}()
} }
// processFundingSignComplete sends a single funding sign complete message // processFundingSignComplete sends a single funding sign complete message
@ -753,6 +784,193 @@ func (f *fundingManager) processFundingSignComplete(msg *lnwire.SingleFundingSig
f.fundingMsgs <- &fundingSignCompleteMsg{msg, peerAddress} f.fundingMsgs <- &fundingSignCompleteMsg{msg, peerAddress}
} }
// handleFundingSignComplete processes the final message received in a single
// funder workflow. Once this message is processed, the funding transaction is
// broadcast. Once the funding transaction reaches a sufficient number of
// confirmations, a message is sent to the responding peer along with a compact
// encoding of the location of the channel within the blockchain.
func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) {
chanID := fmsg.msg.ChannelID
peerKey := fmsg.peerAddress.IdentityKey
resCtx, err := f.getReservationCtx(peerKey, chanID)
if err != nil {
fndgLog.Warnf("can't find reservation (peerID:%v, chanID:%v)",
peerKey, chanID)
return
}
// The remote peer has responded with a signature for our commitment
// transaction. We'll verify the signature for validity, then commit
// the state to disk as we can now open the channel.
commitSig := fmsg.msg.CommitSignature.Serialize()
completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig)
if err != nil {
fndgLog.Errorf("unable to complete reservation sign complete: %v", err)
resCtx.err <- err
return
}
fundingPoint := resCtx.reservation.FundingOutpoint()
fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+
"waiting for channel open on-chain", chanID, fundingPoint)
// Send an update to the upstream client that the negotiation process
// is over.
// TODO(roasbeef): add abstraction over updates to accommodate
// long-polling, or SSE, etc.
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanPending{
ChanPending: &lnrpc.PendingUpdate{
Txid: fundingPoint.Hash[:],
OutputIndex: fundingPoint.Index,
},
},
}
go func() {
doneChan := make(chan struct{})
go f.waitForFundingConfirmation(completeChan, doneChan)
<-doneChan
// Finally give the caller a final update notifying them that
// the channel is now open.
// TODO(roasbeef): helper funcs for proto construction
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: &lnrpc.ChannelPoint{
FundingTxid: fundingPoint.Hash[:],
OutputIndex: fundingPoint.Index,
},
},
},
}
f.deleteReservationCtx(peerKey, fmsg.msg.ChannelID)
}()
}
// waitForFundingConfirmation handles the final stages of the channel funding
// process once the funding transaction has been broadcast. The primary
// function of waitForFundingConfirmation is to wait for blockchain
// confirmation, and then to notify the other systems that must be notified
// when a channel has become active for lightning transactions.
func (f *fundingManager) waitForFundingConfirmation(
completeChan *channeldb.OpenChannel, doneChan chan struct{}) {
// Register with the ChainNotifier for a notification once the funding
// transaction reaches `numConfs` confirmations.
txid := completeChan.FundingOutpoint.Hash
numConfs := uint32(completeChan.NumConfsRequired)
confNtfn, _ := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid, numConfs)
fndgLog.Infof("Waiting for funding tx (%v) to reach %v confirmations",
txid, numConfs)
// Wait until the specified number of confirmations has been reached,
// or the wallet signals a shutdown.
confDetails := <-confNtfn.Confirmed
fundingPoint := *completeChan.FundingOutpoint
fndgLog.Infof("ChannelPoint(%v) is now active",
fundingPoint)
completeChan.IsPending = false
err := f.cfg.Wallet.ChannelDB.MarkChannelAsOpen(&fundingPoint)
if err != nil {
fndgLog.Errorf("error setting channel pending flag to false: "+
"%v", err)
return
}
// Finally, create and officially open the payment channel!
// TODO(roasbeef): CreationTime once tx is 'open'
channel, err := lnwallet.NewLightningChannel(f.cfg.Wallet.Signer,
f.cfg.Notifier, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning channel: %v", err)
return
}
// Now that the channel is open, we need to notify a number of
// parties of this event.
// First we send the newly opened channel to the source server
peer, err := f.cfg.FindPeer(completeChan.IdentityPub)
if err != nil {
fndgLog.Errorf("Unable to find peer: %v", err)
return
}
newChanDone := make(chan struct{})
newChanMsg := &newChannelMsg{
channel: channel,
done: newChanDone,
}
peer.newChannels <- newChanMsg
<-newChanDone
// Close the active channel barrier signalling the
// readHandler that commitment related modifications to
// this channel can now proceed.
f.barrierMtx.Lock()
fndgLog.Tracef("Closing chan barrier for ChannelPoint(%v)", fundingPoint)
close(f.newChanBarriers[fundingPoint])
delete(f.newChanBarriers, fundingPoint)
f.barrierMtx.Unlock()
// Afterwards we send the breach arbiter the new channel so it
// can watch for attempts to breach the channel's contract by
// the remote party.
f.cfg.ArbiterChan <- channel
// With the block height and the transaction index known, we
// can construct the compact chainID which is used on the
// network to unique identify channels.
chanID := lnwire.ChannelID{
BlockHeight: confDetails.BlockHeight,
TxIndex: confDetails.TxIndex,
TxPosition: uint16(fundingPoint.Index),
}
// When the funding transaction has been confirmed, the FundingLocked
// message is sent to the remote peer so that the existence of the
// channel can be announced to the network.
fundingLockedMsg := lnwire.NewFundingLocked(fundingPoint, chanID,
f.cfg.IDKey)
f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg)
close(doneChan)
return
}
// processFundingLocked sends a message to the fundingManager allowing it to finish
// the funding workflow.
func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
peerAddress *lnwire.NetAddress) {
f.fundingMsgs <- &fundingLockedMsg{msg, peerAddress}
}
// handleFundingLocked finalizes the channel funding process and enables the channel
// to enter normal operating mode.
func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
fundingPoint := fmsg.msg.ChannelOutpoint
channel, err := f.cfg.FindChannel(fundingPoint)
if err != nil {
fndgLog.Errorf("error looking up channel for outpoint: %v", fundingPoint)
return
}
// Register the new link with the L3 routing manager so this
// new channel can be utilized during path
// finding.
f.announceChannel(f.cfg.IDKey, fmsg.peerAddress.IdentityKey, channel,
fmsg.msg.ChannelID, f.fakeProof, f.fakeProof)
}
// channelProof is one half of the proof necessary to create an authenticated // channelProof is one half of the proof necessary to create an authenticated
// announcement on the network. The two signatures individually sign a // announcement on the network. The two signatures individually sign a
// statement of the existence of a channel. // statement of the existence of a channel.
@ -775,17 +993,10 @@ type chanAnnouncement struct {
// identity pub keys of both parties to the channel, and the second segment is // identity pub keys of both parties to the channel, and the second segment is
// authenticated only by us and contains our directional routing policy for the // authenticated only by us and contains our directional routing policy for the
// channel. // channel.
func newChanAnnouncement(localIdentity *btcec.PublicKey, func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey,
channel *lnwallet.LightningChannel, chanID lnwire.ChannelID, channel *lnwallet.LightningChannel, chanID lnwire.ChannelID,
localProof, remoteProof *channelProof) *chanAnnouncement { localProof, remoteProof *channelProof) *chanAnnouncement {
// First obtain the remote party's identity public key, this will be
// used to determine the order of the keys and signatures in the
// channel announcement.
chanInfo := channel.StateSnapshot()
remotePub := chanInfo.RemoteIdentity
localPub := localIdentity
// The unconditional section of the announcement is the ChannelID // The unconditional section of the announcement is the ChannelID
// itself which compactly encodes the location of the funding output // itself which compactly encodes the location of the funding output
// within the blockchain. // within the blockchain.
@ -806,8 +1017,8 @@ func newChanAnnouncement(localIdentity *btcec.PublicKey,
selfBytes := localIdentity.SerializeCompressed() selfBytes := localIdentity.SerializeCompressed()
remoteBytes := remotePub.SerializeCompressed() remoteBytes := remotePub.SerializeCompressed()
if bytes.Compare(selfBytes, remoteBytes) == -1 { if bytes.Compare(selfBytes, remoteBytes) == -1 {
chanAnn.FirstNodeID = localPub chanAnn.FirstNodeID = localIdentity
chanAnn.SecondNodeID = &remotePub chanAnn.SecondNodeID = remotePub
chanAnn.FirstNodeSig = localProof.nodeSig chanAnn.FirstNodeSig = localProof.nodeSig
chanAnn.SecondNodeSig = remoteProof.nodeSig chanAnn.SecondNodeSig = remoteProof.nodeSig
chanAnn.FirstBitcoinSig = localProof.nodeSig chanAnn.FirstBitcoinSig = localProof.nodeSig
@ -819,8 +1030,8 @@ func newChanAnnouncement(localIdentity *btcec.PublicKey,
// indicate the "direction" of the update. // indicate the "direction" of the update.
chanFlags = 0 chanFlags = 0
} else { } else {
chanAnn.FirstNodeID = &remotePub chanAnn.FirstNodeID = remotePub
chanAnn.SecondNodeID = localPub chanAnn.SecondNodeID = localIdentity
chanAnn.FirstNodeSig = remoteProof.nodeSig chanAnn.FirstNodeSig = remoteProof.nodeSig
chanAnn.SecondNodeSig = localProof.nodeSig chanAnn.SecondNodeSig = localProof.nodeSig
chanAnn.FirstBitcoinSig = remoteProof.nodeSig chanAnn.FirstBitcoinSig = remoteProof.nodeSig
@ -851,253 +1062,22 @@ func newChanAnnouncement(localIdentity *btcec.PublicKey,
} }
} }
// handleFundingSignComplete processes the final message received in a single
// funder workflow. Once this message is processed, the funding transaction is
// broadcast. Once the funding transaction reaches a sufficient number of
// confirmations, a message is sent to the responding peer along with a compact
// encoding of the location of the channel within the blockchain.
func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) {
chanID := fmsg.msg.ChannelID
peerKey := fmsg.peerAddress.IdentityKey
resCtx, err := f.getReservationCtx(peerKey, chanID)
if err != nil {
fndgLog.Warnf("can't find reservation (peerID:%v, chanID:%v)",
peerKey, chanID)
return
}
// The remote peer has responded with a signature for our commitment
// transaction. We'll verify the signature for validity, then commit
// the state to disk as we can now open the channel.
commitSig := fmsg.msg.CommitSignature.Serialize()
if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil {
fndgLog.Errorf("unable to complete reservation sign complete: %v", err)
resCtx.err <- err
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
return
}
fundingPoint := resCtx.reservation.FundingOutpoint()
fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+
"waiting for channel open on-chain", chanID, fundingPoint)
// Send an update to the upstream client that the negotiation process
// is over.
// TODO(roasbeef): add abstraction over updates to accommodate
// long-polling, or SSE, etc.
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanPending{
ChanPending: &lnrpc.PendingUpdate{
Txid: fundingPoint.Hash[:],
},
},
}
// Spawn a goroutine which will send the newly open channel to the
// source peer once the channel is open. A channel is considered "open"
// once it reaches a sufficient number of confirmations.
// TODO(roasbeef): semaphore to limit active chan open goroutines
go func() {
// As this is the final step in the life-time of a single
// funder channel workflow, ensure that the reservation's state
// is cleaned up.
defer f.deleteReservationCtx(peerKey, chanID)
// TODO(roasbeef): need to persist pending broadcast channels,
// send chan open proof during scan of blocks mined while down.
openChanDetails, err := resCtx.reservation.DispatchChan()
if err != nil {
fndgLog.Errorf("Unable to dispatch "+
"ChannelPoint(%v): %v", fundingPoint, err)
return
}
// This reservation is no longer pending as the funding
// transaction has been fully confirmed.
f.deleteReservationCtx(peerKey, chanID)
fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active",
fundingPoint, peerKey)
// Now that the channel is open, we need to notify a number of
// parties of this event.
// First we send the newly opened channel to the source server
// peer.
peer, err := f.cfg.FindPeer(peerKey)
if err != nil {
fndgLog.Errorf("Error finding peer: %v", err)
return
}
newChanDone := make(chan struct{})
newChanReq := &newChannelReq{
channel: openChanDetails.Channel,
done: newChanDone,
}
peer.newChannels <- newChanReq
<-newChanDone
// Close the active channel barrier signalling the readHandler
// that commitment related modifications to this channel can
// now proceed.
f.barrierMtx.Lock()
fndgLog.Debugf("Closing chan barrier for ChannelPoint(%v)", fundingPoint)
close(f.newChanBarriers[*fundingPoint])
delete(f.newChanBarriers, *fundingPoint)
f.barrierMtx.Unlock()
// Afterwards we send the breach arbiter the new channel so it
// can watch for attempts to breach the channel's contract by
// the remote party.
f.cfg.ArbiterChan <- openChanDetails.Channel
// With the block height and the transaction index known, we
// can construct the compact chainID which is used on the
// network to unique identify channels.
chainID := lnwire.ChannelID{
BlockHeight: openChanDetails.ConfirmationHeight,
TxIndex: openChanDetails.TransactionIndex,
TxPosition: uint16(fundingPoint.Index),
}
// Next, we queue a message to notify the remote peer that the
// channel is open. We additionally provide the compact
// channelID so they can advertise the channel.
fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, chainID)
if err := f.cfg.SendToPeer(peerKey, fundingOpen); err != nil {
fndgLog.Errorf("unable to send fundingOpen message %v", err)
resCtx.err <- err
return
}
// Register the new link with the L3 routing manager so this
// new channel can be utilized during path
// finding.
// TODO(roasbeef): should include sigs from funding
// locked
// * should be moved to after funding locked is recv'd
f.announceChannel(peer.server, openChanDetails.Channel, chainID, f.fakeProof,
f.fakeProof)
// Finally give the caller a final update notifying them that
// the channel is now open.
// TODO(roasbeef): helper funcs for proto construction
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: &lnrpc.ChannelPoint{
FundingTxid: fundingPoint.Hash[:],
OutputIndex: fundingPoint.Index,
},
},
},
}
return
}()
}
// announceChannel announces a newly created channel to the rest of the network // announceChannel announces a newly created channel to the rest of the network
// by crafting the two authenticated announcements required for the peers on the // by crafting the two authenticated announcements required for the peers on the
// network to recognize the legitimacy of the channel. The crafted // network to recognize the legitimacy of the channel. The crafted
// announcements are then send to the channel router to handle broadcasting to // announcements are then send to the channel router to handle broadcasting to
// the network during its next trickle. // the network during its next trickle.
func (f *fundingManager) announceChannel(s *server, func (f *fundingManager) announceChannel(idKey, remoteIdKey *btcec.PublicKey,
channel *lnwallet.LightningChannel, chanID lnwire.ChannelID, channel *lnwallet.LightningChannel, chanID lnwire.ChannelID, localProof,
localProof, remoteProof *channelProof) { remoteProof *channelProof) {
// TODO(roasbeef): need a Signer.SignMessage method to finalize // TODO(roasbeef): need a Signer.SignMessage method to finalize
// advertisements // advertisements
localIdentity := s.identityPriv.PubKey() chanAnnouncement := newChanAnnouncement(idKey, remoteIdKey, channel, chanID,
chanAnnouncement := newChanAnnouncement(localIdentity, channel, localProof, remoteProof)
chanID, localProof, remoteProof)
s.chanRouter.ProcessRoutingMessage(chanAnnouncement.chanAnn, localIdentity) f.cfg.ProcessRoutingMessage(chanAnnouncement.chanAnn, idKey)
s.chanRouter.ProcessRoutingMessage(chanAnnouncement.edgeUpdate, localIdentity) f.cfg.ProcessRoutingMessage(chanAnnouncement.edgeUpdate, idKey)
}
// processFundingOpenProof sends a message to the fundingManager allowing it
// to process the final message received when the daemon is on the responding
// side of a single funder channel workflow.
func (f *fundingManager) processFundingOpenProof(msg *lnwire.SingleFundingOpenProof,
peerAddress *lnwire.NetAddress) {
f.fundingMsgs <- &fundingOpenMsg{msg, peerAddress}
}
// handleFundingOpen processes the final message when the daemon is the
// responder to a single funder channel workflow.
func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
chanID := fmsg.msg.ChannelID
peerKey := fmsg.peerAddress.IdentityKey
resCtx, err := f.getReservationCtx(peerKey, chanID)
if err != nil {
fndgLog.Warnf("can't find reservation (peerID:%v, chanID:%v)",
peerKey, chanID)
return
}
// The channel initiator has claimed the channel is now open, so we'll
// verify the contained SPV proof for validity.
// TODO(roasbeef): send off to the spv proof verifier, in the routing
// submodule.
// Now that we've verified the initiator's proof, we'll commit the
// channel state to disk, and notify the source peer of a newly opened
// channel.
openChan, err := resCtx.reservation.FinalizeReservation()
if err != nil {
fndgLog.Errorf("unable to finalize reservation: %v", err)
return
}
// The reservation has been completed, therefore we can stop tracking
// it within our active reservations map.
f.deleteReservationCtx(peerKey, chanID)
fndgLog.Infof("FundingOpen: ChannelPoint(%v) with peerKey(%v) is now open",
resCtx.reservation.FundingOutpoint(), peerKey)
// Notify the L3 routing manager of the newly active channel link.
// TODO(roasbeef): should have sigs, only after funding_locked is
// recv'd
// * also ensure fault tolerance, scan opened chan on start up check
// for graph existence
peer, err := f.cfg.FindPeer(peerKey)
if err != nil {
fndgLog.Errorf("Error finding peer: %v", err)
return
}
f.announceChannel(peer.server, openChan, fmsg.msg.ChanChainID,
f.fakeProof, f.fakeProof)
// Send the newly opened channel to the breach arbiter to it can watch
// for uncooperative channel breaches, potentially punishing the
// counterparty for attempting to cheat us.
f.cfg.ArbiterChan <- openChan
// Finally, notify the target peer of the newly opened channel.
newChanDone := make(chan struct{})
newChanReq := &newChannelReq{
channel: openChan,
done: newChanDone,
}
peer.newChannels <- newChanReq
<-newChanDone
// Close the active channel barrier signalling the readHandler that
// commitment related modifications to this channel can now proceed.
fundingPoint := resCtx.reservation.FundingOutpoint()
f.barrierMtx.Lock()
fndgLog.Debugf("Closing chan barrier for ChannelPoint(%v)", fundingPoint)
close(f.newChanBarriers[*fundingPoint])
delete(f.newChanBarriers, *fundingPoint)
f.barrierMtx.Unlock()
} }
// initFundingWorkflow sends a message to the funding manager instructing it // initFundingWorkflow sends a message to the funding manager instructing it
@ -1127,13 +1107,15 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+ fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+
"capacity=%v, numConfs=%v, addr=%v, dustLimit=%v)", localAmt, "capacity=%v, numConfs=%v, addr=%v, dustLimit=%v)", localAmt,
msg.pushAmt, capacity, numConfs, msg.peerAddress.Address, ourDustLimit) msg.pushAmt, capacity, numConfs, msg.peerAddress.Address,
ourDustLimit)
// Initialize a funding reservation with the local wallet. If the // Initialize a funding reservation with the local wallet. If the
// wallet doesn't have enough funds to commit to this channel, then // wallet doesn't have enough funds to commit to this channel, then
// the request will fail, and be aborted. // the request will fail, and be aborted.
reservation, err := f.cfg.Wallet.InitChannelReservation(capacity, localAmt, reservation, err := f.cfg.Wallet.InitChannelReservation(capacity,
peerKey, msg.peerAddress.Address, uint16(numConfs), 4, ourDustLimit, msg.pushAmt) localAmt, peerKey, msg.peerAddress.Address, uint16(numConfs), 4,
ourDustLimit, msg.pushAmt)
if err != nil { if err != nil {
msg.err <- err msg.err <- err
return return
@ -1192,6 +1174,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
deliveryScript, deliveryScript,
ourDustLimit, ourDustLimit,
msg.pushAmt, msg.pushAmt,
numConfs,
) )
if err := f.cfg.SendToPeer(peerKey, fundingReq); err != nil { if err := f.cfg.SendToPeer(peerKey, fundingReq); err != nil {
fndgLog.Errorf("Unable to send funding request message: %v", err) fndgLog.Errorf("Unable to send funding request message: %v", err)
@ -1248,8 +1231,8 @@ func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) {
return return
} }
fndgLog.Errorf("Received funding error from %v: %v", peerKey.SerializeCompressed(), fndgLog.Errorf("Received funding error from %v: %v",
newLogClosure(func() string { peerKey.SerializeCompressed(), newLogClosure(func() string {
return spew.Sdump(e) return spew.Sdump(e)
}), }),
) )

@ -290,7 +290,7 @@ func (r *ChannelReservation) OurSignatures() ([]*InputScript, []byte) {
return r.ourFundingInputScripts, r.ourCommitmentSig return r.ourFundingInputScripts, r.ourCommitmentSig
} }
// CompleteFundingReservation finalizes the pending channel reservation, // CompleteReservation finalizes the pending channel reservation,
// transitioning from a pending payment channel, to an open payment // transitioning from a pending payment channel, to an open payment
// channel. All passed signatures to the counterparty's inputs to the funding // channel. All passed signatures to the counterparty's inputs to the funding
// transaction will be fully verified. Signatures are expected to be passed in // transaction will be fully verified. Signatures are expected to be passed in
@ -303,19 +303,21 @@ func (r *ChannelReservation) OurSignatures() ([]*InputScript, []byte) {
// of confirmations. Once the method unblocks, a LightningChannel instance is // of confirmations. Once the method unblocks, a LightningChannel instance is
// returned, marking the channel available for updates. // returned, marking the channel available for updates.
func (r *ChannelReservation) CompleteReservation(fundingInputScripts []*InputScript, func (r *ChannelReservation) CompleteReservation(fundingInputScripts []*InputScript,
commitmentSig []byte) error { commitmentSig []byte) (*channeldb.OpenChannel, error) {
// TODO(roasbeef): add flag for watch or not? // TODO(roasbeef): add flag for watch or not?
errChan := make(chan error, 1) errChan := make(chan error, 1)
completeChan := make(chan *channeldb.OpenChannel, 1)
r.wallet.msgChan <- &addCounterPartySigsMsg{ r.wallet.msgChan <- &addCounterPartySigsMsg{
pendingFundingID: r.reservationID, pendingFundingID: r.reservationID,
theirFundingInputScripts: fundingInputScripts, theirFundingInputScripts: fundingInputScripts,
theirCommitmentSig: commitmentSig, theirCommitmentSig: commitmentSig,
completeChan: completeChan,
err: errChan, err: errChan,
} }
return <-errChan return <-completeChan, <-errChan
} }
// CompleteReservationSingle finalizes the pending single funder channel // CompleteReservationSingle finalizes the pending single funder channel
@ -329,9 +331,10 @@ func (r *ChannelReservation) CompleteReservation(fundingInputScripts []*InputScr
// populated. // populated.
func (r *ChannelReservation) CompleteReservationSingle( func (r *ChannelReservation) CompleteReservationSingle(
revocationKey *btcec.PublicKey, fundingPoint *wire.OutPoint, revocationKey *btcec.PublicKey, fundingPoint *wire.OutPoint,
commitSig []byte, obsfucator [StateHintSize]byte) error { commitSig []byte, obsfucator [StateHintSize]byte) (*channeldb.OpenChannel, error) {
errChan := make(chan error, 1) errChan := make(chan error, 1)
completeChan := make(chan *channeldb.OpenChannel, 1)
r.wallet.msgChan <- &addSingleFunderSigsMsg{ r.wallet.msgChan <- &addSingleFunderSigsMsg{
pendingFundingID: r.reservationID, pendingFundingID: r.reservationID,
@ -339,10 +342,11 @@ func (r *ChannelReservation) CompleteReservationSingle(
fundingOutpoint: fundingPoint, fundingOutpoint: fundingPoint,
theirCommitmentSig: commitSig, theirCommitmentSig: commitSig,
obsfucator: obsfucator, obsfucator: obsfucator,
completeChan: completeChan,
err: errChan, err: errChan,
} }
return <-errChan return <-completeChan, <-errChan
} }
// OurSignatures returns the counterparty's signatures to all inputs to the // OurSignatures returns the counterparty's signatures to all inputs to the
@ -453,44 +457,3 @@ type OpenChannelDetails struct {
// transaction resides. // transaction resides.
TransactionIndex uint32 TransactionIndex uint32
} }
// DispatchChan returns a channel which will be sent on once the funding
// transaction for this pending payment channel obtains the configured number
// of confirmations. Once confirmations have been obtained, a fully initialized
// LightningChannel instance is returned, allowing for channel updates.
//
// NOTE: If this method is called before .CompleteReservation(), it will block
// indefinitely.
func (r *ChannelReservation) DispatchChan() (*OpenChannelDetails, error) {
if err := <-r.chanOpenErr; err != nil {
return nil, err
}
openDetails := <-r.chanOpen
return &OpenChannelDetails{
Channel: openDetails.channel,
ConfirmationHeight: openDetails.blockHeight,
TransactionIndex: openDetails.txIndex,
}, nil
}
// FinalizeReservation completes the pending reservation, returning an active
// open LightningChannel. This method should be called after the responder to
// the single funder workflow receives and verifies a proof from the initiator
// of an open channel.
//
// NOTE: This method should *only* be called as the last step when one is the
// responder to an initiated single funder workflow.
func (r *ChannelReservation) FinalizeReservation() (*LightningChannel, error) {
errChan := make(chan error, 1)
r.wallet.msgChan <- &channelOpenMsg{
pendingFundingID: r.reservationID,
err: errChan,
}
if err := <-errChan; err != nil {
return nil, err
}
return (<-r.chanOpen).channel, nil
}

@ -1,7 +1,6 @@
package lnwallet package lnwallet
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
"sync" "sync"
@ -185,6 +184,10 @@ type addCounterPartySigsMsg struct {
// version of the commitment transaction. // version of the commitment transaction.
theirCommitmentSig []byte theirCommitmentSig []byte
// This channel is used to return the completed channel after the wallet
// has completed all of its stages in the funding process.
completeChan chan *channeldb.OpenChannel
// NOTE: In order to avoid deadlocks, this channel MUST be buffered. // NOTE: In order to avoid deadlocks, this channel MUST be buffered.
err chan error err chan error
} }
@ -215,19 +218,9 @@ type addSingleFunderSigsMsg struct {
// the commitment transaction. // the commitment transaction.
obsfucator [StateHintSize]byte obsfucator [StateHintSize]byte
// NOTE: In order to avoid deadlocks, this channel MUST be buffered. // This channel is used to return the completed channel after the wallet
err chan error // has completed all of its stages in the funding process.
} completeChan chan *channeldb.OpenChannel
// channelOpenMsg is the final message sent to finalize a single funder channel
// workflow to which we are the responder to. This message is sent once the
// remote peer deems the channel open, meaning it has reached a sufficient
// number of confirmations in the blockchain.
type channelOpenMsg struct {
pendingFundingID uint64
// TODO(roasbeef): move verification up to upper layer, yeh?
spvProof []byte
// NOTE: In order to avoid deadlocks, this channel MUST be buffered. // NOTE: In order to avoid deadlocks, this channel MUST be buffered.
err chan error err chan error
@ -461,8 +454,6 @@ out:
l.handleSingleFunderSigs(msg) l.handleSingleFunderSigs(msg)
case *addCounterPartySigsMsg: case *addCounterPartySigsMsg:
l.handleFundingCounterPartySigs(msg) l.handleFundingCounterPartySigs(msg)
case *channelOpenMsg:
l.handleChannelOpen(msg)
} }
case <-l.quit: case <-l.quit:
// TODO: do some clean up // TODO: do some clean up
@ -1072,10 +1063,7 @@ func (l *LightningWallet) handleFundingCounterPartySigs(msg *addCounterPartySigs
return return
} }
// Create a goroutine to watch the chain so we can open the channel once msg.completeChan <- res.partialState
// the funding tx has enough confirmations.
go l.openChannelAfterConfirmations(res)
msg.err <- nil msg.err <- nil
} }
@ -1192,124 +1180,15 @@ func (l *LightningWallet) handleSingleFunderSigs(req *addSingleFunderSigsMsg) {
} }
pendingReservation.ourCommitmentSig = sigTheirCommit pendingReservation.ourCommitmentSig = sigTheirCommit
req.err <- nil
}
// handleChannelOpen completes a single funder reservation to which we are the
// responder. This method saves the channel state to disk, finally "opening"
// the channel by sending it over to the caller of the reservation via the
// channel dispatch channel.
func (l *LightningWallet) handleChannelOpen(req *channelOpenMsg) {
l.limboMtx.RLock()
res, ok := l.fundingLimbo[req.pendingFundingID]
l.limboMtx.RUnlock()
if !ok {
req.err <- fmt.Errorf("attempted to update non-existant " +
"funding state")
res.chanOpen <- nil
return
}
// Grab the mutex on the ChannelReservation to ensure thead-safety
res.Lock()
defer res.Unlock()
// Funding complete, this entry can be removed from limbo.
l.limboMtx.Lock()
delete(l.fundingLimbo, res.reservationID)
l.limboMtx.Unlock()
// Add the complete funding transaction to the DB, in it's open bucket // Add the complete funding transaction to the DB, in it's open bucket
// which will be used for the lifetime of this channel. // which will be used for the lifetime of this channel.
if err := res.partialState.SyncPending(res.nodeAddr); err != nil { if err := pendingReservation.partialState.SyncPending(pendingReservation.nodeAddr); err != nil {
req.err <- err req.err <- err
res.chanOpen <- nil
return
}
// Finally, create and officially open the payment channel!
// TODO(roasbeef): CreationTime once tx is 'open'
channel, err := NewLightningChannel(l.Signer, l.chainNotifier,
res.partialState)
if err != nil {
req.err <- err
res.chanOpen <- nil
return return
} }
req.completeChan <- pendingReservation.partialState
req.err <- nil req.err <- nil
res.chanOpen <- &openChanDetails{
channel: channel,
}
// As this reservation has concluded, we can now safely remove the
// reservation from the limbo map.
l.limboMtx.Lock()
delete(l.fundingLimbo, req.pendingFundingID)
l.limboMtx.Unlock()
}
// openChannelAfterConfirmations creates, and opens a payment channel after
// the funding transaction created within the passed channel reservation
// obtains the specified number of confirmations.
func (l *LightningWallet) openChannelAfterConfirmations(res *ChannelReservation) {
// Register with the ChainNotifier for a notification once the funding
// transaction reaches `numConfs` confirmations.
txid := res.fundingTx.TxHash()
numConfs := uint32(res.numConfsToOpen)
confNtfn, _ := l.chainNotifier.RegisterConfirmationsNtfn(&txid, numConfs)
walletLog.Infof("Waiting for funding tx (txid: %v) to reach %v confirmations",
txid, numConfs)
// Wait until the specified number of confirmations has been reached,
// or the wallet signals a shutdown.
var (
confDetails *chainntnfs.TxConfirmation
ok bool
)
out:
select {
case confDetails, ok = <-confNtfn.Confirmed:
// Reading a falsey value for the second parameter indicates that
// the notifier is in the process of shutting down. Therefore, we
// don't count this as the signal that the funding transaction has
// been confirmed.
if !ok {
res.chanOpenErr <- errors.New("wallet shutting down")
res.chanOpen <- nil
return
}
break out
case <-l.quit:
res.chanOpenErr <- errors.New("wallet shutting down")
res.chanOpen <- nil
return
}
// Finally, create and officially open the payment channel!
// TODO(roasbeef): CreationTime once tx is 'open'
channel, err := NewLightningChannel(l.Signer, l.chainNotifier,
res.partialState)
if err != nil {
res.chanOpenErr <- err
res.chanOpen <- nil
return
}
res.chanOpenErr <- nil
res.chanOpen <- &openChanDetails{
channel: channel,
blockHeight: confDetails.BlockHeight,
txIndex: confDetails.TxIndex,
}
// As this reservation has concluded, we can now safely remove the
// reservation from the limbo map.
l.limboMtx.Lock()
delete(l.fundingLimbo, res.reservationID)
l.limboMtx.Unlock()
} }
// selectCoinsAndChange performs coin selection in order to obtain witness // selectCoinsAndChange performs coin selection in order to obtain witness

@ -77,14 +77,18 @@ type SingleFundingRequest struct {
// this amount are not enforceable onchain from our point view. // this amount are not enforceable onchain from our point view.
DustLimit btcutil.Amount DustLimit btcutil.Amount
// TODO(roasbeef): confirmation depth // ConfirmationDepth is the number of confirmations that the initiator
// of a funding workflow is requesting be required before the channel
// is considered fully open.
ConfirmationDepth uint32
} }
// NewSingleFundingRequest creates, and returns a new empty SingleFundingRequest. // NewSingleFundingRequest creates, and returns a new empty SingleFundingRequest.
func NewSingleFundingRequest(chanID uint64, chanType uint8, coinType uint64, func NewSingleFundingRequest(chanID uint64, chanType uint8, coinType uint64,
fee btcutil.Amount, amt btcutil.Amount, delay uint32, ck, fee btcutil.Amount, amt btcutil.Amount, delay uint32, ck,
cdp *btcec.PublicKey, deliveryScript PkScript, cdp *btcec.PublicKey, deliveryScript PkScript,
dustLimit btcutil.Amount, pushSat btcutil.Amount) *SingleFundingRequest { dustLimit btcutil.Amount, pushSat btcutil.Amount,
confDepth uint32) *SingleFundingRequest {
return &SingleFundingRequest{ return &SingleFundingRequest{
ChannelID: chanID, ChannelID: chanID,
@ -98,6 +102,7 @@ func NewSingleFundingRequest(chanID uint64, chanType uint8, coinType uint64,
DeliveryPkScript: deliveryScript, DeliveryPkScript: deliveryScript,
DustLimit: dustLimit, DustLimit: dustLimit,
PushSatoshis: pushSat, PushSatoshis: pushSat,
ConfirmationDepth: confDepth,
} }
} }
@ -107,16 +112,6 @@ func NewSingleFundingRequest(chanID uint64, chanType uint8, coinType uint64,
// //
// This is part of the lnwire.Message interface. // This is part of the lnwire.Message interface.
func (c *SingleFundingRequest) Decode(r io.Reader, pver uint32) error { func (c *SingleFundingRequest) Decode(r io.Reader, pver uint32) error {
// ChannelID (8)
// ChannelType (1)
// CoinType (8)
// FeePerKb (8)
// PaymentAmount (8)
// Delay (4)
// Pubkey (33)
// Pubkey (33)
// DeliveryPkScript (final delivery)
// DustLimit (8)
err := readElements(r, err := readElements(r,
&c.ChannelID, &c.ChannelID,
&c.ChannelType, &c.ChannelType,
@ -128,7 +123,8 @@ func (c *SingleFundingRequest) Decode(r io.Reader, pver uint32) error {
&c.CommitmentKey, &c.CommitmentKey,
&c.ChannelDerivationPoint, &c.ChannelDerivationPoint,
&c.DeliveryPkScript, &c.DeliveryPkScript,
&c.DustLimit) &c.DustLimit,
&c.ConfirmationDepth)
if err != nil { if err != nil {
return err return err
} }
@ -142,16 +138,6 @@ func (c *SingleFundingRequest) Decode(r io.Reader, pver uint32) error {
// //
// This is part of the lnwire.Message interface. // This is part of the lnwire.Message interface.
func (c *SingleFundingRequest) Encode(w io.Writer, pver uint32) error { func (c *SingleFundingRequest) Encode(w io.Writer, pver uint32) error {
// ChannelID (8)
// ChannelType (1)
// CoinType (8)
// FeePerKb (8)
// PaymentAmount (8)
// Delay (4)
// Pubkey (33)
// Pubkey (33)
// DeliveryPkScript (final delivery)
// DustLimit (8)
err := writeElements(w, err := writeElements(w,
c.ChannelID, c.ChannelID,
c.ChannelType, c.ChannelType,
@ -163,7 +149,8 @@ func (c *SingleFundingRequest) Encode(w io.Writer, pver uint32) error {
c.CommitmentKey, c.CommitmentKey,
c.ChannelDerivationPoint, c.ChannelDerivationPoint,
c.DeliveryPkScript, c.DeliveryPkScript,
c.DustLimit) c.DustLimit,
c.ConfirmationDepth)
if err != nil { if err != nil {
return err return err
} }
@ -183,12 +170,48 @@ func (c *SingleFundingRequest) Command() uint32 {
// SingleFundingRequest. This is calculated by summing the max length of all // SingleFundingRequest. This is calculated by summing the max length of all
// the fields within a SingleFundingRequest. To enforce a maximum // the fields within a SingleFundingRequest. To enforce a maximum
// DeliveryPkScript size, the size of a P2PKH public key script is used. // DeliveryPkScript size, the size of a P2PKH public key script is used.
// Therefore, the final breakdown is: 8 + 1 + 8 + 8 + 8 + 4 + 33 + 33 + 25 + 8
// + 9 = 166.
// //
// This is part of the lnwire.Message interface. // This is part of the lnwire.Message interface.
func (c *SingleFundingRequest) MaxPayloadLength(uint32) uint32 { func (c *SingleFundingRequest) MaxPayloadLength(uint32) uint32 {
return 174 var length uint32
// ChannelID - 8 bytes
length += 8
// ChannelType - 1 byte
length += 1
// CoinType - 8 bytes
length += 8
// FeePerKb - 8 bytes
length += 8
// FundingAmount - 8 bytes
length += 8
// PushSatoshis - 8 bytes
length += 8
// CsvDelay - 4 bytes
length += 4
// CommitmentKey - 33 bytes
length += 33
// ChannelDerivationPoint - 33 bytes
length += 33
// DeliveryPkScript - 25 bytes
length += 25
// DustLimit - 8 bytes
length += 8
// ConfirmationDepth - 4 bytes
length += 4
return length
} }
// Validate examines each populated field within the SingleFundingRequest for // Validate examines each populated field within the SingleFundingRequest for
@ -233,6 +256,10 @@ func (c *SingleFundingRequest) Validate() error {
return fmt.Errorf("Dust limit should be greater than zero.") return fmt.Errorf("Dust limit should be greater than zero.")
} }
if c.ConfirmationDepth == 0 {
return fmt.Errorf("ConfirmationDepth must be non-zero")
}
// We're good! // We're good!
return nil return nil
} }

@ -11,7 +11,7 @@ func TestSingleFundingRequestWire(t *testing.T) {
cdp := pubKey cdp := pubKey
delivery := PkScript(bytes.Repeat([]byte{0x02}, 25)) delivery := PkScript(bytes.Repeat([]byte{0x02}, 25))
sfr := NewSingleFundingRequest(20, 21, 22, 23, 5, 5, cdp, cdp, sfr := NewSingleFundingRequest(20, 21, 22, 23, 5, 5, cdp, cdp,
delivery, 540, 10000) delivery, 540, 10000, 6)
// Next encode the SFR message into an empty bytes buffer. // Next encode the SFR message into an empty bytes buffer.
var b bytes.Buffer var b bytes.Buffer

@ -51,13 +51,18 @@ type SingleFundingResponse struct {
// generated for remote commitment transaction; ie. HTLCs below // generated for remote commitment transaction; ie. HTLCs below
// this amount are not enforceable onchain for their point of view. // this amount are not enforceable onchain for their point of view.
DustLimit btcutil.Amount DustLimit btcutil.Amount
// ConfirmationDepth is the number of confirmations that the initiator
// of a funding workflow is requesting be required before the channel
// is considered fully open.
ConfirmationDepth uint32
} }
// NewSingleFundingResponse creates, and returns a new empty // NewSingleFundingResponse creates, and returns a new empty
// SingleFundingResponse. // SingleFundingResponse.
func NewSingleFundingResponse(chanID uint64, rk, ck, cdp *btcec.PublicKey, func NewSingleFundingResponse(chanID uint64, rk, ck, cdp *btcec.PublicKey,
delay uint32, deliveryScript PkScript, delay uint32, deliveryScript PkScript,
dustLimit btcutil.Amount) *SingleFundingResponse { dustLimit btcutil.Amount, confDepth uint32) *SingleFundingResponse {
return &SingleFundingResponse{ return &SingleFundingResponse{
ChannelID: chanID, ChannelID: chanID,
@ -67,6 +72,7 @@ func NewSingleFundingResponse(chanID uint64, rk, ck, cdp *btcec.PublicKey,
CsvDelay: delay, CsvDelay: delay,
DeliveryPkScript: deliveryScript, DeliveryPkScript: deliveryScript,
DustLimit: dustLimit, DustLimit: dustLimit,
ConfirmationDepth: confDepth,
} }
} }
@ -87,6 +93,7 @@ func (c *SingleFundingResponse) Decode(r io.Reader, pver uint32) error {
// CsvDelay (4) // CsvDelay (4)
// DeliveryPkScript (final delivery) // DeliveryPkScript (final delivery)
// DustLimit (8) // DustLimit (8)
// ConfirmationDepth (4)
err := readElements(r, err := readElements(r,
&c.ChannelID, &c.ChannelID,
&c.ChannelDerivationPoint, &c.ChannelDerivationPoint,
@ -94,7 +101,8 @@ func (c *SingleFundingResponse) Decode(r io.Reader, pver uint32) error {
&c.RevocationKey, &c.RevocationKey,
&c.CsvDelay, &c.CsvDelay,
&c.DeliveryPkScript, &c.DeliveryPkScript,
&c.DustLimit) &c.DustLimit,
&c.ConfirmationDepth)
if err != nil { if err != nil {
return err return err
} }
@ -108,13 +116,6 @@ func (c *SingleFundingResponse) Decode(r io.Reader, pver uint32) error {
// //
// This is part of the lnwire.Message interface. // This is part of the lnwire.Message interface.
func (c *SingleFundingResponse) Encode(w io.Writer, pver uint32) error { func (c *SingleFundingResponse) Encode(w io.Writer, pver uint32) error {
// ChannelID (8)
// ChannelDerivationPoint (33)
// CommitmentKey (33)
// RevocationKey (33)
// CsvDelay (4)
// DeliveryPkScript (final delivery)
// DustLimit (8)
err := writeElements(w, err := writeElements(w,
c.ChannelID, c.ChannelID,
c.ChannelDerivationPoint, c.ChannelDerivationPoint,
@ -122,7 +123,8 @@ func (c *SingleFundingResponse) Encode(w io.Writer, pver uint32) error {
c.RevocationKey, c.RevocationKey,
c.CsvDelay, c.CsvDelay,
c.DeliveryPkScript, c.DeliveryPkScript,
c.DustLimit) c.DustLimit,
c.ConfirmationDepth)
if err != nil { if err != nil {
return err return err
} }
@ -142,11 +144,36 @@ func (c *SingleFundingResponse) Command() uint32 {
// SingleFundingResponse. This is calculated by summing the max length of all // SingleFundingResponse. This is calculated by summing the max length of all
// the fields within a SingleFundingResponse. To enforce a maximum // the fields within a SingleFundingResponse. To enforce a maximum
// DeliveryPkScript size, the size of a P2PKH public key script is used. // DeliveryPkScript size, the size of a P2PKH public key script is used.
// Therefore, the final breakdown is: 8 + (33 * 3) + 8 + 25 + 8
// //
// This is part of the lnwire.Message interface. // This is part of the lnwire.Message interface.
func (c *SingleFundingResponse) MaxPayloadLength(uint32) uint32 { func (c *SingleFundingResponse) MaxPayloadLength(uint32) uint32 {
return 148 var length uint32
// ChannelID - 8 bytes
length += 8
// ChannelDerivationPoint - 33 bytes
length += 33
// CommitmentKey - 33 bytes
length += 33
// RevocationKey - 33 bytes
length += 33
// CsvDelay - 4 bytes
length += 4
// DeliveryPkScript - 25 bytes
length += 25
// DustLimit - 8 bytes
length += 8
// ConfirmationDepth - 4 bytes
length += 4
return length
} }
// Validate examines each populated field within the SingleFundingResponse for // Validate examines each populated field within the SingleFundingResponse for
@ -177,6 +204,10 @@ func (c *SingleFundingResponse) Validate() error {
"zero.") "zero.")
} }
if c.ConfirmationDepth == 0 {
return fmt.Errorf("ConfirmationDepth must be non-zero")
}
// We're good! // We're good!
return nil return nil
} }

@ -10,7 +10,7 @@ func TestSingleFundingResponseWire(t *testing.T) {
// First create a new SFR message. // First create a new SFR message.
delivery := PkScript(bytes.Repeat([]byte{0x02}, 25)) delivery := PkScript(bytes.Repeat([]byte{0x02}, 25))
sfr := NewSingleFundingResponse(22, pubKey, pubKey, pubKey, 5, sfr := NewSingleFundingResponse(22, pubKey, pubKey, pubKey, 5,
delivery, 540) delivery, 540, 4)
// Next encode the SFR message into an empty bytes buffer. // Next encode the SFR message into an empty bytes buffer.
var b bytes.Buffer var b bytes.Buffer

@ -688,7 +688,7 @@ func (n *networkHarness) OpenChannel(ctx context.Context,
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil, fmt.Errorf("timeout reached before chan pending "+ return nil, fmt.Errorf("timeout reached before chan pending "+
"update sent") "update sent: %v", err)
case err := <-errChan: case err := <-errChan:
return nil, err return nil, err
case <-chanOpen: case <-chanOpen:

14
peer.go

@ -49,6 +49,14 @@ type outgoinMsg struct {
sentChan chan struct{} // MUST be buffered. sentChan chan struct{} // MUST be buffered.
} }
// newChannelMsg packages a lnwallet.LightningChannel with a channel that
// allows the receiver of the request to report when the funding transaction
// has been confirmed and the channel creation process completed.
type newChannelMsg struct {
channel *lnwallet.LightningChannel
done chan struct{}
}
// chanSnapshotReq is a message sent by outside subsystems to a peer in order // chanSnapshotReq is a message sent by outside subsystems to a peer in order
// to gain a snapshot of the peer's currently active channels. // to gain a snapshot of the peer's currently active channels.
type chanSnapshotReq struct { type chanSnapshotReq struct {
@ -121,7 +129,7 @@ type peer struct {
// newChannels is used by the fundingManager to send fully opened // newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow. // channels to the source peer which handled the funding workflow.
newChannels chan *newChannelReq newChannels chan *newChannelMsg
// localCloseChanReqs is a channel in which any local requests to close // localCloseChanReqs is a channel in which any local requests to close
// a particular channel are sent over. // a particular channel are sent over.
@ -185,7 +193,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel),
htlcManagers: make(map[wire.OutPoint]chan lnwire.Message), htlcManagers: make(map[wire.OutPoint]chan lnwire.Message),
chanSnapshotReqs: make(chan *chanSnapshotReq), chanSnapshotReqs: make(chan *chanSnapshotReq),
newChannels: make(chan *newChannelReq, 1), newChannels: make(chan *newChannelMsg, 1),
localCloseChanReqs: make(chan *closeLinkReq), localCloseChanReqs: make(chan *closeLinkReq),
remoteCloseChanReqs: make(chan *lnwire.CloseRequest), remoteCloseChanReqs: make(chan *lnwire.CloseRequest),
@ -438,8 +446,6 @@ out:
p.server.fundingMgr.processFundingComplete(msg, p.addr) p.server.fundingMgr.processFundingComplete(msg, p.addr)
case *lnwire.SingleFundingSignComplete: case *lnwire.SingleFundingSignComplete:
p.server.fundingMgr.processFundingSignComplete(msg, p.addr) p.server.fundingMgr.processFundingSignComplete(msg, p.addr)
case *lnwire.SingleFundingOpenProof:
p.server.fundingMgr.processFundingOpenProof(msg, p.addr)
case *lnwire.CloseRequest: case *lnwire.CloseRequest:
p.remoteCloseChanReqs <- msg p.remoteCloseChanReqs <- msg

@ -199,6 +199,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
s.fundingMgr, err = newFundingManager(fundingConfig{ s.fundingMgr, err = newFundingManager(fundingConfig{
Wallet: wallet, Wallet: wallet,
Notifier: s.chainNotifier,
ArbiterChan: s.breachArbiter.newContracts, ArbiterChan: s.breachArbiter.newContracts,
SendToPeer: s.sendToPeer, SendToPeer: s.sendToPeer,
FindPeer: s.findPeer, FindPeer: s.findPeer,