diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index 6ff73ccf..bd297fd2 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -46,11 +46,8 @@ type ChannelEventStore struct { // and offline events. peers map[route.Vertex]bool - // lifespanRequests serves requests for the lifespan of channels. - lifespanRequests chan lifespanRequest - - // uptimeRequests serves requests for the uptime of channels. - uptimeRequests chan uptimeRequest + // chanInfoRequests serves requests for information about our channel. + chanInfoRequests chan channelInfoRequest quit chan struct{} @@ -79,36 +76,14 @@ type Config struct { Clock clock.Clock } -// lifespanRequest contains the channel ID required to query the store for a -// channel's lifespan and a blocking response channel on which the result is -// sent. -type lifespanRequest struct { +type channelInfoRequest struct { channelPoint wire.OutPoint - responseChan chan lifespanResponse + responseChan chan channelInfoResponse } -// lifespanResponse contains the response to a lifespanRequest and an error if -// one occurred. -type lifespanResponse struct { - start time.Time - end time.Time - err error -} - -// uptimeRequest contains the parameters required to query the store for a -// channel's uptime and a blocking response channel on which the result is sent. -type uptimeRequest struct { - channelPoint wire.OutPoint - startTime time.Time - endTime time.Time - responseChan chan uptimeResponse -} - -// uptimeResponse contains the response to an uptimeRequest and an error if one -// occurred. -type uptimeResponse struct { - uptime time.Duration - err error +type channelInfoResponse struct { + info *ChannelInfo + err error } // NewChannelEventStore initializes an event store with the config provided. @@ -119,8 +94,7 @@ func NewChannelEventStore(config *Config) *ChannelEventStore { cfg: config, channels: make(map[wire.OutPoint]*chanEventLog), peers: make(map[route.Vertex]bool), - lifespanRequests: make(chan lifespanRequest), - uptimeRequests: make(chan uptimeRequest), + chanInfoRequests: make(chan channelInfoRequest), quit: make(chan struct{}), } @@ -314,35 +288,10 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { } // Serve all requests for channel lifetime. - case req := <-c.lifespanRequests: - var resp lifespanResponse - - channel, ok := c.channels[req.channelPoint] - if !ok { - resp.err = ErrChannelNotFound - } else { - resp.start = channel.openedAt - resp.end = channel.closedAt - } - - req.responseChan <- resp - - // Serve requests for channel uptime. - case req := <-c.uptimeRequests: - var resp uptimeResponse - - channel, ok := c.channels[req.channelPoint] - if !ok { - resp.err = ErrChannelNotFound - } else { - uptime, err := channel.uptime( - req.startTime, req.endTime, - ) - - resp.uptime = uptime - resp.err = err - } + case req := <-c.chanInfoRequests: + var resp channelInfoResponse + resp.info, resp.err = c.getChanInfo(req) req.responseChan <- resp // Exit if the store receives the signal to shutdown. @@ -352,65 +301,72 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { } } -// GetLifespan returns the opening and closing time observed for a channel and -// a boolean to indicate whether the channel is known the the event store. If -// the channel is still open, a zero close time is returned. -func (c *ChannelEventStore) GetLifespan( - channelPoint wire.OutPoint) (time.Time, time.Time, error) { +// ChannelInfo provides the set of information that the event store has recorded +// for a channel. +type ChannelInfo struct { + // Lifetime is the total amount of time we have monitored the channel + // for. + Lifetime time.Duration - request := lifespanRequest{ + // Uptime is the total amount of time that the channel peer has been + // observed as online during the monitored lifespan. + Uptime time.Duration +} + +// GetChanInfo gets all the information we have on a channel in the event store. +func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) ( + *ChannelInfo, error) { + + request := channelInfoRequest{ channelPoint: channelPoint, - responseChan: make(chan lifespanResponse), + responseChan: make(chan channelInfoResponse), } - // Send a request for the channel's lifespan to the main event loop, or - // return early with an error if the store has already received a + // Send a request for the channel's information to the main event loop, + // or return early with an error if the store has already received a // shutdown signal. select { - case c.lifespanRequests <- request: + case c.chanInfoRequests <- request: case <-c.quit: - return time.Time{}, time.Time{}, errShuttingDown + return nil, errShuttingDown } // Return the response we receive on the response channel or exit early // if the store is instructed to exit. select { case resp := <-request.responseChan: - return resp.start, resp.end, resp.err + return resp.info, resp.err case <-c.quit: - return time.Time{}, time.Time{}, errShuttingDown + return nil, errShuttingDown } } -// GetUptime returns the uptime of a channel over a period and an error if the -// channel cannot be found or the uptime calculation fails. -func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime, - endTime time.Time) (time.Duration, error) { +// getChanInfo collects channel information for a channel. It gets uptime over +// the full lifetime of the channel. +func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo, + error) { - request := uptimeRequest{ - channelPoint: channelPoint, - startTime: startTime, - endTime: endTime, - responseChan: make(chan uptimeResponse), + // Look for the channel in our current set. + channel, ok := c.channels[req.channelPoint] + if !ok { + return nil, ErrChannelNotFound } - // Send a request for the channel's uptime to the main event loop, or - // return early with an error if the store has already received a - // shutdown signal. - select { - case c.uptimeRequests <- request: - case <-c.quit: - return 0, errShuttingDown + // If our channel is not closed, we want to calculate uptime until the + // present. + endTime := channel.closedAt + if endTime.IsZero() { + endTime = c.cfg.Clock.Now() } - // Return the response we receive on the response channel or exit early - // if the store is instructed to exit. - select { - case resp := <-request.responseChan: - return resp.uptime, resp.err - - case <-c.quit: - return 0, errShuttingDown + uptime, err := channel.uptime(channel.openedAt, endTime) + if err != nil { + return nil, err } + + return &ChannelInfo{ + Lifetime: endTime.Sub(channel.openedAt), + Uptime: uptime, + }, nil } diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 206ec8ca..127756f7 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -186,181 +186,6 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx), require.Equal(t, expectedPeers, testCtx.store.peers) } -// TestGetLifetime tests the GetLifetime function for the cases where a channel -// is known and unknown to the store. -func TestGetLifetime(t *testing.T) { - tests := []struct { - name string - channelFound bool - channelPoint wire.OutPoint - opened time.Time - closed time.Time - expectedError error - }{ - { - name: "Channel found", - channelFound: true, - opened: testNow, - closed: testNow.Add(time.Hour * -1), - expectedError: nil, - }, - { - name: "Channel not found", - expectedError: ErrChannelNotFound, - }, - } - - for _, test := range tests { - test := test - - t.Run(test.name, func(t *testing.T) { - ctx := newChanEventStoreTestCtx(t) - - // Add channel to eventStore if the test indicates that - // it should be present. - if test.channelFound { - ctx.store.channels[test.channelPoint] = - &chanEventLog{ - openedAt: test.opened, - closedAt: test.closed, - } - } - - ctx.start() - - open, close, err := ctx.store.GetLifespan(test.channelPoint) - require.Equal(t, test.expectedError, err) - - require.Equal(t, test.opened, open) - require.Equal(t, test.closed, close) - - ctx.stop() - }) - } -} - -// TestGetUptime tests the getUptime call for channels known to the event store. -// It does not test the trivial case where a channel is unknown to the store, -// because this is simply a zero return if an item is not found in a map. It -// tests the unexpected edge cases where a tracked channel does not have any -// events recorded, and when a zero time is specified for the uptime range. -func TestGetUptime(t *testing.T) { - twoHoursAgo := testNow.Add(time.Hour * -2) - fourHoursAgo := testNow.Add(time.Hour * -4) - - tests := []struct { - name string - - channelPoint wire.OutPoint - - // events is the set of events we expect to find in the channel - // store. - events []*channelEvent - - // openedAt is the time the channel is recorded as open by the - // store. - openedAt time.Time - - // closedAt is the time the channel is recorded as closed by the - // store. If the channel is still open, this value is zero. - closedAt time.Time - - // channelFound is true if we expect to find the channel in the - // store. - channelFound bool - - // startTime specifies the beginning of the uptime range we want - // to calculate. - startTime time.Time - - // endTime specified the end of the uptime range we want to - // calculate. - endTime time.Time - - expectedUptime time.Duration - - expectedError error - }{ - { - name: "No events", - startTime: twoHoursAgo, - endTime: testNow, - channelFound: true, - expectedError: nil, - }, - { - name: "50% Uptime", - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOnlineEvent, - }, - { - timestamp: twoHoursAgo, - eventType: peerOfflineEvent, - }, - }, - openedAt: fourHoursAgo, - expectedUptime: time.Hour * 2, - startTime: fourHoursAgo, - endTime: testNow, - channelFound: true, - expectedError: nil, - }, - { - name: "Zero start time", - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOnlineEvent, - }, - }, - openedAt: fourHoursAgo, - expectedUptime: time.Hour * 4, - endTime: testNow, - channelFound: true, - expectedError: nil, - }, - { - name: "Channel not found", - startTime: twoHoursAgo, - endTime: testNow, - channelFound: false, - expectedError: ErrChannelNotFound, - }, - } - - for _, test := range tests { - test := test - - t.Run(test.name, func(t *testing.T) { - ctx := newChanEventStoreTestCtx(t) - - // If we're supposed to find the channel for this test, - // add events for it to the store. - if test.channelFound { - eventLog := &chanEventLog{ - events: test.events, - clock: clock.NewTestClock(testNow), - openedAt: test.openedAt, - closedAt: test.closedAt, - } - ctx.store.channels[test.channelPoint] = eventLog - } - - ctx.start() - - uptime, err := ctx.store.GetUptime( - test.channelPoint, test.startTime, test.endTime, - ) - require.Equal(t, test.expectedError, err) - require.Equal(t, test.expectedUptime, uptime) - - ctx.stop() - }) - } -} - // TestAddChannel tests that channels are added to the event store with // appropriate timestamps. This test addresses a bug where offline channels // did not have an opened time set, and checks that an online event is set for @@ -388,3 +213,70 @@ func TestAddChannel(t *testing.T) { require.Len(t, chan2Events, 1) require.Equal(t, peerOnlineEvent, chan2Events[0].eventType) } + +// TestGetChanInfo tests the GetChanInfo function for the cases where a channel +// is known and unknown to the store. +func TestGetChanInfo(t *testing.T) { + ctx := newChanEventStoreTestCtx(t) + ctx.start() + + // Make a note of the time that our mocked clock starts on. + now := ctx.clock.Now() + + // Create mock vars for a channel but do not add them to our store yet. + peer, pk, channel := ctx.newChannel() + + // Send an online event for our peer, although we do not yet have an + // open channel. + ctx.peerEvent(peer, true) + + // Try to get info for a channel that has not been opened yet, we + // expect to get an error. + _, err := ctx.store.GetChanInfo(channel) + require.Equal(t, ErrChannelNotFound, err) + + // Now we send our store a notification that a channel has been opened. + ctx.sendChannelOpenedUpdate(pk, channel) + + // Wait for our channel to be recognized by our store. We need to wait + // for the channel to be created so that we do not update our time + // before the channel open is processed. + require.Eventually(t, func() bool { + _, err = ctx.store.GetChanInfo(channel) + return err == nil + }, timeout, time.Millisecond*20) + + // Increment our test clock by an hour. + now = now.Add(time.Hour) + ctx.clock.SetTime(now) + + // At this stage our channel has been open and online for an hour. + info, err := ctx.store.GetChanInfo(channel) + require.NoError(t, err) + require.Equal(t, time.Hour, info.Lifetime) + require.Equal(t, time.Hour, info.Uptime) + + // Now we send a peer offline event for our channel. + ctx.peerEvent(peer, false) + + // Since we have not bumped our mocked time, our uptime calculations + // should be the same, even though we've just processed an offline + // event. + info, err = ctx.store.GetChanInfo(channel) + require.NoError(t, err) + require.Equal(t, time.Hour, info.Lifetime) + require.Equal(t, time.Hour, info.Uptime) + + // Progress our time again. This time, our peer is currently tracked as + // being offline, so we expect our channel info to reflect that the peer + // has been offline for this period. + now = now.Add(time.Hour) + ctx.clock.SetTime(now) + + info, err = ctx.store.GetChanInfo(channel) + require.NoError(t, err) + require.Equal(t, time.Hour*2, info.Lifetime) + require.Equal(t, time.Hour, info.Uptime) + + ctx.stop() +} diff --git a/rpcserver.go b/rpcserver.go index be6e7a8e..69d3513f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3536,42 +3536,28 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, return channel, nil } - // Get the lifespan observed by the channel event store. If the channel is - // not known to the channel event store, return early because we cannot - // calculate any further uptime information. + // Query the event store for additional information about the channel. + // Do not fail if it is not available, because there is a potential + // race between a channel being added to our node and the event store + // being notified of it. outpoint := dbChannel.FundingOutpoint - startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint) + info, err := r.server.chanEventStore.GetChanInfo(outpoint) switch err { + // If the store does not know about the channel, we just log it. case chanfitness.ErrChannelNotFound: rpcsLog.Infof("channel: %v not found by channel event store", outpoint) - return channel, nil + // If we got our channel info, we further populate the channel. case nil: - // If there is no error getting lifespan, continue to uptime - // calculation. + channel.Uptime = int64(info.Uptime.Seconds()) + channel.Lifetime = int64(info.Lifetime.Seconds()) + + // If we get an unexpected error, we return it. default: return nil, err } - // If endTime is zero, the channel is still open, progress endTime to - // the present so we can calculate lifetime. - if endTime.IsZero() { - endTime = time.Now() - } - channel.Lifetime = int64(endTime.Sub(startTime).Seconds()) - - // Once we have successfully obtained channel lifespan, we know that the - // channel is known to the event store, so we can return any non-nil error - // that occurs. - uptime, err := r.server.chanEventStore.GetUptime( - outpoint, startTime, endTime, - ) - if err != nil { - return nil, err - } - channel.Uptime = int64(uptime.Seconds()) - return channel, nil }