funding: decouple funding wait from fundingLocked and chanAnn

This commit decouples the wait for funding transaction confirmations
in the waitForFundingConfirmation function from the announcement of
the channel in the sendFundingLockedAndAnnounceChannel function.
Additionally, the sendFundingLockedAndAnnounceChannel function is
now decoupled into the sendFundingLocked and sendChannelAnnouncement
functions. There is also now a helper function that houses creation
of a lnwire.LightningChannel object, calls to both sendFundingLocked
and sendChannelAnnouncement.
This commit is contained in:
nsa 2017-09-17 15:34:59 -04:00 committed by Olaoluwa Osuntokun
parent 3b2e4caa93
commit 9c0c889131

@ -380,11 +380,11 @@ func (f *fundingManager) Start() error {
f.localDiscoverySignals[chanID] = make(chan struct{}) f.localDiscoverySignals[chanID] = make(chan struct{})
doneChan := make(chan struct{}) confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{}) timeoutChan := make(chan struct{})
go func(ch *channeldb.OpenChannel) { go func(ch *channeldb.OpenChannel) {
go f.waitForFundingWithTimeout(ch, doneChan, timeoutChan) go f.waitForFundingWithTimeout(ch, confChan, timeoutChan)
select { select {
case <-timeoutChan: case <-timeoutChan:
@ -403,8 +403,19 @@ func (f *fundingManager) Start() error {
case <-f.quit: case <-f.quit:
// The fundingManager is shutting down, and will // The fundingManager is shutting down, and will
// resume wait on startup. // resume wait on startup.
case <-doneChan: case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding" +
"confirmation failed")
return
}
// Success, funding transaction was confirmed. // Success, funding transaction was confirmed.
err := f.handleFundingConfirmation(ch, shortChanID)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
return
}
} }
}(channel) }(channel)
} }
@ -461,8 +472,13 @@ func (f *fundingManager) Start() error {
f.wg.Add(1) f.wg.Add(1)
go func() { go func() {
defer f.wg.Done() defer f.wg.Done()
f.sendFundingLockedAndAnnounceChannel(channel,
shortChanID) err := f.handleFundingConfirmation(channel, shortChanID)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
return
}
}() }()
case fundingLockedSent: case fundingLockedSent:
@ -477,11 +493,17 @@ func (f *fundingManager) Start() error {
if err != nil { if err != nil {
fndgLog.Errorf("error creating "+ fndgLog.Errorf("error creating "+
"lightning channel: %v", err) "lightning channel: %v", err)
return
} }
defer lnChannel.Stop() defer lnChannel.Stop()
f.sendChannelAnnouncement(channel, lnChannel, err = f.sendChannelAnnouncement(channel, lnChannel,
shortChanID) shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcement: %v", err)
return
}
}() }()
default: default:
@ -1123,9 +1145,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// transaction in 288 blocks (~ 48 hrs), by canceling the reservation // transaction in 288 blocks (~ 48 hrs), by canceling the reservation
// and canceling the wait for the funding confirmation. // and canceling the wait for the funding confirmation.
go func() { go func() {
doneChan := make(chan struct{}) confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{}) timeoutChan := make(chan struct{})
go f.waitForFundingWithTimeout(completeChan, doneChan, go f.waitForFundingWithTimeout(completeChan, confChan,
timeoutChan) timeoutChan)
select { select {
@ -1135,11 +1157,24 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
deleteFromDatabase() deleteFromDatabase()
case <-f.quit: case <-f.quit:
// The fundingManager is shutting down, will resume // The fundingManager is shutting down, will resume
// wait for funding transaction on startup. // wait for funding transaction on startup.
case <-doneChan: case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding confirmation" +
" failed")
return
}
// Success, funding transaction was confirmed. // Success, funding transaction was confirmed.
f.deleteReservationCtx(peerKey, err := f.handleFundingConfirmation(completeChan,
fmsg.msg.PendingChannelID) shortChanID)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
return
}
f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
} }
}() }()
} }
@ -1227,8 +1262,10 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
}, },
} }
f.wg.Add(1)
go func() { go func() {
doneChan := make(chan struct{}) defer f.wg.Done()
confChan := make(chan *lnwire.ShortChannelID)
cancelChan := make(chan struct{}) cancelChan := make(chan struct{})
// In case the fundingManager is stopped at some point during // In case the fundingManager is stopped at some point during
@ -1239,13 +1276,27 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
go func() { go func() {
defer f.wg.Done() defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan, f.waitForFundingConfirmation(completeChan, cancelChan,
doneChan) confChan)
}() }()
select { select {
case <-f.quit: case <-f.quit:
return return
case <-doneChan: case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding confirmation" +
" failed")
return
}
// Success, funding transaction was confirmed
err := f.handleFundingConfirmation(completeChan,
shortChanID)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
return
}
} }
// Finally give the caller a final update notifying them that // Finally give the caller a final update notifying them that
@ -1269,21 +1320,22 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that // waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that
// will cancel the wait for confirmation if maxWaitNumBlocksFundingConf has // will cancel the wait for confirmation if maxWaitNumBlocksFundingConf has
// passed from bestHeight. In the case of timeout, the timeoutChan will be // passed from bestHeight. In the case of timeout, the timeoutChan will be
// closed. In case of confirmation or error, doneChan will be closed. // closed. In case of error, confChan will be closed. In case of success,
// a *lnwire.ShortChannelID will be passed to confChan.
func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel, func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel,
doneChan chan<- struct{}, timeoutChan chan<- struct{}) { confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) {
epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn() epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn()
if err != nil { if err != nil {
fndgLog.Errorf("unable to register for epoch notification: %v", fndgLog.Errorf("unable to register for epoch notification: %v",
err) err)
close(doneChan) close(confChan)
return return
} }
defer epochClient.Cancel() defer epochClient.Cancel()
waitingDoneChan := make(chan struct{}) waitingConfChan := make(chan *lnwire.ShortChannelID)
cancelChan := make(chan struct{}) cancelChan := make(chan struct{})
// Add this goroutine to wait group so we can be sure that it is // Add this goroutine to wait group so we can be sure that it is
@ -1292,7 +1344,7 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
go func() { go func() {
defer f.wg.Done() defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan, f.waitForFundingConfirmation(completeChan, cancelChan,
waitingDoneChan) waitingConfChan)
}() }()
// On block maxHeight we will cancel the funding confirmation wait. // On block maxHeight we will cancel the funding confirmation wait.
@ -1322,9 +1374,19 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
// The fundingManager is shutting down, will resume // The fundingManager is shutting down, will resume
// waiting for the funding transaction on startup. // waiting for the funding transaction on startup.
return return
case <-waitingDoneChan: case shortChanID, ok := <-waitingConfChan:
close(doneChan) if !ok {
return // Failed waiting for confirmation, close
// confChan to indicate failure.
close(confChan)
return
}
select {
case confChan <- shortChanID:
case <-f.quit:
return
}
} }
} }
} }
@ -1334,11 +1396,12 @@ func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenC
// function of waitForFundingConfirmation is to wait for blockchain // function of waitForFundingConfirmation is to wait for blockchain
// confirmation, and then to notify the other systems that must be notified // confirmation, and then to notify the other systems that must be notified
// when a channel has become active for lightning transactions. // when a channel has become active for lightning transactions.
// The wait can be canceled by closing the cancelChan. // The wait can be canceled by closing the cancelChan. In case of success,
// a *lnwire.ShortChannelID will be passed to confChan.
func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.OpenChannel, func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.OpenChannel,
cancelChan <-chan struct{}, doneChan chan<- struct{}) { cancelChan <-chan struct{}, confChan chan<- *lnwire.ShortChannelID) {
defer close(doneChan) defer close(confChan)
// Register with the ChainNotifier for a notification once the funding // Register with the ChainNotifier for a notification once the funding
// transaction reaches `numConfs` confirmations. // transaction reaches `numConfs` confirmations.
@ -1425,28 +1488,49 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
return return
} }
// Now that the funding transaction has the required number of select {
// confirmations, we send the fundingLocked message to the peer. case confChan <- &shortChanID:
f.sendFundingLockedAndAnnounceChannel(completeChan, &shortChanID) case <-f.quit:
}
// sendFundingLockedAndAnnounceChannel creates and sends the fundingLocked
// message, and then the channel announcement. This should be called after the
// funding transaction has been confirmed, and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
// With the channel marked open, we'll create the state-machine object
// which wraps the database state.
channel, err := lnwallet.NewLightningChannel(nil, nil,
f.cfg.FeeEstimator, completeChan)
if err != nil {
fndgLog.Errorf("error creating new lightning channel: %v", err)
return return
} }
defer channel.Stop() }
// handleFundingConfirmation is a wrapper method for creating a new
// lnwallet.LightningChannel object, calling sendFundingLocked, and for calling
// sendChannelAnnouncement. This is called after the funding transaction is
// confirmed.
func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// We create the state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(nil, nil, f.cfg.FeeEstimator,
completeChan)
if err != nil {
return err
}
defer lnChannel.Stop()
err = f.sendFundingLocked(completeChan, lnChannel, shortChanID)
if err != nil {
return fmt.Errorf("failed sending fundingLocked: %v", err)
}
err = f.sendChannelAnnouncement(completeChan, lnChannel, shortChanID)
if err != nil {
return fmt.Errorf("failed sending channel announcement: %v",
err)
}
return nil
}
// sendFundingLocked creates and sends the fundingLocked message.
// This should be called after the funding transaction has been confirmed,
// and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID) error {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
// Next, we'll send over the funding locked message which marks that we // Next, we'll send over the funding locked message which marks that we
// consider the channel open by presenting the remote party with our // consider the channel open by presenting the remote party with our
@ -1454,8 +1538,7 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
// will be unable to propose state transitions. // will be unable to propose state transitions.
nextRevocation, err := channel.NextRevocationKey() nextRevocation, err := channel.NextRevocationKey()
if err != nil { if err != nil {
fndgLog.Errorf("unable to create next revocation: %v", err) return fmt.Errorf("unable to create next revocation: %v", err)
return
} }
fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation) fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation)
@ -1489,7 +1572,7 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
case <-connected: case <-connected:
// Retry sending. // Retry sending.
case <-f.quit: case <-f.quit:
return return nil
} }
} }
@ -1500,14 +1583,11 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, err = f.saveChannelOpeningState(&completeChan.FundingOutpoint,
fundingLockedSent, shortChanID) fundingLockedSent, shortChanID)
if err != nil { if err != nil {
fndgLog.Errorf("error setting channel state to "+ return fmt.Errorf("error setting channel state to"+
"fundingLockedSent: %v", err) " fundingLockedSent: %v", err)
return
} }
// TODO(roasbeef): wait 6 blocks before announcing return nil
f.sendChannelAnnouncement(completeChan, channel, shortChanID)
} }
// sendChannelAnnouncement broadcast the necessary channel announcement // sendChannelAnnouncement broadcast the necessary channel announcement
@ -1515,7 +1595,9 @@ func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
// is sent (channelState is 'fundingLockedSent') and the channel is ready to // is sent (channelState is 'fundingLockedSent') and the channel is ready to
// be used. // be used.
func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel, func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) { channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) error {
// TODO(eugene) wait for 6 confirmations here
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
fundingPoint := completeChan.FundingOutpoint fundingPoint := completeChan.FundingOutpoint
@ -1529,8 +1611,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
channel.LocalFundingKey, channel.RemoteFundingKey, channel.LocalFundingKey, channel.RemoteFundingKey,
*shortChanID, chanID) *shortChanID, chanID)
if err != nil { if err != nil {
fndgLog.Errorf("channel announcement failed: %v", err) return fmt.Errorf("channel announcement failed: %v", err)
return
} }
// After the channel is successfully announced from the // After the channel is successfully announced from the
@ -1540,8 +1621,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
// messages, and persists them in case of a daemon shutdown. // messages, and persists them in case of a daemon shutdown.
err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint) err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint)
if err != nil { if err != nil {
fndgLog.Errorf("error deleting channel state: %v", err) return fmt.Errorf("error deleting channel state: %v", err)
return
} }
// Finally, as the local channel discovery has been fully processed, // Finally, as the local channel discovery has been fully processed,
@ -1553,7 +1633,7 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
} }
f.localDiscoveryMtx.Unlock() f.localDiscoveryMtx.Unlock()
return return nil
} }
// processFundingLocked sends a message to the fundingManager allowing it to // processFundingLocked sends a message to the fundingManager allowing it to