diff --git a/fundingmanager.go b/fundingmanager.go index 9e29de16..41177d58 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -70,10 +70,29 @@ type reservationWithCtx struct { chanAmt btcutil.Amount + lastUpdated time.Time + updates chan *lnrpc.OpenStatusUpdate err chan error } +// isLocked checks the reservation's timestamp to determine whether it is locked. +func (r *reservationWithCtx) isLocked() bool { + // The time zero value represents a locked reservation. + return r.lastUpdated.IsZero() +} + +// lock locks the reservation from zombie pruning by setting its timestamp to the +// zero value. +func (r *reservationWithCtx) lock() { + r.lastUpdated = time.Time{} +} + +// updateTimestamp updates the reservation's timestamp with the current time. +func (r *reservationWithCtx) updateTimestamp() { + r.lastUpdated = time.Now() +} + // initFundingMsg is sent by an outside subsystem to the funding manager in // order to kick off a funding workflow with a specified target peer. The // original request which defines the parameters of the funding workflow are @@ -266,6 +285,14 @@ type fundingConfig struct { // discovered short channel ID of a formerly pending channel to outside // sub-systems. ReportShortChanID func(wire.OutPoint, lnwire.ShortChannelID) error + + // ZombieSweeperInterval is the periodic time interval in which the zombie + // sweeper is run. + ZombieSweeperInterval time.Duration + + // ReservationTimeout is the length of idle time that must pass before a + // reservation is considered a zombie. + ReservationTimeout time.Duration } // fundingManager acts as an orchestrator/bridge between the wallet's @@ -307,7 +334,7 @@ type fundingManager struct { signedReservations map[lnwire.ChannelID][32]byte // resMtx guards both of the maps above to ensure that all access is - // goroutine stafe. + // goroutine safe. resMtx sync.RWMutex // fundingMsgs is a channel which receives wrapped wire messages @@ -764,6 +791,9 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, func (f *fundingManager) reservationCoordinator() { defer f.wg.Done() + zombieSweepTicker := time.NewTicker(f.cfg.ZombieSweeperInterval) + defer zombieSweepTicker.Stop() + for { select { @@ -786,6 +816,9 @@ func (f *fundingManager) reservationCoordinator() { case req := <-f.fundingRequests: f.handleInitFundingMsg(req) + case <-zombieSweepTicker.C: + f.pruneZombieReservations() + case req := <-f.queries: switch msg := req.(type) { case *pendingChansReq: @@ -941,20 +974,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { amt, msg.PushAmount) // Once the reservation has been created successfully, we add it to - // this peers map of pending reservations to track this particular + // this peer's map of pending reservations to track this particular // reservation until either abort or completion. f.resMtx.Lock() if _, ok := f.activeReservations[peerIDKey]; !ok { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][msg.PendingChannelID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ reservation: reservation, chanAmt: amt, err: make(chan error, 1), peerAddress: fmsg.peerAddress, } + f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the fundingOpenMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt) @@ -1063,6 +1100,9 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { return } + // Update the timestamp once the fundingAcceptMsg has been handled. + defer resCtx.updateTimestamp() + fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID[:]) // We'll also specify the responder's preference for the number of @@ -1304,6 +1344,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { f.localDiscoverySignals[channelID] = make(chan struct{}) f.localDiscoveryMtx.Unlock() + // At this point we have sent our last funding message to the + // initiating peer before the funding transaction will be broadcast. + // The only thing left to do before we can delete this reservation + // is wait for the funding transaction. Lock the reservation so it + // is not pruned by the zombie sweeper. + resCtx.lock() + // With this last message, our job as the responder is now complete. // We'll wait for the funding transaction to reach the specified number // of confirmations, then start normal operations. @@ -1454,6 +1501,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { }, } + // At this point we have broadcast the funding transaction and done all + // necessary processing. The only thing left to do before we can delete + // this reservation is wait for the funding transaction. Lock the + // reservation so it is not pruned by the zombie sweeper. + resCtx.lock() + f.wg.Add(1) go func() { defer f.wg.Done() @@ -2454,15 +2507,19 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][chanID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ chanAmt: capacity, reservation: reservation, peerAddress: msg.peerAddress, updates: msg.updates, err: msg.err, } + f.activeReservations[peerIDKey][chanID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the initFundingMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(capacity) @@ -2544,7 +2601,7 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { } } -// processErrorGeneric sends a message to the fundingManager allowing it to +// processFundingError sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processFundingError(err *lnwire.Error, peerAddress *lnwire.NetAddress) { @@ -2556,7 +2613,7 @@ func (f *fundingManager) processFundingError(err *lnwire.Error, } } -// handleErrorGenericMsg process the error which was received from remote peer, +// handleErrorMsg processes the error which was received from remote peer, // depending on the type of error we should do different clean up steps and // inform the user about it. func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { @@ -2589,7 +2646,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { lnErr.ToGrpcCode(), string(protocolErr.Data), ) } else { - // Otherwise, we'll attempt tto display just the error code + // Otherwise, we'll attempt to display just the error code // itself. resCtx.err <- grpc.Errorf( lnErr.ToGrpcCode(), lnErr.String(), @@ -2602,7 +2659,35 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { } } -// cancelReservationCtx do all needed work in order to securely cancel the +// pruneZombieReservations loops through all pending reservations and fails the +// funding flow for any reservations that have not been updated since the +// ReservationTimeout and are not locked waiting for the funding transaction. +func (f *fundingManager) pruneZombieReservations() { + zombieReservations := make(pendingChannels) + + f.resMtx.RLock() + for _, pendingReservations := range f.activeReservations { + for pendingChanID, resCtx := range pendingReservations { + if resCtx.isLocked() { + continue + } + + if time.Since(resCtx.lastUpdated) > f.cfg.ReservationTimeout { + zombieReservations[pendingChanID] = resCtx + } + } + } + f.resMtx.RUnlock() + + for pendingChanID, resCtx := range zombieReservations { + err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+ + "chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:]) + fndgLog.Warnf(err.Error()) + f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err) + } +} + +// cancelReservationCtx does all needed work in order to securely cancel the // reservation. func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, pendingChanID [32]byte) (*reservationWithCtx, error) { diff --git a/lnd.go b/lnd.go index b2685f90..9b2f5088 100644 --- a/lnd.go +++ b/lnd.go @@ -414,6 +414,8 @@ func lndMain() error { // channel bandwidth. return uint16(lnwallet.MaxHTLCNumber / 2) }, + ZombieSweeperInterval: 1 * time.Minute, + ReservationTimeout: 10 * time.Minute, }) if err != nil { return err