chanfitness: unify requests to store in single chan info struct

We currently query the store for uptime and lifespan individually. As
we add more fields, we will need to add more queries with this design.
This change combines requests into a single channel infor request so
that we do not need to add unnecessary boilerplate going forward.
This commit is contained in:
carla 2020-09-08 13:47:17 +02:00
parent 7930ef7cf4
commit 10f9ba952e
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
3 changed files with 133 additions and 299 deletions

@ -46,11 +46,8 @@ type ChannelEventStore struct {
// and offline events. // and offline events.
peers map[route.Vertex]bool peers map[route.Vertex]bool
// lifespanRequests serves requests for the lifespan of channels. // chanInfoRequests serves requests for information about our channel.
lifespanRequests chan lifespanRequest chanInfoRequests chan channelInfoRequest
// uptimeRequests serves requests for the uptime of channels.
uptimeRequests chan uptimeRequest
quit chan struct{} quit chan struct{}
@ -79,35 +76,13 @@ type Config struct {
Clock clock.Clock Clock clock.Clock
} }
// lifespanRequest contains the channel ID required to query the store for a type channelInfoRequest struct {
// channel's lifespan and a blocking response channel on which the result is
// sent.
type lifespanRequest struct {
channelPoint wire.OutPoint channelPoint wire.OutPoint
responseChan chan lifespanResponse responseChan chan channelInfoResponse
} }
// lifespanResponse contains the response to a lifespanRequest and an error if type channelInfoResponse struct {
// one occurred. info *ChannelInfo
type lifespanResponse struct {
start time.Time
end time.Time
err error
}
// uptimeRequest contains the parameters required to query the store for a
// channel's uptime and a blocking response channel on which the result is sent.
type uptimeRequest struct {
channelPoint wire.OutPoint
startTime time.Time
endTime time.Time
responseChan chan uptimeResponse
}
// uptimeResponse contains the response to an uptimeRequest and an error if one
// occurred.
type uptimeResponse struct {
uptime time.Duration
err error err error
} }
@ -119,8 +94,7 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
cfg: config, cfg: config,
channels: make(map[wire.OutPoint]*chanEventLog), channels: make(map[wire.OutPoint]*chanEventLog),
peers: make(map[route.Vertex]bool), peers: make(map[route.Vertex]bool),
lifespanRequests: make(chan lifespanRequest), chanInfoRequests: make(chan channelInfoRequest),
uptimeRequests: make(chan uptimeRequest),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -314,35 +288,10 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
} }
// Serve all requests for channel lifetime. // Serve all requests for channel lifetime.
case req := <-c.lifespanRequests: case req := <-c.chanInfoRequests:
var resp lifespanResponse var resp channelInfoResponse
channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
resp.start = channel.openedAt
resp.end = channel.closedAt
}
req.responseChan <- resp
// Serve requests for channel uptime.
case req := <-c.uptimeRequests:
var resp uptimeResponse
channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
uptime, err := channel.uptime(
req.startTime, req.endTime,
)
resp.uptime = uptime
resp.err = err
}
resp.info, resp.err = c.getChanInfo(req)
req.responseChan <- resp req.responseChan <- resp
// Exit if the store receives the signal to shutdown. // Exit if the store receives the signal to shutdown.
@ -352,65 +301,72 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
} }
} }
// GetLifespan returns the opening and closing time observed for a channel and // ChannelInfo provides the set of information that the event store has recorded
// a boolean to indicate whether the channel is known the the event store. If // for a channel.
// the channel is still open, a zero close time is returned. type ChannelInfo struct {
func (c *ChannelEventStore) GetLifespan( // Lifetime is the total amount of time we have monitored the channel
channelPoint wire.OutPoint) (time.Time, time.Time, error) { // for.
Lifetime time.Duration
request := lifespanRequest{ // Uptime is the total amount of time that the channel peer has been
// observed as online during the monitored lifespan.
Uptime time.Duration
}
// GetChanInfo gets all the information we have on a channel in the event store.
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) (
*ChannelInfo, error) {
request := channelInfoRequest{
channelPoint: channelPoint, channelPoint: channelPoint,
responseChan: make(chan lifespanResponse), responseChan: make(chan channelInfoResponse),
} }
// Send a request for the channel's lifespan to the main event loop, or // Send a request for the channel's information to the main event loop,
// return early with an error if the store has already received a // or return early with an error if the store has already received a
// shutdown signal. // shutdown signal.
select { select {
case c.lifespanRequests <- request: case c.chanInfoRequests <- request:
case <-c.quit: case <-c.quit:
return time.Time{}, time.Time{}, errShuttingDown return nil, errShuttingDown
} }
// Return the response we receive on the response channel or exit early // Return the response we receive on the response channel or exit early
// if the store is instructed to exit. // if the store is instructed to exit.
select { select {
case resp := <-request.responseChan: case resp := <-request.responseChan:
return resp.start, resp.end, resp.err return resp.info, resp.err
case <-c.quit: case <-c.quit:
return time.Time{}, time.Time{}, errShuttingDown return nil, errShuttingDown
} }
} }
// GetUptime returns the uptime of a channel over a period and an error if the // getChanInfo collects channel information for a channel. It gets uptime over
// channel cannot be found or the uptime calculation fails. // the full lifetime of the channel.
func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime, func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
endTime time.Time) (time.Duration, error) { error) {
request := uptimeRequest{ // Look for the channel in our current set.
channelPoint: channelPoint, channel, ok := c.channels[req.channelPoint]
startTime: startTime, if !ok {
endTime: endTime, return nil, ErrChannelNotFound
responseChan: make(chan uptimeResponse),
} }
// Send a request for the channel's uptime to the main event loop, or // If our channel is not closed, we want to calculate uptime until the
// return early with an error if the store has already received a // present.
// shutdown signal. endTime := channel.closedAt
select { if endTime.IsZero() {
case c.uptimeRequests <- request: endTime = c.cfg.Clock.Now()
case <-c.quit:
return 0, errShuttingDown
} }
// Return the response we receive on the response channel or exit early uptime, err := channel.uptime(channel.openedAt, endTime)
// if the store is instructed to exit. if err != nil {
select { return nil, err
case resp := <-request.responseChan:
return resp.uptime, resp.err
case <-c.quit:
return 0, errShuttingDown
} }
return &ChannelInfo{
Lifetime: endTime.Sub(channel.openedAt),
Uptime: uptime,
}, nil
} }

