Merge pull request #1668 from cfromknecht/interface-tickers
Ticker Package
This commit is contained in:
commit
d3b1b9aa98
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnpeer"
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -94,41 +95,6 @@ func ExpectedFee(f ForwardingPolicy,
|
|||||||
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
|
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ticker is an interface used to wrap a time.Ticker in a struct, making
|
|
||||||
// mocking it easier.
|
|
||||||
type Ticker interface {
|
|
||||||
Start() <-chan time.Time
|
|
||||||
|
|
||||||
Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// BatchTicker implements the Ticker interface, and wraps a time.Ticker.
|
|
||||||
type BatchTicker struct {
|
|
||||||
duration time.Duration
|
|
||||||
ticker *time.Ticker
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBatchTicker returns a new BatchTicker that wraps the passed time.Ticker.
|
|
||||||
func NewBatchTicker(d time.Duration) *BatchTicker {
|
|
||||||
return &BatchTicker{
|
|
||||||
duration: d,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start returns the tick channel for the underlying time.Ticker.
|
|
||||||
func (t *BatchTicker) Start() <-chan time.Time {
|
|
||||||
t.ticker = time.NewTicker(t.duration)
|
|
||||||
return t.ticker.C
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop stops the underlying time.Ticker.
|
|
||||||
func (t *BatchTicker) Stop() {
|
|
||||||
if t.ticker != nil {
|
|
||||||
t.ticker.Stop()
|
|
||||||
t.ticker = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ChannelLinkConfig defines the configuration for the channel link. ALL
|
// ChannelLinkConfig defines the configuration for the channel link. ALL
|
||||||
// elements within the configuration MUST be non-nil for channel link to carry
|
// elements within the configuration MUST be non-nil for channel link to carry
|
||||||
// out its duties.
|
// out its duties.
|
||||||
@ -236,13 +202,13 @@ type ChannelLinkConfig struct {
|
|||||||
// flush out. By batching updates into a single commit, we attempt to
|
// flush out. By batching updates into a single commit, we attempt to
|
||||||
// increase throughput by maximizing the number of updates coalesced
|
// increase throughput by maximizing the number of updates coalesced
|
||||||
// into a single commit.
|
// into a single commit.
|
||||||
BatchTicker Ticker
|
BatchTicker ticker.Ticker
|
||||||
|
|
||||||
// FwdPkgGCTicker is the ticker determining the frequency at which
|
// FwdPkgGCTicker is the ticker determining the frequency at which
|
||||||
// garbage collection of forwarding packages occurs. We use a
|
// garbage collection of forwarding packages occurs. We use a
|
||||||
// time-based approach, as opposed to block epochs, as to not hinder
|
// time-based approach, as opposed to block epochs, as to not hinder
|
||||||
// syncing.
|
// syncing.
|
||||||
FwdPkgGCTicker Ticker
|
FwdPkgGCTicker ticker.Ticker
|
||||||
|
|
||||||
// BatchSize is the max size of a batch of updates done to the link
|
// BatchSize is the max size of a batch of updates done to the link
|
||||||
// before we do a state update.
|
// before we do a state update.
|
||||||
@ -739,12 +705,12 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
|||||||
func (l *channelLink) fwdPkgGarbager() {
|
func (l *channelLink) fwdPkgGarbager() {
|
||||||
defer l.wg.Done()
|
defer l.wg.Done()
|
||||||
|
|
||||||
fwdPkgGcTick := l.cfg.FwdPkgGCTicker.Start()
|
l.cfg.FwdPkgGCTicker.Resume()
|
||||||
defer l.cfg.FwdPkgGCTicker.Stop()
|
defer l.cfg.FwdPkgGCTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-fwdPkgGcTick:
|
case <-l.cfg.FwdPkgGCTicker.Ticks():
|
||||||
fwdPkgs, err := l.channel.LoadFwdPkgs()
|
fwdPkgs, err := l.channel.LoadFwdPkgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.warnf("unable to load fwdpkgs for gc: %v", err)
|
l.warnf("unable to load fwdpkgs for gc: %v", err)
|
||||||
@ -890,13 +856,6 @@ func (l *channelLink) htlcManager() {
|
|||||||
l.wg.Add(1)
|
l.wg.Add(1)
|
||||||
go l.fwdPkgGarbager()
|
go l.fwdPkgGarbager()
|
||||||
|
|
||||||
// We'll only need the batch ticker if we have outgoing updates that are
|
|
||||||
// not covered by our last signature. This value will be nil unless a
|
|
||||||
// downstream packet forces the batchCounter to be positive. After the
|
|
||||||
// batch is cleared, it will return to nil to prevent wasteful CPU time
|
|
||||||
// caused by the batch ticker waking up the htlcManager needlessly.
|
|
||||||
var maybeBatchTick <-chan time.Time
|
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
// We must always check if we failed at some point processing
|
// We must always check if we failed at some point processing
|
||||||
@ -977,13 +936,12 @@ out:
|
|||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-maybeBatchTick:
|
case <-l.cfg.BatchTicker.Ticks():
|
||||||
// If the current batch is empty, then we have no work
|
// If the current batch is empty, then we have no work
|
||||||
// here. We also disable the batch ticker from waking up
|
// here. We also disable the batch ticker from waking up
|
||||||
// the htlcManager while the batch is empty.
|
// the htlcManager while the batch is empty.
|
||||||
if l.batchCounter == 0 {
|
if l.batchCounter == 0 {
|
||||||
l.cfg.BatchTicker.Stop()
|
l.cfg.BatchTicker.Pause()
|
||||||
maybeBatchTick = nil
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1012,8 +970,8 @@ out:
|
|||||||
// If the downstream packet resulted in a non-empty
|
// If the downstream packet resulted in a non-empty
|
||||||
// batch, reinstate the batch ticker so that it can be
|
// batch, reinstate the batch ticker so that it can be
|
||||||
// cleared.
|
// cleared.
|
||||||
if l.batchCounter > 0 && maybeBatchTick == nil {
|
if l.batchCounter > 0 {
|
||||||
maybeBatchTick = l.cfg.BatchTicker.Start()
|
l.cfg.BatchTicker.Resume()
|
||||||
}
|
}
|
||||||
|
|
||||||
// A message from the switch was just received. This indicates
|
// A message from the switch was just received. This indicates
|
||||||
@ -1041,8 +999,8 @@ out:
|
|||||||
// If the downstream packet resulted in a non-empty
|
// If the downstream packet resulted in a non-empty
|
||||||
// batch, reinstate the batch ticker so that it can be
|
// batch, reinstate the batch ticker so that it can be
|
||||||
// cleared.
|
// cleared.
|
||||||
if l.batchCounter > 0 && maybeBatchTick == nil {
|
if l.batchCounter > 0 {
|
||||||
maybeBatchTick = l.cfg.BatchTicker.Start()
|
l.cfg.BatchTicker.Resume()
|
||||||
}
|
}
|
||||||
|
|
||||||
// A message from the connected peer was just received. This
|
// A message from the connected peer was just received. This
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnpeer"
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -1474,8 +1475,9 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
|
|||||||
return nil, nil, nil, nil, nil, nil, err
|
return nil, nil, nil, nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
t := make(chan time.Time)
|
// Instantiate with a long interval, so that we can precisely control
|
||||||
ticker := &mockTicker{t}
|
// the firing via force feeding.
|
||||||
|
bticker := ticker.MockNew(time.Hour)
|
||||||
aliceCfg := ChannelLinkConfig{
|
aliceCfg := ChannelLinkConfig{
|
||||||
FwrdingPolicy: globalPolicy,
|
FwrdingPolicy: globalPolicy,
|
||||||
Peer: alicePeer,
|
Peer: alicePeer,
|
||||||
@ -1497,8 +1499,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
|
|||||||
},
|
},
|
||||||
Registry: invoiceRegistry,
|
Registry: invoiceRegistry,
|
||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
BatchTicker: ticker,
|
BatchTicker: bticker,
|
||||||
FwdPkgGCTicker: NewBatchTicker(5 * time.Second),
|
FwdPkgGCTicker: ticker.MockNew(5 * time.Second),
|
||||||
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
||||||
// to not trigger commit updates automatically during tests.
|
// to not trigger commit updates automatically during tests.
|
||||||
BatchSize: 10000,
|
BatchSize: 10000,
|
||||||
@ -1528,7 +1530,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
|
|||||||
defer bobChannel.Stop()
|
defer bobChannel.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
return aliceLink, bobChannel, t, start, cleanUp, restore, nil
|
return aliceLink, bobChannel, bticker.Force, start, cleanUp, restore, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertLinkBandwidth(t *testing.T, link ChannelLink,
|
func assertLinkBandwidth(t *testing.T, link ChannelLink,
|
||||||
@ -3846,8 +3848,9 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t := make(chan time.Time)
|
// Instantiate with a long interval, so that we can precisely control
|
||||||
ticker := &mockTicker{t}
|
// the firing via force feeding.
|
||||||
|
bticker := ticker.MockNew(time.Hour)
|
||||||
aliceCfg := ChannelLinkConfig{
|
aliceCfg := ChannelLinkConfig{
|
||||||
FwrdingPolicy: globalPolicy,
|
FwrdingPolicy: globalPolicy,
|
||||||
Peer: alicePeer,
|
Peer: alicePeer,
|
||||||
@ -3869,8 +3872,8 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch,
|
|||||||
},
|
},
|
||||||
Registry: invoiceRegistry,
|
Registry: invoiceRegistry,
|
||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
BatchTicker: ticker,
|
BatchTicker: bticker,
|
||||||
FwdPkgGCTicker: NewBatchTicker(5 * time.Second),
|
FwdPkgGCTicker: ticker.New(5 * time.Second),
|
||||||
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
|
||||||
// to not trigger commit updates automatically during tests.
|
// to not trigger commit updates automatically during tests.
|
||||||
BatchSize: 10000,
|
BatchSize: 10000,
|
||||||
@ -3901,7 +3904,7 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch,
|
|||||||
defer aliceLink.Stop()
|
defer aliceLink.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
return aliceLink, t, cleanUp, nil
|
return aliceLink, bticker.Force, cleanUp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// gnerateHtlc generates a simple payment from Bob to Alice.
|
// gnerateHtlc generates a simple payment from Bob to Alice.
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnpeer"
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockPreimageCache struct {
|
type mockPreimageCache struct {
|
||||||
@ -145,7 +146,9 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
|
|||||||
FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
|
FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
},
|
},
|
||||||
Notifier: &mockNotifier{},
|
Notifier: &mockNotifier{},
|
||||||
|
FwdEventTicker: ticker.MockNew(DefaultFwdEventInterval),
|
||||||
|
LogEventTicker: ticker.MockNew(DefaultLogInterval),
|
||||||
}
|
}
|
||||||
|
|
||||||
return New(cfg, startingHeight)
|
return New(cfg, startingHeight)
|
||||||
@ -809,14 +812,3 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
|
|||||||
Spend: make(chan *chainntnfs.SpendDetail),
|
Spend: make(chan *chainntnfs.SpendDetail),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockTicker struct {
|
|
||||||
ticker <-chan time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockTicker) Start() <-chan time.Time {
|
|
||||||
return m.ticker
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *mockTicker) Stop() {
|
|
||||||
}
|
|
||||||
|
@ -21,6 +21,17 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultFwdEventInterval is the duration between attempts to flush
|
||||||
|
// pending forwarding events to disk.
|
||||||
|
DefaultFwdEventInterval = 15 * time.Second
|
||||||
|
|
||||||
|
// DefaultLogInterval is the duration between attempts to log statistics
|
||||||
|
// about forwarding events.
|
||||||
|
DefaultLogInterval = 10 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -146,6 +157,14 @@ type Config struct {
|
|||||||
// Notifier is an instance of a chain notifier that we'll use to signal
|
// Notifier is an instance of a chain notifier that we'll use to signal
|
||||||
// the switch when a new block has arrived.
|
// the switch when a new block has arrived.
|
||||||
Notifier chainntnfs.ChainNotifier
|
Notifier chainntnfs.ChainNotifier
|
||||||
|
|
||||||
|
// FwdEventTicker is a signal that instructs the htlcswitch to flush any
|
||||||
|
// pending forwarding events.
|
||||||
|
FwdEventTicker ticker.Ticker
|
||||||
|
|
||||||
|
// LogEventTicker is a signal instructing the htlcswitch to log
|
||||||
|
// aggregate stats about it's forwarding during the last interval.
|
||||||
|
LogEventTicker ticker.Ticker
|
||||||
}
|
}
|
||||||
|
|
||||||
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
|
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
|
||||||
@ -1390,13 +1409,13 @@ func (s *Switch) htlcForwarder() {
|
|||||||
totalSatSent btcutil.Amount
|
totalSatSent btcutil.Amount
|
||||||
totalSatRecv btcutil.Amount
|
totalSatRecv btcutil.Amount
|
||||||
)
|
)
|
||||||
logTicker := time.NewTicker(10 * time.Second)
|
s.cfg.LogEventTicker.Resume()
|
||||||
defer logTicker.Stop()
|
defer s.cfg.LogEventTicker.Stop()
|
||||||
|
|
||||||
// Every 15 seconds, we'll flush out the forwarding events that
|
// Every 15 seconds, we'll flush out the forwarding events that
|
||||||
// occurred during that period.
|
// occurred during that period.
|
||||||
fwdEventTicker := time.NewTicker(15 * time.Second)
|
s.cfg.FwdEventTicker.Resume()
|
||||||
defer fwdEventTicker.Stop()
|
defer s.cfg.FwdEventTicker.Stop()
|
||||||
|
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
@ -1474,7 +1493,7 @@ out:
|
|||||||
// When this time ticks, then it indicates that we should
|
// When this time ticks, then it indicates that we should
|
||||||
// collect all the forwarding events since the last internal,
|
// collect all the forwarding events since the last internal,
|
||||||
// and write them out to our log.
|
// and write them out to our log.
|
||||||
case <-fwdEventTicker.C:
|
case <-s.cfg.FwdEventTicker.Ticks():
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
@ -1488,7 +1507,7 @@ out:
|
|||||||
// The log ticker has fired, so we'll calculate some forwarding
|
// The log ticker has fired, so we'll calculate some forwarding
|
||||||
// stats for the last 10 seconds to display within the logs to
|
// stats for the last 10 seconds to display within the logs to
|
||||||
// users.
|
// users.
|
||||||
case <-logTicker.C:
|
case <-s.cfg.LogEventTicker.Ticks():
|
||||||
// First, we'll collate the current running tally of
|
// First, we'll collate the current running tally of
|
||||||
// our forwarding stats.
|
// our forwarding stats.
|
||||||
prevSatSent := totalSatSent
|
prevSatSent := totalSatSent
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/shachain"
|
"github.com/lightningnetwork/lnd/shachain"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -915,8 +916,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
|
|||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
SyncStates: true,
|
SyncStates: true,
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
|
BatchTicker: ticker.MockNew(batchTimeout),
|
||||||
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
|
FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout),
|
||||||
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
||||||
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
||||||
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
||||||
@ -958,8 +959,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
|
|||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
SyncStates: true,
|
SyncStates: true,
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
|
BatchTicker: ticker.MockNew(batchTimeout),
|
||||||
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
|
FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout),
|
||||||
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
||||||
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
||||||
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
||||||
@ -1001,8 +1002,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
|
|||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
SyncStates: true,
|
SyncStates: true,
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
|
BatchTicker: ticker.MockNew(batchTimeout),
|
||||||
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
|
FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout),
|
||||||
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
||||||
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
||||||
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
||||||
@ -1044,8 +1045,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
|
|||||||
ChainEvents: &contractcourt.ChainEventSubscription{},
|
ChainEvents: &contractcourt.ChainEventSubscription{},
|
||||||
SyncStates: true,
|
SyncStates: true,
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C},
|
BatchTicker: ticker.MockNew(batchTimeout),
|
||||||
FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C},
|
FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout),
|
||||||
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
MinFeeUpdateTimeout: minFeeUpdateTimeout,
|
||||||
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
|
||||||
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},
|
||||||
|
10
peer.go
10
peer.go
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnrpc"
|
"github.com/lightningnetwork/lnd/lnrpc"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -417,7 +418,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
|
|||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lnChan.Stop()
|
lnChan.Stop()
|
||||||
return err
|
return fmt.Errorf("unable to add link %v to switch: %v",
|
||||||
|
chanPoint, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.activeChanMtx.Lock()
|
p.activeChanMtx.Lock()
|
||||||
@ -545,8 +547,8 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
|
|||||||
},
|
},
|
||||||
OnChannelFailure: onChannelFailure,
|
OnChannelFailure: onChannelFailure,
|
||||||
SyncStates: syncStates,
|
SyncStates: syncStates,
|
||||||
BatchTicker: htlcswitch.NewBatchTicker(50 * time.Millisecond),
|
BatchTicker: ticker.New(50 * time.Millisecond),
|
||||||
FwdPkgGCTicker: htlcswitch.NewBatchTicker(time.Minute),
|
FwdPkgGCTicker: ticker.New(time.Minute),
|
||||||
BatchSize: 10,
|
BatchSize: 10,
|
||||||
UnsafeReplay: cfg.UnsafeReplay,
|
UnsafeReplay: cfg.UnsafeReplay,
|
||||||
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
|
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,
|
||||||
@ -576,7 +578,7 @@ func (p *peer) Disconnect(reason error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
peerLog.Tracef("Disconnecting %s, reason: %v", p, reason)
|
peerLog.Debugf("Disconnecting %s, reason: %v", p, reason)
|
||||||
|
|
||||||
// Ensure that the TCP connection is properly closed before continuing.
|
// Ensure that the TCP connection is properly closed before continuing.
|
||||||
p.conn.Close()
|
p.conn.Close()
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/nat"
|
"github.com/lightningnetwork/lnd/nat"
|
||||||
"github.com/lightningnetwork/lnd/routing"
|
"github.com/lightningnetwork/lnd/routing"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
"github.com/lightningnetwork/lnd/tor"
|
"github.com/lightningnetwork/lnd/tor"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -322,6 +323,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
|||||||
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
|
ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter,
|
||||||
FetchLastChannelUpdate: fetchLastChanUpdate(s, serializedPubKey),
|
FetchLastChannelUpdate: fetchLastChanUpdate(s, serializedPubKey),
|
||||||
Notifier: s.cc.chainNotifier,
|
Notifier: s.cc.chainNotifier,
|
||||||
|
FwdEventTicker: ticker.New(
|
||||||
|
htlcswitch.DefaultFwdEventInterval),
|
||||||
|
LogEventTicker: ticker.New(
|
||||||
|
htlcswitch.DefaultLogInterval),
|
||||||
}, uint32(currentHeight))
|
}, uint32(currentHeight))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/shachain"
|
"github.com/lightningnetwork/lnd/shachain"
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -350,6 +351,10 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
|
|||||||
DB: dbAlice,
|
DB: dbAlice,
|
||||||
SwitchPackager: channeldb.NewSwitchPackager(),
|
SwitchPackager: channeldb.NewSwitchPackager(),
|
||||||
Notifier: notifier,
|
Notifier: notifier,
|
||||||
|
FwdEventTicker: ticker.New(
|
||||||
|
htlcswitch.DefaultFwdEventInterval),
|
||||||
|
LogEventTicker: ticker.New(
|
||||||
|
htlcswitch.DefaultLogInterval),
|
||||||
}, uint32(currentHeight))
|
}, uint32(currentHeight))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, nil, err
|
return nil, nil, nil, nil, err
|
||||||
|
104
ticker/mock.go
Normal file
104
ticker/mock.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
// +build debug
|
||||||
|
|
||||||
|
package ticker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Mock implements the Ticker interface, and provides a method of
|
||||||
|
// force-feeding ticks, even while paused.
|
||||||
|
type Mock struct {
|
||||||
|
isActive uint32 // used atomically
|
||||||
|
|
||||||
|
// Force is used to force-feed a ticks into the ticker. Useful for
|
||||||
|
// debugging when trying to wake an event.
|
||||||
|
Force chan time.Time
|
||||||
|
|
||||||
|
ticker <-chan time.Time
|
||||||
|
skip chan struct{}
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
quit chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockNew returns a Mock Ticker, used for testing and debugging. It supports
|
||||||
|
// the ability to force-feed events that get output by the
|
||||||
|
func MockNew(interval time.Duration) *Mock {
|
||||||
|
m := &Mock{
|
||||||
|
ticker: time.NewTicker(interval).C,
|
||||||
|
Force: make(chan time.Time),
|
||||||
|
skip: make(chan struct{}),
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proxy the real ticks to our Force channel if we are active.
|
||||||
|
m.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer m.wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case t := <-m.ticker:
|
||||||
|
if atomic.LoadUint32(&m.isActive) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case m.Force <- t:
|
||||||
|
case <-m.skip:
|
||||||
|
case <-m.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-m.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ticks returns a receive-only channel that delivers times at the ticker's
|
||||||
|
// prescribed interval when active. Force-fed ticks can be delivered at any
|
||||||
|
// time.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (m *Mock) Ticks() <-chan time.Time {
|
||||||
|
return m.Force
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resumes starts underlying time.Ticker and causes the ticker to begin
|
||||||
|
// delivering scheduled events.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (m *Mock) Resume() {
|
||||||
|
atomic.StoreUint32(&m.isActive, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause suspends the underlying ticker, such that Ticks() stops signaling at
|
||||||
|
// regular intervals.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (m *Mock) Pause() {
|
||||||
|
atomic.StoreUint32(&m.isActive, 0)
|
||||||
|
|
||||||
|
// If the ticker fired and read isActive as true, it may still send the
|
||||||
|
// tick. We'll try to send on the skip channel to drop it.
|
||||||
|
select {
|
||||||
|
case m.skip <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop suspends the underlying ticker, such that Ticks() stops signaling at
|
||||||
|
// regular intervals, and permanently frees up any resources.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (m *Mock) Stop() {
|
||||||
|
m.Pause()
|
||||||
|
close(m.quit)
|
||||||
|
m.wg.Wait()
|
||||||
|
}
|
123
ticker/ticker.go
Normal file
123
ticker/ticker.go
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
package ticker
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Ticker defines a resumable ticker interface, whose activity can be toggled to
|
||||||
|
// free up resources during periods of inactivity.
|
||||||
|
//
|
||||||
|
// Example of resuming ticker:
|
||||||
|
//
|
||||||
|
// ticker.Resume() // can remove to keep inactive at first
|
||||||
|
// defer ticker.Stop()
|
||||||
|
// for {
|
||||||
|
// select {
|
||||||
|
// case <-ticker.Tick():
|
||||||
|
// if shouldGoInactive {
|
||||||
|
// ticker.Pause()
|
||||||
|
// continue
|
||||||
|
// }
|
||||||
|
// ...
|
||||||
|
//
|
||||||
|
// case <-otherEvent:
|
||||||
|
// ...
|
||||||
|
// if shouldGoActive {
|
||||||
|
// ticker.Resume()
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// NOTE: ONE DOES NOT SIMPLY assume that Tickers are safe for concurrent access.
|
||||||
|
type Ticker interface {
|
||||||
|
// Ticks returns a read-only channel delivering ticks according to a
|
||||||
|
// prescribed interval. The value returned does not need to be the same
|
||||||
|
// channel, and may be nil.
|
||||||
|
//
|
||||||
|
// NOTE: Callers should assume that reads from Ticks() are stale after
|
||||||
|
// any invocations of Resume, Pause, or Stop.
|
||||||
|
Ticks() <-chan time.Time
|
||||||
|
|
||||||
|
// Resume starts or resumes the underlying ticker, such that Ticks()
|
||||||
|
// will fire at regular intervals. After calling Resume, Ticks() should
|
||||||
|
// minimally send ticks at the prescribed interval.
|
||||||
|
//
|
||||||
|
// NOTE: It MUST be safe to call Resume at any time, and more than once
|
||||||
|
// successively.
|
||||||
|
Resume()
|
||||||
|
|
||||||
|
// Pause suspends the underlying ticker, such that Ticks() stops
|
||||||
|
// signaling at regular intervals. After calling Pause, the ticker
|
||||||
|
// should not send any ticks scheduled with the chosen interval. Forced
|
||||||
|
// ticks are still permissible, as in the case of the Mock Ticker.
|
||||||
|
//
|
||||||
|
// NOTE: It MUST be safe to call Pause at any time, and more than once
|
||||||
|
// successively.
|
||||||
|
Pause()
|
||||||
|
|
||||||
|
// Stop suspends the underlying ticker, such that Ticks() stops
|
||||||
|
// signaling at regular intervals, and permanently frees up any
|
||||||
|
// remaining resources.
|
||||||
|
//
|
||||||
|
// NOTE: The behavior of a Ticker is undefined after calling Stop.
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ticker is the production implementation of the resumable Ticker interface.
|
||||||
|
// This allows various components to toggle their need for tick events, which
|
||||||
|
// may vary depending on system load.
|
||||||
|
type ticker struct {
|
||||||
|
// interval is the desired duration between ticks when active.
|
||||||
|
interval time.Duration
|
||||||
|
|
||||||
|
// ticker is the ephemeral, underlying time.Ticker. We keep a reference
|
||||||
|
// to this ticker so that it can be stopped and cleaned up on Pause or
|
||||||
|
// Stop.
|
||||||
|
ticker *time.Ticker
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a new ticker that signals with the given interval when not
|
||||||
|
// paused. The ticker starts off inactive.
|
||||||
|
func New(interval time.Duration) Ticker {
|
||||||
|
return &ticker{
|
||||||
|
interval: interval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ticks returns a receive-only channel that delivers times at the ticker's
|
||||||
|
// prescribed interval. This method returns nil when the ticker is paused.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (t *ticker) Ticks() <-chan time.Time {
|
||||||
|
if t.ticker == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return t.ticker.C
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resumes starts underlying time.Ticker and causes the ticker to begin
|
||||||
|
// delivering scheduled events.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (t *ticker) Resume() {
|
||||||
|
if t.ticker == nil {
|
||||||
|
t.ticker = time.NewTicker(t.interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause suspends the underlying ticker, such that Ticks() stops signaling at
|
||||||
|
// regular intervals.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (t *ticker) Pause() {
|
||||||
|
if t.ticker != nil {
|
||||||
|
t.ticker.Stop()
|
||||||
|
t.ticker = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop suspends the underlying ticker, such that Ticks() stops signaling at
|
||||||
|
// regular intervals, and permanently frees up any resources. For this
|
||||||
|
// implementation, this is equivalent to Pause.
|
||||||
|
//
|
||||||
|
// NOTE: Part of the Ticker interface.
|
||||||
|
func (t *ticker) Stop() {
|
||||||
|
t.Pause()
|
||||||
|
}
|
99
ticker/ticker_test.go
Normal file
99
ticker/ticker_test.go
Normal file
@ -0,0 +1,99 @@
|
|||||||
|
package ticker_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
|
)
|
||||||
|
|
||||||
|
const interval = 50 * time.Millisecond
|
||||||
|
const numActiveTicks = 3
|
||||||
|
|
||||||
|
var tickers = []struct {
|
||||||
|
name string
|
||||||
|
ticker ticker.Ticker
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"default ticker",
|
||||||
|
ticker.New(interval),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"mock ticker",
|
||||||
|
ticker.MockNew(interval),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTickers verifies that both our production and mock tickers exhibit the
|
||||||
|
// same principle behaviors when accessed via the ticker.Ticker interface
|
||||||
|
// methods.
|
||||||
|
func TestInterfaceTickers(t *testing.T) {
|
||||||
|
for _, test := range tickers {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
testTicker(t, test.ticker)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// testTicker asserts the behavior of a freshly initialized ticker.Ticker.
|
||||||
|
func testTicker(t *testing.T, ticker ticker.Ticker) {
|
||||||
|
// Newly initialized ticker should start off inactive.
|
||||||
|
select {
|
||||||
|
case <-ticker.Ticks():
|
||||||
|
t.Fatalf("ticker should not have ticked before calling Resume")
|
||||||
|
case <-time.After(2 * interval):
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resume, ticker should be active and start sending ticks.
|
||||||
|
ticker.Resume()
|
||||||
|
|
||||||
|
for i := 0; i < numActiveTicks; i++ {
|
||||||
|
select {
|
||||||
|
case <-ticker.Ticks():
|
||||||
|
case <-time.After(2 * interval):
|
||||||
|
t.Fatalf(
|
||||||
|
"ticker should have ticked after calling Resume",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause, check that ticker is inactive and sends no ticks.
|
||||||
|
ticker.Pause()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ticker.Ticks():
|
||||||
|
t.Fatalf("ticker should not have ticked after calling Pause")
|
||||||
|
case <-time.After(2 * interval):
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause again, expect same behavior as after first invocation.
|
||||||
|
ticker.Pause()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ticker.Ticks():
|
||||||
|
t.Fatalf("ticker should not have ticked after calling Pause again")
|
||||||
|
case <-time.After(2 * interval):
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resume again, should result in normal active behavior.
|
||||||
|
ticker.Resume()
|
||||||
|
|
||||||
|
for i := 0; i < numActiveTicks; i++ {
|
||||||
|
select {
|
||||||
|
case <-ticker.Ticks():
|
||||||
|
case <-time.After(2 * interval):
|
||||||
|
t.Fatalf(
|
||||||
|
"ticker should have ticked after calling Resume",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop the ticker altogether, should render it inactive.
|
||||||
|
ticker.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ticker.Ticks():
|
||||||
|
t.Fatalf("ticker should not have ticked after calling Stop")
|
||||||
|
case <-time.After(2 * interval):
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user