From 70bca1f350c4bac858053c2fe40e106a4bfecd8c Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 8 Sep 2020 13:47:20 +0200 Subject: [PATCH] chanfitness: add flap count based rate limiting To prevent flapping peers from endlessly dos-ing us with online and offline events, we rate limit the number of events we will store per period using their flap rate to determine how often we will add their events to our in memory list of online events. Since we are tracking online events, we need to track the aggregate change over the rate limited period, otherwise we will lose track of a peer's current state. For example, if we store an online event, then do not store the subsequent offline event, we will believe that the peer is online when they actually aren't. To address this, we "stage" a single event which keeps track of all the events that occurred while we were rate limiting the peer. At the end of the rate limting period, we will store the last state for that peer, thereby ensureing that we maintain our record of their most recent state. --- chanfitness/chanevent.go | 67 ++++++++++++++++++++++++--- chanfitness/chanevent_test.go | 82 ++++++++++++++++++++++++++++++++++ chanfitness/rate_limit.go | 37 +++++++++++++++ chanfitness/rate_limit_test.go | 51 +++++++++++++++++++++ 4 files changed, 232 insertions(+), 5 deletions(-) create mode 100644 chanfitness/rate_limit.go create mode 100644 chanfitness/rate_limit_test.go diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index 30e856ee..6338887f 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -43,9 +43,31 @@ type peerLog struct { // online stores whether the peer is currently online. online bool - // onlineEvents is a log of timestamped events observed for the peer. + // onlineEvents is a log of timestamped events observed for the peer + // that we have committed to allocating memory to. onlineEvents []*event + // stagedEvent represents an event that is pending addition to the + // events list. It has not yet been added because we rate limit the + // frequency that we store events at. We need to store this value + // in the log (rather than just ignore events) so that we can flush the + // aggregate outcome to our event log once the rate limiting period has + // ended. + // + // Take the following example: + // - Peer online event recorded + // - Peer offline event, not recorded due to rate limit + // - No more events, we incorrectly believe our peer to be online + // Instead of skipping events, we stage the most recent event during the + // rate limited period so that we know what happened (on aggregate) + // while we were rate limiting events. + // + // Note that we currently only store offline/online events so we can + // use this field to track our online state. With the addition of other + // event types, we need to only stage online/offline events, or split + // them out. + stagedEvent *event + // flapCount is the number of times this peer has been observed as // going offline. flapCount int @@ -105,7 +127,8 @@ func (p *peerLog) onlineEvent(online bool) { p.addEvent(online, eventTime) } -// addEvent records an online or offline event in our event log. +// addEvent records an online or offline event in our event log. and increments +// the peer's flap count. func (p *peerLog) addEvent(online bool, time time.Time) { eventType := peerOnlineEvent if !online { @@ -117,7 +140,26 @@ func (p *peerLog) addEvent(online bool, time time.Time) { eventType: eventType, } - p.onlineEvents = append(p.onlineEvents, event) + // If we have no staged events, we can just stage this event and return. + if p.stagedEvent == nil { + p.stagedEvent = event + return + } + + // We get the amount of time we require between events according to + // peer flap count. + aggregation := getRateLimit(p.flapCount) + nextRecordTime := p.stagedEvent.timestamp.Add(aggregation) + flushEvent := nextRecordTime.Before(event.timestamp) + + // If enough time has passed since our last staged event, we add our + // event to our in-memory list. + if flushEvent { + p.onlineEvents = append(p.onlineEvents, p.stagedEvent) + } + + // Finally, we replace our staged event with the new event we received. + p.stagedEvent = event } // addChannel adds a channel to our log. If we have not tracked any online @@ -160,6 +202,7 @@ func (p *peerLog) removeChannel(channelPoint wire.OutPoint) error { // TODO(carla): this could be done on a per channel basis. if p.channelCount() == 0 { p.onlineEvents = nil + p.stagedEvent = nil } return nil @@ -197,6 +240,18 @@ func (p *peerLog) getFlapCount() (int, *time.Time) { return p.flapCount, p.lastFlap } +// listEvents returns all of the events that our event log has tracked, +// including events that are staged for addition to our set of events but have +// not yet been committed to (because we rate limit and store only the aggregate +// outcome over a period). +func (p *peerLog) listEvents() []*event { + if p.stagedEvent == nil { + return p.onlineEvents + } + + return append(p.onlineEvents, p.stagedEvent) +} + // onlinePeriod represents a period of time over which a peer was online. type onlinePeriod struct { start, end time.Time @@ -211,8 +266,10 @@ type onlinePeriod struct { // to be ordered by ascending timestamp, and can tolerate multiple consecutive // online or offline events. func (p *peerLog) getOnlinePeriods() []*onlinePeriod { + events := p.listEvents() + // Return early if there are no events, there are no online periods. - if len(p.onlineEvents) == 0 { + if len(events) == 0 { return nil } @@ -231,7 +288,7 @@ func (p *peerLog) getOnlinePeriods() []*onlinePeriod { // the online event and the present is not tracked. The type of the most // recent event is tracked using the offline bool so that we can add a // final online period if necessary. - for _, event := range p.onlineEvents { + for _, event := range events { switch event.eventType { case peerOnlineEvent: // If our previous event is nil, we just set it and diff --git a/chanfitness/chanevent_test.go b/chanfitness/chanevent_test.go index 7d75841d..03eec515 100644 --- a/chanfitness/chanevent_test.go +++ b/chanfitness/chanevent_test.go @@ -122,6 +122,88 @@ func TestPeerLog(t *testing.T) { require.Equal(t, 0, peerLog.channelCount()) require.Len(t, peerLog.onlineEvents, 0) assertFlapCount(3, &lastFlap) + + require.Len(t, peerLog.listEvents(), 0) + require.Nil(t, peerLog.stagedEvent) +} + +// TestRateLimitAdd tests the addition of events to the event log with rate +// limiting in place. +func TestRateLimitAdd(t *testing.T) { + // Create a mock clock specifically for this test so that we can + // progress time without affecting the other tests. + mockedClock := clock.NewTestClock(testNow) + + // Create a new peer log. + peerLog := newPeerLog(mockedClock) + require.Nil(t, peerLog.stagedEvent) + + // Create a channel for our peer log, otherwise it will not track online + // events. + require.NoError(t, peerLog.addChannel(wire.OutPoint{})) + + // First, we add an event to the event log. Since we have no previous + // events, we expect this event to staged immediately. + peerEvent := &event{ + timestamp: testNow, + eventType: peerOfflineEvent, + } + + peerLog.onlineEvent(false) + require.Equal(t, peerEvent, peerLog.stagedEvent) + + // We immediately add another event to our event log. We expect our + // staged event to be replaced with this new event, because insufficient + // time has passed since our last event. + peerEvent = &event{ + timestamp: testNow, + eventType: peerOnlineEvent, + } + + peerLog.onlineEvent(true) + require.Equal(t, peerEvent, peerLog.stagedEvent) + + // We get the amount of time that we need to pass before we record an + // event from our rate limiting tiers. We then progress our test clock + // to just after this point. + delta := getRateLimit(peerLog.flapCount) + newNow := testNow.Add(delta + 1) + mockedClock.SetTime(newNow) + + // Now, when we add an event, we expect our staged event to be added + // to our events list and for our new event to be staged. + newEvent := &event{ + timestamp: newNow, + eventType: peerOfflineEvent, + } + peerLog.onlineEvent(false) + + require.Equal(t, []*event{peerEvent}, peerLog.onlineEvents) + require.Equal(t, newEvent, peerLog.stagedEvent) + + // Now, we test the case where we add many events to our log. We expect + // our set of events to be untouched, but for our staged event to be + // updated. + nextEvent := &event{ + timestamp: newNow, + eventType: peerOnlineEvent, + } + + for i := 0; i < 5; i++ { + // We flip the kind of event for each type so that we can check + // that our staged event is definitely changing each time. + if i%2 == 0 { + nextEvent.eventType = peerOfflineEvent + } else { + nextEvent.eventType = peerOnlineEvent + } + + online := nextEvent.eventType == peerOnlineEvent + + peerLog.onlineEvent(online) + require.Equal(t, []*event{peerEvent}, peerLog.onlineEvents) + require.Equal(t, nextEvent, peerLog.stagedEvent) + } } // TestGetOnlinePeriod tests the getOnlinePeriod function. It tests the case diff --git a/chanfitness/rate_limit.go b/chanfitness/rate_limit.go new file mode 100644 index 00000000..3fa5a325 --- /dev/null +++ b/chanfitness/rate_limit.go @@ -0,0 +1,37 @@ +package chanfitness + +import "time" + +// rateLimitScale is the number of events we allow per rate limited tier. +// Increasing this value makes our rate limiting more lenient, decreasing it +// makes us less lenient. +const rateLimitScale = 200 + +// rateLimits is the set of rate limit tiers we apply to our peers based on +// their flap count. A peer can be placed in their tier by dividing their flap +// count by the rateLimitScale and returning the value at that index. +var rateLimits = []time.Duration{ + time.Second, + time.Second * 5, + time.Second * 30, + time.Minute, + time.Minute * 30, + time.Hour, +} + +// getRateLimit returns the value of the rate limited tier that we are on based +// on current flap count. If a peer's flap count exceeds the top tier, we just +// return our highest tier. +func getRateLimit(flapCount int) time.Duration { + // Figure out the tier we fall into based on our current flap count. + tier := flapCount / rateLimitScale + + // If we have more events than our number of tiers, we just use the + // last tier + tierLen := len(rateLimits) + if tier >= tierLen { + tier = tierLen - 1 + } + + return rateLimits[tier] +} diff --git a/chanfitness/rate_limit_test.go b/chanfitness/rate_limit_test.go new file mode 100644 index 00000000..0a9aa0e3 --- /dev/null +++ b/chanfitness/rate_limit_test.go @@ -0,0 +1,51 @@ +package chanfitness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestGetRateLimit tests getting of our rate limit using the current constants. +// It creates test cases that are relative to our constants so that they +// can be adjusted without breaking the unit test. +func TestGetRateLimit(t *testing.T) { + tests := []struct { + name string + flapCount int + rateLimit time.Duration + }{ + { + name: "zero flaps", + flapCount: 0, + rateLimit: rateLimits[0], + }, + { + name: "middle tier", + flapCount: rateLimitScale * (len(rateLimits) / 2), + rateLimit: rateLimits[len(rateLimits)/2], + }, + { + name: "last tier", + flapCount: rateLimitScale * (len(rateLimits) - 1), + rateLimit: rateLimits[len(rateLimits)-1], + }, + { + name: "beyond last tier", + flapCount: rateLimitScale * (len(rateLimits) * 2), + rateLimit: rateLimits[len(rateLimits)-1], + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + limit := getRateLimit(test.flapCount) + require.Equal(t, test.rateLimit, limit) + }) + } +}