htlcswitch/link: replace batch ticker with...

resumable ticker.Ticker interface
This commit is contained in:
Conner Fromknecht 2018-08-01 12:42:38 -07:00
parent 83353b44bd
commit 1c456a5144
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

@ -18,6 +18,7 @@ import (
"github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/ticker"
) )
func init() { func init() {
@ -99,41 +100,6 @@ func ExpectedFee(f ForwardingPolicy,
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
} }
// Ticker is an interface used to wrap a time.Ticker in a struct, making
// mocking it easier.
type Ticker interface {
Start() <-chan time.Time
Stop()
}
// BatchTicker implements the Ticker interface, and wraps a time.Ticker.
type BatchTicker struct {
duration time.Duration
ticker *time.Ticker
}
// NewBatchTicker returns a new BatchTicker that wraps the passed time.Ticker.
func NewBatchTicker(d time.Duration) *BatchTicker {
return &BatchTicker{
duration: d,
}
}
// Start returns the tick channel for the underlying time.Ticker.
func (t *BatchTicker) Start() <-chan time.Time {
t.ticker = time.NewTicker(t.duration)
return t.ticker.C
}
// Stop stops the underlying time.Ticker.
func (t *BatchTicker) Stop() {
if t.ticker != nil {
t.ticker.Stop()
t.ticker = nil
}
}
// ChannelLinkConfig defines the configuration for the channel link. ALL // ChannelLinkConfig defines the configuration for the channel link. ALL
// elements within the configuration MUST be non-nil for channel link to carry // elements within the configuration MUST be non-nil for channel link to carry
// out its duties. // out its duties.
@ -241,13 +207,13 @@ type ChannelLinkConfig struct {
// flush out. By batching updates into a single commit, we attempt to // flush out. By batching updates into a single commit, we attempt to
// increase throughput by maximizing the number of updates coalesced // increase throughput by maximizing the number of updates coalesced
// into a single commit. // into a single commit.
BatchTicker Ticker BatchTicker ticker.Ticker
// FwdPkgGCTicker is the ticker determining the frequency at which // FwdPkgGCTicker is the ticker determining the frequency at which
// garbage collection of forwarding packages occurs. We use a // garbage collection of forwarding packages occurs. We use a
// time-based approach, as opposed to block epochs, as to not hinder // time-based approach, as opposed to block epochs, as to not hinder
// syncing. // syncing.
FwdPkgGCTicker Ticker FwdPkgGCTicker ticker.Ticker
// BatchSize is the max size of a batch of updates done to the link // BatchSize is the max size of a batch of updates done to the link
// before we do a state update. // before we do a state update.
@ -757,12 +723,12 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
func (l *channelLink) fwdPkgGarbager() { func (l *channelLink) fwdPkgGarbager() {
defer l.wg.Done() defer l.wg.Done()
fwdPkgGcTick := l.cfg.FwdPkgGCTicker.Start() l.cfg.FwdPkgGCTicker.Resume()
defer l.cfg.FwdPkgGCTicker.Stop() defer l.cfg.FwdPkgGCTicker.Stop()
for { for {
select { select {
case <-fwdPkgGcTick: case <-l.cfg.FwdPkgGCTicker.Ticks():
fwdPkgs, err := l.channel.LoadFwdPkgs() fwdPkgs, err := l.channel.LoadFwdPkgs()
if err != nil { if err != nil {
l.warnf("unable to load fwdpkgs for gc: %v", err) l.warnf("unable to load fwdpkgs for gc: %v", err)
@ -908,13 +874,6 @@ func (l *channelLink) htlcManager() {
l.wg.Add(1) l.wg.Add(1)
go l.fwdPkgGarbager() go l.fwdPkgGarbager()
// We'll only need the batch ticker if we have outgoing updates that are
// not covered by our last signature. This value will be nil unless a
// downstream packet forces the batchCounter to be positive. After the
// batch is cleared, it will return to nil to prevent wasteful CPU time
// caused by the batch ticker waking up the htlcManager needlessly.
var maybeBatchTick <-chan time.Time
out: out:
for { for {
// We must always check if we failed at some point processing // We must always check if we failed at some point processing
@ -995,13 +954,12 @@ out:
break out break out
} }
case <-maybeBatchTick: case <-l.cfg.BatchTicker.Ticks():
// If the current batch is empty, then we have no work // If the current batch is empty, then we have no work
// here. We also disable the batch ticker from waking up // here. We also disable the batch ticker from waking up
// the htlcManager while the batch is empty. // the htlcManager while the batch is empty.
if l.batchCounter == 0 { if l.batchCounter == 0 {
l.cfg.BatchTicker.Stop() l.cfg.BatchTicker.Pause()
maybeBatchTick = nil
continue continue
} }
@ -1030,8 +988,8 @@ out:
// If the downstream packet resulted in a non-empty // If the downstream packet resulted in a non-empty
// batch, reinstate the batch ticker so that it can be // batch, reinstate the batch ticker so that it can be
// cleared. // cleared.
if l.batchCounter > 0 && maybeBatchTick == nil { if l.batchCounter > 0 {
maybeBatchTick = l.cfg.BatchTicker.Start() l.cfg.BatchTicker.Resume()
} }
// A message from the switch was just received. This indicates // A message from the switch was just received. This indicates
@ -1059,8 +1017,8 @@ out:
// If the downstream packet resulted in a non-empty // If the downstream packet resulted in a non-empty
// batch, reinstate the batch ticker so that it can be // batch, reinstate the batch ticker so that it can be
// cleared. // cleared.
if l.batchCounter > 0 && maybeBatchTick == nil { if l.batchCounter > 0 {
maybeBatchTick = l.cfg.BatchTicker.Start() l.cfg.BatchTicker.Resume()
} }
// A message from the connected peer was just received. This // A message from the connected peer was just received. This