@ -186,181 +186,6 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx),
require.Equal(t, expectedPeers, testCtx.store.peers) require.Equal(t, expectedPeers, testCtx.store.peers)
} }
// TestGetLifetime tests the GetLifetime function for the cases where a channel
// is known and unknown to the store.
func TestGetLifetime(t *testing.T) {
tests := []struct {
name string
channelFound bool
channelPoint wire.OutPoint
opened time.Time
closed time.Time
expectedError error
}{
{
name: "Channel found",
channelFound: true,
opened: testNow,
closed: testNow.Add(time.Hour * -1),
expectedError: nil,
},
{
name: "Channel not found",
expectedError: ErrChannelNotFound,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
ctx := newChanEventStoreTestCtx(t)
// Add channel to eventStore if the test indicates that
// it should be present.
if test.channelFound {
ctx.store.channels[test.channelPoint] =
&chanEventLog{
openedAt: test.opened,
closedAt: test.closed,
}
}
ctx.start()
open, close, err := ctx.store.GetLifespan(test.channelPoint)
require.Equal(t, test.expectedError, err)
require.Equal(t, test.opened, open)
require.Equal(t, test.closed, close)
ctx.stop()
})
}
}
// TestGetUptime tests the getUptime call for channels known to the event store.
// It does not test the trivial case where a channel is unknown to the store,
// because this is simply a zero return if an item is not found in a map. It
// tests the unexpected edge cases where a tracked channel does not have any
// events recorded, and when a zero time is specified for the uptime range.
func TestGetUptime(t *testing.T) {
twoHoursAgo := testNow.Add(time.Hour * -2)
fourHoursAgo := testNow.Add(time.Hour * -4)
tests := []struct {
name string
channelPoint wire.OutPoint
// events is the set of events we expect to find in the channel
// store.
events []*channelEvent
// openedAt is the time the channel is recorded as open by the
// store.
openedAt time.Time
// closedAt is the time the channel is recorded as closed by the
// store. If the channel is still open, this value is zero.
closedAt time.Time
// channelFound is true if we expect to find the channel in the
// store.
channelFound bool
// startTime specifies the beginning of the uptime range we want
// to calculate.
startTime time.Time
// endTime specified the end of the uptime range we want to
// calculate.
endTime time.Time
expectedUptime time.Duration
expectedError error
}{
{
name: "No events",
startTime: twoHoursAgo,
endTime: testNow,
channelFound: true,
expectedError: nil,
},
{
name: "50% Uptime",
events: []*channelEvent{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
},
{
timestamp: twoHoursAgo,
eventType: peerOfflineEvent,
},
},
openedAt: fourHoursAgo,
expectedUptime: time.Hour * 2,
startTime: fourHoursAgo,
endTime: testNow,
channelFound: true,
expectedError: nil,
},
{
name: "Zero start time",
events: []*channelEvent{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
},
},
openedAt: fourHoursAgo,
expectedUptime: time.Hour * 4,
endTime: testNow,
channelFound: true,
expectedError: nil,
},
{
name: "Channel not found",
startTime: twoHoursAgo,
endTime: testNow,
channelFound: false,
expectedError: ErrChannelNotFound,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
ctx := newChanEventStoreTestCtx(t)
// If we're supposed to find the channel for this test,
// add events for it to the store.
if test.channelFound {
eventLog := &chanEventLog{
events: test.events,
clock: clock.NewTestClock(testNow),
openedAt: test.openedAt,
closedAt: test.closedAt,
}
ctx.store.channels[test.channelPoint] = eventLog
}
ctx.start()
uptime, err := ctx.store.GetUptime(
test.channelPoint, test.startTime, test.endTime,
)
require.Equal(t, test.expectedError, err)
require.Equal(t, test.expectedUptime, uptime)
ctx.stop()
})
}
}
// TestAddChannel tests that channels are added to the event store with // TestAddChannel tests that channels are added to the event store with
// appropriate timestamps. This test addresses a bug where offline channels // appropriate timestamps. This test addresses a bug where offline channels
// did not have an opened time set, and checks that an online event is set for // did not have an opened time set, and checks that an online event is set for
@ -388,3 +213,70 @@ func TestAddChannel(t *testing.T) {
require.Len(t, chan2Events, 1) require.Len(t, chan2Events, 1)
require.Equal(t, peerOnlineEvent, chan2Events[0].eventType) require.Equal(t, peerOnlineEvent, chan2Events[0].eventType)
} }
// TestGetChanInfo tests the GetChanInfo function for the cases where a channel
// is known and unknown to the store.
func TestGetChanInfo(t *testing.T) {
ctx := newChanEventStoreTestCtx(t)
ctx.start()
// Make a note of the time that our mocked clock starts on.
now := ctx.clock.Now()
// Create mock vars for a channel but do not add them to our store yet.
peer, pk, channel := ctx.newChannel()
// Send an online event for our peer, although we do not yet have an
// open channel.
ctx.peerEvent(peer, true)
// Try to get info for a channel that has not been opened yet, we
// expect to get an error.
_, err := ctx.store.GetChanInfo(channel)
require.Equal(t, ErrChannelNotFound, err)
// Now we send our store a notification that a channel has been opened.
ctx.sendChannelOpenedUpdate(pk, channel)
// Wait for our channel to be recognized by our store. We need to wait
// for the channel to be created so that we do not update our time
// before the channel open is processed.
require.Eventually(t, func() bool {
_, err = ctx.store.GetChanInfo(channel)
return err == nil
}, timeout, time.Millisecond*20)
// Increment our test clock by an hour.
now = now.Add(time.Hour)
ctx.clock.SetTime(now)
// At this stage our channel has been open and online for an hour.
info, err := ctx.store.GetChanInfo(channel)
require.NoError(t, err)
require.Equal(t, time.Hour, info.Lifetime)
require.Equal(t, time.Hour, info.Uptime)
// Now we send a peer offline event for our channel.
ctx.peerEvent(peer, false)
// Since we have not bumped our mocked time, our uptime calculations
// should be the same, even though we've just processed an offline
// event.
info, err = ctx.store.GetChanInfo(channel)
require.NoError(t, err)
require.Equal(t, time.Hour, info.Lifetime)
require.Equal(t, time.Hour, info.Uptime)
// Progress our time again. This time, our peer is currently tracked as
// being offline, so we expect our channel info to reflect that the peer
// has been offline for this period.
now = now.Add(time.Hour)
ctx.clock.SetTime(now)
info, err = ctx.store.GetChanInfo(channel)
require.NoError(t, err)
require.Equal(t, time.Hour*2, info.Lifetime)
require.Equal(t, time.Hour, info.Uptime)
ctx.stop()
}

