diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index 6338887f..4cfff3d9 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -84,11 +84,17 @@ type peerLog struct { channels map[wire.OutPoint]*channelInfo } -// newPeerLog creates a log for a peer. -func newPeerLog(clock clock.Clock) *peerLog { +// newPeerLog creates a log for a peer, taking its historical flap count and +// last flap time as parameters. These values may be zero/nil if we have no +// record of historical flap count for the peer. +func newPeerLog(clock clock.Clock, flapCount int, + lastFlap *time.Time) *peerLog { + return &peerLog{ - clock: clock, - channels: make(map[wire.OutPoint]*channelInfo), + clock: clock, + flapCount: flapCount, + lastFlap: lastFlap, + channels: make(map[wire.OutPoint]*channelInfo), } } diff --git a/chanfitness/chanevent_test.go b/chanfitness/chanevent_test.go index 03eec515..2ccbb726 100644 --- a/chanfitness/chanevent_test.go +++ b/chanfitness/chanevent_test.go @@ -12,7 +12,7 @@ import ( // TestPeerLog tests the functionality of the peer log struct. func TestPeerLog(t *testing.T) { clock := clock.NewTestClock(testNow) - peerLog := newPeerLog(clock) + peerLog := newPeerLog(clock, 0, nil) // assertFlapCount is a helper that asserts that our peer's flap count // and timestamp is set to expected values. @@ -135,7 +135,7 @@ func TestRateLimitAdd(t *testing.T) { mockedClock := clock.NewTestClock(testNow) // Create a new peer log. - peerLog := newPeerLog(mockedClock) + peerLog := newPeerLog(mockedClock, 0, nil) require.Nil(t, peerLog.stagedEvent) // Create a channel for our peer log, otherwise it will not track online diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index fc05bd21..1f84db2f 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -22,6 +22,13 @@ import ( "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" + "github.com/lightningnetwork/lnd/ticker" +) + +const ( + // FlapCountFlushRate determines how often we write peer total flap + // count to disk. + FlapCountFlushRate = time.Hour ) var ( @@ -74,8 +81,22 @@ type Config struct { // Clock is the time source that the subsystem uses, provided here // for ease of testing. Clock clock.Clock + + // WriteFlapCounts records the flap count for a set of peers on disk. + WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error + + // ReadFlapCount gets the flap count for a peer on disk. + ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error) + + // FlapCountTicker is a ticker which controls how often we flush our + // peer's flap count to disk. + FlapCountTicker ticker.Ticker } +// peerFlapCountMap is the map used to map peers to flap counts, declared here +// to allow shorter function signatures. +type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount + type channelInfoRequest struct { peer route.Vertex channelPoint wire.OutPoint @@ -167,6 +188,8 @@ func (c *ChannelEventStore) Start() error { func (c *ChannelEventStore) Stop() { log.Info("Stopping event store") + c.cfg.FlapCountTicker.Stop() + // Stop the consume goroutine. close(c.quit) @@ -179,10 +202,10 @@ func (c *ChannelEventStore) Stop() { func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, peer route.Vertex) { - peerMonitor, ok := c.peers[peer] - if !ok { - peerMonitor = newPeerLog(c.cfg.Clock) - c.peers[peer] = peerMonitor + peerMonitor, err := c.getPeerMonitor(peer) + if err != nil { + log.Error("could not create monitor: %v", err) + return } if err := peerMonitor.addChannel(channelPoint); err != nil { @@ -190,6 +213,42 @@ func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, } } +// getPeerMonitor tries to get an existing peer monitor from our in memory list, +// and falls back to creating a new monitor if it is not currently known. +func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor, + error) { + + peerMonitor, ok := c.peers[peer] + if ok { + return peerMonitor, nil + } + + var ( + flapCount int + lastFlap *time.Time + ) + + historicalFlap, err := c.cfg.ReadFlapCount(peer) + switch err { + // If we do not have any records for this peer we set a 0 flap count + // and timestamp. + case channeldb.ErrNoPeerBucket: + + case nil: + flapCount = int(historicalFlap.Count) + lastFlap = &historicalFlap.LastFlap + + // Return if we get an unexpected error. + default: + return nil, err + } + + peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap) + c.peers[peer] = peerMonitor + + return peerMonitor, nil +} + // closeChannel records a closed time for a channel, and returns early is the // channel is not known to the event store. We log warnings (rather than errors) // when we cannot find a peer/channel because channels that we restore from a @@ -214,12 +273,10 @@ func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint, // peerEvent creates a peer monitor for a peer if we do not currently have // one, and adds an online event to it. func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) { - // If we are not currently tracking events for this peer, add a peer - // log for it. - peerMonitor, ok := c.peers[peer] - if !ok { - peerMonitor = newPeerLog(c.cfg.Clock) - c.peers[peer] = peerMonitor + peerMonitor, err := c.getPeerMonitor(peer) + if err != nil { + log.Error("could not create monitor: %v", err) + return } peerMonitor.onlineEvent(online) @@ -236,8 +293,22 @@ type subscriptions struct { // the event store with channel and peer events, and serves requests for channel // uptime and lifespan. func (c *ChannelEventStore) consume(subscriptions *subscriptions) { - defer c.wg.Done() - defer subscriptions.cancel() + // Start our flap count ticker. + c.cfg.FlapCountTicker.Resume() + + // On exit, we will cancel our subscriptions and write our most recent + // flap counts to disk. This ensures that we have consistent data in + // the case of a graceful shutdown. If we do not shutdown gracefully, + // our worst case is data from our last flap count tick (1H). + defer func() { + subscriptions.cancel() + + if err := c.recordFlapCount(); err != nil { + log.Errorf("error recording flap on shutdown: %v", err) + } + + c.wg.Done() + }() // Consume events until the channel is closed. for { @@ -302,6 +373,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { resp.info, resp.err = c.getChanInfo(req) req.responseChan <- resp + case <-c.cfg.FlapCountTicker.Ticks(): + if err := c.recordFlapCount(); err != nil { + log.Errorf("could not record flap "+ + "count: %v", err) + } + // Exit if the store receives the signal to shutdown. case <-c.quit: return @@ -371,3 +448,25 @@ func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo, Uptime: uptime, }, nil } + +// recordFlapCount will record our flap count for each peer that we are +// currently tracking, skipping peers that have a 0 flap count. +func (c *ChannelEventStore) recordFlapCount() error { + updates := make(peerFlapCountMap) + + for peer, monitor := range c.peers { + flapCount, lastFlap := monitor.getFlapCount() + if lastFlap == nil { + continue + } + + updates[peer] = &channeldb.FlapCount{ + Count: uint32(flapCount), + LastFlap: *lastFlap, + } + } + + log.Debugf("recording flap count for: %v peers", len(updates)) + + return c.cfg.WriteFlapCount(updates) +} diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 771f3bb0..67e8c35b 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -179,6 +179,51 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx), require.Equal(t, expectedChannels, monitor.channelCount()) } +// TestStoreFlapCount tests flushing of flap counts to disk on timer ticks and +// on store shutdown. +func TestStoreFlapCount(t *testing.T) { + testCtx := newChanEventStoreTestCtx(t) + testCtx.start() + + pubkey, _, _ := testCtx.createChannel() + testCtx.peerEvent(pubkey, false) + + // Now, we tick our flap count ticker. We expect our main goroutine to + // flush our tick count to disk. + testCtx.tickFlapCount() + + // Since we just tracked a offline event, we expect a single flap for + // our peer. + expectedUpdate := peerFlapCountMap{ + pubkey: { + Count: 1, + LastFlap: testCtx.clock.Now(), + }, + } + + testCtx.assertFlapCountUpdated() + testCtx.assertFlapCountUpdates(expectedUpdate) + + // Create three events for out peer, online/offline/online. + testCtx.peerEvent(pubkey, true) + testCtx.peerEvent(pubkey, false) + testCtx.peerEvent(pubkey, true) + + // Trigger another write. + testCtx.tickFlapCount() + + // Since we have processed 3 more events for our peer, we update our + // expected online map to have a flap count of 4 for this peer. + expectedUpdate[pubkey] = &channeldb.FlapCount{ + Count: 4, + LastFlap: testCtx.clock.Now(), + } + testCtx.assertFlapCountUpdated() + testCtx.assertFlapCountUpdates(expectedUpdate) + + testCtx.stop() +} + // TestGetChanInfo tests the GetChanInfo function for the cases where a channel // is known and unknown to the store. func TestGetChanInfo(t *testing.T) { diff --git a/chanfitness/chaneventstore_testctx_test.go b/chanfitness/chaneventstore_testctx_test.go index 6a6b8fd7..8b498310 100644 --- a/chanfitness/chaneventstore_testctx_test.go +++ b/chanfitness/chaneventstore_testctx_test.go @@ -14,6 +14,7 @@ import ( "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" + "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" ) @@ -38,6 +39,17 @@ type chanEventStoreTestCtx struct { // clock is the clock that our test store will use. clock *clock.TestClock + + // flapUpdates stores our most recent set of updates flap counts. + flapUpdates peerFlapCountMap + + // flapCountUpdates is a channel which receives new flap counts. + flapCountUpdates chan peerFlapCountMap + + // stopped is closed when our test context is fully shutdown. It is + // used to prevent calling of functions which can only be called after + // shutdown. + stopped chan struct{} } // newChanEventStoreTestCtx creates a test context which can be used to test @@ -48,6 +60,9 @@ func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx { channelSubscription: newMockSubscription(t), peerSubscription: newMockSubscription(t), clock: clock.NewTestClock(testNow), + flapUpdates: make(peerFlapCountMap), + flapCountUpdates: make(chan peerFlapCountMap), + stopped: make(chan struct{}), } cfg := &Config{ @@ -61,6 +76,28 @@ func newChanEventStoreTestCtx(t *testing.T) *chanEventStoreTestCtx { GetOpenChannels: func() ([]*channeldb.OpenChannel, error) { return nil, nil }, + WriteFlapCount: func(updates map[route.Vertex]*channeldb.FlapCount) error { + // Send our whole update map into the test context's + // updates channel. The test will need to assert flap + // count updated or this send will timeout. + select { + case testCtx.flapCountUpdates <- updates: + + case <-time.After(timeout): + t.Fatalf("WriteFlapCount timeout") + } + + return nil + }, + ReadFlapCount: func(peer route.Vertex) (*channeldb.FlapCount, error) { + count, ok := testCtx.flapUpdates[peer] + if !ok { + return nil, channeldb.ErrNoPeerBucket + } + + return count, nil + }, + FlapCountTicker: ticker.NewForce(FlapCountFlushRate), } testCtx.store = NewChannelEventStore(cfg) @@ -75,7 +112,25 @@ func (c *chanEventStoreTestCtx) start() { // stop stops the channel event store's subscribe servers and the store itself. func (c *chanEventStoreTestCtx) stop() { - c.store.Stop() + // On shutdown of our event store, we write flap counts to disk. In our + // test context, this write function is blocked on asserting that the + // update has occurred. We stop our store in a goroutine so that we + // can shut it down and assert that it performs these on-shutdown + // updates. The stopped channel is used to ensure that we do not finish + // our test before this shutdown has completed. + go func() { + c.store.Stop() + close(c.stopped) + }() + + // We write our flap count to disk on shutdown, assert that the most + // recent record that the server has is written on shutdown. Calling + // this assert unblocks the stop function above. We don't check values + // here, so that our tests don't all require providing an expected swap + // count, but at least assert that the write occurred. + c.assertFlapCountUpdated() + + <-c.stopped // Make sure that the cancel function was called for both of our // subscription mocks. @@ -137,6 +192,18 @@ func (c *chanEventStoreTestCtx) closeChannel(channel wire.OutPoint, c.channelSubscription.sendUpdate(update) } +// tickFlapCount forces a tick for our flap count ticker with the current time. +func (c *chanEventStoreTestCtx) tickFlapCount() { + testTicker := c.store.cfg.FlapCountTicker.(*ticker.Force) + + select { + case testTicker.Force <- c.store.cfg.Clock.Now(): + + case <-time.After(timeout): + c.t.Fatalf("could not tick flap count ticker") + } +} + // peerEvent sends a peer online or offline event to the store for the peer // provided. func (c *chanEventStoreTestCtx) peerEvent(peer route.Vertex, online bool) { @@ -165,6 +232,24 @@ func (c *chanEventStoreTestCtx) sendChannelOpenedUpdate(pubkey *btcec.PublicKey, c.channelSubscription.sendUpdate(update) } +// assertFlapCountUpdated asserts that our store has made an attempt to write +// our current set of flap counts to disk and sets this value in our test ctx. +// Note that it does not check the values of the update. +func (c *chanEventStoreTestCtx) assertFlapCountUpdated() { + select { + case c.flapUpdates = <-c.flapCountUpdates: + + case <-time.After(timeout): + c.t.Fatalf("assertFlapCountUpdated timeout") + } +} + +// assertFlapCountUpdates asserts that out current record of flap counts is +// as expected. +func (c *chanEventStoreTestCtx) assertFlapCountUpdates(expected peerFlapCountMap) { + require.Equal(c.t, expected, c.flapUpdates) +} + // mockSubscription is a mock subscription client that blocks on sends into the // updates channel. We use this mock rather than an actual subscribe client // because they do not block, which makes tests race (because we have no way diff --git a/channeldb/db.go b/channeldb/db.go index a4c5c516..983b4fbb 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -164,6 +164,12 @@ var ( number: 17, migration: mig.CreateTLB(closeSummaryBucket), }, + { + // Create a top level bucket which holds information + // about our peers. + number: 18, + migration: mig.CreateTLB(peersBucket), + }, } // Big endian is the preferred byte order, due to cursor scans over @@ -278,6 +284,7 @@ var topLevelBuckets = [][]byte{ invoiceBucket, payAddrIndexBucket, paymentsIndexBucket, + peersBucket, nodeInfoBucket, nodeBucket, edgeBucket, diff --git a/channeldb/peers.go b/channeldb/peers.go new file mode 100644 index 00000000..fabb5361 --- /dev/null +++ b/channeldb/peers.go @@ -0,0 +1,121 @@ +package channeldb + +import ( + "bytes" + "errors" + "fmt" + "time" + + "github.com/btcsuite/btcwallet/walletdb" + "github.com/lightningnetwork/lnd/routing/route" +) + +var ( + // peersBucket is the name of a top level bucket in which we store + // information about our peers. Information for different peers is + // stored in buckets keyed by their public key. + // + // + // peers-bucket + // | + // |-- + // | |--flap-count-key: + // | + // |-- + // | |--flap-count-key: + peersBucket = []byte("peers-bucket") + + // flapCountKey is a key used in the peer pubkey sub-bucket that stores + // the timestamp of a peer's last flap count and its all time flap + // count. + flapCountKey = []byte("flap-count") +) + +var ( + // ErrNoPeerBucket is returned when we try to read entries for a peer + // that is not tracked. + ErrNoPeerBucket = errors.New("peer bucket not found") +) + +// FlapCount contains information about a peer's flap count. +type FlapCount struct { + // Count provides the total flap count for a peer. + Count uint32 + + // LastFlap is the timestamp of the last flap recorded for a peer. + LastFlap time.Time +} + +// WriteFlapCounts writes the flap count for a set of peers to disk, creating a +// bucket for the peer's pubkey if necessary. Note that this function overwrites +// the current value. +func (d *DB) WriteFlapCounts(flapCounts map[route.Vertex]*FlapCount) error { + return d.Update(func(tx walletdb.ReadWriteTx) error { + // Run through our set of flap counts and record them for + // each peer, creating a bucket for the peer pubkey if required. + for peer, flapCount := range flapCounts { + peers := tx.ReadWriteBucket(peersBucket) + + peerBucket, err := peers.CreateBucketIfNotExists( + peer[:], + ) + if err != nil { + return err + } + + var b bytes.Buffer + err = serializeTime(&b, flapCount.LastFlap) + if err != nil { + return err + } + + if err = WriteElement(&b, flapCount.Count); err != nil { + return err + } + + err = peerBucket.Put(flapCountKey, b.Bytes()) + if err != nil { + return err + } + } + + return nil + }) +} + +// ReadFlapCount attempts to read the flap count for a peer, failing if the +// peer is not found or we do not have flap count stored. +func (d *DB) ReadFlapCount(pubkey route.Vertex) (*FlapCount, error) { + var flapCount FlapCount + + if err := d.View(func(tx walletdb.ReadTx) error { + peers := tx.ReadBucket(peersBucket) + + peerBucket := peers.NestedReadBucket(pubkey[:]) + if peerBucket == nil { + return ErrNoPeerBucket + } + + flapBytes := peerBucket.Get(flapCountKey) + if flapBytes == nil { + return fmt.Errorf("flap count not recorded for: %v", + pubkey) + } + + var ( + err error + r = bytes.NewReader(flapBytes) + ) + + flapCount.LastFlap, err = deserializeTime(r) + if err != nil { + return err + } + + return ReadElements(r, &flapCount.Count) + }); err != nil { + return nil, err + } + + return &flapCount, nil +} diff --git a/channeldb/peers_test.go b/channeldb/peers_test.go new file mode 100644 index 00000000..b702c18d --- /dev/null +++ b/channeldb/peers_test.go @@ -0,0 +1,50 @@ +package channeldb + +import ( + "testing" + "time" + + "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" +) + +// TestFlapCount tests lookup and writing of flap count to disk. +func TestFlapCount(t *testing.T) { + db, cleanup, err := MakeTestDB() + require.NoError(t, err) + defer cleanup() + + // Try to read flap count for a peer that we have no records for. + _, err = db.ReadFlapCount(testPub) + require.Equal(t, ErrNoPeerBucket, err) + + var ( + testPub2 = route.Vertex{2, 2, 2} + peer1FlapCount = &FlapCount{ + Count: 20, + LastFlap: time.Unix(100, 23), + } + peer2FlapCount = &FlapCount{ + Count: 39, + LastFlap: time.Unix(200, 23), + } + ) + + peers := map[route.Vertex]*FlapCount{ + testPub: peer1FlapCount, + testPub2: peer2FlapCount, + } + + err = db.WriteFlapCounts(peers) + require.NoError(t, err) + + // Lookup flap count for our first pubkey. + count, err := db.ReadFlapCount(testPub) + require.NoError(t, err) + require.Equal(t, peer1FlapCount, count) + + // Lookup our flap count for the second peer. + count, err = db.ReadFlapCount(testPub2) + require.NoError(t, err) + require.Equal(t, peer2FlapCount, count) +} diff --git a/server.go b/server.go index d737a096..648dea10 100644 --- a/server.go +++ b/server.go @@ -1212,6 +1212,9 @@ func newServer(cfg *Config, listenAddrs []net.Addr, }, GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels, Clock: clock.NewDefaultClock(), + ReadFlapCount: s.remoteChanDB.ReadFlapCount, + WriteFlapCount: s.remoteChanDB.WriteFlapCounts, + FlapCountTicker: ticker.New(chanfitness.FlapCountFlushRate), }) if cfg.WtClient.Active {