From a550ca3d64b8af577b5b0c2e6c78618b0d822e93 Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 8 Sep 2020 13:47:21 +0200 Subject: [PATCH] multi: store peer flap rate on disk on best effort basis Since we will use peer flap rate to determine how we rate limit, we store this value on disk per peer per channel. This allows us to restart with memory of our peers past behaviour, so we don't give badly behaving peers have a fresh start on restart. Last flap timestamp is stored with our flap count so that we can degrade this all time flap count over time for peers that have not recently flapped. --- chanfitness/chanevent.go | 14 ++- chanfitness/chanevent_test.go | 4 +- chanfitness/chaneventstore.go | 123 +++++++++++++++++++-- chanfitness/chaneventstore_test.go | 45 ++++++++ chanfitness/chaneventstore_testctx_test.go | 87 ++++++++++++++- channeldb/db.go | 7 ++ channeldb/peers.go | 121 ++++++++++++++++++++ channeldb/peers_test.go | 50 +++++++++ server.go | 3 + 9 files changed, 435 insertions(+), 19 deletions(-) create mode 100644 channeldb/peers.go create mode 100644 channeldb/peers_test.go diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index 6338887f..4cfff3d9 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -84,11 +84,17 @@ type peerLog struct { channels map[wire.OutPoint]*channelInfo } -// newPeerLog creates a log for a peer. -func newPeerLog(clock clock.Clock) *peerLog { +// newPeerLog creates a log for a peer, taking its historical flap count and +// last flap time as parameters. These values may be zero/nil if we have no +// record of historical flap count for the peer. +func newPeerLog(clock clock.Clock, flapCount int, + lastFlap *time.Time) *peerLog { + return &peerLog{ - clock: clock, - channels: make(map[wire.OutPoint]*channelInfo), + clock: clock, + flapCount: flapCount, + lastFlap: lastFlap, + channels: make(map[wire.OutPoint]*channelInfo), } } diff --git a/chanfitness/chanevent_test.go b/chanfitness/chanevent_test.go index 03eec515..2ccbb726 100644 --- a/chanfitness/chanevent_test.go +++ b/chanfitness/chanevent_test.go @@ -12,7 +12,7 @@ import ( // TestPeerLog tests the functionality of the peer log struct. func TestPeerLog(t *testing.T) { clock := clock.NewTestClock(testNow) - peerLog := newPeerLog(clock) + peerLog := newPeerLog(clock, 0, nil) // assertFlapCount is a helper that asserts that our peer's flap count // and timestamp is set to expected values. @@ -135,7 +135,7 @@ func TestRateLimitAdd(t *testing.T) { mockedClock := clock.NewTestClock(testNow) // Create a new peer log. - peerLog := newPeerLog(mockedClock) + peerLog := newPeerLog(mockedClock, 0, nil) require.Nil(t, peerLog.stagedEvent) // Create a channel for our peer log, otherwise it will not track online diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index fc05bd21..1f84db2f 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -22,6 +22,13 @@ import ( "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" + "github.com/lightningnetwork/lnd/ticker" +) + +const ( + // FlapCountFlushRate determines how often we write peer total flap + // count to disk. + FlapCountFlushRate = time.Hour ) var ( @@ -74,8 +81,22 @@ type Config struct { // Clock is the time source that the subsystem uses, provided here // for ease of testing. Clock clock.Clock + + // WriteFlapCounts records the flap count for a set of peers on disk. + WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error + + // ReadFlapCount gets the flap count for a peer on disk. + ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error) + + // FlapCountTicker is a ticker which controls how often we flush our + // peer's flap count to disk. + FlapCountTicker ticker.Ticker } +// peerFlapCountMap is the map used to map peers to flap counts, declared here +// to allow shorter function signatures. +type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount + type channelInfoRequest struct { peer route.Vertex channelPoint wire.OutPoint @@ -167,6 +188,8 @@ func (c *ChannelEventStore) Start() error { func (c *ChannelEventStore) Stop() { log.Info("Stopping event store") + c.cfg.FlapCountTicker.Stop() + // Stop the consume goroutine. close(c.quit) @@ -179,10 +202,10 @@ func (c *ChannelEventStore) Stop() { func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, peer route.Vertex) { - peerMonitor, ok := c.peers[peer] - if !ok { - peerMonitor = newPeerLog(c.cfg.Clock) - c.peers[peer] = peerMonitor + peerMonitor, err := c.getPeerMonitor(peer) + if err != nil { + log.Error("could not create monitor: %v", err) + return } if err := peerMonitor.addChannel(channelPoint); err != nil { @@ -190,6 +213,42 @@ func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, } } +// getPeerMonitor tries to get an existing peer monitor from our in memory list, +// and falls back to creating a new monitor if it is not currently known. +func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor, + error) { + + peerMonitor, ok := c.peers[peer] + if ok { + return peerMonitor, nil + } + + var ( + flapCount int + lastFlap *time.Time + ) + + historicalFlap, err := c.cfg.ReadFlapCount(peer) + switch err { + // If we do not have any records for this peer we set a 0 flap count + // and timestamp. + case channeldb.ErrNoPeerBucket: + + case nil: + flapCount = int(historicalFlap.Count) + lastFlap = &historicalFlap.LastFlap + + // Return if we get an unexpected error. + default: + return nil, err + } + + peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap) + c.peers[peer] = peerMonitor + + return peerMonitor, nil +} + // closeChannel records a closed time for a channel, and returns early is the // channel is not known to the event store. We log warnings (rather than errors) // when we cannot find a peer/channel because channels that we restore from a @@ -214,12 +273,10 @@ func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint, // peerEvent creates a peer monitor for a peer if we do not currently have // one, and adds an online event to it. func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) { - // If we are not currently tracking events for this peer, add a peer - // log for it. - peerMonitor, ok := c.peers[peer] - if !ok { - peerMonitor = newPeerLog(c.cfg.Clock) - c.peers[peer] = peerMonitor + peerMonitor, err := c.getPeerMonitor(peer) + if err != nil { + log.Error("could not create monitor: %v", err) + return } peerMonitor.onlineEvent(online) @@ -236,8 +293,22 @@ type subscriptions struct { // the event store with channel and peer events, and serves requests for channel // uptime and lifespan. func (c *ChannelEventStore) consume(subscriptions *subscriptions) { - defer c.wg.Done() - defer subscriptions.cancel() + // Start our flap count ticker. + c.cfg.FlapCountTicker.Resume() + + // On exit, we will cancel our subscriptions and write our most recent + // flap counts to disk. This ensures that we have consistent data in + // the case of a graceful shutdown. If we do not shutdown gracefully, + // our worst case is data from our last flap count tick (1H). + defer func() { + subscriptions.cancel() + + if err := c.recordFlapCount(); err != nil { + log.Errorf("error recording flap on shutdown: %v", err) + } + + c.wg.Done() + }() // Consume events until the channel is closed. for { @@ -302,6 +373,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { resp.info, resp.err = c.getChanInfo(req) req.responseChan <- resp + case <-c.cfg.FlapCountTicker.Ticks(): + if err := c.recordFlapCount(); err != nil { + log.Errorf("could not record flap "+ + "count: %v", err) + } + // Exit if the store receives the signal to shutdown. case <-c.quit: return @@ -371,3 +448,25 @@ func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo, Uptime: uptime, }, nil } + +// recordFlapCount will record our flap count for each peer that we are +// currently tracking, skipping peers that have a 0 flap count. +func (c *ChannelEventStore) recordFlapCount() error { + updates := make(peerFlapCountMap) + + for peer, monitor := range c.peers { + flapCount, lastFlap := monitor.getFlapCount() + if lastFlap == nil { + continue + } + + updates[peer] = &channeldb.FlapCount{ + Count: uint32(flapCount), + LastFlap: *lastFlap, + } + } + + log.Debugf("recording flap count for: %v peers", len(updates)) + + return c.cfg.WriteFlapCount(updates) +} diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 771f3bb0..67e8c35b 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -179,6 +179,51 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx), require.Equal(t, expectedChannels, monitor.channelCount()) } +// TestStoreFlapCount tests flushing of flap counts to disk on timer ticks and +// on store shutdown. +func TestStoreFlapCount(t *testing.T) { + testCtx := newChanEventStoreTestCtx(t) + testCtx.start() + + pubkey, _, _ := testCtx.createChannel() + testCtx.peerEvent(pubkey, false) + + // Now, we tick our flap count ticker. We expect our main goroutine to + // flush our tick count to disk. + testCtx.tickFlapCount() + + // Since we just tracked a offline event, we expect a single flap for + // our peer. + expectedUpdate := peerFlapCountMap{ + pubkey: { + Count: 1, + LastFlap: testCtx.clock.Now(), + }, + } + + testCtx.assertFlapCountUpdated() + testCtx.assertFlapCountUpdates(expectedUpdate) + + // Create three events for out peer, online/offline/online. + testCtx.peerEvent(pubkey, true) + testCtx.peerEvent(pubkey, false) + testCtx.peerEvent(pubkey, true) + + // Trigger another write. + testCtx.tickFlapCount() + + // Since we have processed 3 more events for our peer, we update our + // expected online map to have a flap count of 4 for this peer. + expectedUpdate[pubkey] = &channeldb.FlapCount{ + Count: 4, + LastFlap: testCtx.clock.Now(), + } + testCtx.assertFlapCountUpdated() + testCtx.assertFlapCountUpdates(expectedUpdate) + + testCtx.stop() +} + // TestGetChanInfo tests the GetChanInfo function for the cases where a channel // is known and unknown to the store. func TestGetChanInfo(t *testing.T) { diff --git a/chanfitness/chaneventstore_testctx_test.go b/chanfitness/chaneventstore_testctx_test.go index 6a6b8fd7..8b498310 100644 --- a/chanfitness/chaneventstore_testctx_test.go +++ b/chanfitness/chaneventstore_testctx_test.go @@ -14,6 +14,7 @@ import ( "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" + "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" ) @@ -38,6 +39,17 @@ type chanEventStoreTestCtx struct { // clock is the clock that our test store will use. clock *clock.TestClock + + // flapUpdates stores our most recent set of updates flap counts. + flapUpdates peerFlapCountMap + + // flapCountUpdates is a channel which receives new flap counts. + flapCountUpdates chan peerFlapCountMap + + // stopped is closed when our test context is fully shutdown. It is + // used to prevent calling of functions which can only be called after + // shutdown. + stopped chan struct{} } // newChanEventStoreTestCtx creates a test context which can be used to test @@ -48,6 +60,9 @@ func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx { channelSubscription: newMockSubscription(t), peerSubscription: newMockSubscription(t), clock: clock.NewTestClock(testNow), + flapUpdates: make(peerFlapCountMap), + flapCountUpdates: make(chan peerFlapCountMap), + stopped: make(chan struct{}), } cfg := &Config{ @@ -61,6 +76,28 @@ func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx { GetOpenChannels: func() ([]*channeldb.OpenChannel, error) { return nil, nil }, + WriteFlapCount: func(updates map[route.Vertex]*channeldb.FlapCount) error { + // Send our whole update map into the test context's + // updates channel. The test will need to assert flap + // count updated or this send will timeout. + select { + case testCtx.flapCountUpdates <- updates: + + case <-time.After(timeout): + t.Fatalf("WriteFlapCount timeout") + } + + return nil + }, + ReadFlapCount: func(peer route.Vertex) (*channeldb.FlapCount, error) { + count, ok := testCtx.flapUpdates[peer] + if !ok { + return nil, channeldb.ErrNoPeerBucket + } + + return count, nil + }, + FlapCountTicker: ticker.NewForce(FlapCountFlushRate), } testCtx.store = NewChannelEventStore(cfg) @@ -75,7 +112,25 @@ func (c *chanEventStoreTestCtx) start() { // stop stops the channel event store's subscribe servers and the store itself. func (c *chanEventStoreTestCtx) stop() { - c.store.Stop() + // On shutdown of our event store, we write flap counts to disk. In our + // test context, this write function is blocked on asserting that the + // update has occurred. We stop our store in a goroutine so that we + // can shut it down and assert that it performs these on-shutdown + // updates. The stopped channel is used to ensure that we do not finish + // our test before this shutdown has completed. + go func() { + c.store.Stop() + close(c.stopped) + }() + + // We write our flap count to disk on shutdown, assert that the most + // recent record that the server has is written on shutdown. Calling + // this assert unblocks the stop function above. We don't check values + // here, so that our tests don't all require providing an expected swap + // count, but at least assert that the write occurred. + c.assertFlapCountUpdated() + + <-c.stopped // Make sure that the cancel function was called for both of our // subscription mocks. @@ -137,6 +192,18 @@ func (c *chanEventStoreTestCtx) closeChannel(channel wire.OutPoint, c.channelSubscription.sendUpdate(update) } +// tickFlapCount forces a tick for our flap count ticker with the current time. +func (c *chanEventStoreTestCtx) tickFlapCount() { + testTicker := c.store.cfg.FlapCountTicker.(*ticker.Force) + + select { + case testTicker.Force <- c.store.cfg.Clock.Now(): + + case <-time.After(timeout): + c.t.Fatalf("could not tick flap count ticker") + } +} + // peerEvent sends a peer online or offline event to the store for the peer // provided. func (c *chanEventStoreTestCtx) peerEvent(peer route.Vertex, online bool) { @@ -165,6 +232,24 @@ func (c *chanEventStoreTestCtx) sendChannelOpenedUpdate(pubkey *btcec.PublicKey, c.channelSubscription.sendUpdate(update) } +// assertFlapCountUpdated asserts that our store has made an attempt to write +// our current set of flap counts to disk and sets this value in our test ctx. +// Note that it does not check the values of the update. +func (c *chanEventStoreTestCtx) assertFlapCountUpdated() { + select { + case c.flapUpdates = <-c.flapCountUpdates: + + case <-time.After(timeout): + c.t.Fatalf("assertFlapCountUpdated timeout") + } +} + +// assertFlapCountUpdates asserts that out current record of flap counts is +// as expected. +func (c *chanEventStoreTestCtx) assertFlapCountUpdates(expected peerFlapCountMap) { + require.Equal(c.t, expected, c.flapUpdates) +} + // mockSubscription is a mock subscription client that blocks on sends into the // updates channel. We use this mock rather than an actual subscribe client // because they do not block, which makes tests race (because we have no way diff --git a/channeldb/db.go b/channeldb/db.go index a4c5c516..983b4fbb 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -164,6 +164,12 @@ var ( number: 17, migration: mig.CreateTLB(closeSummaryBucket), }, + { + // Create a top level bucket which holds information + // about our peers. + number: 18, + migration: mig.CreateTLB(peersBucket), + }, } // Big endian is the preferred byte order, due to cursor scans over @@ -278,6 +284,7 @@ var topLevelBuckets = [][]byte{ invoiceBucket, payAddrIndexBucket, paymentsIndexBucket, + peersBucket, nodeInfoBucket, nodeBucket, edgeBucket, diff --git a/channeldb/peers.go b/channeldb/peers.go new file mode 100644 index 00000000..fabb5361 --- /dev/null +++ b/channeldb/peers.go @@ -0,0 +1,121 @@ +package channeldb + +import ( + "bytes" + "errors" + "fmt" + "time" + + "github.com/btcsuite/btcwallet/walletdb" + "github.com/lightningnetwork/lnd/routing/route" +) + +var ( + // peersBucket is the name of a top level bucket in which we store + // information about our peers. Information for different peers is + // stored in buckets keyed by their public key. + // + // + // peers-bucket + // | + // |-- + // | |--flap-count-key: + // | + // |-- + // | |--flap-count-key: + peersBucket = []byte("peers-bucket") + + // flapCountKey is a key used in the peer pubkey sub-bucket that stores + // the timestamp of a peer's last flap count and its all time flap + // count. + flapCountKey = []byte("flap-count") +) + +var ( + // ErrNoPeerBucket is returned when we try to read entries for a peer + // that is not tracked. + ErrNoPeerBucket = errors.New("peer bucket not found") +) + +// FlapCount contains information about a peer's flap count. +type FlapCount struct { + // Count provides the total flap count for a peer. + Count uint32 + + // LastFlap is the timestamp of the last flap recorded for a peer. + LastFlap time.Time +} + +// WriteFlapCounts writes the flap count for a set of peers to disk, creating a +// bucket for the peer's pubkey if necessary. Note that this function overwrites +// the current value. +func (d *DB) WriteFlapCounts(flapCounts map[route.Vertex]*FlapCount) error { + return d.Update(func(tx walletdb.ReadWriteTx) error { + // Run through our set of flap counts and record them for + // each peer, creating a bucket for the peer pubkey if required. + for peer, flapCount := range flapCounts { + peers := tx.ReadWriteBucket(peersBucket) + + peerBucket, err := peers.CreateBucketIfNotExists( + peer[:], + ) + if err != nil { + return err + } + + var b bytes.Buffer + err = serializeTime(&b, flapCount.LastFlap) + if err != nil { + return err + } + + if err = WriteElement(&b, flapCount.Count); err != nil { + return err + } + + err = peerBucket.Put(flapCountKey, b.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// ReadFlapCount attempts to read the flap count for a peer, failing if the +// peer is not found or we do not have flap count stored. +func (d *DB) ReadFlapCount(pubkey route.Vertex) (*FlapCount, error) { + var flapCount FlapCount + + if err := d.View(func(tx walletdb.ReadTx) error { + peers := tx.ReadBucket(peersBucket) + + peerBucket := peers.NestedReadBucket(pubkey[:]) + if peerBucket == nil { + return ErrNoPeerBucket + } + + flapBytes := peerBucket.Get(flapCountKey) + if flapBytes == nil { + return fmt.Errorf("flap count not recorded for: %v", + pubkey) + } + + var ( + err error + r = bytes.NewReader(flapBytes) + ) + + flapCount.LastFlap, err = deserializeTime(r) + if err != nil { + return err + } + + return ReadElements(r, &flapCount.Count) + }); err != nil { + return nil, err + } + + return &flapCount, nil +} diff --git a/channeldb/peers_test.go b/channeldb/peers_test.go new file mode 100644 index 00000000..b702c18d --- /dev/null +++ b/channeldb/peers_test.go @@ -0,0 +1,50 @@ +package channeldb + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" +) + +// TestFlapCount tests lookup and writing of flap count to disk. +func TestFlapCount(t *testing.T) { + db, cleanup, err := MakeTestDB() + require.NoError(t, err) + defer cleanup() + + // Try to read flap count for a peer that we have no records for. + _, err = db.ReadFlapCount(testPub) + require.Equal(t, ErrNoPeerBucket, err) + + var ( + testPub2 = route.Vertex{2, 2, 2} + peer1FlapCount = &FlapCount{ + Count: 20, + LastFlap: time.Unix(100, 23), + } + peer2FlapCount = &FlapCount{ + Count: 39, + LastFlap: time.Unix(200, 23), + } + ) + + peers := map[route.Vertex]*FlapCount{ + testPub: peer1FlapCount, + testPub2: peer2FlapCount, + } + + err = db.WriteFlapCounts(peers) + require.NoError(t, err) + + // Lookup flap count for our first pubkey. + count, err := db.ReadFlapCount(testPub) + require.NoError(t, err) + require.Equal(t, peer1FlapCount, count) + + // Lookup our flap count for the second peer. + count, err = db.ReadFlapCount(testPub2) + require.NoError(t, err) + require.Equal(t, peer2FlapCount, count) +} diff --git a/server.go b/server.go index d737a096..648dea10 100644 --- a/server.go +++ b/server.go @@ -1212,6 +1212,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, }, GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels, Clock: clock.NewDefaultClock(), + ReadFlapCount: s.remoteChanDB.ReadFlapCount, + WriteFlapCount: s.remoteChanDB.WriteFlapCounts, + FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate), }) if cfg.WtClient.Active {