Merge pull request #1910 from halseth/fundingmanager-state-machine

Fundingmanager state machine cleanup
This commit is contained in:
Olaoluwa Osuntokun 2019-09-25 16:31:51 -07:00 committed by GitHub
commit b1d122773e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 489 additions and 522 deletions

@ -86,6 +86,12 @@ var (
// been signaled to shut down.
ErrFundingManagerShuttingDown = errors.New("funding manager shutting " +
"down")
// ErrConfirmationTimeout is an error returned when we as a responder
// are waiting for a funding transaction to confirm, but too many
// blocks pass without confirmation.
ErrConfirmationTimeout = errors.New("timeout waiting for funding " +
"confirmation")
)
// reservationWithCtx encapsulates a pending channel reservation. This wrapper
@ -489,230 +495,53 @@ func (f *fundingManager) start() error {
// down.
// TODO(roasbeef): store height that funding finished?
// * would then replace call below
pendingChannels, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels()
allChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels()
if err != nil {
return err
}
// For any channels that were in a pending state when the daemon was
// last connected, the Funding Manager will re-initialize the channel
// barriers and will also launch waitForFundingConfirmation to wait for
// the channel's funding transaction to be confirmed on the blockchain.
for _, channel := range pendingChannels {
f.barrierMtx.Lock()
fndgLog.Tracef("Loading pending ChannelPoint(%v), creating chan "+
"barrier", channel.FundingOutpoint)
for _, channel := range allChannels {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
f.newChanBarriers[chanID] = make(chan struct{})
f.barrierMtx.Unlock()
f.localDiscoverySignals[chanID] = make(chan struct{})
// Rebroadcast the funding transaction for any pending channel
// that we initiated. If this operation fails due to a reported
// double spend, we treat this as an indicator that we have
// already broadcast this transaction. Otherwise, we simply log
// the error as there isn't anything we can currently do to
// recover.
if channel.ChanType == channeldb.SingleFunder &&
channel.IsInitiator {
err := f.cfg.PublishTransaction(channel.FundingTxn)
if err != nil {
fndgLog.Errorf("Unable to rebroadcast funding "+
"tx for ChannelPoint(%v): %v",
channel.FundingOutpoint, err)
}
}
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{})
go func(ch *channeldb.OpenChannel) {
go f.waitForFundingWithTimeout(ch, confChan, timeoutChan)
select {
case <-timeoutChan:
// Timeout channel will be triggered if the number of blocks
// mined since the channel was initiated reaches
// maxWaitNumBlocksFundingConf and we are not the channel
// initiator.
localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis()
closeInfo := &channeldb.ChannelCloseSummary{
ChainHash: ch.ChainHash,
ChanPoint: ch.FundingOutpoint,
RemotePub: ch.IdentityPub,
Capacity: ch.Capacity,
SettledBalance: localBalance,
CloseType: channeldb.FundingCanceled,
RemoteCurrentRevocation: ch.RemoteCurrentRevocation,
RemoteNextRevocation: ch.RemoteNextRevocation,
LocalChanConfig: ch.LocalChanCfg,
}
if err := ch.CloseChannel(closeInfo); err != nil {
fndgLog.Errorf("Failed closing channel "+
"%v: %v", ch.FundingOutpoint, err)
}
case <-f.quit:
// The fundingManager is shutting down, and will
// resume wait on startup.
case shortChanID, ok := <-confChan:
if !ok {
fndgLog.Errorf("Waiting for funding" +
"confirmation failed")
return
}
// The funding transaction has confirmed, so
// we'll attempt to retrieve the remote peer
// to complete the rest of the funding flow.
peerChan := make(chan lnpeer.Peer, 1)
var peerKey [33]byte
copy(peerKey[:], ch.IdentityPub.SerializeCompressed())
f.cfg.NotifyWhenOnline(peerKey, peerChan)
var peer lnpeer.Peer
select {
case peer = <-peerChan:
case <-f.quit:
return
}
err := f.handleFundingConfirmation(
peer, ch, shortChanID,
)
if err != nil {
fndgLog.Errorf("Failed to handle "+
"funding confirmation: %v", err)
return
}
}
}(channel)
}
// Fetch all our open channels, and make sure they all finalized the
// opening process.
// TODO(halseth): this check is only done on restart atm, but should
// also be done if a peer that disappeared during the opening process
// reconnects.
openChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels()
if err != nil {
return err
}
for _, channel := range openChannels {
channelState, shortChanID, err := f.getChannelOpeningState(
&channel.FundingOutpoint)
if err == ErrChannelNotFound {
// Channel not in fundingManager's opening database,
// meaning it was successfully announced to the
// network.
continue
} else if err != nil {
return err
}
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
fndgLog.Debugf("channel (%v) with opening state %v found",
chanID, channelState)
// For any channels that were in a pending state when the
// daemon was last connected, the Funding Manager will
// re-initialize the channel barriers, and republish the
// funding transaction if we're the initiator.
if channel.IsPending {
// Set up the channel barriers again, to make sure
// waitUntilChannelOpen correctly waits until the
// opening process is completely over.
f.barrierMtx.Lock()
fndgLog.Tracef("Loading pending ChannelPoint(%v), "+
"creating chan barrier", channel.FundingOutpoint)
"creating chan barrier",
channel.FundingOutpoint)
f.newChanBarriers[chanID] = make(chan struct{})
f.barrierMtx.Unlock()
}
// If we did find the channel in the opening state database, we
// have seen the funding transaction being confirmed, but we
// did not finish the rest of the setup procedure before we shut
// down. We handle the remaining steps of this setup by
// continuing the procedure where we left off.
switch channelState {
case markedOpen:
// The funding transaction was confirmed, but we did not
// successfully send the fundingLocked message to the
// peer, so let's do that now.
f.wg.Add(1)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
f.localDiscoverySignals[chanID] = make(chan struct{})
peerChan := make(chan lnpeer.Peer, 1)
// Rebroadcast the funding transaction for any pending
// channel that we initiated. No error will be returned
// if the transaction already has been broadcasted.
if channel.ChanType == channeldb.SingleFunder &&
channel.IsInitiator {
var peerKey [33]byte
copy(peerKey[:], dbChan.IdentityPub.SerializeCompressed())
f.cfg.NotifyWhenOnline(peerKey, peerChan)
var peer lnpeer.Peer
select {
case peer = <-peerChan:
case <-f.quit:
return
}
err := f.handleFundingConfirmation(
peer, dbChan, shortChanID,
err := f.cfg.PublishTransaction(
channel.FundingTxn,
)
if err != nil {
fndgLog.Errorf("Failed to handle "+
"funding confirmation: %v", err)
return
fndgLog.Errorf("Unable to rebroadcast "+
"funding tx for "+
"ChannelPoint(%v): %v",
channel.FundingOutpoint, err)
}
}(channel)
case fundingLockedSent:
// fundingLocked was sent to peer, but the channel
// was not added to the router graph and the channel
// announcement was not sent.
f.wg.Add(1)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
err = f.addToRouterGraph(dbChan, shortChanID)
if err != nil {
fndgLog.Errorf("failed adding to "+
"router graph: %v", err)
return
}
// TODO(halseth): should create a state machine
// that can more easily be resumed from
// different states, to avoid this code
// duplication.
err = f.annAfterSixConfs(dbChan, shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcements: %v", err)
return
}
}(channel)
case addedToRouterGraph:
// The channel was added to the Router's topology, but
// the channel announcement was not sent.
f.wg.Add(1)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
err = f.annAfterSixConfs(dbChan, shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcement: %v", err)
return
}
}(channel)
default:
fndgLog.Errorf("undefined channelState: %v",
channelState)
}
}
// We will restart the funding state machine for all channels,
// which will wait for the channel's funding transaction to be
// confirmed on the blockchain, and transmit the messages
// necessary for the channel to be operational.
f.wg.Add(1)
go f.advanceFundingState(channel, chanID, nil)
}
f.wg.Add(1) // TODO(roasbeef): tune
@ -937,6 +766,283 @@ func (f *fundingManager) reservationCoordinator() {
}
}
// advanceFundingState will advance the channel through the steps after the
// funding transaction is broadcasted, up until the point where the channel is
// ready for operation. This includes waiting for the funding transaction to
// confirm, sending funding locked to the peer, adding the channel to the
// router graph, and announcing the channel. The updateChan can be set non-nil
// to get OpenStatusUpdates.
//
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) advanceFundingState(channel *channeldb.OpenChannel,
pendingChanID [32]byte, updateChan chan<- *lnrpc.OpenStatusUpdate) {
defer f.wg.Done()
// If the channel is still pending we must wait for the funding
// transaction to confirm.
if channel.IsPending {
err := f.advancePendingChannelState(channel, pendingChanID)
if err != nil {
fndgLog.Errorf("Unable to advance pending state of "+
"ChannelPoint(%v): %v",
channel.FundingOutpoint, err)
return
}
}
// We create the state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(
nil, channel, nil,
)
if err != nil {
fndgLog.Errorf("Unable to create LightningChannel(%v): %v",
channel.FundingOutpoint, err)
return
}
for {
channelState, shortChanID, err := f.getChannelOpeningState(
&channel.FundingOutpoint,
)
if err == ErrChannelNotFound {
// Channel not in fundingManager's opening database,
// meaning it was successfully announced to the
// network.
// TODO(halseth): could do graph consistency check
// here, and re-add the edge if missing.
fndgLog.Debugf("ChannlPoint(%v) with chanID=%v not "+
"found in opening database, assuming already "+
"announced to the network",
channel.FundingOutpoint, pendingChanID)
return
} else if err != nil {
fndgLog.Errorf("Unable to query database for "+
"channel opening state(%v): %v",
channel.FundingOutpoint, err)
return
}
// If we did find the channel in the opening state database, we
// have seen the funding transaction being confirmed, but there
// are still steps left of the setup procedure. We continue the
// procedure where we left off.
err = f.stateStep(
channel, lnChannel, shortChanID, channelState,
updateChan,
)
if err != nil {
fndgLog.Errorf("Unable to advance state(%v): %v",
channel.FundingOutpoint, err)
return
}
}
}
// stateStep advances the confirmed channel one step in the funding state
// machine. This method is synchronous and the new channel opening state will
// have been written to the database when it successfully returns. The
// updateChan can be set non-nil to get OpenStatusUpdates.
func (f *fundingManager) stateStep(channel *channeldb.OpenChannel,
lnChannel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID, channelState channelOpeningState,
updateChan chan<- *lnrpc.OpenStatusUpdate) error {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
fndgLog.Debugf("Channel(%v) with ShortChanID %v has opening state %v",
chanID, shortChanID, channelState)
switch channelState {
// The funding transaction was confirmed, but we did not successfully
// send the fundingLocked message to the peer, so let's do that now.
case markedOpen:
err := f.sendFundingLocked(channel, lnChannel, shortChanID)
if err != nil {
return fmt.Errorf("failed sending fundingLocked: %v",
err)
}
// As the fundingLocked message is now sent to the peer, the
// channel is moved to the next state of the state machine. It
// will be moved to the last state (actually deleted from the
// database) after the channel is finally announced.
err = f.saveChannelOpeningState(
&channel.FundingOutpoint, fundingLockedSent,
shortChanID,
)
if err != nil {
return fmt.Errorf("error setting channel state to"+
" fundingLockedSent: %v", err)
}
fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"sent FundingLocked", chanID, shortChanID)
return nil
// fundingLocked was sent to peer, but the channel was not added to the
// router graph and the channel announcement was not sent.
case fundingLockedSent:
err := f.addToRouterGraph(channel, shortChanID)
if err != nil {
return fmt.Errorf("failed adding to "+
"router graph: %v", err)
}
// As the channel is now added to the ChannelRouter's topology,
// the channel is moved to the next state of the state machine.
// It will be moved to the last state (actually deleted from
// the database) after the channel is finally announced.
err = f.saveChannelOpeningState(
&channel.FundingOutpoint, addedToRouterGraph,
shortChanID,
)
if err != nil {
return fmt.Errorf("error setting channel state to"+
" addedToRouterGraph: %v", err)
}
fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"added to router graph", chanID, shortChanID)
// Give the caller a final update notifying them that
// the channel is now open.
// TODO(roasbeef): only notify after recv of funding locked?
fundingPoint := channel.FundingOutpoint
cp := &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: fundingPoint.Hash[:],
},
OutputIndex: fundingPoint.Index,
}
if updateChan != nil {
upd := &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: cp,
},
},
}
select {
case updateChan <- upd:
case <-f.quit:
return ErrFundingManagerShuttingDown
}
}
return nil
// The channel was added to the Router's topology, but the channel
// announcement was not sent.
case addedToRouterGraph:
err := f.annAfterSixConfs(channel, shortChanID)
if err != nil {
return fmt.Errorf("error sending channel "+
"announcement: %v", err)
}
// We delete the channel opening state from our internal
// database as the opening process has succeeded. We can do
// this because we assume the AuthenticatedGossiper queues the
// announcement messages, and persists them in case of a daemon
// shutdown.
err = f.deleteChannelOpeningState(&channel.FundingOutpoint)
if err != nil {
return fmt.Errorf("error deleting channel state: %v",
err)
}
fndgLog.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"announced", chanID, shortChanID)
return nil
}
return fmt.Errorf("undefined channelState: %v", channelState)
}
// advancePendingChannelState waits for a pending channel's funding tx to
// confirm, and marks it open in the database when that happens.
func (f *fundingManager) advancePendingChannelState(
channel *channeldb.OpenChannel, pendingChanID [32]byte) error {
shortChanID, err := f.waitForFundingWithTimeout(channel)
if err == ErrConfirmationTimeout {
// We'll get a timeout if the number of blocks mined
// since the channel was initiated reaches
// maxWaitNumBlocksFundingConf and we are not the
// channel initiator.
ch := channel
localBalance := ch.LocalCommitment.LocalBalance.ToSatoshis()
closeInfo := &channeldb.ChannelCloseSummary{
ChainHash: ch.ChainHash,
ChanPoint: ch.FundingOutpoint,
RemotePub: ch.IdentityPub,
Capacity: ch.Capacity,
SettledBalance: localBalance,
CloseType: channeldb.FundingCanceled,
RemoteCurrentRevocation: ch.RemoteCurrentRevocation,
RemoteNextRevocation: ch.RemoteNextRevocation,
LocalChanConfig: ch.LocalChanCfg,
}
if err := ch.CloseChannel(closeInfo); err != nil {
return fmt.Errorf("failed closing channel "+
"%v: %v", ch.FundingOutpoint, err)
}
timeoutErr := fmt.Errorf("timeout waiting for funding tx "+
"(%v) to confirm", channel.FundingOutpoint)
// When the peer comes online, we'll notify it that we
// are now considering the channel flow canceled.
f.wg.Add(1)
go func() {
defer f.wg.Done()
peerChan := make(chan lnpeer.Peer, 1)
var peerKey [33]byte
copy(peerKey[:], ch.IdentityPub.SerializeCompressed())
f.cfg.NotifyWhenOnline(peerKey, peerChan)
var peer lnpeer.Peer
select {
case peer = <-peerChan:
case <-f.quit:
return
}
// TODO(halseth): should this send be made
// reliable?
f.failFundingFlow(peer, pendingChanID, timeoutErr)
}()
return timeoutErr
} else if err != nil {
return fmt.Errorf("error waiting for funding "+
"confirmation for ChannelPoint(%v): %v",
channel.FundingOutpoint, err)
}
// Success, funding transaction was confirmed.
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
fndgLog.Debugf("ChannelID(%v) is now fully confirmed! "+
"(shortChanID=%v)", chanID, shortChanID)
err = f.handleFundingConfirmation(channel, *shortChanID)
if err != nil {
return fmt.Errorf("unable to handle funding "+
"confirmation for ChannelPoint(%v): %v",
channel.FundingOutpoint, err)
}
return nil
}
// handlePendingChannels responds to a request for details concerning all
// currently pending channels waiting for the final phase of the funding
// workflow (funding txn confirmation).
@ -1544,48 +1650,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// transaction in 288 blocks (~ 48 hrs), by canceling the reservation
// and canceling the wait for the funding confirmation.
f.wg.Add(1)
go func() {
defer f.wg.Done()
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{})
go f.waitForFundingWithTimeout(completeChan, confChan,
timeoutChan)
var shortChanID *lnwire.ShortChannelID
var ok bool
select {
case <-timeoutChan:
// We did not see the funding confirmation before
// timeout, so we forget the channel.
err := fmt.Errorf("timeout waiting for funding tx "+
"(%v) to confirm", completeChan.FundingOutpoint)
fndgLog.Warnf(err.Error())
f.failFundingFlow(fmsg.peer, pendingChanID, err)
deleteFromDatabase()
return
case <-f.quit:
// The fundingManager is shutting down, will resume
// wait for funding transaction on startup.
return
case shortChanID, ok = <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding confirmation" +
" failed")
return
}
// Fallthrough.
}
// Success, funding transaction was confirmed.
err := f.handleFundingConfirmation(
fmsg.peer, completeChan, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed to handle funding"+
"confirmation: %v", err)
return
}
}()
go f.advanceFundingState(completeChan, pendingChanID, nil)
}
// processFundingSigned sends a single funding sign complete message along with
@ -1710,176 +1775,53 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
// At this point we have broadcast the funding transaction and done all
// necessary processing.
f.wg.Add(1)
go func() {
defer f.wg.Done()
confChan := make(chan *lnwire.ShortChannelID)
cancelChan := make(chan struct{})
// In case the fundingManager is stopped at some point during
// the remaining part of the opening process, we must wait for
// this process to finish (either successfully or with some
// error), before the fundingManager can be shut down.
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
confChan)
}()
var shortChanID *lnwire.ShortChannelID
var ok bool
select {
case <-f.quit:
return
case shortChanID, ok = <-confChan:
if !ok {
fndgLog.Errorf("waiting for funding " +
"confirmation failed")
return
}
}
fndgLog.Debugf("Channel with ShortChanID %v now confirmed",
shortChanID.ToUint64())
// Go on adding the channel to the channel graph, and crafting
// channel announcements.
lnChannel, err := lnwallet.NewLightningChannel(
nil, completeChan, nil,
)
if err != nil {
fndgLog.Errorf("failed creating lnChannel: %v", err)
return
}
err = f.sendFundingLocked(
fmsg.peer, completeChan, lnChannel, shortChanID,
)
if err != nil {
fndgLog.Errorf("failed sending fundingLocked: %v", err)
return
}
fndgLog.Debugf("FundingLocked for channel with ShortChanID "+
"%v sent", shortChanID.ToUint64())
err = f.addToRouterGraph(completeChan, shortChanID)
if err != nil {
fndgLog.Errorf("failed adding to router graph: %v", err)
return
}
fndgLog.Debugf("Channel with ShortChanID %v added to "+
"router graph", shortChanID.ToUint64())
// Give the caller a final update notifying them that
// the channel is now open.
// TODO(roasbeef): only notify after recv of funding locked?
upd := &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: fundingPoint.Hash[:],
},
OutputIndex: fundingPoint.Index,
},
},
},
}
select {
case resCtx.updates <- upd:
case <-f.quit:
return
}
err = f.annAfterSixConfs(completeChan, shortChanID)
if err != nil {
fndgLog.Errorf("failed sending channel announcement: %v",
err)
return
}
}()
go f.advanceFundingState(completeChan, pendingChanID, resCtx.updates)
}
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that
// will cancel the wait for confirmation if we are not the channel initiator and
// the maxWaitNumBlocksFundingConf has passed from bestHeight.
// In the case of timeout, the timeoutChan will be closed. In case of error,
// confChan will be closed. In case of success, a *lnwire.ShortChannelID will be
// passed to confChan.
func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel,
confChan chan<- *lnwire.ShortChannelID, timeoutChan chan<- struct{}) {
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation and
// waitForTimeout that will return ErrConfirmationTimeout if we are not the
// channel initiator and the maxWaitNumBlocksFundingConf has passed from the
// funding broadcast height. In case of confirmation, the short channel ID of
// the channel will be returned.
func (f *fundingManager) waitForFundingWithTimeout(
ch *channeldb.OpenChannel) (*lnwire.ShortChannelID, error) {
epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
fndgLog.Errorf("unable to register for epoch notification: %v",
err)
close(confChan)
return
}
defer epochClient.Cancel()
waitingConfChan := make(chan *lnwire.ShortChannelID)
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan error, 1)
cancelChan := make(chan struct{})
// Add this goroutine to wait group so we can be sure that it is
// properly stopped before the funding manager can be shut down.
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
waitingConfChan)
}()
go f.waitForFundingConfirmation(ch, cancelChan, confChan)
// On block maxHeight we will cancel the funding confirmation wait.
maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf
for {
select {
case epoch, ok := <-epochClient.Epochs:
if !ok {
fndgLog.Warnf("Epoch client shutting down")
return
}
// If we are not the initiator, we have no money at stake and will
// timeout waiting for the funding transaction to confirm after a
// while.
if !ch.IsInitiator {
f.wg.Add(1)
go f.waitForTimeout(ch, cancelChan, timeoutChan)
}
defer close(cancelChan)
// If we are not the channel initiator it's safe
// to timeout the channel
if uint32(epoch.Height) >= maxHeight && !completeChan.IsInitiator {
fndgLog.Warnf("waited for %v blocks without "+
"seeing funding transaction confirmed,"+
" cancelling.", maxWaitNumBlocksFundingConf)
// Cancel the waitForFundingConfirmation
// goroutine.
close(cancelChan)
// Notify the caller of the timeout.
close(timeoutChan)
return
}
// TODO: If we are the channel initiator implement
// a method for recovering the funds from the funding
// transaction
case <-f.quit:
// The fundingManager is shutting down, will resume
// waiting for the funding transaction on startup.
return
case shortChanID, ok := <-waitingConfChan:
if !ok {
// Failed waiting for confirmation, close
// confChan to indicate failure.
close(confChan)
return
}
select {
case confChan <- shortChanID:
case <-f.quit:
return
}
var shortChanID *lnwire.ShortChannelID
var ok bool
select {
case err := <-timeoutChan:
if err != nil {
return nil, err
}
return nil, ErrConfirmationTimeout
case <-f.quit:
// The fundingManager is shutting down, and will resume wait on
// startup.
return nil, ErrFundingManagerShuttingDown
case shortChanID, ok = <-confChan:
if !ok {
return nil, fmt.Errorf("waiting for funding" +
"confirmation failed")
}
return shortChanID, nil
}
}
@ -1904,10 +1846,13 @@ func makeFundingScript(channel *channeldb.OpenChannel) ([]byte, error) {
// when a channel has become active for lightning transactions.
// The wait can be canceled by closing the cancelChan. In case of success,
// a *lnwire.ShortChannelID will be passed to confChan.
//
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) waitForFundingConfirmation(
completeChan *channeldb.OpenChannel, cancelChan <-chan struct{},
confChan chan<- *lnwire.ShortChannelID) {
defer f.wg.Done()
defer close(confChan)
// Register with the ChainNotifier for a notification once the funding
@ -1991,17 +1936,82 @@ func (f *fundingManager) waitForFundingConfirmation(
TxPosition: uint16(fundingPoint.Index),
}
// Now that the channel has been fully confirmed, we'll mark it as open
// within the database.
if err := completeChan.MarkAsOpen(shortChanID); err != nil {
fndgLog.Errorf("error setting channel pending flag to false: "+
"%v", err)
select {
case confChan <- &shortChanID:
case <-f.quit:
return
}
}
// waitForTimeout will close the timeout channel if maxWaitNumBlocksFundingConf
// has passed from the broadcast height of the given channel. In case of error,
// the error is sent on timeoutChan. The wait can be canceled by closing the
// cancelChan.
//
// NOTE: timeoutChan MUST be buffered.
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) waitForTimeout(completeChan *channeldb.OpenChannel,
cancelChan <-chan struct{}, timeoutChan chan<- error) {
defer f.wg.Done()
epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
timeoutChan <- fmt.Errorf("unable to register for epoch "+
"notification: %v", err)
return
}
// Inform the ChannelNotifier that the channel has transitioned from
// pending open to open.
f.cfg.NotifyOpenChannelEvent(completeChan.FundingOutpoint)
defer epochClient.Cancel()
// On block maxHeight we will cancel the funding confirmation wait.
maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf
for {
select {
case epoch, ok := <-epochClient.Epochs:
if !ok {
timeoutChan <- fmt.Errorf("epoch client " +
"shutting down")
return
}
// Close the timeout channel and exit if the block is
// aboce the max height.
if uint32(epoch.Height) >= maxHeight {
fndgLog.Warnf("Waited for %v blocks without "+
"seeing funding transaction confirmed,"+
" cancelling.",
maxWaitNumBlocksFundingConf)
// Notify the caller of the timeout.
close(timeoutChan)
return
}
// TODO: If we are the channel initiator implement
// a method for recovering the funds from the funding
// transaction
case <-cancelChan:
return
case <-f.quit:
// The fundingManager is shutting down, will resume
// waiting for the funding transaction on startup.
return
}
}
}
// handleFundingConfirmation marks a channel as open in the database, and set
// the channelOpeningState markedOpen. In addition it will report the now
// decided short channel ID to the switch, and close the local discovery signal
// for this channel.
func (f *fundingManager) handleFundingConfirmation(
completeChan *channeldb.OpenChannel,
shortChanID lnwire.ShortChannelID) error {
fundingPoint := completeChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(&fundingPoint)
// TODO(roasbeef): ideally persistent state update for chan above
// should be abstracted
@ -2009,19 +2019,26 @@ func (f *fundingManager) waitForFundingConfirmation(
// The funding transaction now being confirmed, we add this channel to
// the fundingManager's internal persistent state machine that we use
// to track the remaining process of the channel opening. This is
// useful to resume the opening process in case of restarts.
//
// TODO(halseth): make the two db transactions (MarkChannelAsOpen and
// saveChannelOpeningState) atomic by doing them in the same transaction.
// Needed to be properly fault-tolerant.
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, markedOpen,
&shortChanID)
// useful to resume the opening process in case of restarts. We set the
// opening state before we mark the channel opened in the database,
// such that we can receover from one of the db writes failing.
err := f.saveChannelOpeningState(&fundingPoint, markedOpen, &shortChanID)
if err != nil {
fndgLog.Errorf("error setting channel state to markedOpen: %v",
return fmt.Errorf("error setting channel state to markedOpen: %v",
err)
return
}
// Now that the channel has been fully confirmed and we successfully
// saved the opening state, we'll mark it as open within the database.
if err := completeChan.MarkAsOpen(shortChanID); err != nil {
return fmt.Errorf("error setting channel pending flag to false: "+
"%v", err)
}
// Inform the ChannelNotifier that the channel has transitioned from
// pending open to open.
f.cfg.NotifyOpenChannelEvent(completeChan.FundingOutpoint)
// As there might already be an active link in the switch with an
// outdated short chan ID, we'll instruct the switch to load the updated
// short chan id from disk.
@ -2030,12 +2047,6 @@ func (f *fundingManager) waitForFundingConfirmation(
fndgLog.Errorf("unable to report short chan id: %v", err)
}
select {
case confChan <- &shortChanID:
case <-f.quit:
return
}
// Close the discoverySignal channel, indicating to a separate
// goroutine that the channel now is marked as open in the database
// and that it is acceptable to process funding locked messages
@ -2045,41 +2056,6 @@ func (f *fundingManager) waitForFundingConfirmation(
close(discoverySignal)
}
f.localDiscoveryMtx.Unlock()
}
// handleFundingConfirmation is a wrapper method for creating a new
// lnwallet.LightningChannel object, calling sendFundingLocked,
// addToRouterGraph, and annAfterSixConfs. This is called after the funding
// transaction is confirmed.
func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer,
completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// We create the state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(
nil, completeChan, nil,
)
if err != nil {
return err
}
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
fndgLog.Debugf("ChannelID(%v) is now fully confirmed!", chanID)
err = f.sendFundingLocked(peer, completeChan, lnChannel, shortChanID)
if err != nil {
return fmt.Errorf("failed sending fundingLocked: %v", err)
}
err = f.addToRouterGraph(completeChan, shortChanID)
if err != nil {
return fmt.Errorf("failed adding to router graph: %v", err)
}
err = f.annAfterSixConfs(completeChan, shortChanID)
if err != nil {
return fmt.Errorf("failed sending channel announcement: %v",
err)
}
return nil
}
@ -2087,7 +2063,7 @@ func (f *fundingManager) handleFundingConfirmation(peer lnpeer.Peer,
// sendFundingLocked creates and sends the fundingLocked message.
// This should be called after the funding transaction has been confirmed,
// and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
func (f *fundingManager) sendFundingLocked(
completeChan *channeldb.OpenChannel, channel *lnwallet.LightningChannel,
shortChanID *lnwire.ShortChannelID) error {
@ -2118,8 +2094,18 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
// send fundingLocked until we succeed, or the fundingManager is shut
// down.
for {
fndgLog.Debugf("Sending FundingLocked for ChannelID(%v) to "+
"peer %x", chanID, peerKey)
connected := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(peerKey, connected)
var peer lnpeer.Peer
select {
case peer = <-connected:
case <-f.quit:
return ErrFundingManagerShuttingDown
}
fndgLog.Infof("Peer(%x) is online, sending FundingLocked "+
"for ChannelID(%v)", peerKey, chanID)
if err := peer.SendMessage(false, fundingLockedMsg); err == nil {
// Sending succeeded, we can break out and continue the
@ -2129,31 +2115,6 @@ func (f *fundingManager) sendFundingLocked(peer lnpeer.Peer,
fndgLog.Warnf("Unable to send fundingLocked to peer %x: %v. "+
"Will retry when online", peerKey, err)
connected := make(chan lnpeer.Peer, 1)
f.cfg.NotifyWhenOnline(peerKey, connected)
select {
case <-connected:
fndgLog.Infof("Peer(%x) came back online, will retry "+
"sending FundingLocked for ChannelID(%v)",
peerKey, chanID)
// Retry sending.
case <-f.quit:
return ErrFundingManagerShuttingDown
}
}
// As the fundingLocked message is now sent to the peer, the channel is
// moved to the next state of the state machine. It will be moved to the
// last state (actually deleted from the database) after the channel is
// finally announced.
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint,
fundingLockedSent, shortChanID)
if err != nil {
return fmt.Errorf("error setting channel state to"+
" fundingLockedSent: %v", err)
}
return nil
@ -2234,17 +2195,6 @@ func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel,
return ErrFundingManagerShuttingDown
}
// As the channel is now added to the ChannelRouter's topology, the
// channel is moved to the next state of the state machine. It will be
// moved to the last state (actually deleted from the database) after
// the channel is finally announced.
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint,
addedToRouterGraph, shortChanID)
if err != nil {
return fmt.Errorf("error setting channel state to"+
" addedToRouterGraph: %v", err)
}
return nil
}
@ -2292,6 +2242,9 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
fndgLog.Debugf("Sending our NodeAnnouncement for "+
"ChannelID(%v) to %x", chanID, pubKey)
// TODO(halseth): make reliable. If the peer is not online this
// will fail, and the opening process will stop. Should instead
// block here, waiting for the peer to come online.
if err := peer.SendMessage(true, &nodeAnn); err != nil {
return fmt.Errorf("unable to send node announcement "+
"to peer %x: %v", pubKey, err)
@ -2386,15 +2339,6 @@ func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
"announced", &fundingPoint, shortChanID)
}
// We delete the channel opening state from our internal database
// as the opening process has succeeded. We can do this because we
// assume the AuthenticatedGossiper queues the announcement messages,
// and persists them in case of a daemon shutdown.
err := f.deleteChannelOpeningState(&completeChan.FundingOutpoint)
if err != nil {
return fmt.Errorf("error deleting channel state: %v", err)
}
return nil
}

