funding: implement reservation zombie sweeper

Before previous commits were squashed into this commit, zombie
reservations were cleaned up individually when they timed out.
However, this made the code more complex because each reservation
had its own individual timer and thus it would have required the
timer being cancelled any time the reservation was cancelled,
which would have been harder to maintain. With this commit,
zombie reservations are cleaned up by a zombie sweeper that is
set off by a ticker instead, to make the code more maintainable.
This commit is contained in:
PaddyQuinn 2018-03-12 21:58:51 -04:00
parent 5410725306
commit 75e45b830b
2 changed files with 95 additions and 8 deletions

View File

@ -70,10 +70,29 @@ type reservationWithCtx struct {
chanAmt btcutil.Amount
lastUpdated time.Time
updates chan *lnrpc.OpenStatusUpdate
err chan error
}
// isLocked checks the reservation's timestamp to determine whether it is locked.
func (r *reservationWithCtx) isLocked() bool {
// The time zero value represents a locked reservation.
return r.lastUpdated.IsZero()
}
// lock locks the reservation from zombie pruning by setting its timestamp to the
// zero value.
func (r *reservationWithCtx) lock() {
r.lastUpdated = time.Time{}
}
// updateTimestamp updates the reservation's timestamp with the current time.
func (r *reservationWithCtx) updateTimestamp() {
r.lastUpdated = time.Now()
}
// initFundingMsg is sent by an outside subsystem to the funding manager in
// order to kick off a funding workflow with a specified target peer. The
// original request which defines the parameters of the funding workflow are
@ -266,6 +285,14 @@ type fundingConfig struct {
// discovered short channel ID of a formerly pending channel to outside
// sub-systems.
ReportShortChanID func(wire.OutPoint, lnwire.ShortChannelID) error
// ZombieSweeperInterval is the periodic time interval in which the zombie
// sweeper is run.
ZombieSweeperInterval time.Duration
// ReservationTimeout is the length of idle time that must pass before a
// reservation is considered a zombie.
ReservationTimeout time.Duration
}
// fundingManager acts as an orchestrator/bridge between the wallet's
@ -307,7 +334,7 @@ type fundingManager struct {
signedReservations map[lnwire.ChannelID][32]byte
// resMtx guards both of the maps above to ensure that all access is
// goroutine stafe.
// goroutine safe.
resMtx sync.RWMutex
// fundingMsgs is a channel which receives wrapped wire messages
@ -764,6 +791,9 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
func (f *fundingManager) reservationCoordinator() {
defer f.wg.Done()
zombieSweepTicker := time.NewTicker(f.cfg.ZombieSweeperInterval)
defer zombieSweepTicker.Stop()
for {
select {
@ -786,6 +816,9 @@ func (f *fundingManager) reservationCoordinator() {
case req := <-f.fundingRequests:
f.handleInitFundingMsg(req)
case <-zombieSweepTicker.C:
f.pruneZombieReservations()
case req := <-f.queries:
switch msg := req.(type) {
case *pendingChansReq:
@ -941,20 +974,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
amt, msg.PushAmount)
// Once the reservation has been created successfully, we add it to
// this peers map of pending reservations to track this particular
// this peer's map of pending reservations to track this particular
// reservation until either abort or completion.
f.resMtx.Lock()
if _, ok := f.activeReservations[peerIDKey]; !ok {
f.activeReservations[peerIDKey] = make(pendingChannels)
}
f.activeReservations[peerIDKey][msg.PendingChannelID] = &reservationWithCtx{
resCtx := &reservationWithCtx{
reservation: reservation,
chanAmt: amt,
err: make(chan error, 1),
peerAddress: fmsg.peerAddress,
}
f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx
f.resMtx.Unlock()
// Update the timestamp once the fundingOpenMsg has been handled.
defer resCtx.updateTimestamp()
// Using the RequiredRemoteDelay closure, we'll compute the remote CSV
// delay we require given the total amount of funds within the channel.
remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt)
@ -1063,6 +1100,9 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
return
}
// Update the timestamp once the fundingAcceptMsg has been handled.
defer resCtx.updateTimestamp()
fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID[:])
// We'll also specify the responder's preference for the number of
@ -1304,6 +1344,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
f.localDiscoverySignals[channelID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
// At this point we have sent our last funding message to the
// initiating peer before the funding transaction will be broadcast.
// The only thing left to do before we can delete this reservation
// is wait for the funding transaction. Lock the reservation so it
// is not pruned by the zombie sweeper.
resCtx.lock()
// With this last message, our job as the responder is now complete.
// We'll wait for the funding transaction to reach the specified number
// of confirmations, then start normal operations.
@ -1454,6 +1501,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
},
}
// At this point we have broadcast the funding transaction and done all
// necessary processing. The only thing left to do before we can delete
// this reservation is wait for the funding transaction. Lock the
// reservation so it is not pruned by the zombie sweeper.
resCtx.lock()
f.wg.Add(1)
go func() {
defer f.wg.Done()
@ -2454,15 +2507,19 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
f.activeReservations[peerIDKey] = make(pendingChannels)
}
f.activeReservations[peerIDKey][chanID] = &reservationWithCtx{
resCtx := &reservationWithCtx{
chanAmt: capacity,
reservation: reservation,
peerAddress: msg.peerAddress,
updates: msg.updates,
err: msg.err,
}
f.activeReservations[peerIDKey][chanID] = resCtx
f.resMtx.Unlock()
// Update the timestamp once the initFundingMsg has been handled.
defer resCtx.updateTimestamp()
// Using the RequiredRemoteDelay closure, we'll compute the remote CSV
// delay we require given the total amount of funds within the channel.
remoteCsvDelay := f.cfg.RequiredRemoteDelay(capacity)
@ -2544,7 +2601,7 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) {
}
}
// processErrorGeneric sends a message to the fundingManager allowing it to
// processFundingError sends a message to the fundingManager allowing it to
// process the occurred generic error.
func (f *fundingManager) processFundingError(err *lnwire.Error,
peerAddress *lnwire.NetAddress) {
@ -2556,7 +2613,7 @@ func (f *fundingManager) processFundingError(err *lnwire.Error,
}
}
// handleErrorGenericMsg process the error which was received from remote peer,
// handleErrorMsg processes the error which was received from remote peer,
// depending on the type of error we should do different clean up steps and
// inform the user about it.
func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
@ -2589,7 +2646,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
lnErr.ToGrpcCode(), string(protocolErr.Data),
)
} else {
// Otherwise, we'll attempt tto display just the error code
// Otherwise, we'll attempt to display just the error code
// itself.
resCtx.err <- grpc.Errorf(
lnErr.ToGrpcCode(), lnErr.String(),
@ -2602,7 +2659,35 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
}
}
// cancelReservationCtx do all needed work in order to securely cancel the
// pruneZombieReservations loops through all pending reservations and fails the
// funding flow for any reservations that have not been updated since the
// ReservationTimeout and are not locked waiting for the funding transaction.
func (f *fundingManager) pruneZombieReservations() {
zombieReservations := make(pendingChannels)
f.resMtx.RLock()
for _, pendingReservations := range f.activeReservations {
for pendingChanID, resCtx := range pendingReservations {
if resCtx.isLocked() {
continue
}
if time.Since(resCtx.lastUpdated) > f.cfg.ReservationTimeout {
zombieReservations[pendingChanID] = resCtx
}
}
}
f.resMtx.RUnlock()
for pendingChanID, resCtx := range zombieReservations {
err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+
"chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:])
fndgLog.Warnf(err.Error())
f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err)
}
}
// cancelReservationCtx does all needed work in order to securely cancel the
// reservation.
func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
pendingChanID [32]byte) (*reservationWithCtx, error) {

2
lnd.go
View File

@ -414,6 +414,8 @@ func lndMain() error {
// channel bandwidth.
return uint16(lnwallet.MaxHTLCNumber / 2)
},
ZombieSweeperInterval: 1 * time.Minute,
ReservationTimeout: 10 * time.Minute,
})
if err != nil {
return err