diff --git a/channeldb/channel.go b/channeldb/channel.go index 5d943a11..0a5e47b2 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -2409,16 +2409,24 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error { }) } -// RemoveFwdPkg atomically removes a forwarding package specified by the remote -// commitment height. +// RemoveFwdPkgs atomically removes forwarding packages specified by the remote +// commitment heights. If one of the intermediate RemovePkg calls fails, then the +// later packages won't be removed. // // NOTE: This method should only be called on packages marked FwdStateCompleted. -func (c *OpenChannel) RemoveFwdPkg(height uint64) error { +func (c *OpenChannel) RemoveFwdPkgs(heights ...uint64) error { c.Lock() defer c.Unlock() return kvdb.Update(c.Db, func(tx kvdb.RwTx) error { - return c.Packager.RemovePkg(tx, height) + for _, height := range heights { + err := c.Packager.RemovePkg(tx, height) + if err != nil { + return err + } + } + + return nil }) } diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 3931af34..974e3cc1 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -776,7 +776,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { l.log.Debugf("removing completed fwd pkg for height=%d", fwdPkg.Height) - err := l.channel.RemoveFwdPkg(fwdPkg.Height) + err := l.channel.RemoveFwdPkgs(fwdPkg.Height) if err != nil { l.log.Errorf("unable to remove fwd pkg for height=%d: "+ "%v", fwdPkg.Height, err) @@ -843,35 +843,51 @@ func (l *channelLink) fwdPkgGarbager() { l.cfg.FwdPkgGCTicker.Resume() defer l.cfg.FwdPkgGCTicker.Stop() + if err := l.loadAndRemove(); err != nil { + l.log.Warnf("unable to run initial fwd pkgs gc: %v", err) + } + for { select { case <-l.cfg.FwdPkgGCTicker.Ticks(): - fwdPkgs, err := l.channel.LoadFwdPkgs() - if err != nil { - l.log.Warnf("unable to load fwdpkgs for gc: %v", + if err := l.loadAndRemove(); err != nil { + l.log.Warnf("unable to remove fwd pkgs: %v", err) continue } - - // TODO(conner): batch removal of forward packages. - for _, fwdPkg := range fwdPkgs { - if fwdPkg.State != channeldb.FwdStateCompleted { - continue - } - - err = l.channel.RemoveFwdPkg(fwdPkg.Height) - if err != nil { - l.log.Warnf("unable to remove fwd pkg "+ - "for height=%d: %v", - fwdPkg.Height, err) - } - } case <-l.quit: return } } } +// loadAndRemove loads all the channels forwarding packages and determines if +// they can be removed. It is called once before the FwdPkgGCTicker ticks so that +// a longer tick interval can be used. +func (l *channelLink) loadAndRemove() error { + fwdPkgs, err := l.channel.LoadFwdPkgs() + if err != nil { + return err + } + + var removeHeights []uint64 + for _, fwdPkg := range fwdPkgs { + if fwdPkg.State != channeldb.FwdStateCompleted { + continue + } + + removeHeights = append(removeHeights, fwdPkg.Height) + } + + // If removeHeights is empty, return early so we don't use a db + // transaction. + if len(removeHeights) == 0 { + return nil + } + + return l.channel.RemoveFwdPkgs(removeHeights...) +} + // htlcManager is the primary goroutine which drives a channel's commitment // update state-machine in response to messages received via several channels. // This goroutine reads messages from the upstream (remote) peer, and also from diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 563fe8d1..94e26805 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -4850,9 +4850,9 @@ func (lc *LightningChannel) SetFwdFilter(height uint64, return lc.channelState.SetFwdFilter(height, fwdFilter) } -// RemoveFwdPkg permanently deletes the forwarding package at the given height. -func (lc *LightningChannel) RemoveFwdPkg(height uint64) error { - return lc.channelState.RemoveFwdPkg(height) +// RemoveFwdPkgs permanently deletes the forwarding package at the given heights. +func (lc *LightningChannel) RemoveFwdPkgs(heights ...uint64) error { + return lc.channelState.RemoveFwdPkgs(heights...) } // NextRevocationKey returns the commitment point for the _next_ commitment diff --git a/peer/brontide.go b/peer/brontide.go index e1ecaf88..cc637207 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -588,7 +588,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, OnChannelFailure: onChannelFailure, SyncStates: syncStates, BatchTicker: ticker.New(50 * time.Millisecond), - FwdPkgGCTicker: ticker.New(time.Minute), + FwdPkgGCTicker: ticker.New(time.Hour), PendingCommitTicker: ticker.New(time.Minute), BatchSize: 10, UnsafeReplay: p.cfg.UnsafeReplay,