From f8fe26fb06801d15446cd3e576bdf922588402d6 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Aug 2018 12:20:46 -0700 Subject: [PATCH 01/11] ticker/ticker: adds resumable Ticker interface --- ticker/ticker.go | 123 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 ticker/ticker.go diff --git a/ticker/ticker.go b/ticker/ticker.go new file mode 100644 index 00000000..29e0c04b --- /dev/null +++ b/ticker/ticker.go @@ -0,0 +1,123 @@ +package ticker + +import "time" + +// Ticker defines a resumable ticker interface, whose activity can be toggled to +// free up resources during periods of inactivity. +// +// Example of resuming ticker: +// +// ticker.Resume() // can remove to keep inactive at first +// defer ticker.Stop() +// for { +// select { +// case <-ticker.Tick(): +// if shouldGoInactive { +// ticker.Pause() +// continue +// } +// ... +// +// case <-otherEvent: +// ... +// if shouldGoActive { +// ticker.Resume() +// } +// } +// +// NOTE: ONE DOES NOT SIMPLY assume that Tickers are safe for concurrent access. +type Ticker interface { + // Ticks returns a read-only channel delivering ticks according to a + // prescribed interval. The value returned does not need to be the same + // channel, and may be nil. + // + // NOTE: Callers should assume that reads from Ticks() are stale after + // any invocations of Resume, Pause, or Stop. + Ticks() <-chan time.Time + + // Resume starts or resumes the underlying ticker, such that Ticks() + // will fire at regular intervals. After calling Resume, Ticks() should + // minimally send ticks at the prescribed interval. + // + // NOTE: It MUST be safe to call Resume at any time, and more than once + // successively. + Resume() + + // Pause suspends the underlying ticker, such that Ticks() stops + // signaling at regular intervals. After calling Pause, the ticker + // should not send any ticks scheduled with the chosen interval. Forced + // ticks are still permissible, as in the case of the Mock Ticker. + // + // NOTE: It MUST be safe to call Pause at any time, and more than once + // successively. + Pause() + + // Stop suspends the underlying ticker, such that Ticks() stops + // signaling at regular intervals, and permanently frees up any + // remaining resources. + // + // NOTE: The behavior of a Ticker is undefined after calling Stop. + Stop() +} + +// ticker is the production implementation of the resumable Ticker interface. +// This allows various components to toggle their need for tick events, which +// may vary depending on system load. +type ticker struct { + // interval is the desired duration between ticks when active. + interval time.Duration + + // ticker is the ephemeral, underlying time.Ticker. We keep a reference + // to this ticker so that it can be stopped and cleaned up on Pause or + // Stop. + ticker *time.Ticker +} + +// New returns a new ticker that signals with the given interval when not +// paused. The ticker starts off inactive. +func New(interval time.Duration) Ticker { + return &ticker{ + interval: interval, + } +} + +// Ticks returns a receive-only channel that delivers times at the ticker's +// prescribed interval. This method returns nil when the ticker is paused. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Ticks() <-chan time.Time { + if t.ticker == nil { + return nil + } + return t.ticker.C +} + +// Resumes starts underlying time.Ticker and causes the ticker to begin +// delivering scheduled events. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Resume() { + if t.ticker == nil { + t.ticker = time.NewTicker(t.interval) + } +} + +// Pause suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Pause() { + if t.ticker != nil { + t.ticker.Stop() + t.ticker = nil + } +} + +// Stop suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals, and permanently frees up any resources. For this +// implementation, this is equivalent to Pause. +// +// NOTE: Part of the Ticker interface. +func (t *ticker) Stop() { + t.Pause() +} From eeab500de710cfbffdfb478cab8ae78a6e23015d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 2 Aug 2018 02:14:16 -0700 Subject: [PATCH 02/11] ticker/mock: adds debug Mock Ticker This commit adds a Mock Ticker, implementing the Ticker interface. The ticker abides by the behavioral requirements of the production implementation, but also includes the ability to force feed ticks while the Ticker is paused. This commit also places the mock.go file behind the debug build flag. This allows us to import the MockTicker into our unit tests. --- ticker/mock.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 ticker/mock.go diff --git a/ticker/mock.go b/ticker/mock.go new file mode 100644 index 00000000..2fae9e51 --- /dev/null +++ b/ticker/mock.go @@ -0,0 +1,104 @@ +// +build debug + +package ticker + +import ( + "sync" + "sync/atomic" + "time" +) + +// Mock implements the Ticker interface, and provides a method of +// force-feeding ticks, even while paused. +type Mock struct { + isActive uint32 // used atomically + + // Force is used to force-feed a ticks into the ticker. Useful for + // debugging when trying to wake an event. + Force chan time.Time + + ticker <-chan time.Time + skip chan struct{} + + wg sync.WaitGroup + quit chan struct{} +} + +// MockNew returns a Mock Ticker, used for testing and debugging. It supports +// the ability to force-feed events that get output by the +func MockNew(interval time.Duration) *Mock { + m := &Mock{ + ticker: time.NewTicker(interval).C, + Force: make(chan time.Time), + skip: make(chan struct{}), + quit: make(chan struct{}), + } + + // Proxy the real ticks to our Force channel if we are active. + m.wg.Add(1) + go func() { + defer m.wg.Done() + for { + select { + case t := <-m.ticker: + if atomic.LoadUint32(&m.isActive) == 0 { + continue + } + + select { + case m.Force <- t: + case <-m.skip: + case <-m.quit: + return + } + + case <-m.quit: + return + } + } + }() + + return m +} + +// Ticks returns a receive-only channel that delivers times at the ticker's +// prescribed interval when active. Force-fed ticks can be delivered at any +// time. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Ticks() <-chan time.Time { + return m.Force +} + +// Resumes starts underlying time.Ticker and causes the ticker to begin +// delivering scheduled events. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Resume() { + atomic.StoreUint32(&m.isActive, 1) +} + +// Pause suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Pause() { + atomic.StoreUint32(&m.isActive, 0) + + // If the ticker fired and read isActive as true, it may still send the + // tick. We'll try to send on the skip channel to drop it. + select { + case m.skip <- struct{}{}: + default: + } +} + +// Stop suspends the underlying ticker, such that Ticks() stops signaling at +// regular intervals, and permanently frees up any resources. +// +// NOTE: Part of the Ticker interface. +func (m *Mock) Stop() { + m.Pause() + close(m.quit) + m.wg.Wait() +} From 83353b44bdc0ceca073254b1316e711e86b2ee5c Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Aug 2018 12:40:32 -0700 Subject: [PATCH 03/11] ticker/ticker_test: adds Ticker interface tests --- ticker/ticker_test.go | 99 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 ticker/ticker_test.go diff --git a/ticker/ticker_test.go b/ticker/ticker_test.go new file mode 100644 index 00000000..2abd0543 --- /dev/null +++ b/ticker/ticker_test.go @@ -0,0 +1,99 @@ +package ticker_test + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/ticker" +) + +const interval = 50 * time.Millisecond +const numActiveTicks = 3 + +var tickers = []struct { + name string + ticker ticker.Ticker +}{ + { + "default ticker", + ticker.New(interval), + }, + { + "mock ticker", + ticker.MockNew(interval), + }, +} + +// TestTickers verifies that both our production and mock tickers exhibit the +// same principle behaviors when accessed via the ticker.Ticker interface +// methods. +func TestInterfaceTickers(t *testing.T) { + for _, test := range tickers { + t.Run(test.name, func(t *testing.T) { + testTicker(t, test.ticker) + }) + } +} + +// testTicker asserts the behavior of a freshly initialized ticker.Ticker. +func testTicker(t *testing.T, ticker ticker.Ticker) { + // Newly initialized ticker should start off inactive. + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked before calling Resume") + case <-time.After(2 * interval): + } + + // Resume, ticker should be active and start sending ticks. + ticker.Resume() + + for i := 0; i < numActiveTicks; i++ { + select { + case <-ticker.Ticks(): + case <-time.After(2 * interval): + t.Fatalf( + "ticker should have ticked after calling Resume", + ) + } + } + + // Pause, check that ticker is inactive and sends no ticks. + ticker.Pause() + + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked after calling Pause") + case <-time.After(2 * interval): + } + + // Pause again, expect same behavior as after first invocation. + ticker.Pause() + + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked after calling Pause again") + case <-time.After(2 * interval): + } + + // Resume again, should result in normal active behavior. + ticker.Resume() + + for i := 0; i < numActiveTicks; i++ { + select { + case <-ticker.Ticks(): + case <-time.After(2 * interval): + t.Fatalf( + "ticker should have ticked after calling Resume", + ) + } + } + + // Stop the ticker altogether, should render it inactive. + ticker.Stop() + + select { + case <-ticker.Ticks(): + t.Fatalf("ticker should not have ticked after calling Stop") + case <-time.After(2 * interval): + } +} From 1c456a51443292728155890c4e9f4eae83c17d3f Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Aug 2018 12:42:38 -0700 Subject: [PATCH 04/11] htlcswitch/link: replace batch ticker with... resumable ticker.Ticker interface --- htlcswitch/link.go | 64 ++++++++-------------------------------------- 1 file changed, 11 insertions(+), 53 deletions(-) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index b75f3af5..2f09d99b 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -18,6 +18,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) func init() { @@ -99,41 +100,6 @@ func ExpectedFee(f ForwardingPolicy, return f.BaseFee + (htlcAmt*f.FeeRate)/1000000 } -// Ticker is an interface used to wrap a time.Ticker in a struct, making -// mocking it easier. -type Ticker interface { - Start() <-chan time.Time - - Stop() -} - -// BatchTicker implements the Ticker interface, and wraps a time.Ticker. -type BatchTicker struct { - duration time.Duration - ticker *time.Ticker -} - -// NewBatchTicker returns a new BatchTicker that wraps the passed time.Ticker. -func NewBatchTicker(d time.Duration) *BatchTicker { - return &BatchTicker{ - duration: d, - } -} - -// Start returns the tick channel for the underlying time.Ticker. -func (t *BatchTicker) Start() <-chan time.Time { - t.ticker = time.NewTicker(t.duration) - return t.ticker.C -} - -// Stop stops the underlying time.Ticker. -func (t *BatchTicker) Stop() { - if t.ticker != nil { - t.ticker.Stop() - t.ticker = nil - } -} - // ChannelLinkConfig defines the configuration for the channel link. ALL // elements within the configuration MUST be non-nil for channel link to carry // out its duties. @@ -241,13 +207,13 @@ type ChannelLinkConfig struct { // flush out. By batching updates into a single commit, we attempt to // increase throughput by maximizing the number of updates coalesced // into a single commit. - BatchTicker Ticker + BatchTicker ticker.Ticker // FwdPkgGCTicker is the ticker determining the frequency at which // garbage collection of forwarding packages occurs. We use a // time-based approach, as opposed to block epochs, as to not hinder // syncing. - FwdPkgGCTicker Ticker + FwdPkgGCTicker ticker.Ticker // BatchSize is the max size of a batch of updates done to the link // before we do a state update. @@ -757,12 +723,12 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) { func (l *channelLink) fwdPkgGarbager() { defer l.wg.Done() - fwdPkgGcTick := l.cfg.FwdPkgGCTicker.Start() + l.cfg.FwdPkgGCTicker.Resume() defer l.cfg.FwdPkgGCTicker.Stop() for { select { - case <-fwdPkgGcTick: + case <-l.cfg.FwdPkgGCTicker.Ticks(): fwdPkgs, err := l.channel.LoadFwdPkgs() if err != nil { l.warnf("unable to load fwdpkgs for gc: %v", err) @@ -908,13 +874,6 @@ func (l *channelLink) htlcManager() { l.wg.Add(1) go l.fwdPkgGarbager() - // We'll only need the batch ticker if we have outgoing updates that are - // not covered by our last signature. This value will be nil unless a - // downstream packet forces the batchCounter to be positive. After the - // batch is cleared, it will return to nil to prevent wasteful CPU time - // caused by the batch ticker waking up the htlcManager needlessly. - var maybeBatchTick <-chan time.Time - out: for { // We must always check if we failed at some point processing @@ -995,13 +954,12 @@ out: break out } - case <-maybeBatchTick: + case <-l.cfg.BatchTicker.Ticks(): // If the current batch is empty, then we have no work // here. We also disable the batch ticker from waking up // the htlcManager while the batch is empty. if l.batchCounter == 0 { - l.cfg.BatchTicker.Stop() - maybeBatchTick = nil + l.cfg.BatchTicker.Pause() continue } @@ -1030,8 +988,8 @@ out: // If the downstream packet resulted in a non-empty // batch, reinstate the batch ticker so that it can be // cleared. - if l.batchCounter > 0 && maybeBatchTick == nil { - maybeBatchTick = l.cfg.BatchTicker.Start() + if l.batchCounter > 0 { + l.cfg.BatchTicker.Resume() } // A message from the switch was just received. This indicates @@ -1059,8 +1017,8 @@ out: // If the downstream packet resulted in a non-empty // batch, reinstate the batch ticker so that it can be // cleared. - if l.batchCounter > 0 && maybeBatchTick == nil { - maybeBatchTick = l.cfg.BatchTicker.Start() + if l.batchCounter > 0 { + l.cfg.BatchTicker.Resume() } // A message from the connected peer was just received. This From e3253a4f3bef61001890453335de2fe67bfebcf2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 31 Jul 2018 17:01:19 -0700 Subject: [PATCH 05/11] htlcswitch/switch: add log/fwd event tickers to Config --- htlcswitch/switch.go | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index d1473ec7..e2ee14b0 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -21,6 +21,17 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" +) + +const ( + // DefaultFwdEventInterval is the duration between attempts to flush + // pending forwarding events to disk. + DefaultFwdEventInterval = 15 * time.Second + + // DefaultLogInterval is the duration between attempts to log statistics + // about forwarding events. + DefaultLogInterval = 10 * time.Second ) var ( @@ -146,6 +157,14 @@ type Config struct { // Notifier is an instance of a chain notifier that we'll use to signal // the switch when a new block has arrived. Notifier chainntnfs.ChainNotifier + + // FwdEventTicker is a signal that instructs the htlcswitch to flush any + // pending forwarding events. + FwdEventTicker ticker.Ticker + + // LogEventTicker is a signal instructing the htlcswitch to log + // aggregate stats about it's forwarding during the last interval. + LogEventTicker ticker.Ticker } // Switch is the central messaging bus for all incoming/outgoing HTLCs. @@ -1390,13 +1409,13 @@ func (s *Switch) htlcForwarder() { totalSatSent btcutil.Amount totalSatRecv btcutil.Amount ) - logTicker := time.NewTicker(10 * time.Second) - defer logTicker.Stop() + s.cfg.LogEventTicker.Resume() + defer s.cfg.LogEventTicker.Stop() // Every 15 seconds, we'll flush out the forwarding events that // occurred during that period. - fwdEventTicker := time.NewTicker(15 * time.Second) - defer fwdEventTicker.Stop() + s.cfg.FwdEventTicker.Resume() + defer s.cfg.FwdEventTicker.Stop() out: for { @@ -1474,7 +1493,7 @@ out: // When this time ticks, then it indicates that we should // collect all the forwarding events since the last internal, // and write them out to our log. - case <-fwdEventTicker.C: + case <-s.cfg.FwdEventTicker.Ticks(): s.wg.Add(1) go func() { defer s.wg.Done() @@ -1488,7 +1507,7 @@ out: // The log ticker has fired, so we'll calculate some forwarding // stats for the last 10 seconds to display within the logs to // users. - case <-logTicker.C: + case <-s.cfg.LogEventTicker.Ticks(): // First, we'll collate the current running tally of // our forwarding stats. prevSatSent := totalSatSent From 2651136806bc2c3b8dd70228a1989414597dff32 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Aug 2018 14:19:04 -0700 Subject: [PATCH 06/11] htlcswitch/link_test: replace mockTicker with ticker.Mock --- htlcswitch/link_test.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index bd4dc597..95efe152 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) const ( @@ -1474,8 +1475,9 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( return nil, nil, nil, nil, nil, nil, err } - t := make(chan time.Time) - ticker := &mockTicker{t} + // Instantiate with a long interval, so that we can precisely control + // the firing via force feeding. + bticker := ticker.MockNew(time.Hour) aliceCfg := ChannelLinkConfig{ FwrdingPolicy: globalPolicy, Peer: alicePeer, @@ -1497,8 +1499,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( }, Registry: invoiceRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: ticker, - FwdPkgGCTicker: NewBatchTicker(5 * time.Second), + BatchTicker: bticker, + FwdPkgGCTicker: ticker.MockNew(5 * time.Second), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -1528,7 +1530,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) ( defer bobChannel.Stop() } - return aliceLink, bobChannel, t, start, cleanUp, restore, nil + return aliceLink, bobChannel, bticker.Force, start, cleanUp, restore, nil } func assertLinkBandwidth(t *testing.T, link ChannelLink, @@ -3865,8 +3867,9 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, } } - t := make(chan time.Time) - ticker := &mockTicker{t} + // Instantiate with a long interval, so that we can precisely control + // the firing via force feeding. + bticker := ticker.MockNew(time.Hour) aliceCfg := ChannelLinkConfig{ FwrdingPolicy: globalPolicy, Peer: alicePeer, @@ -3888,8 +3891,8 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, }, Registry: invoiceRegistry, ChainEvents: &contractcourt.ChainEventSubscription{}, - BatchTicker: ticker, - FwdPkgGCTicker: NewBatchTicker(5 * time.Second), + BatchTicker: bticker, + FwdPkgGCTicker: ticker.New(5 * time.Second), // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // to not trigger commit updates automatically during tests. BatchSize: 10000, @@ -3920,7 +3923,7 @@ func restartLink(aliceChannel *lnwallet.LightningChannel, aliceSwitch *Switch, defer aliceLink.Stop() } - return aliceLink, t, cleanUp, nil + return aliceLink, bticker.Force, cleanUp, nil } // gnerateHtlc generates a simple payment from Bob to Alice. From 6a709526d8999817ee2ea963b0bc5937e9a89490 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 2 Aug 2018 02:14:55 -0700 Subject: [PATCH 07/11] htlcswitch/mock: remove mockTicker, use ticker.MockNew --- htlcswitch/mock.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 9d3a2938..20472b6a 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -26,6 +26,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) type mockPreimageCache struct { @@ -143,7 +144,9 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) { return nil, nil }, - Notifier: &mockNotifier{}, + Notifier: &mockNotifier{}, + FwdEventTicker: ticker.MockNew(DefaultFwdEventInterval), + LogEventTicker: ticker.MockNew(DefaultLogInterval), } return New(cfg, startingHeight) @@ -807,14 +810,3 @@ func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, Spend: make(chan *chainntnfs.SpendDetail), }, nil } - -type mockTicker struct { - ticker <-chan time.Time -} - -func (m *mockTicker) Start() <-chan time.Time { - return m.ticker -} - -func (m *mockTicker) Stop() { -} From c90ec1959541128d21ca2f223128903477870a7e Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Aug 2018 14:19:29 -0700 Subject: [PATCH 08/11] htlcswitch/test_utils: repalce mockTicker with new MockTicker --- htlcswitch/test_utils.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 46bf718f..7500be1a 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -918,8 +919,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, @@ -961,8 +962,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, @@ -1004,8 +1005,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, @@ -1047,8 +1048,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, ChainEvents: &contractcourt.ChainEventSubscription{}, SyncStates: true, BatchSize: 10, - BatchTicker: &mockTicker{time.NewTicker(batchTimeout).C}, - FwdPkgGCTicker: &mockTicker{time.NewTicker(fwdPkgTimeout).C}, + BatchTicker: ticker.MockNew(batchTimeout), + FwdPkgGCTicker: ticker.MockNew(fwdPkgTimeout), MinFeeUpdateTimeout: minFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, From 2762546eebef46b2b938e8cb54dadc6f85429730 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Aug 2018 14:48:32 -0700 Subject: [PATCH 09/11] peer: replace NewBatchTicker with ticker.New --- peer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/peer.go b/peer.go index ffc4cd28..646382c3 100644 --- a/peer.go +++ b/peer.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -417,7 +418,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { ) if err != nil { lnChan.Stop() - return err + return fmt.Errorf("unable to add link %v to switch: %v", + chanPoint, err) } p.activeChanMtx.Lock() @@ -545,8 +547,8 @@ func (p *peer) addLink(chanPoint *wire.OutPoint, }, OnChannelFailure: onChannelFailure, SyncStates: syncStates, - BatchTicker: htlcswitch.NewBatchTicker(50 * time.Millisecond), - FwdPkgGCTicker: htlcswitch.NewBatchTicker(time.Minute), + BatchTicker: ticker.New(50 * time.Millisecond), + FwdPkgGCTicker: ticker.New(time.Minute), BatchSize: 10, UnsafeReplay: cfg.UnsafeReplay, MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, @@ -576,7 +578,7 @@ func (p *peer) Disconnect(reason error) { return } - peerLog.Tracef("Disconnecting %s, reason: %v", p, reason) + peerLog.Debugf("Disconnecting %s, reason: %v", p, reason) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() From d5d01dd575bd231a5e5b4bdb4a789fab3847337a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 31 Jul 2018 21:01:19 -0700 Subject: [PATCH 10/11] server: configure switch w/ default log/fwd tickers --- server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server.go b/server.go index 233fe2d5..3c0f925d 100644 --- a/server.go +++ b/server.go @@ -37,6 +37,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/nat" "github.com/lightningnetwork/lnd/routing" + "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/tor" ) @@ -322,6 +323,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, ExtractErrorEncrypter: s.sphinx.ExtractErrorEncrypter, FetchLastChannelUpdate: fetchLastChanUpdate(s, serializedPubKey), Notifier: s.cc.chainNotifier, + FwdEventTicker: ticker.New( + htlcswitch.DefaultFwdEventInterval), + LogEventTicker: ticker.New( + htlcswitch.DefaultLogInterval), }, uint32(currentHeight)) if err != nil { return nil, err From 8cc46812019abeb67619de8845b9f3225755df81 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 31 Jul 2018 21:01:45 -0700 Subject: [PATCH 11/11] test_utils: init switch with default fwd/log tickers --- test_utils.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test_utils.go b/test_utils.go index 0de7f0f8..34aef8b9 100644 --- a/test_utils.go +++ b/test_utils.go @@ -22,6 +22,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" + "github.com/lightningnetwork/lnd/ticker" ) var ( @@ -351,6 +352,10 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, DB: dbAlice, SwitchPackager: channeldb.NewSwitchPackager(), Notifier: notifier, + FwdEventTicker: ticker.New( + htlcswitch.DefaultFwdEventInterval), + LogEventTicker: ticker.New( + htlcswitch.DefaultLogInterval), }, uint32(currentHeight)) if err != nil { return nil, nil, nil, nil, err