funding: ensure reservation contexts are remove in the case of workflow error

This commit is contained in:
Olaoluwa Osuntokun 2017-02-21 22:14:22 -08:00
parent bd775b9bb3
commit 2636e654be
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 60 additions and 13 deletions

@ -108,10 +108,18 @@ type pendingChannels map[uint64]*reservationWithCtx
// initiate new channels. // initiate new channels.
type serializedPubKey [33]byte type serializedPubKey [33]byte
// FundingConfig defines the configuration for the FundingManager. All elements // newSerializedKey creates a new serialized public key from an instance of a
// live pubkey object.
func newSerializedKey(pubKey *btcec.PublicKey) serializedPubKey {
var s serializedPubKey
copy(s[:], pubKey.SerializeCompressed())
return s
}
// fundingConfig defines the configuration for the FundingManager. All elements
// within the configuration MUST be non-nil for the FundingManager to carry out // within the configuration MUST be non-nil for the FundingManager to carry out
// its duties. // its duties.
type FundingConfig struct { type fundingConfig struct {
// Wallet handles the parts of the funding process that involves moving // Wallet handles the parts of the funding process that involves moving
// funds from on-chain transaction outputs into Lightning channels. // funds from on-chain transaction outputs into Lightning channels.
Wallet *lnwallet.LightningWallet Wallet *lnwallet.LightningWallet
@ -149,7 +157,7 @@ type fundingManager struct {
// cfg is a copy of the configuration struct that the FundingManager was // cfg is a copy of the configuration struct that the FundingManager was
// initialized with. // initialized with.
cfg *FundingConfig cfg *fundingConfig
// channelReservations is a map which houses the state of all pending // channelReservations is a map which houses the state of all pending
// funding workflows. // funding workflows.
@ -176,7 +184,7 @@ type fundingManager struct {
// newFundingManager creates and initializes a new instance of the // newFundingManager creates and initializes a new instance of the
// fundingManager. // fundingManager.
func newFundingManager(cfg FundingConfig) (*fundingManager, error) { func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
// TODO(roasbeef): remove once we actually sign the funding_locked // TODO(roasbeef): remove once we actually sign the funding_locked
// stuffs // stuffs
s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" + s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" +
@ -439,15 +447,24 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
} }
f.activeReservations[peerIDKey][msg.ChannelID] = &reservationWithCtx{ f.activeReservations[peerIDKey][msg.ChannelID] = &reservationWithCtx{
reservation: reservation, reservation: reservation,
err: make(chan error, 1),
peerAddress: fmsg.peerAddress, peerAddress: fmsg.peerAddress,
} }
f.resMtx.Unlock() f.resMtx.Unlock()
cancelReservation := func() {
_, err := f.cancelReservationCtx(fmsg.peerAddress.IdentityKey, msg.ChannelID)
if err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
}
// With our portion of the reservation initialized, process the // With our portion of the reservation initialized, process the
// initiators contribution to the channel. // initiators contribution to the channel.
_, addrs, _, err := txscript.ExtractPkScriptAddrs(msg.DeliveryPkScript, activeNetParams.Params) _, addrs, _, err := txscript.ExtractPkScriptAddrs(msg.DeliveryPkScript, activeNetParams.Params)
if err != nil { if err != nil {
fndgLog.Errorf("Unable to extract addresses from script: %v", err) fndgLog.Errorf("Unable to extract addresses from script: %v", err)
cancelReservation()
return return
} }
contribution := &lnwallet.ChannelContribution{ contribution := &lnwallet.ChannelContribution{
@ -459,6 +476,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
} }
if err := reservation.ProcessSingleContribution(contribution); err != nil { if err := reservation.ProcessSingleContribution(contribution); err != nil {
fndgLog.Errorf("unable to add contribution reservation: %v", err) fndgLog.Errorf("unable to add contribution reservation: %v", err)
cancelReservation()
return return
} }
@ -470,6 +488,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
deliveryScript, err := txscript.PayToAddrScript(ourContribution.DeliveryAddress) deliveryScript, err := txscript.PayToAddrScript(ourContribution.DeliveryAddress)
if err != nil { if err != nil {
fndgLog.Errorf("unable to convert address to pkscript: %v", err) fndgLog.Errorf("unable to convert address to pkscript: %v", err)
cancelReservation()
return return
} }
fundingResp := lnwire.NewSingleFundingResponse(msg.ChannelID, fundingResp := lnwire.NewSingleFundingResponse(msg.ChannelID,
@ -479,6 +498,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingResp); err != nil { if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingResp); err != nil {
fndgLog.Errorf("unable to send funding response to peer: %v", err) fndgLog.Errorf("unable to send funding response to peer: %v", err)
cancelReservation()
return return
} }
} }
@ -505,6 +525,12 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
return return
} }
cancelReservation := func() {
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
}
fndgLog.Infof("Recv'd fundingResponse for pendingID(%v)", msg.ChannelID) fndgLog.Infof("Recv'd fundingResponse for pendingID(%v)", msg.ChannelID)
resCtx.reservation.SetTheirDustLimit(msg.DustLimit) resCtx.reservation.SetTheirDustLimit(msg.DustLimit)
@ -517,6 +543,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
activeNetParams.Params) activeNetParams.Params)
if err != nil { if err != nil {
fndgLog.Errorf("Unable to extract addresses from script: %v", err) fndgLog.Errorf("Unable to extract addresses from script: %v", err)
cancelReservation()
resCtx.err <- err resCtx.err <- err
return return
} }
@ -531,6 +558,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
if err := resCtx.reservation.ProcessContribution(contribution); err != nil { if err := resCtx.reservation.ProcessContribution(contribution); err != nil {
fndgLog.Errorf("Unable to process contribution from %v: %v", fndgLog.Errorf("Unable to process contribution from %v: %v",
fmsg.peerAddress.IdentityKey, err) fmsg.peerAddress.IdentityKey, err)
cancelReservation()
resCtx.err <- err resCtx.err <- err
return return
} }
@ -543,6 +571,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
commitSig, err := btcec.ParseSignature(sig, btcec.S256()) commitSig, err := btcec.ParseSignature(sig, btcec.S256())
if err != nil { if err != nil {
fndgLog.Errorf("Unable to parse signature: %v", err) fndgLog.Errorf("Unable to parse signature: %v", err)
cancelReservation()
resCtx.err <- err resCtx.err <- err
return return
} }
@ -552,6 +581,8 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
peer, err := f.cfg.FindPeer(peerKey) peer, err := f.cfg.FindPeer(peerKey)
if err != nil { if err != nil {
fndgLog.Errorf("Error finding peer: %v", err) fndgLog.Errorf("Error finding peer: %v", err)
cancelReservation()
resCtx.err <- err
return return
} }
peer.barrierInits <- *outPoint peer.barrierInits <- *outPoint
@ -567,6 +598,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingComplete); err != nil { if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingComplete); err != nil {
fndgLog.Errorf("Unable to send funding complete message: %v", err) fndgLog.Errorf("Unable to send funding complete message: %v", err)
cancelReservation()
resCtx.err <- err resCtx.err <- err
return return
} }
@ -594,6 +626,12 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
return return
} }
cancelReservation := func() {
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
}
// The channel initiator has responded with the funding outpoint of the // The channel initiator has responded with the funding outpoint of the
// final funding transaction, as well as a signature for our version of // final funding transaction, as well as a signature for our version of
// the commitment transaction. So at this point, we can validate the // the commitment transaction. So at this point, we can validate the
@ -616,6 +654,7 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
if err != nil { if err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc. // TODO(roasbeef): better error logging: peerID, channelID, etc.
fndgLog.Errorf("unable to complete single reservation: %v", err) fndgLog.Errorf("unable to complete single reservation: %v", err)
cancelReservation()
return return
} }
@ -627,6 +666,7 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
ourCommitSig, err := btcec.ParseSignature(sig, btcec.S256()) ourCommitSig, err := btcec.ParseSignature(sig, btcec.S256())
if err != nil { if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err) fndgLog.Errorf("unable to parse signature: %v", err)
cancelReservation()
return return
} }
@ -635,6 +675,7 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
peer, err := f.cfg.FindPeer(peerKey) peer, err := f.cfg.FindPeer(peerKey)
if err != nil { if err != nil {
fndgLog.Errorf("Error finding peer: %v", err) fndgLog.Errorf("Error finding peer: %v", err)
cancelReservation()
return return
} }
peer.barrierInits <- fundingOut peer.barrierInits <- fundingOut
@ -645,6 +686,7 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
signComplete := lnwire.NewSingleFundingSignComplete(chanID, ourCommitSig) signComplete := lnwire.NewSingleFundingSignComplete(chanID, ourCommitSig)
if err := f.cfg.SendToPeer(peerKey, signComplete); err != nil { if err := f.cfg.SendToPeer(peerKey, signComplete); err != nil {
fndgLog.Errorf("unable to send signComplete message: %v", err) fndgLog.Errorf("unable to send signComplete message: %v", err)
cancelReservation()
return return
} }
} }
@ -777,6 +819,10 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil { if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil {
fndgLog.Errorf("unable to complete reservation sign complete: %v", err) fndgLog.Errorf("unable to complete reservation sign complete: %v", err)
resCtx.err <- err resCtx.err <- err
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
return return
} }
@ -801,6 +847,11 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
// once it reaches a sufficient number of confirmations. // once it reaches a sufficient number of confirmations.
// TODO(roasbeef): semaphore to limit active chan open goroutines // TODO(roasbeef): semaphore to limit active chan open goroutines
go func() { go func() {
// As this is the final step in the life-time of a single
// funder channel workflow, ensure that the reservation's state
// is cleaned up.
defer f.deleteReservationCtx(peerKey, chanID)
// TODO(roasbeef): need to persist pending broadcast channels, // TODO(roasbeef): need to persist pending broadcast channels,
// send chan open proof during scan of blocks mined while down. // send chan open proof during scan of blocks mined while down.
openChanDetails, err := resCtx.reservation.DispatchChan() openChanDetails, err := resCtx.reservation.DispatchChan()
@ -1009,7 +1060,6 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
return return
} }
// TODO(bvu): add comment
chanID := peer.fetchNextPendingChanID() chanID := peer.fetchNextPendingChanID()
// If a pending channel map for this peer isn't already created, then // If a pending channel map for this peer isn't already created, then
@ -1087,6 +1137,7 @@ func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) {
ctx, err := f.cancelReservationCtx(peerKey, chanID) ctx, err := f.cancelReservationCtx(peerKey, chanID)
if err != nil { if err != nil {
fndgLog.Warnf("unable to delete reservation: %v", err) fndgLog.Warnf("unable to delete reservation: %v", err)
ctx.err <- err
return return
} }
@ -1109,6 +1160,9 @@ func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) {
func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
chanID uint64) (*reservationWithCtx, error) { chanID uint64) (*reservationWithCtx, error) {
fndgLog.Infof("Cancelling funding reservation for node_key=%x, "+
"chan_id=%v", peerKey.SerializeCompressed(), chanID)
ctx, err := f.getReservationCtx(peerKey, chanID) ctx, err := f.getReservationCtx(peerKey, chanID)
if err != nil { if err != nil {
return nil, errors.Errorf("can't find reservation: %v", return nil, errors.Errorf("can't find reservation: %v",
@ -1116,7 +1170,6 @@ func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
} }
if err := ctx.reservation.Cancel(); err != nil { if err := ctx.reservation.Cancel(); err != nil {
ctx.err <- err
return nil, errors.Errorf("can't cancel reservation: %v", return nil, errors.Errorf("can't cancel reservation: %v",
err) err)
} }
@ -1158,9 +1211,3 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
Y: pub.Y, Y: pub.Y,
} }
} }
func newSerializedKey(pubKey *btcec.PublicKey) serializedPubKey {
serializedKey := serializedPubKey{}
copy(serializedKey[:33], pubKey.SerializeCompressed()[:])
return serializedKey
}

@ -197,7 +197,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
s.rpcServer = newRpcServer(s) s.rpcServer = newRpcServer(s)
s.breachArbiter = newBreachArbiter(wallet, chanDB, notifier, s.htlcSwitch) s.breachArbiter = newBreachArbiter(wallet, chanDB, notifier, s.htlcSwitch)
s.fundingMgr, err = newFundingManager(FundingConfig{ s.fundingMgr, err = newFundingManager(fundingConfig{
Wallet: wallet, Wallet: wallet,
ArbiterChan: s.breachArbiter.newContracts, ArbiterChan: s.breachArbiter.newContracts,
SendToPeer: s.sendToPeer, SendToPeer: s.sendToPeer,