Merge pull request #1301 from halseth/fundingmanager-double-error

Reduce fundingmanager chan send deadlock scenarios
This commit is contained in:
Olaoluwa Osuntokun 2018-06-01 17:36:02 -07:00 committed by GitHub
commit 7e4abb4f55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 52 deletions

@ -724,15 +724,27 @@ type pendingChansReq struct {
// currently pending at the last state of the funding workflow. // currently pending at the last state of the funding workflow.
func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) { func (f *fundingManager) PendingChannels() ([]*pendingChannel, error) {
respChan := make(chan []*pendingChannel, 1) respChan := make(chan []*pendingChannel, 1)
errChan := make(chan error) errChan := make(chan error, 1)
req := &pendingChansReq{ req := &pendingChansReq{
resp: respChan, resp: respChan,
err: errChan, err: errChan,
} }
f.queries <- req
return <-respChan, <-errChan select {
case f.queries <- req:
case <-f.quit:
return nil, fmt.Errorf("fundingmanager shutting down")
}
select {
case resp := <-respChan:
return resp, nil
case err := <-errChan:
return nil, err
case <-f.quit:
return nil, fmt.Errorf("fundingmanager shutting down")
}
} }
// CancelPeerReservations cancels all active reservations associated with the // CancelPeerReservations cancels all active reservations associated with the
@ -763,13 +775,7 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) {
"node=%x: %v", nodePub[:], err) "node=%x: %v", nodePub[:], err)
} }
if resCtx.err != nil { resCtx.err <- fmt.Errorf("peer disconnected")
select {
case resCtx.err <- fmt.Errorf("peer disconnected"):
default:
}
}
delete(nodeReservations, pendingID) delete(nodeReservations, pendingID)
} }
@ -787,6 +793,20 @@ func (f *fundingManager) CancelPeerReservations(nodePub [33]byte) {
func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
tempChanID [32]byte, fundingErr error) { tempChanID [32]byte, fundingErr error) {
fndgLog.Debugf("Failing funding flow for pendingID=%x: %v",
tempChanID, fundingErr)
ctx, err := f.cancelReservationCtx(peer, tempChanID)
if err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
// In case the case where the reservation existed, send the funding
// error on the error channel.
if ctx != nil {
ctx.err <- fundingErr
}
// We only send the exact error if it is part of out whitelisted set of // We only send the exact error if it is part of out whitelisted set of
// errors (lnwire.ErrorCode or lnwallet.ReservationError). // errors (lnwire.ErrorCode or lnwallet.ReservationError).
var msg lnwire.ErrorData var msg lnwire.ErrorData
@ -810,20 +830,11 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
Data: msg, Data: msg,
} }
fndgLog.Errorf("Failing funding flow: %v (%v)", fundingErr, fndgLog.Debugf("Sending funding error to peer (%x): %v",
spew.Sdump(errMsg)) peer.SerializeCompressed(), spew.Sdump(errMsg))
if err := f.cfg.SendToPeer(peer, errMsg); err != nil {
if _, err := f.cancelReservationCtx(peer, tempChanID); err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err)
}
err := f.cfg.SendToPeer(peer, errMsg)
if err != nil {
fndgLog.Errorf("unable to send error message to peer %v", err) fndgLog.Errorf("unable to send error message to peer %v", err)
return
} }
return
} }
// reservationCoordinator is the primary goroutine tasked with progressing the // reservationCoordinator is the primary goroutine tasked with progressing the
@ -880,7 +891,6 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
dbPendingChannels, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels() dbPendingChannels, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels()
if err != nil { if err != nil {
msg.resp <- nil
msg.err <- err msg.err <- err
return return
} }
@ -898,7 +908,6 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) {
} }
msg.resp <- pendingChannels msg.resp <- pendingChannels
msg.err <- nil
} }
// processFundingOpen sends a message to the fundingManager allowing it to // processFundingOpen sends a message to the fundingManager allowing it to
@ -1216,7 +1225,6 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
fmsg.peerAddress.IdentityKey, err) fmsg.peerAddress.IdentityKey, err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey, f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err) msg.PendingChannelID, err)
resCtx.err <- err
return return
} }
@ -1261,7 +1269,6 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
fndgLog.Errorf("Unable to parse signature: %v", err) fndgLog.Errorf("Unable to parse signature: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey, f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err) msg.PendingChannelID, err)
resCtx.err <- err
return return
} }
err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated) err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingCreated)
@ -1269,7 +1276,6 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
fndgLog.Errorf("Unable to send funding complete message: %v", err) fndgLog.Errorf("Unable to send funding complete message: %v", err)
f.failFundingFlow(fmsg.peerAddress.IdentityKey, f.failFundingFlow(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID, err) msg.PendingChannelID, err)
resCtx.err <- err
return return
} }
} }
@ -1431,6 +1437,11 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
case <-timeoutChan: case <-timeoutChan:
// We did not see the funding confirmation before // We did not see the funding confirmation before
// timeout, so we forget the channel. // timeout, so we forget the channel.
err := fmt.Errorf("timeout waiting for funding tx "+
"(%v) to confirm", completeChan.FundingOutpoint)
fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err)
deleteFromDatabase() deleteFromDatabase()
return return
case <-f.quit: case <-f.quit:
@ -1488,9 +1499,8 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
err := fmt.Errorf("Unable to find signed reservation for "+ err := fmt.Errorf("Unable to find signed reservation for "+
"chan_id=%x", fmsg.msg.ChanID) "chan_id=%x", fmsg.msg.ChanID)
fndgLog.Warnf(err.Error()) fndgLog.Warnf(err.Error())
// TODO: add ErrChanNotFound?
f.failFundingFlow(fmsg.peerAddress.IdentityKey, f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err) fmsg.msg.ChanID, err)
return return
} }
@ -1522,7 +1532,6 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig) completeChan, err := resCtx.reservation.CompleteReservation(nil, commitSig)
if err != nil { if err != nil {
fndgLog.Errorf("Unable to complete reservation sign complete: %v", err) fndgLog.Errorf("Unable to complete reservation sign complete: %v", err)
resCtx.err <- err
f.failFundingFlow(fmsg.peerAddress.IdentityKey, f.failFundingFlow(fmsg.peerAddress.IdentityKey,
pendingChanID, err) pendingChanID, err)
return return
@ -2671,13 +2680,13 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
peerKey := fmsg.peerAddress.IdentityKey peerKey := fmsg.peerAddress.IdentityKey
chanID := fmsg.err.ChanID chanID := fmsg.err.ChanID
// First, we'll attempt to retrieve the funding workflow that this // First, we'll attempt to retrieve and cancel the funding workflow
// error was tied to. If we're unable to do so, then we'll exit early // that this error was tied to. If we're unable to do so, then we'll
// as this was an unwarranted error. // exit early as this was an unwarranted error.
resCtx, err := f.getReservationCtx(peerKey, chanID) resCtx, err := f.cancelReservationCtx(peerKey, chanID)
if err != nil { if err != nil {
fndgLog.Warnf("Received error for non-existent funding "+ fndgLog.Warnf("Received error for non-existent funding "+
"flow: %v", spew.Sdump(protocolErr)) "flow: %v (%v)", err, spew.Sdump(protocolErr))
return return
} }
@ -2691,21 +2700,17 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
// If this isn't a simple error code, then we'll display the entire // If this isn't a simple error code, then we'll display the entire
// thing. // thing.
if len(protocolErr.Data) > 1 { if len(protocolErr.Data) > 1 {
resCtx.err <- grpc.Errorf( err = grpc.Errorf(
lnErr.ToGrpcCode(), string(protocolErr.Data), lnErr.ToGrpcCode(), string(protocolErr.Data),
) )
} else { } else {
// Otherwise, we'll attempt to display just the error code // Otherwise, we'll attempt to display just the error code
// itself. // itself.
resCtx.err <- grpc.Errorf( err = grpc.Errorf(
lnErr.ToGrpcCode(), lnErr.String(), lnErr.ToGrpcCode(), lnErr.String(),
) )
} }
resCtx.err <- err
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil {
fndgLog.Warnf("unable to delete reservation: %v", err)
return
}
} }
// pruneZombieReservations loops through all pending reservations and fails the // pruneZombieReservations loops through all pending reservations and fails the
@ -2744,10 +2749,21 @@ func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
fndgLog.Infof("Cancelling funding reservation for node_key=%x, "+ fndgLog.Infof("Cancelling funding reservation for node_key=%x, "+
"chan_id=%x", peerKey.SerializeCompressed(), pendingChanID[:]) "chan_id=%x", peerKey.SerializeCompressed(), pendingChanID[:])
ctx, err := f.getReservationCtx(peerKey, pendingChanID) peerIDKey := newSerializedKey(peerKey)
if err != nil { f.resMtx.Lock()
return nil, errors.Errorf("unable to find reservation: %v", defer f.resMtx.Unlock()
err)
nodeReservations, ok := f.activeReservations[peerIDKey]
if !ok {
// No reservations for this node.
return nil, errors.Errorf("no active reservations for peer(%x)",
peerIDKey[:])
}
ctx, ok := nodeReservations[pendingChanID]
if !ok {
return nil, errors.Errorf("unknown channel (id: %x) for "+
"peer(%x)", pendingChanID[:], peerIDKey[:])
} }
if err := ctx.reservation.Cancel(); err != nil { if err := ctx.reservation.Cancel(); err != nil {
@ -2755,7 +2771,13 @@ func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
err) err)
} }
f.deleteReservationCtx(peerKey, pendingChanID) delete(nodeReservations, pendingChanID)
// If this was the last active reservation for this peer, delete the
// peer's entry altogether.
if len(nodeReservations) == 0 {
delete(f.activeReservations, peerIDKey)
}
return ctx, nil return ctx, nil
} }
@ -2768,8 +2790,20 @@ func (f *fundingManager) deleteReservationCtx(peerKey *btcec.PublicKey,
// channelManager? // channelManager?
peerIDKey := newSerializedKey(peerKey) peerIDKey := newSerializedKey(peerKey)
f.resMtx.Lock() f.resMtx.Lock()
delete(f.activeReservations[peerIDKey], pendingChanID) defer f.resMtx.Unlock()
f.resMtx.Unlock()
nodeReservations, ok := f.activeReservations[peerIDKey]
if !ok {
// No reservations for this node.
return
}
delete(nodeReservations, pendingChanID)
// If this was the last active reservation for this peer, delete the
// peer's entry altogether.
if len(nodeReservations) == 0 {
delete(f.activeReservations, peerIDKey)
}
} }
// getReservationCtx returns the reservation context for a particular pending // getReservationCtx returns the reservation context for a particular pending
@ -2783,8 +2817,8 @@ func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey,
f.resMtx.RUnlock() f.resMtx.RUnlock()
if !ok { if !ok {
return nil, errors.Errorf("unknown channel (id: %x)", return nil, errors.Errorf("unknown channel (id: %x) for "+
pendingChanID[:]) "peer(%x)", pendingChanID[:], peerIDKey[:])
} }
return resCtx, nil return resCtx, nil

@ -1447,6 +1447,9 @@ func TestFundingManagerFundingTimeout(t *testing.T) {
Height: fundingBroadcastHeight + 288, Height: fundingBroadcastHeight + 288,
} }
// Bob should have sent an Error message to Alice.
assertErrorSent(t, bob.msgChan)
// Should not be pending anymore. // Should not be pending anymore.
assertNumPendingChannelsBecomes(t, bob, 0) assertNumPendingChannelsBecomes(t, bob, 0)
} }
@ -1511,6 +1514,9 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) {
// Since Alice was the initiator, the channel should not have timed out // Since Alice was the initiator, the channel should not have timed out
assertNumPendingChannelsRemains(t, alice, 1) assertNumPendingChannelsRemains(t, alice, 1)
// Bob should have sent an Error message to Alice.
assertErrorSent(t, bob.msgChan)
// Since Bob was not the initiator, the channel should timeout // Since Bob was not the initiator, the channel should timeout
assertNumPendingChannelsBecomes(t, bob, 0) assertNumPendingChannelsBecomes(t, bob, 0)
} }

@ -2006,7 +2006,11 @@ func (s *server) OpenChannel(nodeKey *btcec.PublicKey,
fundingFeePerVSize lnwallet.SatPerVByte, private bool, fundingFeePerVSize lnwallet.SatPerVByte, private bool,
remoteCsvDelay uint16) (chan *lnrpc.OpenStatusUpdate, chan error) { remoteCsvDelay uint16) (chan *lnrpc.OpenStatusUpdate, chan error) {
updateChan := make(chan *lnrpc.OpenStatusUpdate, 1) // The updateChan will have a buffer of 2, since we expect a
// ChanPending + a ChanOpen update, and we want to make sure the
// funding process is not blocked if the caller is not reading the
// updates.
updateChan := make(chan *lnrpc.OpenStatusUpdate, 2)
errChan := make(chan error, 1) errChan := make(chan error, 1)
var ( var (