channellink: make BatchTicker and BatchSize configurable
This commit introduces a new Ticker interface, that can be used to control when the batch timer should tick. This is done to be able to more easily control the ticker during tests. The batch timer is wrapped in the new BatchTicker struct, and made part of the config together with BatchSize.
This commit is contained in:
parent
bba2ff1871
commit
9b7b3fa3b6
@ -83,6 +83,34 @@ func ExpectedFee(f ForwardingPolicy, htlcAmt lnwire.MilliSatoshi) lnwire.MilliSa
|
||||
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
|
||||
}
|
||||
|
||||
// Ticker is an interface used to wrap a time.Ticker in a struct,
|
||||
// making mocking it easier.
|
||||
type Ticker interface {
|
||||
Start() <-chan time.Time
|
||||
Stop()
|
||||
}
|
||||
|
||||
// BatchTicker implements the Ticker interface, and wraps a time.Ticker.
|
||||
type BatchTicker struct {
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
// NewBatchTicker returns a new BatchTicker that wraps the passed
|
||||
// time.Ticker.
|
||||
func NewBatchTicker(t *time.Ticker) *BatchTicker {
|
||||
return &BatchTicker{t}
|
||||
}
|
||||
|
||||
// Start returns the tick channel for the underlying time.Ticker.
|
||||
func (t *BatchTicker) Start() <-chan time.Time {
|
||||
return t.ticker.C
|
||||
}
|
||||
|
||||
// Stop stops the underlying time.Ticker.
|
||||
func (t *BatchTicker) Stop() {
|
||||
t.ticker.Stop()
|
||||
}
|
||||
|
||||
// ChannelLinkConfig defines the configuration for the channel link. ALL
|
||||
// elements within the configuration MUST be non-nil for channel link to carry
|
||||
// out its duties.
|
||||
@ -167,6 +195,17 @@ type ChannelLinkConfig struct {
|
||||
// reestablishment message to the remote peer. It should be done if our
|
||||
// clients have been restarted, or remote peer have been reconnected.
|
||||
SyncStates bool
|
||||
|
||||
// BatchTicker is the ticker that determines the interval that we'll
|
||||
// use to check the batch to see if there're any updates we should
|
||||
// flush out. By batching updates into a single commit, we attempt
|
||||
// to increase throughput by maximizing the number of updates
|
||||
// coalesced into a single commit.
|
||||
BatchTicker Ticker
|
||||
|
||||
// BatchSize is the max size of a batch of updates done to the link
|
||||
// before we do a state update.
|
||||
BatchSize uint32
|
||||
}
|
||||
|
||||
// channelLink is the service which drives a channel's commitment update
|
||||
@ -588,8 +627,8 @@ func (l *channelLink) htlcManager() {
|
||||
}
|
||||
}
|
||||
|
||||
batchTimer := time.NewTicker(50 * time.Millisecond)
|
||||
defer batchTimer.Stop()
|
||||
batchTick := l.cfg.BatchTicker.Start()
|
||||
defer l.cfg.BatchTicker.Stop()
|
||||
|
||||
// TODO(roasbeef): fail chan in case of protocol violation
|
||||
out:
|
||||
@ -667,7 +706,7 @@ out:
|
||||
break out
|
||||
}
|
||||
|
||||
case <-batchTimer.C:
|
||||
case <-batchTick:
|
||||
// If the current batch is empty, then we have no work
|
||||
// here.
|
||||
if l.batchCounter == 0 {
|
||||
@ -899,7 +938,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
||||
|
||||
// If this newly added update exceeds the min batch size for adds, or
|
||||
// this is a settle request, then initiate an update.
|
||||
if l.batchCounter >= 10 || isSettle {
|
||||
if l.batchCounter >= l.cfg.BatchSize || isSettle {
|
||||
if err := l.updateCommitTx(); err != nil {
|
||||
l.fail("unable to update commitment: %v", err)
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user