@ -3536,42 +3536,28 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
return channel, nil return channel, nil
} }
// Get the lifespan observed by the channel event store. If the channel is // Query the event store for additional information about the channel.
// not known to the channel event store, return early because we cannot // Do not fail if it is not available, because there is a potential
// calculate any further uptime information. // race between a channel being added to our node and the event store
// being notified of it.
outpoint := dbChannel.FundingOutpoint outpoint := dbChannel.FundingOutpoint
startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint) info, err := r.server.chanEventStore.GetChanInfo(outpoint)
switch err { switch err {
// If the store does not know about the channel, we just log it.
case chanfitness.ErrChannelNotFound: case chanfitness.ErrChannelNotFound:
rpcsLog.Infof("channel: %v not found by channel event store", rpcsLog.Infof("channel: %v not found by channel event store",
outpoint) outpoint)
return channel, nil // If we got our channel info, we further populate the channel.
case nil: case nil:
// If there is no error getting lifespan, continue to uptime channel.Uptime = int64(info.Uptime.Seconds())
// calculation. channel.Lifetime = int64(info.Lifetime.Seconds())
// If we get an unexpected error, we return it.
default: default:
return nil, err return nil, err
} }
// If endTime is zero, the channel is still open, progress endTime to
// the present so we can calculate lifetime.
if endTime.IsZero() {
endTime = time.Now()
}
channel.Lifetime = int64(endTime.Sub(startTime).Seconds())
// Once we have successfully obtained channel lifespan, we know that the
// channel is known to the event store, so we can return any non-nil error
// that occurs.
uptime, err := r.server.chanEventStore.GetUptime(
outpoint, startTime, endTime,
)
if err != nil {
return nil, err
}
channel.Uptime = int64(uptime.Seconds())
return channel, nil return channel, nil
} }