diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index 6a3af2aa..4fb6b142 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -6,7 +6,6 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/clock" - "github.com/lightningnetwork/lnd/routing/route" ) type eventType int @@ -29,74 +28,152 @@ func (e eventType) String() string { return "unknown" } -// channelEvent is a a timestamped event which is observed on a per channel -// basis. -type channelEvent struct { +type event struct { timestamp time.Time eventType eventType } -// chanEventLog stores all events that have occurred over a channel's lifetime. -type chanEventLog struct { - // channelPoint is the outpoint for the channel's funding transaction. - channelPoint wire.OutPoint +// peerLog tracks events for a peer and its channels. If we currently have no +// channels with the peer, it will simply track its current online state. If we +// do have channels open with the peer, it will track the peer's online and +// offline events so that we can calculate uptime for our channels. A single +// event log is used for these online and offline events, and uptime for a +// channel is calculated by examining a subsection of this log. +type peerLog struct { + // online stores whether the peer is currently online. + online bool - // peer is the compressed public key of the peer being monitored. - peer route.Vertex - - // events is a log of timestamped events observed for the channel. - events []*channelEvent + // onlineEvents is a log of timestamped events observed for the peer. + onlineEvents []*event // clock allows creation of deterministic unit tests. clock clock.Clock + // channels contains a set of currently open channels. Channels will be + // added and removed from this map as they are opened and closed. + channels map[wire.OutPoint]*channelInfo +} + +// newPeerLog creates a log for a peer. +func newPeerLog(clock clock.Clock) *peerLog { + return &peerLog{ + clock: clock, + channels: make(map[wire.OutPoint]*channelInfo), + } +} + +// channelInfo contains information about a channel. +type channelInfo struct { // openedAt tracks the first time this channel was seen. This is not // necessarily the time that it confirmed on chain because channel // events are not persisted at present. openedAt time.Time - - // closedAt is the time that the channel was closed. If the channel has - // not been closed yet, it is zero. - closedAt time.Time } -// newEventLog creates an event log for a channel with the openedAt time set. -func newEventLog(channelPoint wire.OutPoint, peer route.Vertex, - clock clock.Clock) *chanEventLog { - - eventlog := &chanEventLog{ - channelPoint: channelPoint, - peer: peer, - clock: clock, - openedAt: clock.Now(), +func newChannelInfo(openedAt time.Time) *channelInfo { + return &channelInfo{ + openedAt: openedAt, } - - return eventlog } -// close sets the closing time for an event log. -func (e *chanEventLog) close() { - e.closedAt = e.clock.Now() -} +// onlineEvent records a peer online or offline event in the log. +func (p *peerLog) onlineEvent(online bool) { + p.online = online -// add appends an event with the given type and current time to the event log. -// The open time for the eventLog will be set to the event's timestamp if it is -// not set yet. -func (e *chanEventLog) add(eventType eventType) { - // If the channel is already closed, return early without adding an - // event. - if !e.closedAt.IsZero() { + // If we have no channels currently open with the peer, we do not want + // to commit resources to tracking their online state beyond a simple + // online boolean, so we exit early. + if p.channelCount() == 0 { return } - // Add the event to the eventLog with the current timestamp. - event := &channelEvent{ - timestamp: e.clock.Now(), + p.addEvent(online, p.clock.Now()) +} + +// addEvent records an online or offline event in our event log. +func (p *peerLog) addEvent(online bool, time time.Time) { + eventType := peerOnlineEvent + if !online { + eventType = peerOfflineEvent + } + + event := &event{ + timestamp: time, eventType: eventType, } - e.events = append(e.events, event) - log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType) + p.onlineEvents = append(p.onlineEvents, event) +} + +// addChannel adds a channel to our log. If we have not tracked any online +// events for our peer yet, we create one with our peer's current online state +// so that we know the state that the peer had at channel start, which is +// required to calculate uptime over the channel's lifetime. +func (p *peerLog) addChannel(channelPoint wire.OutPoint) error { + _, ok := p.channels[channelPoint] + if ok { + return fmt.Errorf("channel: %v already present", channelPoint) + } + + openTime := p.clock.Now() + p.channels[channelPoint] = newChannelInfo(openTime) + + // If we do not have any online events tracked for our peer (which is + // the case when we have no other channels open with the peer), we add + // an event with the peer's current online state so that we know that + // starting state for this peer when a channel was connected (which + // allows us to calculate uptime over the lifetime of the channel). + if len(p.onlineEvents) == 0 { + p.addEvent(p.online, openTime) + } + + return nil +} + +// removeChannel removes a channel from our log. If we have no more channels +// with the peer after removing this one, we clear our list of events. +func (p *peerLog) removeChannel(channelPoint wire.OutPoint) error { + _, ok := p.channels[channelPoint] + if !ok { + return fmt.Errorf("channel: %v not present", channelPoint) + } + + delete(p.channels, channelPoint) + + // If we have no more channels in our event log, we can discard all of + // our online events in memory, since we don't need them anymore. + // TODO(carla): this could be done on a per channel basis. + if p.channelCount() == 0 { + p.onlineEvents = nil + } + + return nil +} + +// channelCount returns the number of channels that we currently have +// with the peer. +func (p *peerLog) channelCount() int { + return len(p.channels) +} + +// channelUptime looks up a channel and returns the amount of time that the +// channel has been monitored for and its uptime over this period. +func (p *peerLog) channelUptime(channelPoint wire.OutPoint) (time.Duration, + time.Duration, error) { + + channel, ok := p.channels[channelPoint] + if !ok { + return 0, 0, ErrChannelNotFound + } + + now := p.clock.Now() + + uptime, err := p.uptime(channel.openedAt, now) + if err != nil { + return 0, 0, err + } + + return now.Sub(channel.openedAt), uptime, nil } // onlinePeriod represents a period of time over which a peer was online. @@ -112,9 +189,9 @@ type onlinePeriod struct { // calculated until the present. This function expects the event log provided // to be ordered by ascending timestamp, and can tolerate multiple consecutive // online or offline events. -func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod { +func (p *peerLog) getOnlinePeriods() []*onlinePeriod { // Return early if there are no events, there are no online periods. - if len(e.events) == 0 { + if len(p.onlineEvents) == 0 { return nil } @@ -123,7 +200,7 @@ func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod { // a different type to our own. It is used to determine the // start time of our online periods when we experience an // offline event, and to track our last recorded state. - lastEvent *channelEvent + lastEvent *event onlinePeriods []*onlinePeriod ) @@ -133,7 +210,7 @@ func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod { // the online event and the present is not tracked. The type of the most // recent event is tracked using the offline bool so that we can add a // final online period if necessary. - for _, event := range e.events { + for _, event := range p.onlineEvents { switch event.eventType { case peerOnlineEvent: // If our previous event is nil, we just set it and @@ -188,24 +265,20 @@ func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod { } // The log ended on an online event, so we need to add a final online - // event. If the channel is closed, this period is until channel - // closure. It it is still open, we calculate it until the present. + // period which terminates at the present. finalEvent := &onlinePeriod{ start: lastEvent.timestamp, - end: e.closedAt, - } - if finalEvent.end.IsZero() { - finalEvent.end = e.clock.Now() + end: p.clock.Now(), } // Add the final online period to the set and return. return append(onlinePeriods, finalEvent) } -// uptime calculates the total uptime we have recorded for a channel over the +// uptime calculates the total uptime we have recorded for a peer over the // inclusive range specified. An error is returned if the end of the range is // before the start or a zero end time is returned. -func (e *chanEventLog) uptime(start, end time.Time) (time.Duration, error) { +func (p *peerLog) uptime(start, end time.Time) (time.Duration, error) { // Error if we are provided with an invalid range to calculate uptime // for. if end.Before(start) { @@ -218,7 +291,7 @@ func (e *chanEventLog) uptime(start, end time.Time) (time.Duration, error) { var uptime time.Duration - for _, p := range e.getOnlinePeriods() { + for _, p := range p.getOnlinePeriods() { // The online period ends before the range we're looking at, so // we can skip over it. if p.end.Before(start) { diff --git a/chanfitness/chanevent_test.go b/chanfitness/chanevent_test.go index 637e3627..c4dd97a8 100644 --- a/chanfitness/chanevent_test.go +++ b/chanfitness/chanevent_test.go @@ -4,55 +4,97 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/clock" "github.com/stretchr/testify/require" ) -// TestAdd tests adding events to an event log. It tests the case where the -// channel is open, and should have an event added, and the case where it is -// closed and the event should not be added. -func TestAdd(t *testing.T) { - tests := []struct { - name string - eventLog *chanEventLog - event eventType - expectedEvents []*channelEvent - }{ - { - name: "Channel open", - eventLog: &chanEventLog{ - clock: clock.NewTestClock(testNow), - }, - event: peerOnlineEvent, - expectedEvents: []*channelEvent{ - { - eventType: peerOnlineEvent, - timestamp: testNow, - }, - }, - }, - { - name: "Channel closed, event not added", - eventLog: &chanEventLog{ - clock: clock.NewTestClock(testNow), - closedAt: testNow, - }, - event: peerOnlineEvent, - expectedEvents: nil, - }, +// TestPeerLog tests the functionality of the peer log struct. +func TestPeerLog(t *testing.T) { + clock := clock.NewTestClock(testNow) + peerLog := newPeerLog(clock) + + require.Zero(t, peerLog.channelCount()) + require.False(t, peerLog.online) + + // Test that looking up an unknown channel fails. + _, _, err := peerLog.channelUptime(wire.OutPoint{Index: 1}) + require.Error(t, err) + + // Add an offline event, since we have no channels, we do not expect + // to have any online periods recorded for our peer. + peerLog.onlineEvent(false) + require.Len(t, peerLog.getOnlinePeriods(), 0) + + // Likewise, if we have an online event, nothing beyond the online state + // of our peer log should change. + peerLog.onlineEvent(true) + require.Len(t, peerLog.getOnlinePeriods(), 0) + + // Add a channel and assert that we have one channel listed. + chan1 := wire.OutPoint{ + Index: 1, } + require.NoError(t, peerLog.addChannel(chan1)) + require.Equal(t, 1, peerLog.channelCount()) - for _, test := range tests { - test := test + // Assert that we can now successfully get our added channel. + _, _, err = peerLog.channelUptime(chan1) + require.NoError(t, err) - t.Run(test.name, func(t *testing.T) { - test.eventLog.add(test.event) + // Bump our test clock's time so that our current time is different to + // channel open time. + now := testNow.Add(time.Hour) + clock.SetTime(now) - require.Equal( - t, test.expectedEvents, test.eventLog.events, - ) - }) + // Now that we have added a channel and an hour has passed, we expect + // our uptime and lifetime to both equal an hour. + lifetime, uptime, err := peerLog.channelUptime(chan1) + require.NoError(t, err) + require.Equal(t, time.Hour, lifetime) + require.Equal(t, time.Hour, uptime) + + // Add an offline event for our peer. + peerLog.onlineEvent(false) + + // Now we add another channel to our store and assert that we now report + // two channels for this peer. + chan2 := wire.OutPoint{ + Index: 2, } + require.NoError(t, peerLog.addChannel(chan2)) + require.Equal(t, 2, peerLog.channelCount()) + + // Progress our time again, so that our peer has now been offline for + // two hours. + now = now.Add(time.Hour * 2) + clock.SetTime(now) + + // Our first channel should report as having been monitored for three + // hours, but only online for one of those hours. + lifetime, uptime, err = peerLog.channelUptime(chan1) + require.NoError(t, err) + require.Equal(t, time.Hour*3, lifetime) + require.Equal(t, time.Hour, uptime) + + // Remove our first channel and check that we can still correctly query + // uptime for the second channel. + require.NoError(t, peerLog.removeChannel(chan1)) + require.Equal(t, 1, peerLog.channelCount()) + + // Our second channel, which was created when our peer was offline, + // should report as having been monitored for two hours, but have zero + // uptime. + lifetime, uptime, err = peerLog.channelUptime(chan2) + require.NoError(t, err) + require.Equal(t, time.Hour*2, lifetime) + require.Equal(t, time.Duration(0), uptime) + + // Finally, remove our second channel and assert that our peer cleans + // up its in memory set of events. + require.NoError(t, peerLog.removeChannel(chan2)) + require.Equal(t, 0, peerLog.channelCount()) + require.Len(t, peerLog.onlineEvents, 0) } // TestGetOnlinePeriod tests the getOnlinePeriod function. It tests the case @@ -62,21 +104,18 @@ func TestGetOnlinePeriod(t *testing.T) { fourHoursAgo := testNow.Add(time.Hour * -4) threeHoursAgo := testNow.Add(time.Hour * -3) twoHoursAgo := testNow.Add(time.Hour * -2) - oneHourAgo := testNow.Add(time.Hour * -1) tests := []struct { name string - events []*channelEvent + events []*event expectedOnline []*onlinePeriod - openedAt time.Time - closedAt time.Time }{ { name: "no events", }, { name: "start on online period", - events: []*channelEvent{ + events: []*event{ { timestamp: threeHoursAgo, eventType: peerOnlineEvent, @@ -95,7 +134,7 @@ func TestGetOnlinePeriod(t *testing.T) { }, { name: "start on offline period", - events: []*channelEvent{ + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOfflineEvent, @@ -103,8 +142,8 @@ func TestGetOnlinePeriod(t *testing.T) { }, }, { - name: "end on an online period, channel not closed", - events: []*channelEvent{ + name: "end on an online period", + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOnlineEvent, @@ -118,24 +157,8 @@ func TestGetOnlinePeriod(t *testing.T) { }, }, { - name: "end on an online period, channel closed", - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOnlineEvent, - }, - }, - expectedOnline: []*onlinePeriod{ - { - start: fourHoursAgo, - end: oneHourAgo, - }, - }, - closedAt: oneHourAgo, - }, - { - name: "duplicate online events, channel not closed", - events: []*channelEvent{ + name: "duplicate online events", + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOnlineEvent, @@ -153,28 +176,8 @@ func TestGetOnlinePeriod(t *testing.T) { }, }, { - name: "duplicate online events, channel closed", - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOnlineEvent, - }, - { - timestamp: twoHoursAgo, - eventType: peerOnlineEvent, - }, - }, - expectedOnline: []*onlinePeriod{ - { - start: fourHoursAgo, - end: threeHoursAgo, - }, - }, - closedAt: threeHoursAgo, - }, - { - name: "duplicate offline events, channel not closed", - events: []*channelEvent{ + name: "duplicate offline events", + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOfflineEvent, @@ -188,7 +191,7 @@ func TestGetOnlinePeriod(t *testing.T) { }, { name: "duplicate online then offline", - events: []*channelEvent{ + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOnlineEvent, @@ -211,7 +214,7 @@ func TestGetOnlinePeriod(t *testing.T) { }, { name: "duplicate offline then online", - events: []*channelEvent{ + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOfflineEvent, @@ -240,11 +243,9 @@ func TestGetOnlinePeriod(t *testing.T) { t.Run(test.name, func(t *testing.T) { t.Parallel() - score := &chanEventLog{ - events: test.events, - clock: clock.NewTestClock(testNow), - openedAt: test.openedAt, - closedAt: test.closedAt, + score := &peerLog{ + onlineEvents: test.events, + clock: clock.NewTestClock(testNow), } online := score.getOnlinePeriods() @@ -265,18 +266,9 @@ func TestUptime(t *testing.T) { tests := []struct { name string - // opened at is the time the channel was recorded as being open, - // and is never expected to be zero. - openedAt time.Time - - // closed at is the time the channel was recorded as being - // closed, and can have a zero value if the channel is not - // closed. - closedAt time.Time - // events is the set of event log that we are calculating uptime // for. - events []*channelEvent + events []*event // startTime is the beginning of the period that we are // calculating uptime for, it cannot have a zero value. @@ -306,23 +298,8 @@ func TestUptime(t *testing.T) { expectErr: true, }, { - name: "Online event and closed", - openedAt: fourHoursAgo, - closedAt: oneHourAgo, - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOnlineEvent, - }, - }, - startTime: fourHoursAgo, - endTime: testNow, - expectedUptime: time.Hour * 3, - }, - { - name: "Online event and not closed", - openedAt: fourHoursAgo, - events: []*channelEvent{ + name: "online event and no offline", + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOnlineEvent, @@ -333,37 +310,8 @@ func TestUptime(t *testing.T) { expectedUptime: time.Hour * 4, }, { - name: "Offline event and closed", - openedAt: fourHoursAgo, - closedAt: threeHoursAgo, - events: []*channelEvent{ - { - timestamp: fourHoursAgo, - eventType: peerOfflineEvent, - }, - }, - startTime: fourHoursAgo, - endTime: testNow, - }, - { - name: "Online event before close", - openedAt: fourHoursAgo, - closedAt: oneHourAgo, - events: []*channelEvent{ - { - timestamp: twoHoursAgo, - eventType: peerOnlineEvent, - }, - }, - startTime: fourHoursAgo, - endTime: testNow, - expectedUptime: time.Hour, - }, - { - name: "Online then offline event", - openedAt: fourHoursAgo, - closedAt: oneHourAgo, - events: []*channelEvent{ + name: "online then offline event", + events: []*event{ { timestamp: threeHoursAgo, eventType: peerOnlineEvent, @@ -378,10 +326,8 @@ func TestUptime(t *testing.T) { expectedUptime: time.Hour, }, { - name: "Online event before uptime period", - openedAt: fourHoursAgo, - closedAt: oneHourAgo, - events: []*channelEvent{ + name: "online event before uptime period", + events: []*event{ { timestamp: threeHoursAgo, eventType: peerOnlineEvent, @@ -389,12 +335,11 @@ func TestUptime(t *testing.T) { }, startTime: twoHoursAgo, endTime: testNow, - expectedUptime: time.Hour, + expectedUptime: time.Hour * 2, }, { - name: "Offline event after uptime period", - openedAt: fourHoursAgo, - events: []*channelEvent{ + name: "offline event after uptime period", + events: []*event{ { timestamp: fourHoursAgo, eventType: peerOnlineEvent, @@ -409,9 +354,8 @@ func TestUptime(t *testing.T) { expectedUptime: time.Hour * 2, }, { - name: "All events within period", - openedAt: fourHoursAgo, - events: []*channelEvent{ + name: "all events within period", + events: []*event{ { timestamp: twoHoursAgo, eventType: peerOnlineEvent, @@ -422,9 +366,8 @@ func TestUptime(t *testing.T) { expectedUptime: time.Hour, }, { - name: "Multiple online and offline", - openedAt: testNow.Add(time.Hour * -8), - events: []*channelEvent{ + name: "multiple online and offline", + events: []*event{ { timestamp: testNow.Add(time.Hour * -7), eventType: peerOnlineEvent, @@ -456,11 +399,9 @@ func TestUptime(t *testing.T) { test := test t.Run(test.name, func(t *testing.T) { - score := &chanEventLog{ - events: test.events, - clock: clock.NewTestClock(testNow), - openedAt: test.openedAt, - closedAt: test.closedAt, + score := &peerLog{ + onlineEvents: test.events, + clock: clock.NewTestClock(testNow), } uptime, err := score.uptime( diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index bd297fd2..fc05bd21 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -32,6 +32,10 @@ var ( // ErrChannelNotFound is returned when a query is made for a channel // that the event store does not have knowledge of. ErrChannelNotFound = errors.New("channel not found in event store") + + // ErrPeerNotFound is returned when a query is made for a channel + // that has a peer that the event store is not currently tracking. + ErrPeerNotFound = errors.New("peer not found in event store") ) // ChannelEventStore maintains a set of event logs for the node's channels to @@ -39,12 +43,8 @@ var ( type ChannelEventStore struct { cfg *Config - // channels maps channel points to event logs. - channels map[wire.OutPoint]*chanEventLog - - // peers tracks the current online status of peers based on online - // and offline events. - peers map[route.Vertex]bool + // peers tracks all of our currently monitored peers and their channels. + peers map[route.Vertex]peerMonitor // chanInfoRequests serves requests for information about our channel. chanInfoRequests chan channelInfoRequest @@ -77,6 +77,7 @@ type Config struct { } type channelInfoRequest struct { + peer route.Vertex channelPoint wire.OutPoint responseChan chan channelInfoResponse } @@ -92,8 +93,7 @@ type channelInfoResponse struct { func NewChannelEventStore(config *Config) *ChannelEventStore { store := &ChannelEventStore{ cfg: config, - channels: make(map[wire.OutPoint]*chanEventLog), - peers: make(map[route.Vertex]bool), + peers: make(map[route.Vertex]peerMonitor), chanInfoRequests: make(chan channelInfoRequest), quit: make(chan struct{}), } @@ -173,60 +173,56 @@ func (c *ChannelEventStore) Stop() { c.wg.Wait() } -// addChannel adds a new channel to the ChannelEventStore's map of channels with -// an initial peer online state (if the peer is online). If the channel is -// already present in the map, the function returns early. This function should -// be called to add existing channels on startup and when open channel events -// are observed. +// addChannel checks whether we are already tracking a channel's peer, creates a +// new peer log to track it if we are not yet monitoring it, and adds the +// channel. func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, peer route.Vertex) { - // Check for the unexpected case where the channel is already in the - // store. - _, ok := c.channels[channelPoint] - if ok { - log.Errorf("Channel %v duplicated in channel store", - channelPoint) - return + peerMonitor, ok := c.peers[peer] + if !ok { + peerMonitor = newPeerLog(c.cfg.Clock) + c.peers[peer] = peerMonitor } - // Create an event log for the channel. - eventLog := newEventLog(channelPoint, peer, c.cfg.Clock) - - // If the peer is already online, add a peer online event to record - // the starting state of the peer. - if c.peers[peer] { - eventLog.add(peerOnlineEvent) + if err := peerMonitor.addChannel(channelPoint); err != nil { + log.Errorf("could not add channel: %v", err) } - - c.channels[channelPoint] = eventLog } // closeChannel records a closed time for a channel, and returns early is the -// channel is not known to the event store. -func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint) { - // Check for the unexpected case where the channel is unknown to the - // store. - eventLog, ok := c.channels[channelPoint] +// channel is not known to the event store. We log warnings (rather than errors) +// when we cannot find a peer/channel because channels that we restore from a +// static channel backup do not have their open notified, so the event store +// never learns about them, but they are closed using the regular flow so we +// will try to remove them on close. At present, we cannot easily distinguish +// between these closes and others. +func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint, + peer route.Vertex) { + + peerMonitor, ok := c.peers[peer] if !ok { - log.Errorf("Close channel %v unknown to store", channelPoint) + log.Warnf("peer not known to store: %v", peer) return } - eventLog.close() + if err := peerMonitor.removeChannel(channelPoint); err != nil { + log.Warnf("could not remove channel: %v", err) + } } -// peerEvent adds a peer online or offline event to all channels we currently -// have open with a peer. -func (c *ChannelEventStore) peerEvent(peer route.Vertex, event eventType) { - // Track current online status of peers in the channelEventStore. - c.peers[peer] = event == peerOnlineEvent - - for _, eventLog := range c.channels { - if eventLog.peer == peer { - eventLog.add(event) - } +// peerEvent creates a peer monitor for a peer if we do not currently have +// one, and adds an online event to it. +func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) { + // If we are not currently tracking events for this peer, add a peer + // log for it. + peerMonitor, ok := c.peers[peer] + if !ok { + peerMonitor = newPeerLog(c.cfg.Clock) + c.peers[peer] = peerMonitor } + + peerMonitor.onlineEvent(online) } // subscriptions abstracts away from subscription clients to allow for mocking. @@ -268,7 +264,19 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { // A channel has been closed, we must remove the channel // from the store and record a channel closed event. case channelnotifier.ClosedChannelEvent: - c.closeChannel(event.CloseSummary.ChanPoint) + compressed := event.CloseSummary.RemotePub.SerializeCompressed() + peerKey, err := route.NewVertexFromBytes( + compressed, + ) + if err != nil { + log.Errorf("Could not get vertex "+ + "from: %v", compressed) + continue + } + + c.closeChannel( + event.CloseSummary.ChanPoint, peerKey, + ) } // Process peer online and offline events. @@ -278,13 +286,13 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { // and should record an online event for any channels // with that peer. case peernotifier.PeerOnlineEvent: - c.peerEvent(event.PubKey, peerOnlineEvent) + c.peerEvent(event.PubKey, true) // We have lost a connection with our peer, and should // record an offline event for any channels with that // peer. case peernotifier.PeerOfflineEvent: - c.peerEvent(event.PubKey, peerOfflineEvent) + c.peerEvent(event.PubKey, false) } // Serve all requests for channel lifetime. @@ -314,10 +322,11 @@ type ChannelInfo struct { } // GetChanInfo gets all the information we have on a channel in the event store. -func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) ( - *ChannelInfo, error) { +func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint, + peer route.Vertex) (*ChannelInfo, error) { request := channelInfoRequest{ + peer: peer, channelPoint: channelPoint, responseChan: make(chan channelInfoResponse), } @@ -347,26 +356,18 @@ func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) ( func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo, error) { - // Look for the channel in our current set. - channel, ok := c.channels[req.channelPoint] + peerMonitor, ok := c.peers[req.peer] if !ok { - return nil, ErrChannelNotFound + return nil, ErrPeerNotFound } - // If our channel is not closed, we want to calculate uptime until the - // present. - endTime := channel.closedAt - if endTime.IsZero() { - endTime = c.cfg.Clock.Now() - } - - uptime, err := channel.uptime(channel.openedAt, endTime) + lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint) if err != nil { return nil, err } return &ChannelInfo{ - Lifetime: endTime.Sub(channel.openedAt), + Lifetime: lifetime, Uptime: uptime, }, nil } diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 127756f7..771f3bb0 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -107,9 +107,7 @@ func TestMonitorChannelEvents(t *testing.T) { ctx.peerEvent(peer1, true) } - testEventStore(t, gen, 1, map[route.Vertex]bool{ - peer1: true, - }) + testEventStore(t, gen, peer1, 1) }) t.Run("duplicate channel open events", func(t *testing.T) { @@ -119,9 +117,7 @@ func TestMonitorChannelEvents(t *testing.T) { ctx.peerEvent(peer1, true) } - testEventStore(t, gen, 1, map[route.Vertex]bool{ - peer1: true, - }) + testEventStore(t, gen, peer1, 1) }) t.Run("peer online before channel created", func(t *testing.T) { @@ -130,9 +126,7 @@ func TestMonitorChannelEvents(t *testing.T) { ctx.sendChannelOpenedUpdate(pubKey, chan1) } - testEventStore(t, gen, 1, map[route.Vertex]bool{ - peer1: true, - }) + testEventStore(t, gen, peer1, 1) }) t.Run("multiple channels for peer", func(t *testing.T) { @@ -144,9 +138,7 @@ func TestMonitorChannelEvents(t *testing.T) { ctx.sendChannelOpenedUpdate(pubKey, chan2) } - testEventStore(t, gen, 2, map[route.Vertex]bool{ - peer1: false, - }) + testEventStore(t, gen, peer1, 2) }) t.Run("multiple channels for peer, one closed", func(t *testing.T) { @@ -161,18 +153,15 @@ func TestMonitorChannelEvents(t *testing.T) { ctx.peerEvent(peer1, true) } - testEventStore(t, gen, 2, map[route.Vertex]bool{ - peer1: true, - }) + testEventStore(t, gen, peer1, 1) }) } // testEventStore creates a new test contexts, generates a set of events for it -// and tests that it has the number of channels and online state for peers that -// we expect. +// and tests that it has the number of channels we expect. func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx), - expectedChannels int, expectedPeers map[route.Vertex]bool) { + peer route.Vertex, expectedChannels int) { testCtx := newChanEventStoreTestCtx(t) testCtx.start() @@ -182,36 +171,12 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx), // Shutdown the store so that we can safely access the maps in our event // store. testCtx.stop() - require.Len(t, testCtx.store.channels, expectedChannels) - require.Equal(t, expectedPeers, testCtx.store.peers) -} -// TestAddChannel tests that channels are added to the event store with -// appropriate timestamps. This test addresses a bug where offline channels -// did not have an opened time set, and checks that an online event is set for -// peers that are online at the time that a channel is opened. -func TestAddChannel(t *testing.T) { - ctx := newChanEventStoreTestCtx(t) - ctx.start() + // Get our peer and check that it has the channels we expect. + monitor, ok := testCtx.store.peers[peer] + require.True(t, ok) - // Create a channel for a peer that is not online yet. - _, _, channel1 := ctx.createChannel() - - // Get a set of values for another channel, but do not create it yet. - // - peer2, pubkey2, channel2 := ctx.newChannel() - ctx.peerEvent(peer2, true) - ctx.sendChannelOpenedUpdate(pubkey2, channel2) - - ctx.stop() - - // Assert that our peer that was offline on connection has no events - // and our peer that was online on connection has one. - require.Len(t, ctx.store.channels[channel1].events, 0) - - chan2Events := ctx.store.channels[channel2].events - require.Len(t, chan2Events, 1) - require.Equal(t, peerOnlineEvent, chan2Events[0].eventType) + require.Equal(t, expectedChannels, monitor.channelCount()) } // TestGetChanInfo tests the GetChanInfo function for the cases where a channel @@ -232,7 +197,7 @@ func TestGetChanInfo(t *testing.T) { // Try to get info for a channel that has not been opened yet, we // expect to get an error. - _, err := ctx.store.GetChanInfo(channel) + _, err := ctx.store.GetChanInfo(channel, peer) require.Equal(t, ErrChannelNotFound, err) // Now we send our store a notification that a channel has been opened. @@ -242,7 +207,7 @@ func TestGetChanInfo(t *testing.T) { // for the channel to be created so that we do not update our time // before the channel open is processed. require.Eventually(t, func() bool { - _, err = ctx.store.GetChanInfo(channel) + _, err = ctx.store.GetChanInfo(channel, peer) return err == nil }, timeout, time.Millisecond*20) @@ -251,7 +216,7 @@ func TestGetChanInfo(t *testing.T) { ctx.clock.SetTime(now) // At this stage our channel has been open and online for an hour. - info, err := ctx.store.GetChanInfo(channel) + info, err := ctx.store.GetChanInfo(channel, peer) require.NoError(t, err) require.Equal(t, time.Hour, info.Lifetime) require.Equal(t, time.Hour, info.Uptime) @@ -262,7 +227,7 @@ func TestGetChanInfo(t *testing.T) { // Since we have not bumped our mocked time, our uptime calculations // should be the same, even though we've just processed an offline // event. - info, err = ctx.store.GetChanInfo(channel) + info, err = ctx.store.GetChanInfo(channel, peer) require.NoError(t, err) require.Equal(t, time.Hour, info.Lifetime) require.Equal(t, time.Hour, info.Uptime) @@ -273,7 +238,7 @@ func TestGetChanInfo(t *testing.T) { now = now.Add(time.Hour) ctx.clock.SetTime(now) - info, err = ctx.store.GetChanInfo(channel) + info, err = ctx.store.GetChanInfo(channel, peer) require.NoError(t, err) require.Equal(t, time.Hour*2, info.Lifetime) require.Equal(t, time.Hour, info.Uptime) diff --git a/chanfitness/interface.go b/chanfitness/interface.go new file mode 100644 index 00000000..2da9302c --- /dev/null +++ b/chanfitness/interface.go @@ -0,0 +1,29 @@ +package chanfitness + +import ( + "time" + + "github.com/btcsuite/btcd/wire" +) + +// peerMonitor is an interface implemented by entities that monitor our peers +// online events and the channels we currently have open with them. +type peerMonitor interface { + // event adds an online or offline event. + onlineEvent(online bool) + + // addChannel adds a new channel. + addChannel(channelPoint wire.OutPoint) error + + // removeChannel removes a channel. + removeChannel(channelPoint wire.OutPoint) error + + // channelCount returns the number of channels that we currently have + // with the peer. + channelCount() int + + // channelUptime looks up a channel and returns the amount of time that + // the channel has been monitored for and its uptime over this period. + channelUptime(channelPoint wire.OutPoint) (time.Duration, + time.Duration, error) +} diff --git a/rpcserver.go b/rpcserver.go index 69d3513f..e1e7cd0d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3536,12 +3536,17 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, return channel, nil } + peer, err := route.NewVertexFromBytes(nodePub.SerializeCompressed()) + if err != nil { + return nil, err + } + // Query the event store for additional information about the channel. // Do not fail if it is not available, because there is a potential // race between a channel being added to our node and the event store // being notified of it. outpoint := dbChannel.FundingOutpoint - info, err := r.server.chanEventStore.GetChanInfo(outpoint) + info, err := r.server.chanEventStore.GetChanInfo(outpoint, peer) switch err { // If the store does not know about the channel, we just log it. case chanfitness.ErrChannelNotFound: