fundingmanager: make waitForFundingTimeout sync
This commit makes the waitForFundingTimeout method synchronous, and return ErrConfirmationTimeout in case the timeout is reached. We also simplify the internals by using waitForTimout defined earlier.
This commit is contained in:
parent
47fae26dc4
commit
893c6cbc59
@ -85,6 +85,12 @@ var (
|
||||
// been signaled to shut down.
|
||||
ErrFundingManagerShuttingDown = errors.New("funding manager shutting " +
|
||||
"down")
|
||||
|
||||
// ErrConfirmationTimeout is an error returned when we as a responder
|
||||
// are waiting for a funding transaction to confirm, but too many
|
||||
// blocks pass without confirmation.
|
||||
ErrConfirmationTimeout = errors.New("timeout waiting for funding " +
|
||||
"confirmation")
|
||||
)
|
||||
|
||||
// reservationWithCtx encapsulates a pending channel reservation. This wrapper
|
||||
@ -519,20 +525,20 @@ func (f *fundingManager) start() error {
|
||||
}
|
||||
}
|
||||
|
||||
confChan := make(chan *lnwire.ShortChannelID)
|
||||
timeoutChan := make(chan struct{})
|
||||
|
||||
f.wg.Add(1)
|
||||
go func(ch *channeldb.OpenChannel) {
|
||||
go f.waitForFundingWithTimeout(ch, confChan, timeoutChan)
|
||||
defer f.wg.Done()
|
||||
|
||||
var shortChanID *lnwire.ShortChannelID
|
||||
var ok bool
|
||||
select {
|
||||
case <-timeoutChan:
|
||||
// Timeout channel will be triggered if the number of blocks
|
||||
// mined since the channel was initiated reaches
|
||||
// maxWaitNumBlocksFundingConf and we are not the channel
|
||||
// initiator.
|
||||
shortChanID, err := f.waitForFundingWithTimeout(ch)
|
||||
if err == ErrConfirmationTimeout {
|
||||
fndgLog.Warnf("Timeout waiting for funding "+
|
||||
"confirmation of ChannelPoint(%v)",
|
||||
ch.FundingOutpoint)
|
||||
|
||||
// We'll get a timeout if the number of blocks
|
||||
// mined since the channel was initiated
|
||||
// reaches maxWaitNumBlocksFundingConf and we
|
||||
// are not the channel initiator.
|
||||
localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis()
|
||||
closeInfo := &channeldb.ChannelCloseSummary{
|
||||
ChainHash: ch.ChainHash,
|
||||
@ -552,19 +558,11 @@ func (f *fundingManager) start() error {
|
||||
return
|
||||
}
|
||||
return
|
||||
|
||||
case <-f.quit:
|
||||
// The fundingManager is shutting down, and will
|
||||
// resume wait on startup.
|
||||
} else if err != nil {
|
||||
fndgLog.Errorf("Failed waiting for funding "+
|
||||
"confirmation for ChannelPoint(%v): %v",
|
||||
ch.FundingOutpoint, err)
|
||||
return
|
||||
|
||||
case shortChanID, ok = <-confChan:
|
||||
if !ok {
|
||||
fndgLog.Errorf("Waiting for funding" +
|
||||
"confirmation failed")
|
||||
return
|
||||
}
|
||||
// Fallthrough.
|
||||
}
|
||||
|
||||
// Success, funding transaction was confirmed.
|
||||
@ -1609,15 +1607,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
|
||||
f.wg.Add(1)
|
||||
go func() {
|
||||
defer f.wg.Done()
|
||||
confChan := make(chan *lnwire.ShortChannelID)
|
||||
timeoutChan := make(chan struct{})
|
||||
go f.waitForFundingWithTimeout(completeChan, confChan,
|
||||
timeoutChan)
|
||||
|
||||
var shortChanID *lnwire.ShortChannelID
|
||||
var ok bool
|
||||
select {
|
||||
case <-timeoutChan:
|
||||
shortChanID, err := f.waitForFundingWithTimeout(completeChan)
|
||||
if err == ErrConfirmationTimeout {
|
||||
// We did not see the funding confirmation before
|
||||
// timeout, so we forget the channel.
|
||||
err := fmt.Errorf("timeout waiting for funding tx "+
|
||||
@ -1626,17 +1618,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
|
||||
f.failFundingFlow(fmsg.peer, pendingChanID, err)
|
||||
deleteFromDatabase()
|
||||
return
|
||||
case <-f.quit:
|
||||
// The fundingManager is shutting down, will resume
|
||||
// wait for funding transaction on startup.
|
||||
} else if err != nil {
|
||||
fndgLog.Errorf(err.Error())
|
||||
return
|
||||
case shortChanID, ok = <-confChan:
|
||||
if !ok {
|
||||
fndgLog.Errorf("waiting for funding confirmation" +
|
||||
" failed")
|
||||
return
|
||||
}
|
||||
// Fallthrough.
|
||||
}
|
||||
|
||||
// Success, funding transaction was confirmed.
|
||||
@ -1781,28 +1765,15 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
|
||||
f.wg.Add(1)
|
||||
go func() {
|
||||
defer f.wg.Done()
|
||||
confChan := make(chan *lnwire.ShortChannelID)
|
||||
cancelChan := make(chan struct{})
|
||||
|
||||
// In case the fundingManager is stopped at some point during
|
||||
// the remaining part of the opening process, we must wait for
|
||||
// this process to finish (either successfully or with some
|
||||
// error), before the fundingManager can be shut down.
|
||||
f.wg.Add(1)
|
||||
go f.waitForFundingConfirmation(completeChan, cancelChan,
|
||||
confChan)
|
||||
|
||||
var shortChanID *lnwire.ShortChannelID
|
||||
var ok bool
|
||||
select {
|
||||
case <-f.quit:
|
||||
shortChanID, err := f.waitForFundingWithTimeout(completeChan)
|
||||
if err != nil {
|
||||
// Since we are the channel initiator, we don't expect
|
||||
// to get ErrConfirmationTimeout.
|
||||
fndgLog.Errorf("Failed waiting for funding "+
|
||||
"confirmation for ChannelPoint(%v): %v",
|
||||
completeChan.FundingOutpoint, err)
|
||||
return
|
||||
case shortChanID, ok = <-confChan:
|
||||
if !ok {
|
||||
fndgLog.Errorf("waiting for funding " +
|
||||
"confirmation failed")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err = f.handleFundingConfirmation(completeChan, *shortChanID)
|
||||
@ -1821,82 +1792,50 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
|
||||
}()
|
||||
}
|
||||
|
||||
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that
|
||||
// will cancel the wait for confirmation if we are not the channel initiator and
|
||||
// the maxWaitNumBlocksFundingConf has passed from bestHeight.
|
||||
// In the case of timeout, the timeoutChan will be closed. In case of error,
|
||||
// confChan will be closed. In case of success, a *lnwire.ShortChannelID will be
|
||||
// passed to confChan.
|
||||
func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel,
|
||||
confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) {
|
||||
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation and
|
||||
// waitForTimeout that will return ErrConfirmationTimeout if we are not the
|
||||
// channel initiator and the maxWaitNumBlocksFundingConf has passed from the
|
||||
// funding broadcast height. In case of confirmation, the short channel ID of
|
||||
// the channel will be returned.
|
||||
func (f *fundingManager) waitForFundingWithTimeout(
|
||||
ch *channeldb.OpenChannel) (*lnwire.ShortChannelID, error) {
|
||||
|
||||
epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil)
|
||||
if err != nil {
|
||||
fndgLog.Errorf("unable to register for epoch notification: %v",
|
||||
err)
|
||||
close(confChan)
|
||||
return
|
||||
}
|
||||
|
||||
defer epochClient.Cancel()
|
||||
|
||||
waitingConfChan := make(chan *lnwire.ShortChannelID)
|
||||
confChan := make(chan *lnwire.ShortChannelID)
|
||||
timeoutChan := make(chan error, 1)
|
||||
cancelChan := make(chan struct{})
|
||||
|
||||
// Add this goroutine to wait group so we can be sure that it is
|
||||
// properly stopped before the funding manager can be shut down.
|
||||
f.wg.Add(1)
|
||||
go f.waitForFundingConfirmation(completeChan, cancelChan,
|
||||
waitingConfChan)
|
||||
go f.waitForFundingConfirmation(ch, cancelChan, confChan)
|
||||
|
||||
// On block maxHeight we will cancel the funding confirmation wait.
|
||||
maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf
|
||||
for {
|
||||
select {
|
||||
case epoch, ok := <-epochClient.Epochs:
|
||||
if !ok {
|
||||
fndgLog.Warnf("Epoch client shutting down")
|
||||
return
|
||||
}
|
||||
// If we are not the initiator, we have no money at stake and will
|
||||
// timeout waiting for the funding transaction to confirm after a
|
||||
// while.
|
||||
if !ch.IsInitiator {
|
||||
f.wg.Add(1)
|
||||
go f.waitForTimeout(ch, cancelChan, timeoutChan)
|
||||
}
|
||||
defer close(cancelChan)
|
||||
|
||||
// If we are not the channel initiator it's safe
|
||||
// to timeout the channel
|
||||
if uint32(epoch.Height) >= maxHeight && !completeChan.IsInitiator {
|
||||
fndgLog.Warnf("waited for %v blocks without "+
|
||||
"seeing funding transaction confirmed,"+
|
||||
" cancelling.", maxWaitNumBlocksFundingConf)
|
||||
|
||||
// Cancel the waitForFundingConfirmation
|
||||
// goroutine.
|
||||
close(cancelChan)
|
||||
|
||||
// Notify the caller of the timeout.
|
||||
close(timeoutChan)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: If we are the channel initiator implement
|
||||
// a method for recovering the funds from the funding
|
||||
// transaction
|
||||
|
||||
case <-f.quit:
|
||||
// The fundingManager is shutting down, will resume
|
||||
// waiting for the funding transaction on startup.
|
||||
return
|
||||
case shortChanID, ok := <-waitingConfChan:
|
||||
if !ok {
|
||||
// Failed waiting for confirmation, close
|
||||
// confChan to indicate failure.
|
||||
close(confChan)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case confChan <- shortChanID:
|
||||
case <-f.quit:
|
||||
return
|
||||
}
|
||||
var shortChanID *lnwire.ShortChannelID
|
||||
var ok bool
|
||||
select {
|
||||
case err := <-timeoutChan:
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, ErrConfirmationTimeout
|
||||
|
||||
case <-f.quit:
|
||||
// The fundingManager is shutting down, and will resume wait on
|
||||
// startup.
|
||||
return nil, ErrFundingManagerShuttingDown
|
||||
|
||||
case shortChanID, ok = <-confChan:
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("waiting for funding" +
|
||||
"confirmation failed")
|
||||
}
|
||||
return shortChanID, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user