Merge pull request #4547 from Crypt-iQ/fwdpkg_cleanup_0819

htlcswitch: reduce db contention when removing forwarding packages
This commit is contained in:
Conner Fromknecht 2020-09-02 16:58:28 -07:00 committed by GitHub
commit 7c91f744ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 26 deletions

@ -2409,16 +2409,24 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error {
})
}
// RemoveFwdPkg atomically removes a forwarding package specified by the remote
// commitment height.
// RemoveFwdPkgs atomically removes forwarding packages specified by the remote
// commitment heights. If one of the intermediate RemovePkg calls fails, then the
// later packages won't be removed.
//
// NOTE: This method should only be called on packages marked FwdStateCompleted.
func (c *OpenChannel) RemoveFwdPkg(height uint64) error {
func (c *OpenChannel) RemoveFwdPkgs(heights ...uint64) error {
c.Lock()
defer c.Unlock()
return kvdb.Update(c.Db, func(tx kvdb.RwTx) error {
return c.Packager.RemovePkg(tx, height)
for _, height := range heights {
err := c.Packager.RemovePkg(tx, height)
if err != nil {
return err
}
}
return nil
})
}

@ -776,7 +776,7 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error {
l.log.Debugf("removing completed fwd pkg for height=%d",
fwdPkg.Height)
err := l.channel.RemoveFwdPkg(fwdPkg.Height)
err := l.channel.RemoveFwdPkgs(fwdPkg.Height)
if err != nil {
l.log.Errorf("unable to remove fwd pkg for height=%d: "+
"%v", fwdPkg.Height, err)
@ -843,35 +843,51 @@ func (l *channelLink) fwdPkgGarbager() {
l.cfg.FwdPkgGCTicker.Resume()
defer l.cfg.FwdPkgGCTicker.Stop()
if err := l.loadAndRemove(); err != nil {
l.log.Warnf("unable to run initial fwd pkgs gc: %v", err)
}
for {
select {
case <-l.cfg.FwdPkgGCTicker.Ticks():
fwdPkgs, err := l.channel.LoadFwdPkgs()
if err != nil {
l.log.Warnf("unable to load fwdpkgs for gc: %v",
if err := l.loadAndRemove(); err != nil {
l.log.Warnf("unable to remove fwd pkgs: %v",
err)
continue
}
// TODO(conner): batch removal of forward packages.
for _, fwdPkg := range fwdPkgs {
if fwdPkg.State != channeldb.FwdStateCompleted {
continue
}
err = l.channel.RemoveFwdPkg(fwdPkg.Height)
if err != nil {
l.log.Warnf("unable to remove fwd pkg "+
"for height=%d: %v",
fwdPkg.Height, err)
}
}
case <-l.quit:
return
}
}
}
// loadAndRemove loads all the channels forwarding packages and determines if
// they can be removed. It is called once before the FwdPkgGCTicker ticks so that
// a longer tick interval can be used.
func (l *channelLink) loadAndRemove() error {
fwdPkgs, err := l.channel.LoadFwdPkgs()
if err != nil {
return err
}
var removeHeights []uint64
for _, fwdPkg := range fwdPkgs {
if fwdPkg.State != channeldb.FwdStateCompleted {
continue
}
removeHeights = append(removeHeights, fwdPkg.Height)
}
// If removeHeights is empty, return early so we don't use a db
// transaction.
if len(removeHeights) == 0 {
return nil
}
return l.channel.RemoveFwdPkgs(removeHeights...)
}
// htlcManager is the primary goroutine which drives a channel's commitment
// update state-machine in response to messages received via several channels.
// This goroutine reads messages from the upstream (remote) peer, and also from

@ -4850,9 +4850,9 @@ func (lc *LightningChannel) SetFwdFilter(height uint64,
return lc.channelState.SetFwdFilter(height, fwdFilter)
}
// RemoveFwdPkg permanently deletes the forwarding package at the given height.
func (lc *LightningChannel) RemoveFwdPkg(height uint64) error {
return lc.channelState.RemoveFwdPkg(height)
// RemoveFwdPkgs permanently deletes the forwarding package at the given heights.
func (lc *LightningChannel) RemoveFwdPkgs(heights ...uint64) error {
return lc.channelState.RemoveFwdPkgs(heights...)
}
// NextRevocationKey returns the commitment point for the _next_ commitment

@ -588,7 +588,7 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint,
OnChannelFailure: onChannelFailure,
SyncStates: syncStates,
BatchTicker: ticker.New(50 * time.Millisecond),
FwdPkgGCTicker: ticker.New(time.Minute),
FwdPkgGCTicker: ticker.New(time.Hour),
PendingCommitTicker: ticker.New(time.Minute),
BatchSize: 10,
UnsafeReplay: p.cfg.UnsafeReplay,