@ -244,7 +244,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
chainNotifier := &mockNotifier{
oneConfChannel: make(chan *chainntnfs.TxConfirmation, 1),
sixConfChannel: make(chan *chainntnfs.TxConfirmation, 1),
epochChan: make(chan *chainntnfs.BlockEpoch, 1),
epochChan: make(chan *chainntnfs.BlockEpoch, 2),
}
sentMessages := make(chan lnwire.Message)
@ -398,7 +398,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
f.cfg.NotifyWhenOnline = func(peer [33]byte,
connectedChan chan<- lnpeer.Peer) {
connectedChan <- testNode
connectedChan <- testNode.remotePeer
}
return testNode, nil
@ -679,6 +679,8 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
}
func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) {
t.Helper()
select {
case <-msgChan:
t.Fatalf("error sent unexpectedly")
@ -688,6 +690,8 @@ func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) {
}
func assertErrorSent(t *testing.T, msgChan chan lnwire.Message) {
t.Helper()
var msg lnwire.Message
select {
case msg = <-msgChan:
@ -748,6 +752,8 @@ func assertFundingMsgSent(t *testing.T, msgChan chan lnwire.Message,
func assertNumPendingReservations(t *testing.T, node *testNode,
peerPubKey *btcec.PublicKey, expectedNum int) {
t.Helper()
serializedPubKey := newSerializedKey(peerPubKey)
actualNum := len(node.fundingMgr.activeReservations[serializedPubKey])
if actualNum == expectedNum {
@ -760,6 +766,8 @@ func assertNumPendingReservations(t *testing.T, node *testNode,
}
func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum int) {
t.Helper()
var numPendingChans int
for i := 0; i < testPollNumTries; i++ {
// If this is not the first try, sleep before retrying.
@ -784,6 +792,8 @@ func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum i
}
func assertNumPendingChannelsRemains(t *testing.T, node *testNode, expectedNum int) {
t.Helper()
var numPendingChans int
for i := 0; i < 5; i++ {
// If this is not the first try, sleep before retrying.
@ -807,6 +817,7 @@ func assertNumPendingChannelsRemains(t *testing.T, node *testNode, expectedNum i
func assertDatabaseState(t *testing.T, node *testNode,
fundingOutPoint *wire.OutPoint, expectedState channelOpeningState) {
t.Helper()
var state channelOpeningState
var err error
@ -839,18 +850,24 @@ func assertDatabaseState(t *testing.T, node *testNode,
func assertMarkedOpen(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()
assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
assertDatabaseState(t, bob, fundingOutPoint, markedOpen)
}
func assertFundingLockedSent(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()
assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent)
assertDatabaseState(t, bob, fundingOutPoint, fundingLockedSent)
}
func assertAddedToRouterGraph(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()
assertDatabaseState(t, alice, fundingOutPoint, addedToRouterGraph)
assertDatabaseState(t, bob, fundingOutPoint, addedToRouterGraph)
}
@ -957,6 +974,8 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode,
}
func assertAnnouncementSignatures(t *testing.T, alice, bob *testNode) {
t.Helper()
// After the FundingLocked message is sent and six confirmations have
// been reached, the channel will be announced to the greater network
// by having the nodes exchange announcement signatures.
@ -1013,6 +1032,7 @@ func waitForOpenUpdate(t *testing.T, updateChan chan *lnrpc.OpenStatusUpdate) {
func assertNoChannelState(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()
assertErrChannelNotFound(t, alice, fundingOutPoint)
assertErrChannelNotFound(t, bob, fundingOutPoint)
@ -1020,6 +1040,7 @@ func assertNoChannelState(t *testing.T, alice, bob *testNode,
func assertErrChannelNotFound(t *testing.T, node *testNode,
fundingOutPoint *wire.OutPoint) {
t.Helper()
var state channelOpeningState
var err error
@ -1043,6 +1064,8 @@ func assertErrChannelNotFound(t *testing.T, node *testNode,
}
func assertHandleFundingLocked(t *testing.T, alice, bob *testNode) {
t.Helper()
// They should both send the new channel state to their peer.
select {
case c := <-alice.newChannels:
@ -1725,7 +1748,7 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) {
t.Fatalf("alice did not publish funding tx")
}
// Increase the height to 1 minus the maxWaitNumBlocksFundingConf height
// Increase the height to 1 minus the maxWaitNumBlocksFundingConf height.
alice.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{
Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf - 1,
}
@ -1734,12 +1757,12 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) {
Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf - 1,
}
// Assert both and Alice and Bob still have 1 pending channels
// Assert both and Alice and Bob still have 1 pending channels.
assertNumPendingChannelsRemains(t, alice, 1)
assertNumPendingChannelsRemains(t, bob, 1)
// Increase both Alice and Bob to maxWaitNumBlocksFundingConf height
// Increase both Alice and Bob to maxWaitNumBlocksFundingConf height.
alice.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{
Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf,
}
@ -1748,13 +1771,13 @@ func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) {
Height: fundingBroadcastHeight + maxWaitNumBlocksFundingConf,
}
// Since Alice was the initiator, the channel should not have timed out
// Since Alice was the initiator, the channel should not have timed out.
assertNumPendingChannelsRemains(t, alice, 1)
// Bob should have sent an Error message to Alice.
assertErrorSent(t, bob.msgChan)
// Since Bob was not the initiator, the channel should timeout
// Since Bob was not the initiator, the channel should timeout.
assertNumPendingChannelsBecomes(t, bob, 0)
}