From 10f9ba952eee09d0f8a8a42c6267b9deae3207b9 Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 8 Sep 2020 13:47:17 +0200 Subject: [PATCH] chanfitness: unify requests to store in single chan info struct We currently query the store for uptime and lifespan individually. As we add more fields, we will need to add more queries with this design. This change combines requests into a single channel infor request so that we do not need to add unnecessary boilerplate going forward. --- chanfitness/chaneventstore.go | 154 +++++++----------- chanfitness/chaneventstore_test.go | 242 ++++++++--------------------- rpcserver.go | 36 ++--- 3 files changed, 133 insertions(+), 299 deletions(-) diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index 6ff73ccf..bd297fd2 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -46,11 +46,8 @@ type ChannelEventStore struct { // and offline events. peers map[route.Vertex]bool - // lifespanRequests serves requests for the lifespan of channels. - lifespanRequests chan lifespanRequest - - // uptimeRequests serves requests for the uptime of channels. - uptimeRequests chan uptimeRequest + // chanInfoRequests serves requests for information about our channel. + chanInfoRequests chan channelInfoRequest quit chan struct{} @@ -79,36 +76,14 @@ type Config struct { Clock clock.Clock } -// lifespanRequest contains the channel ID required to query the store for a -// channel's lifespan and a blocking response channel on which the result is -// sent. -type lifespanRequest struct { +type channelInfoRequest struct { channelPoint wire.OutPoint - responseChan chan lifespanResponse + responseChan chan channelInfoResponse } -// lifespanResponse contains the response to a lifespanRequest and an error if -// one occurred. -type lifespanResponse struct { - start time.Time - end time.Time - err error -} - -// uptimeRequest contains the parameters required to query the store for a -// channel's uptime and a blocking response channel on which the result is sent. -type uptimeRequest struct { - channelPoint wire.OutPoint - startTime time.Time - endTime time.Time - responseChan chan uptimeResponse -} - -// uptimeResponse contains the response to an uptimeRequest and an error if one -// occurred. -type uptimeResponse struct { - uptime time.Duration - err error +type channelInfoResponse struct { + info *ChannelInfo + err error } // NewChannelEventStore initializes an event store with the config provided. @@ -119,8 +94,7 @@ func NewChannelEventStore(config *Config) *ChannelEventStore { cfg: config, channels: make(map[wire.OutPoint]*chanEventLog), peers: make(map[route.Vertex]bool), - lifespanRequests: make(chan lifespanRequest), - uptimeRequests: make(chan uptimeRequest), + chanInfoRequests: make(chan channelInfoRequest), quit: make(chan struct{}), } @@ -314,35 +288,10 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { } // Serve all requests for channel lifetime. - case req := <-c.lifespanRequests: - var resp lifespanResponse - - channel, ok := c.channels[req.channelPoint] - if !ok { - resp.err = ErrChannelNotFound - } else { - resp.start = channel.openedAt - resp.end = channel.closedAt - } - - req.responseChan <- resp - - // Serve requests for channel uptime. - case req := <-c.uptimeRequests: - var resp uptimeResponse - - channel, ok := c.channels[req.channelPoint] - if !ok { - resp.err = ErrChannelNotFound - } else { - uptime, err := channel.uptime( - req.startTime, req.endTime, - ) - - resp.uptime = uptime - resp.err = err - } + case req := <-c.chanInfoRequests: + var resp channelInfoResponse + resp.info, resp.err = c.getChanInfo(req) req.responseChan <- resp // Exit if the store receives the signal to shutdown. @@ -352,65 +301,72 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { } } -// GetLifespan returns the opening and closing time observed for a channel and -// a boolean to indicate whether the channel is known the the event store. If -// the channel is still open, a zero close time is returned. -func (c *ChannelEventStore) GetLifespan( - channelPoint wire.OutPoint) (time.Time, time.Time, error) { +// ChannelInfo provides the set of information that the event store has recorded +// for a channel. +type ChannelInfo struct { + // Lifetime is the total amount of time we have monitored the channel + // for. + Lifetime time.Duration - request := lifespanRequest{ + // Uptime is the total amount of time that the channel peer has been + // observed as online during the monitored lifespan. + Uptime time.Duration +} + +// GetChanInfo gets all the information we have on a channel in the event store. +func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) ( + *ChannelInfo, error) { + + request := channelInfoRequest{ channelPoint: channelPoint, - responseChan: make(chan lifespanResponse), + responseChan: make(chan channelInfoResponse), } - // Send a request for the channel's lifespan to the main event loop, or - // return early with an error if the store has already received a + // Send a request for the channel's information to the main event loop, + // or return early with an error if the store has already received a // shutdown signal. select { - case c.lifespanRequests <- request: + case c.chanInfoRequests <- request: case <-c.quit: - return time.Time{}, time.Time{}, errShuttingDown + return nil, errShuttingDown } // Return the response we receive on the response channel or exit early // if the store is instructed to exit. select { case resp := <-request.responseChan: - return resp.start, resp.end, resp.err + return resp.info, resp.err case <-c.quit: - return time.Time{}, time.Time{}, errShuttingDown + return nil, errShuttingDown } } -// GetUptime returns the uptime of a channel over a period and an error if the -// channel cannot be found or the uptime calculation fails. -func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime, - endTime time.Time) (time.Duration, error) { +// getChanInfo collects channel information for a channel. It gets uptime over +// the full lifetime of the channel. +func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo, + error) { - request := uptimeRequest{ - channelPoint: channelPoint, - startTime: startTime, - endTime: endTime, - responseChan: make(chan uptimeResponse), + // Look for the channel in our current set. + channel, ok := c.channels[req.channelPoint] + if !ok { + return nil, ErrChannelNotFound } - // Send a request for the channel's uptime to the main event loop, or - // return early with an error if the store has already received a - // shutdown signal. - select { - case c.uptimeRequests <- request: - case <-c.quit: - return 0, errShuttingDown + // If our channel is not closed, we want to calculate uptime until the + // present. + endTime := channel.closedAt + if endTime.IsZero() { + endTime = c.cfg.Clock.Now() } - // Return the response we receive on the response channel or exit early - // if the store is instructed to exit. - select { - case resp := <-request.responseChan: - return resp.uptime, resp.err - - case <-c.quit: - return 0, errShuttingDown + uptime, err := channel.uptime(channel.openedAt, endTime) + if err != nil { + return nil, err } + + return &ChannelInfo{ + Lifetime: endTime.Sub(channel.openedAt), + Uptime: uptime, + }, nil } diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 206ec8ca..127756f7 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -186,181 +186,6 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx), require.Equal(t, expectedPeers, testCtx.store.peers) } -// TestGetLifetime tests the GetLifetime function for the cases where a channel -// is known and unknown to the store. -func TestGetLifetime(t *testing.T) { - tests := []struct { - name string - channelFound bool - channelPoint wire.OutPoint - opened time.Time - closed time.Time - expectedError error - }{ - { - name: "Channel found", - channelFound: true, - opened: testNow, - closed: testNow.Add(time.Hour * -1), - expectedError: nil, - }, - { - name: "Channel not found", - expectedError: ErrChannelNotFound, - }, - } - - for _, test := range tests { - test := test - - t.Run(test.name, func(t *testing.T) { - ctx := newChanEventStoreTestCtx(t) - - // Add channel to eventStore if the test indicates that - // it should be present. - if test.channelFound { - ctx.store.channels[test.channelPoint] = - &chanEventLog{ - openedAt: test.opened, - closedAt: test.closed, - } - } - - ctx.start() - - open, close, err := ctx.store.GetLifespan(test.channelPoint) - require.Equal(t, test.expectedError, err) - - require.Equal(t, test.opened, open) - require.Equal(t, test.closed, close) - - ctx.stop() - }) - } -} - -// TestGetUptime tests the getUptime call for channels known to the event store. -// It does not test the trivial case where a channel is unknown to the store, -// because this is simply a zero return if an item is not found in a map. It -// tests the unexpected edge cases where a tracked channel does not have any -// events recorded, and when a zero time is specified for the uptime range. -func TestGetUptime(t *testing.T) { - twoHoursAgo := testNow.Add(time.Hour * -2) - fourHoursAgo := testNow.Add(time.Hour * -4) - - tests := []struct { - name string - - channelPoint wire.OutPoint - - // events is the set of events we expect to find in the channel - // store. - events []*channelEvent - - // openedAt is the time the channel is recorded as open by the - // store. - openedAt time.Time - - // closedAt is the time the channel is recorded as closed by the - // store. If the channel is still open, this value is zero. - closedAt time.Time - - // channelFound is true if we expect to find the channel in the - // store. - channelFound bool - - // startTime specifies the beginning of the uptime range we want - // to calculate. - startTime time.Time - - // endTime specified the end of the uptime range we want to - // calculate. - endTime time.Time - - expectedUptime time.Duration - - expectedError error - }{ - { - name: "No events", - startTime: twoHoursAgo, - endTime: testNow, - channelFound: true, - expectedError: nil, - }, - { - name: "50% Uptime", - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOnlineEvent, - }, - { - timestamp: twoHoursAgo, - eventType: peerOfflineEvent, - }, - }, - openedAt: fourHoursAgo, - expectedUptime: time.Hour * 2, - startTime: fourHoursAgo, - endTime: testNow, - channelFound: true, - expectedError: nil, - }, - { - name: "Zero start time", - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOnlineEvent, - }, - }, - openedAt: fourHoursAgo, - expectedUptime: time.Hour * 4, - endTime: testNow, - channelFound: true, - expectedError: nil, - }, - { - name: "Channel not found", - startTime: twoHoursAgo, - endTime: testNow, - channelFound: false, - expectedError: ErrChannelNotFound, - }, - } - - for _, test := range tests { - test := test - - t.Run(test.name, func(t *testing.T) { - ctx := newChanEventStoreTestCtx(t) - - // If we're supposed to find the channel for this test, - // add events for it to the store. - if test.channelFound { - eventLog := &chanEventLog{ - events: test.events, - clock: clock.NewTestClock(testNow), - openedAt: test.openedAt, - closedAt: test.closedAt, - } - ctx.store.channels[test.channelPoint] = eventLog - } - - ctx.start() - - uptime, err := ctx.store.GetUptime( - test.channelPoint, test.startTime, test.endTime, - ) - require.Equal(t, test.expectedError, err) - require.Equal(t, test.expectedUptime, uptime) - - ctx.stop() - }) - } -} - // TestAddChannel tests that channels are added to the event store with // appropriate timestamps. This test addresses a bug where offline channels // did not have an opened time set, and checks that an online event is set for @@ -388,3 +213,70 @@ func TestAddChannel(t *testing.T) { require.Len(t, chan2Events, 1) require.Equal(t, peerOnlineEvent, chan2Events[0].eventType) } + +// TestGetChanInfo tests the GetChanInfo function for the cases where a channel +// is known and unknown to the store. +func TestGetChanInfo(t *testing.T) { + ctx := newChanEventStoreTestCtx(t) + ctx.start() + + // Make a note of the time that our mocked clock starts on. + now := ctx.clock.Now() + + // Create mock vars for a channel but do not add them to our store yet. + peer, pk, channel := ctx.newChannel() + + // Send an online event for our peer, although we do not yet have an + // open channel. + ctx.peerEvent(peer, true) + + // Try to get info for a channel that has not been opened yet, we + // expect to get an error. + _, err := ctx.store.GetChanInfo(channel) + require.Equal(t, ErrChannelNotFound, err) + + // Now we send our store a notification that a channel has been opened. + ctx.sendChannelOpenedUpdate(pk, channel) + + // Wait for our channel to be recognized by our store. We need to wait + // for the channel to be created so that we do not update our time + // before the channel open is processed. + require.Eventually(t, func() bool { + _, err = ctx.store.GetChanInfo(channel) + return err == nil + }, timeout, time.Millisecond*20) + + // Increment our test clock by an hour. + now = now.Add(time.Hour) + ctx.clock.SetTime(now) + + // At this stage our channel has been open and online for an hour. + info, err := ctx.store.GetChanInfo(channel) + require.NoError(t, err) + require.Equal(t, time.Hour, info.Lifetime) + require.Equal(t, time.Hour, info.Uptime) + + // Now we send a peer offline event for our channel. + ctx.peerEvent(peer, false) + + // Since we have not bumped our mocked time, our uptime calculations + // should be the same, even though we've just processed an offline + // event. + info, err = ctx.store.GetChanInfo(channel) + require.NoError(t, err) + require.Equal(t, time.Hour, info.Lifetime) + require.Equal(t, time.Hour, info.Uptime) + + // Progress our time again. This time, our peer is currently tracked as + // being offline, so we expect our channel info to reflect that the peer + // has been offline for this period. + now = now.Add(time.Hour) + ctx.clock.SetTime(now) + + info, err = ctx.store.GetChanInfo(channel) + require.NoError(t, err) + require.Equal(t, time.Hour*2, info.Lifetime) + require.Equal(t, time.Hour, info.Uptime) + + ctx.stop() +} diff --git a/rpcserver.go b/rpcserver.go index be6e7a8e..69d3513f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3536,42 +3536,28 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, return channel, nil } - // Get the lifespan observed by the channel event store. If the channel is - // not known to the channel event store, return early because we cannot - // calculate any further uptime information. + // Query the event store for additional information about the channel. + // Do not fail if it is not available, because there is a potential + // race between a channel being added to our node and the event store + // being notified of it. outpoint := dbChannel.FundingOutpoint - startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint) + info, err := r.server.chanEventStore.GetChanInfo(outpoint) switch err { + // If the store does not know about the channel, we just log it. case chanfitness.ErrChannelNotFound: rpcsLog.Infof("channel: %v not found by channel event store", outpoint) - return channel, nil + // If we got our channel info, we further populate the channel. case nil: - // If there is no error getting lifespan, continue to uptime - // calculation. + channel.Uptime = int64(info.Uptime.Seconds()) + channel.Lifetime = int64(info.Lifetime.Seconds()) + + // If we get an unexpected error, we return it. default: return nil, err } - // If endTime is zero, the channel is still open, progress endTime to - // the present so we can calculate lifetime. - if endTime.IsZero() { - endTime = time.Now() - } - channel.Lifetime = int64(endTime.Sub(startTime).Seconds()) - - // Once we have successfully obtained channel lifespan, we know that the - // channel is known to the event store, so we can return any non-nil error - // that occurs. - uptime, err := r.server.chanEventStore.GetUptime( - outpoint, startTime, endTime, - ) - if err != nil { - return nil, err - } - channel.Uptime = int64(uptime.Seconds()) - return channel, nil }