diff --git a/htlcswitch/link.go b/htlcswitch/link.go index df33da84..8697624c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -18,6 +18,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) func init() { @@ -94,41 +95,6 @@ func ExpectedFee(f ForwardingPolicy, return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 } -// Ticker is an interface used to wrap a time.Ticker in a struct, making -// mocking it easier. -type Ticker interface { - Start() <-chan time.Time - - Stop() -} - -// BatchTicker implements the Ticker interface, and wraps a time.Ticker. -type BatchTicker struct { - duration time.Duration - ticker *time.Ticker -} - -// NewBatchTicker returns a new BatchTicker that wraps the passed time.Ticker. -func NewBatchTicker(d time.Duration) *BatchTicker { - return &BatchTicker{ - duration: d, - } -} - -// Start returns the tick channel for the underlying time.Ticker. -func (t *BatchTicker) Start() <-chan time.Time { - t.ticker = time.NewTicker(t.duration) - return t.ticker.C -} - -// Stop stops the underlying time.Ticker. -func (t *BatchTicker) Stop() { - if t.ticker != nil { - t.ticker.Stop() - t.ticker = nil - } -} - // ChannelLinkConfig defines the configuration for the channel link. ALL // elements within the configuration MUST be non-nil for channel link to carry // out its duties. @@ -236,13 +202,13 @@ type ChannelLinkConfig struct { // flush out. By batching updates into a single commit, we attempt to // increase throughput by maximizing the number of updates coalesced // into a single commit. - BatchTicker Ticker + BatchTicker ticker.Ticker // FwdPkgGCTicker is the ticker determining the frequency at which // garbage collection of forwarding packages occurs. We use a // time-based approach, as opposed to block epochs, as to not hinder // syncing. - FwdPkgGCTicker Ticker + FwdPkgGCTicker ticker.Ticker // BatchSize is the max size of a batch of updates done to the link // before we do a state update. @@ -739,12 +705,12 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) { func (l *channelLink) fwdPkgGarbager() { defer l.wg.Done() - fwdPkgGcTick := l.cfg.FwdPkgGCTicker.Start() + l.cfg.FwdPkgGCTicker.Resume() defer l.cfg.FwdPkgGCTicker.Stop() for { select { - case <-fwdPkgGcTick: + case <-l.cfg.FwdPkgGCTicker.Ticks(): fwdPkgs, err := l.channel.LoadFwdPkgs() if err != nil { l.warnf("unable to load fwdpkgs for gc: %v", err) @@ -890,13 +856,6 @@ func (l *channelLink) htlcManager() { l.wg.Add(1) go l.fwdPkgGarbager() - // We'll only need the batch ticker if we have outgoing updates that are - // not covered by our last signature. This value will be nil unless a - // downstream packet forces the batchCounter to be positive. After the - // batch is cleared, it will return to nil to prevent wasteful CPU time - // caused by the batch ticker waking up the htlcManager needlessly. - var maybeBatchTick <-chan time.Time - out: for { // We must always check if we failed at some point processing @@ -977,13 +936,12 @@ out: break out } - case <-maybeBatchTick: + case <-l.cfg.BatchTicker.Ticks(): // If the current batch is empty, then we have no work // here. We also disable the batch ticker from waking up // the htlcManager while the batch is empty. if l.batchCounter == 0 { - l.cfg.BatchTicker.Stop() - maybeBatchTick = nil + l.cfg.BatchTicker.Pause() continue } @@ -1012,8 +970,8 @@ out: // If the downstream packet resulted in a non-empty // batch, reinstate the batch ticker so that it can be // cleared. - if l.batchCounter > 0 && maybeBatchTick == nil { - maybeBatchTick = l.cfg.BatchTicker.Start() + if l.batchCounter > 0 { + l.cfg.BatchTicker.Resume() } // A message from the switch was just received. This indicates @@ -1041,8 +999,8 @@ out: // If the downstream packet resulted in a non-empty // batch, reinstate the batch ticker so that it can be // cleared. - if l.batchCounter > 0 && maybeBatchTick == nil { - maybeBatchTick = l.cfg.BatchTicker.Start() + if l.batchCounter > 0 { + l.cfg.BatchTicker.Resume() } // A message from the connected peer was just received. This diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 24b80530..540dd937 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) const ( @@ -1474,8 +1475,9 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( return nil, nil, nil, nil, nil, nil, err } - t := make(chan time.Time) - ticker := &mockTicker{t} + // Instantiate with a long interval, so that we can precisely control + // the firing via force feeding. + bticker := ticker.MockNew(time.Hour) aliceCfg := ChannelLinkConfig{ FwrdingPolicy: globalPolicy, Peer: alicePeer, @@ -1497,8 +1499,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( }, Registry: invoiceRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: ticker, - FwdPkgGCTicker: NewBatchTicker(5 * time.Second), + BatchTicker: bticker, + FwdPkgGCTicker: ticker.MockNew(5 * time.Second), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -1528,7 +1530,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( defer bobChannel.Stop() } - return aliceLink, bobChannel, t, start, cleanUp, restore, nil + return aliceLink, bobChannel, bticker.Force, start, cleanUp, restore, nil } func assertLinkBandwidth(t *testing.T, link ChannelLink, @@ -3846,8 +3848,9 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, } } - t := make(chan time.Time) - ticker := &mockTicker{t} + // Instantiate with a long interval, so that we can precisely control + // the firing via force feeding. + bticker := ticker.MockNew(time.Hour) aliceCfg := ChannelLinkConfig{ FwrdingPolicy: globalPolicy, Peer: alicePeer, @@ -3869,8 +3872,8 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, }, Registry: invoiceRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: ticker, - FwdPkgGCTicker: NewBatchTicker(5 * time.Second), + BatchTicker: bticker, + FwdPkgGCTicker: ticker.New(5 * time.Second), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -3901,7 +3904,7 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, defer aliceLink.Stop() } - return aliceLink, t, cleanUp, nil + return aliceLink, bticker.Force, cleanUp, nil } // gnerateHtlc generates a simple payment from Bob to Alice. diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 3dcfa876..00795c51 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -26,6 +26,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) type mockPreimageCache struct { @@ -145,7 +146,9 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) { return nil, nil }, - Notifier: &mockNotifier{}, + Notifier: &mockNotifier{}, + FwdEventTicker: ticker.MockNew(DefaultFwdEventInterval), + LogEventTicker: ticker.MockNew(DefaultLogInterval), } return New(cfg, startingHeight) @@ -809,14 +812,3 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, Spend: make(chan *chainntnfs.SpendDetail), }, nil } - -type mockTicker struct { - ticker <-chan time.Time -} - -func (m *mockTicker) Start() <-chan time.Time { - return m.ticker -} - -func (m *mockTicker) Stop() { -} diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index d1473ec7..e2ee14b0 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -21,6 +21,17 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" +) + +const ( + // DefaultFwdEventInterval is the duration between attempts to flush + // pending forwarding events to disk. + DefaultFwdEventInterval = 15 * time.Second + + // DefaultLogInterval is the duration between attempts to log statistics + // about forwarding events. + DefaultLogInterval = 10 * time.Second ) var ( @@ -146,6 +157,14 @@ type Config struct { // Notifier is an instance of a chain notifier that we'll use to signal // the switch when a new block has arrived. Notifier chainntnfs.ChainNotifier + + // FwdEventTicker is a signal that instructs the htlcswitch to flush any + // pending forwarding events. + FwdEventTicker ticker.Ticker + + // LogEventTicker is a signal instructing the htlcswitch to log + // aggregate stats about it's forwarding during the last interval. + LogEventTicker ticker.Ticker } // Switch is the central messaging bus for all incoming/outgoing HTLCs. @@ -1390,13 +1409,13 @@ func (s *Switch) htlcForwarder() { totalSatSent btcutil.Amount totalSatRecv btcutil.Amount ) - logTicker := time.NewTicker(10 * time.Second) - defer logTicker.Stop() + s.cfg.LogEventTicker.Resume() + defer s.cfg.LogEventTicker.Stop() // Every 15 seconds, we'll flush out the forwarding events that // occurred during that period. - fwdEventTicker := time.NewTicker(15 * time.Second) - defer fwdEventTicker.Stop() + s.cfg.FwdEventTicker.Resume() + defer s.cfg.FwdEventTicker.Stop() out: for { @@ -1474,7 +1493,7 @@ out: // When this time ticks, then it indicates that we should // collect all the forwarding events since the last internal, // and write them out to our log. - case <-fwdEventTicker.C: + case <-s.cfg.FwdEventTicker.Ticks(): s.wg.Add(1) go func() { defer s.wg.Done() @@ -1488,7 +1507,7 @@ out: // The log ticker has fired, so we'll calculate some forwarding // stats for the last 10 seconds to display within the logs to // users. - case <-logTicker.C: + case <-s.cfg.LogEventTicker.Ticks(): // First, we'll collate the current running tally of // our forwarding stats. prevSatSent := totalSatSent diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 7838e129..1e7c8182 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -915,8 +916,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, @@ -958,8 +959,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, @@ -1001,8 +1002,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, @@ -1044,8 +1045,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, diff --git a/peer.go b/peer.go index 09a17291..a9911dda 100644 --- a/peer.go +++ b/peer.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -417,7 +418,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { ) if err != nil { lnChan.Stop() - return err + return fmt.Errorf("unable to add link %v to switch: %v", + chanPoint, err) } p.activeChanMtx.Lock() @@ -545,8 +547,8 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, }, OnChannelFailure: onChannelFailure, SyncStates: syncStates, - BatchTicker: htlcswitch.NewBatchTicker(50 * time.Millisecond), - FwdPkgGCTicker: htlcswitch.NewBatchTicker(time.Minute), + BatchTicker: ticker.New(50 * time.Millisecond), + FwdPkgGCTicker: ticker.New(time.Minute), BatchSize: 10, UnsafeReplay: cfg.UnsafeReplay, MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, @@ -576,7 +578,7 @@ func (p *peer) Disconnect(reason error) { return } - peerLog.Tracef("Disconnecting %s, reason: %v", p, reason) + peerLog.Debugf("Disconnecting %s, reason: %v", p, reason) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() diff --git a/server.go b/server.go index 76d54512..a86276a3 100644 --- a/server.go +++ b/server.go @@ -37,6 +37,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/nat" "github.com/lightningnetwork/lnd/routing" + "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/tor" ) @@ -322,6 +323,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter, FetchLastChannelUpdate: fetchLastChanUpdate(s, serializedPubKey), Notifier: s.cc.chainNotifier, + FwdEventTicker: ticker.New( + htlcswitch.DefaultFwdEventInterval), + LogEventTicker: ticker.New( + htlcswitch.DefaultLogInterval), }, uint32(currentHeight)) if err != nil { return nil, err diff --git a/test_utils.go b/test_utils.go index f6338bfb..6f4b1383 100644 --- a/test_utils.go +++ b/test_utils.go @@ -22,6 +22,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -350,6 +351,10 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, DB: dbAlice, SwitchPackager: channeldb.NewSwitchPackager(), Notifier: notifier, + FwdEventTicker: ticker.New( + htlcswitch.DefaultFwdEventInterval), + LogEventTicker: ticker.New( + htlcswitch.DefaultLogInterval), }, uint32(currentHeight)) if err != nil { return nil, nil, nil, nil, err diff --git a/ticker/mock.go b/ticker/mock.go new file mode 100644 index 00000000..2fae9e51 --- /dev/null +++ b/ticker/mock.go @@ -0,0 +1,104 @@ +// +build debug + +package ticker + +import ( + "sync" + "sync/atomic" + "time" +) + +// Mock implements the Ticker interface, and provides a method of +// force-feeding ticks, even while paused. +type Mock struct { + isActive uint32 // used atomically + + // Force is used to force-feed a ticks into the ticker. Useful for + // debugging when trying to wake an event. + Force chan time.Time + + ticker <-chan time.Time + skip chan struct{} + + wg sync.WaitGroup + quit chan struct{} +} + +// MockNew returns a Mock Ticker, used for testing and debugging. It supports +// the ability to force-feed events that get output by the +func MockNew(interval time.Duration) *Mock { + m := &Mock{ + ticker: time.NewTicker(interval).C, + Force: make(chan time.Time), + skip: make(chan struct{}), + quit: make(chan struct{}), + } + + // Proxy the real ticks to our Force channel if we are active. + m.wg.Add(1) + go func() { + defer m.wg.Done() + for { + select { + case t := <-m.ticker: + if atomic.LoadUint32(&m.isActive) == 0 { + continue + } + + select { + case m.Force <- t: + case <-m.skip: + case <-m.quit: + return + } + + case <-m.quit: + return + } + } + }() + + return m +} + +// Ticks returns a receive-only channel that delivers times at the ticker's +// prescribed interval when active. Force-fed ticks can be delivered at any +// time. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Ticks() <-chan time.Time { + return m.Force +} + +// Resumes starts underlying time.Ticker and causes the ticker to begin +// delivering scheduled events. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Resume() { + atomic.StoreUint32(&m.isActive, 1) +} + +// Pause suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Pause() { + atomic.StoreUint32(&m.isActive, 0) + + // If the ticker fired and read isActive as true, it may still send the + // tick. We'll try to send on the skip channel to drop it. + select { + case m.skip <- struct{}{}: + default: + } +} + +// Stop suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals, and permanently frees up any resources. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Stop() { + m.Pause() + close(m.quit) + m.wg.Wait() +} diff --git a/ticker/ticker.go b/ticker/ticker.go new file mode 100644 index 00000000..29e0c04b --- /dev/null +++ b/ticker/ticker.go @@ -0,0 +1,123 @@ +package ticker + +import "time" + +// Ticker defines a resumable ticker interface, whose activity can be toggled to +// free up resources during periods of inactivity. +// +// Example of resuming ticker: +// +// ticker.Resume() // can remove to keep inactive at first +// defer ticker.Stop() +// for { +// select { +// case <-ticker.Tick(): +// if shouldGoInactive { +// ticker.Pause() +// continue +// } +// ... +// +// case <-otherEvent: +// ... +// if shouldGoActive { +// ticker.Resume() +// } +// } +// +// NOTE: ONE DOES NOT SIMPLY assume that Tickers are safe for concurrent access. +type Ticker interface { + // Ticks returns a read-only channel delivering ticks according to a + // prescribed interval. The value returned does not need to be the same + // channel, and may be nil. + // + // NOTE: Callers should assume that reads from Ticks() are stale after + // any invocations of Resume, Pause, or Stop. + Ticks() <-chan time.Time + + // Resume starts or resumes the underlying ticker, such that Ticks() + // will fire at regular intervals. After calling Resume, Ticks() should + // minimally send ticks at the prescribed interval. + // + // NOTE: It MUST be safe to call Resume at any time, and more than once + // successively. + Resume() + + // Pause suspends the underlying ticker, such that Ticks() stops + // signaling at regular intervals. After calling Pause, the ticker + // should not send any ticks scheduled with the chosen interval. Forced + // ticks are still permissible, as in the case of the Mock Ticker. + // + // NOTE: It MUST be safe to call Pause at any time, and more than once + // successively. + Pause() + + // Stop suspends the underlying ticker, such that Ticks() stops + // signaling at regular intervals, and permanently frees up any + // remaining resources. + // + // NOTE: The behavior of a Ticker is undefined after calling Stop. + Stop() +} + +// ticker is the production implementation of the resumable Ticker interface. +// This allows various components to toggle their need for tick events, which +// may vary depending on system load. +type ticker struct { + // interval is the desired duration between ticks when active. + interval time.Duration + + // ticker is the ephemeral, underlying time.Ticker. We keep a reference + // to this ticker so that it can be stopped and cleaned up on Pause or + // Stop. + ticker *time.Ticker +} + +// New returns a new ticker that signals with the given interval when not +// paused. The ticker starts off inactive. +func New(interval time.Duration) Ticker { + return &ticker{ + interval: interval, + } +} + +// Ticks returns a receive-only channel that delivers times at the ticker's +// prescribed interval. This method returns nil when the ticker is paused. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Ticks() <-chan time.Time { + if t.ticker == nil { + return nil + } + return t.ticker.C +} + +// Resumes starts underlying time.Ticker and causes the ticker to begin +// delivering scheduled events. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Resume() { + if t.ticker == nil { + t.ticker = time.NewTicker(t.interval) + } +} + +// Pause suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Pause() { + if t.ticker != nil { + t.ticker.Stop() + t.ticker = nil + } +} + +// Stop suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals, and permanently frees up any resources. For this +// implementation, this is equivalent to Pause. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Stop() { + t.Pause() +} diff --git a/ticker/ticker_test.go b/ticker/ticker_test.go new file mode 100644 index 00000000..2abd0543 --- /dev/null +++ b/ticker/ticker_test.go @@ -0,0 +1,99 @@ +package ticker_test + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/ticker" +) + +const interval = 50 * time.Millisecond +const numActiveTicks = 3 + +var tickers = []struct { + name string + ticker ticker.Ticker +}{ + { + "default ticker", + ticker.New(interval), + }, + { + "mock ticker", + ticker.MockNew(interval), + }, +} + +// TestTickers verifies that both our production and mock tickers exhibit the +// same principle behaviors when accessed via the ticker.Ticker interface +// methods. +func TestInterfaceTickers(t *testing.T) { + for _, test := range tickers { + t.Run(test.name, func(t *testing.T) { + testTicker(t, test.ticker) + }) + } +} + +// testTicker asserts the behavior of a freshly initialized ticker.Ticker. +func testTicker(t *testing.T, ticker ticker.Ticker) { + // Newly initialized ticker should start off inactive. + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked before calling Resume") + case <-time.After(2 * interval): + } + + // Resume, ticker should be active and start sending ticks. + ticker.Resume() + + for i := 0; i < numActiveTicks; i++ { + select { + case <-ticker.Ticks(): + case <-time.After(2 * interval): + t.Fatalf( + "ticker should have ticked after calling Resume", + ) + } + } + + // Pause, check that ticker is inactive and sends no ticks. + ticker.Pause() + + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked after calling Pause") + case <-time.After(2 * interval): + } + + // Pause again, expect same behavior as after first invocation. + ticker.Pause() + + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked after calling Pause again") + case <-time.After(2 * interval): + } + + // Resume again, should result in normal active behavior. + ticker.Resume() + + for i := 0; i < numActiveTicks; i++ { + select { + case <-ticker.Ticks(): + case <-time.After(2 * interval): + t.Fatalf( + "ticker should have ticked after calling Resume", + ) + } + } + + // Stop the ticker altogether, should render it inactive. + ticker.Stop() + + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked after calling Stop") + case <-time.After(2 * interval): + } +}