multi: store peer flap rate on disk on best effort basis

Since we will use peer flap rate to determine how we rate limit, we
store this value on disk per peer per channel. This allows us to
restart with memory of our peers past behaviour, so we don't give badly
behaving peers have a fresh start on restart. Last flap timestamp is
stored with our flap count so that we can degrade this all time flap
count over time for peers that have not recently flapped.
This commit is contained in:
carla 2020-09-08 13:47:21 +02:00
parent 70bca1f350
commit a550ca3d64
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
9 changed files with 435 additions and 19 deletions

@ -84,11 +84,17 @@ type peerLog struct {
channels map[wire.OutPoint]*channelInfo
}
// newPeerLog creates a log for a peer.
func newPeerLog(clock clock.Clock) *peerLog {
// newPeerLog creates a log for a peer, taking its historical flap count and
// last flap time as parameters. These values may be zero/nil if we have no
// record of historical flap count for the peer.
func newPeerLog(clock clock.Clock, flapCount int,
lastFlap *time.Time) *peerLog {
return &peerLog{
clock: clock,
channels: make(map[wire.OutPoint]*channelInfo),
clock: clock,
flapCount: flapCount,
lastFlap: lastFlap,
channels: make(map[wire.OutPoint]*channelInfo),
}
}

@ -12,7 +12,7 @@ import (
// TestPeerLog tests the functionality of the peer log struct.
func TestPeerLog(t *testing.T) {
clock := clock.NewTestClock(testNow)
peerLog := newPeerLog(clock)
peerLog := newPeerLog(clock, 0, nil)
// assertFlapCount is a helper that asserts that our peer's flap count
// and timestamp is set to expected values.
@ -135,7 +135,7 @@ func TestRateLimitAdd(t *testing.T) {
mockedClock := clock.NewTestClock(testNow)
// Create a new peer log.
peerLog := newPeerLog(mockedClock)
peerLog := newPeerLog(mockedClock, 0, nil)
require.Nil(t, peerLog.stagedEvent)
// Create a channel for our peer log, otherwise it will not track online

@ -22,6 +22,13 @@ import (
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/ticker"
)
const (
// FlapCountFlushRate determines how often we write peer total flap
// count to disk.
FlapCountFlushRate = time.Hour
)
var (
@ -74,8 +81,22 @@ type Config struct {
// Clock is the time source that the subsystem uses, provided here
// for ease of testing.
Clock clock.Clock
// WriteFlapCounts records the flap count for a set of peers on disk.
WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error
// ReadFlapCount gets the flap count for a peer on disk.
ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error)
// FlapCountTicker is a ticker which controls how often we flush our
// peer's flap count to disk.
FlapCountTicker ticker.Ticker
}
// peerFlapCountMap is the map used to map peers to flap counts, declared here
// to allow shorter function signatures.
type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount
type channelInfoRequest struct {
peer route.Vertex
channelPoint wire.OutPoint
@ -167,6 +188,8 @@ func (c *ChannelEventStore) Start() error {
func (c *ChannelEventStore) Stop() {
log.Info("Stopping event store")
c.cfg.FlapCountTicker.Stop()
// Stop the consume goroutine.
close(c.quit)
@ -179,10 +202,10 @@ func (c *ChannelEventStore) Stop() {
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
peer route.Vertex) {
peerMonitor, ok := c.peers[peer]
if !ok {
peerMonitor = newPeerLog(c.cfg.Clock)
c.peers[peer] = peerMonitor
peerMonitor, err := c.getPeerMonitor(peer)
if err != nil {
log.Error("could not create monitor: %v", err)
return
}
if err := peerMonitor.addChannel(channelPoint); err != nil {
@ -190,6 +213,42 @@ func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
}
}
// getPeerMonitor tries to get an existing peer monitor from our in memory list,
// and falls back to creating a new monitor if it is not currently known.
func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor,
error) {
peerMonitor, ok := c.peers[peer]
if ok {
return peerMonitor, nil
}
var (
flapCount int
lastFlap *time.Time
)
historicalFlap, err := c.cfg.ReadFlapCount(peer)
switch err {
// If we do not have any records for this peer we set a 0 flap count
// and timestamp.
case channeldb.ErrNoPeerBucket:
case nil:
flapCount = int(historicalFlap.Count)
lastFlap = &historicalFlap.LastFlap
// Return if we get an unexpected error.
default:
return nil, err
}
peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap)
c.peers[peer] = peerMonitor
return peerMonitor, nil
}
// closeChannel records a closed time for a channel, and returns early is the
// channel is not known to the event store. We log warnings (rather than errors)
// when we cannot find a peer/channel because channels that we restore from a
@ -214,12 +273,10 @@ func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
// peerEvent creates a peer monitor for a peer if we do not currently have
// one, and adds an online event to it.
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
// If we are not currently tracking events for this peer, add a peer
// log for it.
peerMonitor, ok := c.peers[peer]
if !ok {
peerMonitor = newPeerLog(c.cfg.Clock)
c.peers[peer] = peerMonitor
peerMonitor, err := c.getPeerMonitor(peer)
if err != nil {
log.Error("could not create monitor: %v", err)
return
}
peerMonitor.onlineEvent(online)
@ -236,8 +293,22 @@ type subscriptions struct {
// the event store with channel and peer events, and serves requests for channel
// uptime and lifespan.
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
defer c.wg.Done()
defer subscriptions.cancel()
// Start our flap count ticker.
c.cfg.FlapCountTicker.Resume()
// On exit, we will cancel our subscriptions and write our most recent
// flap counts to disk. This ensures that we have consistent data in
// the case of a graceful shutdown. If we do not shutdown gracefully,
// our worst case is data from our last flap count tick (1H).
defer func() {
subscriptions.cancel()
if err := c.recordFlapCount(); err != nil {
log.Errorf("error recording flap on shutdown: %v", err)
}
c.wg.Done()
}()
// Consume events until the channel is closed.
for {
@ -302,6 +373,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
resp.info, resp.err = c.getChanInfo(req)
req.responseChan <- resp
case <-c.cfg.FlapCountTicker.Ticks():
if err := c.recordFlapCount(); err != nil {
log.Errorf("could not record flap "+
"count: %v", err)
}
// Exit if the store receives the signal to shutdown.
case <-c.quit:
return
@ -371,3 +448,25 @@ func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
Uptime: uptime,
}, nil
}
// recordFlapCount will record our flap count for each peer that we are
// currently tracking, skipping peers that have a 0 flap count.
func (c *ChannelEventStore) recordFlapCount() error {
updates := make(peerFlapCountMap)
for peer, monitor := range c.peers {
flapCount, lastFlap := monitor.getFlapCount()
if lastFlap == nil {
continue
}
updates[peer] = &channeldb.FlapCount{
Count: uint32(flapCount),
LastFlap: *lastFlap,
}
}
log.Debugf("recording flap count for: %v peers", len(updates))
return c.cfg.WriteFlapCount(updates)
}

@ -179,6 +179,51 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx),
require.Equal(t, expectedChannels, monitor.channelCount())
}
// TestStoreFlapCount tests flushing of flap counts to disk on timer ticks and
// on store shutdown.
func TestStoreFlapCount(t *testing.T) {
testCtx := newChanEventStoreTestCtx(t)
testCtx.start()
pubkey, _, _ := testCtx.createChannel()
testCtx.peerEvent(pubkey, false)
// Now, we tick our flap count ticker. We expect our main goroutine to
// flush our tick count to disk.
testCtx.tickFlapCount()
// Since we just tracked a offline event, we expect a single flap for
// our peer.
expectedUpdate := peerFlapCountMap{
pubkey: {
Count: 1,
LastFlap: testCtx.clock.Now(),
},
}
testCtx.assertFlapCountUpdated()
testCtx.assertFlapCountUpdates(expectedUpdate)
// Create three events for out peer, online/offline/online.
testCtx.peerEvent(pubkey, true)
testCtx.peerEvent(pubkey, false)
testCtx.peerEvent(pubkey, true)
// Trigger another write.
testCtx.tickFlapCount()
// Since we have processed 3 more events for our peer, we update our
// expected online map to have a flap count of 4 for this peer.
expectedUpdate[pubkey] = &channeldb.FlapCount{
Count: 4,
LastFlap: testCtx.clock.Now(),
}
testCtx.assertFlapCountUpdated()
testCtx.assertFlapCountUpdates(expectedUpdate)
testCtx.stop()
}
// TestGetChanInfo tests the GetChanInfo function for the cases where a channel
// is known and unknown to the store.
func TestGetChanInfo(t *testing.T) {

@ -14,6 +14,7 @@ import (
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/subscribe"
"github.com/lightningnetwork/lnd/ticker"
"github.com/stretchr/testify/require"
)
@ -38,6 +39,17 @@ type chanEventStoreTestCtx struct {
// clock is the clock that our test store will use.
clock *clock.TestClock
// flapUpdates stores our most recent set of updates flap counts.
flapUpdates peerFlapCountMap
// flapCountUpdates is a channel which receives new flap counts.
flapCountUpdates chan peerFlapCountMap
// stopped is closed when our test context is fully shutdown. It is
// used to prevent calling of functions which can only be called after
// shutdown.
stopped chan struct{}
}
// newChanEventStoreTestCtx creates a test context which can be used to test
@ -48,6 +60,9 @@ func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx {
channelSubscription: newMockSubscription(t),
peerSubscription: newMockSubscription(t),
clock: clock.NewTestClock(testNow),
flapUpdates: make(peerFlapCountMap),
flapCountUpdates: make(chan peerFlapCountMap),
stopped: make(chan struct{}),
}
cfg := &Config{
@ -61,6 +76,28 @@ func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx {
GetOpenChannels: func() ([]*channeldb.OpenChannel, error) {
return nil, nil
},
WriteFlapCount: func(updates map[route.Vertex]*channeldb.FlapCount) error {
// Send our whole update map into the test context's
// updates channel. The test will need to assert flap
// count updated or this send will timeout.
select {
case testCtx.flapCountUpdates <- updates:
case <-time.After(timeout):
t.Fatalf("WriteFlapCount timeout")
}
return nil
},
ReadFlapCount: func(peer route.Vertex) (*channeldb.FlapCount, error) {
count, ok := testCtx.flapUpdates[peer]
if !ok {
return nil, channeldb.ErrNoPeerBucket
}
return count, nil
},
FlapCountTicker: ticker.NewForce(FlapCountFlushRate),
}
testCtx.store = NewChannelEventStore(cfg)
@ -75,7 +112,25 @@ func (c *chanEventStoreTestCtx) start() {
// stop stops the channel event store's subscribe servers and the store itself.
func (c *chanEventStoreTestCtx) stop() {
c.store.Stop()
// On shutdown of our event store, we write flap counts to disk. In our
// test context, this write function is blocked on asserting that the
// update has occurred. We stop our store in a goroutine so that we
// can shut it down and assert that it performs these on-shutdown
// updates. The stopped channel is used to ensure that we do not finish
// our test before this shutdown has completed.
go func() {
c.store.Stop()
close(c.stopped)
}()
// We write our flap count to disk on shutdown, assert that the most
// recent record that the server has is written on shutdown. Calling
// this assert unblocks the stop function above. We don't check values
// here, so that our tests don't all require providing an expected swap
// count, but at least assert that the write occurred.
c.assertFlapCountUpdated()
<-c.stopped
// Make sure that the cancel function was called for both of our
// subscription mocks.
@ -137,6 +192,18 @@ func (c *chanEventStoreTestCtx) closeChannel(channel wire.OutPoint,
c.channelSubscription.sendUpdate(update)
}
// tickFlapCount forces a tick for our flap count ticker with the current time.
func (c *chanEventStoreTestCtx) tickFlapCount() {
testTicker := c.store.cfg.FlapCountTicker.(*ticker.Force)
select {
case testTicker.Force <- c.store.cfg.Clock.Now():
case <-time.After(timeout):
c.t.Fatalf("could not tick flap count ticker")
}
}
// peerEvent sends a peer online or offline event to the store for the peer
// provided.
func (c *chanEventStoreTestCtx) peerEvent(peer route.Vertex, online bool) {
@ -165,6 +232,24 @@ func (c *chanEventStoreTestCtx) sendChannelOpenedUpdate(pubkey *btcec.PublicKey,
c.channelSubscription.sendUpdate(update)
}
// assertFlapCountUpdated asserts that our store has made an attempt to write
// our current set of flap counts to disk and sets this value in our test ctx.
// Note that it does not check the values of the update.
func (c *chanEventStoreTestCtx) assertFlapCountUpdated() {
select {
case c.flapUpdates = <-c.flapCountUpdates:
case <-time.After(timeout):
c.t.Fatalf("assertFlapCountUpdated timeout")
}
}
// assertFlapCountUpdates asserts that out current record of flap counts is
// as expected.
func (c *chanEventStoreTestCtx) assertFlapCountUpdates(expected peerFlapCountMap) {
require.Equal(c.t, expected, c.flapUpdates)
}
// mockSubscription is a mock subscription client that blocks on sends into the
// updates channel. We use this mock rather than an actual subscribe client
// because they do not block, which makes tests race (because we have no way

@ -164,6 +164,12 @@ var (
number: 17,
migration: mig.CreateTLB(closeSummaryBucket),
},
{
// Create a top level bucket which holds information
// about our peers.
number: 18,
migration: mig.CreateTLB(peersBucket),
},
}
// Big endian is the preferred byte order, due to cursor scans over
@ -278,6 +284,7 @@ var topLevelBuckets = [][]byte{
invoiceBucket,
payAddrIndexBucket,
paymentsIndexBucket,
peersBucket,
nodeInfoBucket,
nodeBucket,
edgeBucket,

121
channeldb/peers.go Normal file

@ -0,0 +1,121 @@
package channeldb
import (
"bytes"
"errors"
"fmt"
"time"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/routing/route"
)
var (
// peersBucket is the name of a top level bucket in which we store
// information about our peers. Information for different peers is
// stored in buckets keyed by their public key.
//
//
// peers-bucket
// |
// |-- <peer-pubkey>
// | |--flap-count-key: <ts><flap count>
// |
// |-- <peer-pubkey>
// | |--flap-count-key: <ts><flap count>
peersBucket = []byte("peers-bucket")
// flapCountKey is a key used in the peer pubkey sub-bucket that stores
// the timestamp of a peer's last flap count and its all time flap
// count.
flapCountKey = []byte("flap-count")
)
var (
// ErrNoPeerBucket is returned when we try to read entries for a peer
// that is not tracked.
ErrNoPeerBucket = errors.New("peer bucket not found")
)
// FlapCount contains information about a peer's flap count.
type FlapCount struct {
// Count provides the total flap count for a peer.
Count uint32
// LastFlap is the timestamp of the last flap recorded for a peer.
LastFlap time.Time
}
// WriteFlapCounts writes the flap count for a set of peers to disk, creating a
// bucket for the peer's pubkey if necessary. Note that this function overwrites
// the current value.
func (d *DB) WriteFlapCounts(flapCounts map[route.Vertex]*FlapCount) error {
return d.Update(func(tx walletdb.ReadWriteTx) error {
// Run through our set of flap counts and record them for
// each peer, creating a bucket for the peer pubkey if required.
for peer, flapCount := range flapCounts {
peers := tx.ReadWriteBucket(peersBucket)
peerBucket, err := peers.CreateBucketIfNotExists(
peer[:],
)
if err != nil {
return err
}
var b bytes.Buffer
err = serializeTime(&b, flapCount.LastFlap)
if err != nil {
return err
}
if err = WriteElement(&b, flapCount.Count); err != nil {
return err
}
err = peerBucket.Put(flapCountKey, b.Bytes())
if err != nil {
return err
}
}
return nil
})
}
// ReadFlapCount attempts to read the flap count for a peer, failing if the
// peer is not found or we do not have flap count stored.
func (d *DB) ReadFlapCount(pubkey route.Vertex) (*FlapCount, error) {
var flapCount FlapCount
if err := d.View(func(tx walletdb.ReadTx) error {
peers := tx.ReadBucket(peersBucket)
peerBucket := peers.NestedReadBucket(pubkey[:])
if peerBucket == nil {
return ErrNoPeerBucket
}
flapBytes := peerBucket.Get(flapCountKey)
if flapBytes == nil {
return fmt.Errorf("flap count not recorded for: %v",
pubkey)
}
var (
err error
r = bytes.NewReader(flapBytes)
)
flapCount.LastFlap, err = deserializeTime(r)
if err != nil {
return err
}
return ReadElements(r, &flapCount.Count)
}); err != nil {
return nil, err
}
return &flapCount, nil
}

50
channeldb/peers_test.go Normal file

@ -0,0 +1,50 @@
package channeldb
import (
"testing"
"time"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
)
// TestFlapCount tests lookup and writing of flap count to disk.
func TestFlapCount(t *testing.T) {
db, cleanup, err := MakeTestDB()
require.NoError(t, err)
defer cleanup()
// Try to read flap count for a peer that we have no records for.
_, err = db.ReadFlapCount(testPub)
require.Equal(t, ErrNoPeerBucket, err)
var (
testPub2 = route.Vertex{2, 2, 2}
peer1FlapCount = &FlapCount{
Count: 20,
LastFlap: time.Unix(100, 23),
}
peer2FlapCount = &FlapCount{
Count: 39,
LastFlap: time.Unix(200, 23),
}
)
peers := map[route.Vertex]*FlapCount{
testPub: peer1FlapCount,
testPub2: peer2FlapCount,
}
err = db.WriteFlapCounts(peers)
require.NoError(t, err)
// Lookup flap count for our first pubkey.
count, err := db.ReadFlapCount(testPub)
require.NoError(t, err)
require.Equal(t, peer1FlapCount, count)
// Lookup our flap count for the second peer.
count, err = db.ReadFlapCount(testPub2)
require.NoError(t, err)
require.Equal(t, peer2FlapCount, count)
}

@ -1212,6 +1212,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
},
GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels,
Clock: clock.NewDefaultClock(),
ReadFlapCount: s.remoteChanDB.ReadFlapCount,
WriteFlapCount: s.remoteChanDB.WriteFlapCounts,
FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate),
})
if cfg.WtClient.Active {