diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 0c0d1b66..bdfaadfc 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -83,6 +83,34 @@ func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSa return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 } +// Ticker is an interface used to wrap a time.Ticker in a struct, +// making mocking it easier. +type Ticker interface { + Start() <-chan time.Time + Stop() +} + +// BatchTicker implements the Ticker interface, and wraps a time.Ticker. +type BatchTicker struct { + ticker *time.Ticker +} + +// NewBatchTicker returns a new BatchTicker that wraps the passed +// time.Ticker. +func NewBatchTicker(t *time.Ticker) *BatchTicker { + return &BatchTicker{t} +} + +// Start returns the tick channel for the underlying time.Ticker. +func (t *BatchTicker) Start() <-chan time.Time { + return t.ticker.C +} + +// Stop stops the underlying time.Ticker. +func (t *BatchTicker) Stop() { + t.ticker.Stop() +} + // ChannelLinkConfig defines the configuration for the channel link. ALL // elements within the configuration MUST be non-nil for channel link to carry // out its duties. @@ -167,6 +195,17 @@ type ChannelLinkConfig struct { // reestablishment message to the remote peer. It should be done if our // clients have been restarted, or remote peer have been reconnected. SyncStates bool + + // BatchTicker is the ticker that determines the interval that we'll + // use to check the batch to see if there're any updates we should + // flush out. By batching updates into a single commit, we attempt + // to increase throughput by maximizing the number of updates + // coalesced into a single commit. + BatchTicker Ticker + + // BatchSize is the max size of a batch of updates done to the link + // before we do a state update. + BatchSize uint32 } // channelLink is the service which drives a channel's commitment update @@ -588,8 +627,8 @@ func (l *channelLink) htlcManager() { } } - batchTimer := time.NewTicker(50 * time.Millisecond) - defer batchTimer.Stop() + batchTick := l.cfg.BatchTicker.Start() + defer l.cfg.BatchTicker.Stop() // TODO(roasbeef): fail chan in case of protocol violation out: @@ -667,7 +706,7 @@ out: break out } - case <-batchTimer.C: + case <-batchTick: // If the current batch is empty, then we have no work // here. if l.batchCounter == 0 { @@ -899,7 +938,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // If this newly added update exceeds the min batch size for adds, or // this is a settle request, then initiate an update. - if l.batchCounter >= 10 || isSettle { + if l.batchCounter >= l.cfg.BatchSize || isSettle { if err := l.updateCommitTx(); err != nil { l.fail("unable to update commitment: %v", err) return