chanfitness: refactor to store channels by peer

When dealing with online events, we actually need to track our events
by peer, not by channel. All we need to track channels is to have a
set of online events for a peer which at least contain those events.
This change refactors chanfitness to track by peer.
This commit is contained in:
carla 2020-09-08 13:47:18 +02:00
parent 10f9ba952e
commit e05b4a8e2e
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
6 changed files with 358 additions and 344 deletions

@ -6,7 +6,6 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/routing/route"
)
type eventType int
@ -29,74 +28,152 @@ func (e eventType) String() string {
return "unknown"
}
// channelEvent is a a timestamped event which is observed on a per channel
// basis.
type channelEvent struct {
type event struct {
timestamp time.Time
eventType eventType
}
// chanEventLog stores all events that have occurred over a channel's lifetime.
type chanEventLog struct {
// channelPoint is the outpoint for the channel's funding transaction.
channelPoint wire.OutPoint
// peerLog tracks events for a peer and its channels. If we currently have no
// channels with the peer, it will simply track its current online state. If we
// do have channels open with the peer, it will track the peer's online and
// offline events so that we can calculate uptime for our channels. A single
// event log is used for these online and offline events, and uptime for a
// channel is calculated by examining a subsection of this log.
type peerLog struct {
// online stores whether the peer is currently online.
online bool
// peer is the compressed public key of the peer being monitored.
peer route.Vertex
// events is a log of timestamped events observed for the channel.
events []*channelEvent
// onlineEvents is a log of timestamped events observed for the peer.
onlineEvents []*event
// clock allows creation of deterministic unit tests.
clock clock.Clock
// channels contains a set of currently open channels. Channels will be
// added and removed from this map as they are opened and closed.
channels map[wire.OutPoint]*channelInfo
}
// newPeerLog creates a log for a peer.
func newPeerLog(clock clock.Clock) *peerLog {
return &peerLog{
clock: clock,
channels: make(map[wire.OutPoint]*channelInfo),
}
}
// channelInfo contains information about a channel.
type channelInfo struct {
// openedAt tracks the first time this channel was seen. This is not
// necessarily the time that it confirmed on chain because channel
// events are not persisted at present.
openedAt time.Time
// closedAt is the time that the channel was closed. If the channel has
// not been closed yet, it is zero.
closedAt time.Time
}
// newEventLog creates an event log for a channel with the openedAt time set.
func newEventLog(channelPoint wire.OutPoint, peer route.Vertex,
clock clock.Clock) *chanEventLog {
eventlog := &chanEventLog{
channelPoint: channelPoint,
peer: peer,
clock: clock,
openedAt: clock.Now(),
func newChannelInfo(openedAt time.Time) *channelInfo {
return &channelInfo{
openedAt: openedAt,
}
return eventlog
}
// close sets the closing time for an event log.
func (e *chanEventLog) close() {
e.closedAt = e.clock.Now()
}
// onlineEvent records a peer online or offline event in the log.
func (p *peerLog) onlineEvent(online bool) {
p.online = online
// add appends an event with the given type and current time to the event log.
// The open time for the eventLog will be set to the event's timestamp if it is
// not set yet.
func (e *chanEventLog) add(eventType eventType) {
// If the channel is already closed, return early without adding an
// event.
if !e.closedAt.IsZero() {
// If we have no channels currently open with the peer, we do not want
// to commit resources to tracking their online state beyond a simple
// online boolean, so we exit early.
if p.channelCount() == 0 {
return
}
// Add the event to the eventLog with the current timestamp.
event := &channelEvent{
timestamp: e.clock.Now(),
p.addEvent(online, p.clock.Now())
}
// addEvent records an online or offline event in our event log.
func (p *peerLog) addEvent(online bool, time time.Time) {
eventType := peerOnlineEvent
if !online {
eventType = peerOfflineEvent
}
event := &event{
timestamp: time,
eventType: eventType,
}
e.events = append(e.events, event)
log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType)
p.onlineEvents = append(p.onlineEvents, event)
}
// addChannel adds a channel to our log. If we have not tracked any online
// events for our peer yet, we create one with our peer's current online state
// so that we know the state that the peer had at channel start, which is
// required to calculate uptime over the channel's lifetime.
func (p *peerLog) addChannel(channelPoint wire.OutPoint) error {
_, ok := p.channels[channelPoint]
if ok {
return fmt.Errorf("channel: %v already present", channelPoint)
}
openTime := p.clock.Now()
p.channels[channelPoint] = newChannelInfo(openTime)
// If we do not have any online events tracked for our peer (which is
// the case when we have no other channels open with the peer), we add
// an event with the peer's current online state so that we know that
// starting state for this peer when a channel was connected (which
// allows us to calculate uptime over the lifetime of the channel).
if len(p.onlineEvents) == 0 {
p.addEvent(p.online, openTime)
}
return nil
}
// removeChannel removes a channel from our log. If we have no more channels
// with the peer after removing this one, we clear our list of events.
func (p *peerLog) removeChannel(channelPoint wire.OutPoint) error {
_, ok := p.channels[channelPoint]
if !ok {
return fmt.Errorf("channel: %v not present", channelPoint)
}
delete(p.channels, channelPoint)
// If we have no more channels in our event log, we can discard all of
// our online events in memory, since we don't need them anymore.
// TODO(carla): this could be done on a per channel basis.
if p.channelCount() == 0 {
p.onlineEvents = nil
}
return nil
}
// channelCount returns the number of channels that we currently have
// with the peer.
func (p *peerLog) channelCount() int {
return len(p.channels)
}
// channelUptime looks up a channel and returns the amount of time that the
// channel has been monitored for and its uptime over this period.
func (p *peerLog) channelUptime(channelPoint wire.OutPoint) (time.Duration,
time.Duration, error) {
channel, ok := p.channels[channelPoint]
if !ok {
return 0, 0, ErrChannelNotFound
}
now := p.clock.Now()
uptime, err := p.uptime(channel.openedAt, now)
if err != nil {
return 0, 0, err
}
return now.Sub(channel.openedAt), uptime, nil
}
// onlinePeriod represents a period of time over which a peer was online.
@ -112,9 +189,9 @@ type onlinePeriod struct {
// calculated until the present. This function expects the event log provided
// to be ordered by ascending timestamp, and can tolerate multiple consecutive
// online or offline events.
func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod {
func (p *peerLog) getOnlinePeriods() []*onlinePeriod {
// Return early if there are no events, there are no online periods.
if len(e.events) == 0 {
if len(p.onlineEvents) == 0 {
return nil
}
@ -123,7 +200,7 @@ func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod {
// a different type to our own. It is used to determine the
// start time of our online periods when we experience an
// offline event, and to track our last recorded state.
lastEvent *channelEvent
lastEvent *event
onlinePeriods []*onlinePeriod
)
@ -133,7 +210,7 @@ func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod {
// the online event and the present is not tracked. The type of the most
// recent event is tracked using the offline bool so that we can add a
// final online period if necessary.
for _, event := range e.events {
for _, event := range p.onlineEvents {
switch event.eventType {
case peerOnlineEvent:
// If our previous event is nil, we just set it and
@ -188,24 +265,20 @@ func (e *chanEventLog) getOnlinePeriods() []*onlinePeriod {
}
// The log ended on an online event, so we need to add a final online
// event. If the channel is closed, this period is until channel
// closure. It it is still open, we calculate it until the present.
// period which terminates at the present.
finalEvent := &onlinePeriod{
start: lastEvent.timestamp,
end: e.closedAt,
}
if finalEvent.end.IsZero() {
finalEvent.end = e.clock.Now()
end: p.clock.Now(),
}
// Add the final online period to the set and return.
return append(onlinePeriods, finalEvent)
}
// uptime calculates the total uptime we have recorded for a channel over the
// uptime calculates the total uptime we have recorded for a peer over the
// inclusive range specified. An error is returned if the end of the range is
// before the start or a zero end time is returned.
func (e *chanEventLog) uptime(start, end time.Time) (time.Duration, error) {
func (p *peerLog) uptime(start, end time.Time) (time.Duration, error) {
// Error if we are provided with an invalid range to calculate uptime
// for.
if end.Before(start) {
@ -218,7 +291,7 @@ func (e *chanEventLog) uptime(start, end time.Time) (time.Duration, error) {
var uptime time.Duration
for _, p := range e.getOnlinePeriods() {
for _, p := range p.getOnlinePeriods() {
// The online period ends before the range we're looking at, so
// we can skip over it.
if p.end.Before(start) {

@ -4,55 +4,97 @@ import (
"testing"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/clock"
"github.com/stretchr/testify/require"
)
// TestAdd tests adding events to an event log. It tests the case where the
// channel is open, and should have an event added, and the case where it is
// closed and the event should not be added.
func TestAdd(t *testing.T) {
tests := []struct {
name string
eventLog *chanEventLog
event eventType
expectedEvents []*channelEvent
}{
{
name: "Channel open",
eventLog: &chanEventLog{
clock: clock.NewTestClock(testNow),
},
event: peerOnlineEvent,
expectedEvents: []*channelEvent{
{
eventType: peerOnlineEvent,
timestamp: testNow,
},
},
},
{
name: "Channel closed, event not added",
eventLog: &chanEventLog{
clock: clock.NewTestClock(testNow),
closedAt: testNow,
},
event: peerOnlineEvent,
expectedEvents: nil,
},
// TestPeerLog tests the functionality of the peer log struct.
func TestPeerLog(t *testing.T) {
clock := clock.NewTestClock(testNow)
peerLog := newPeerLog(clock)
require.Zero(t, peerLog.channelCount())
require.False(t, peerLog.online)
// Test that looking up an unknown channel fails.
_, _, err := peerLog.channelUptime(wire.OutPoint{Index: 1})
require.Error(t, err)
// Add an offline event, since we have no channels, we do not expect
// to have any online periods recorded for our peer.
peerLog.onlineEvent(false)
require.Len(t, peerLog.getOnlinePeriods(), 0)
// Likewise, if we have an online event, nothing beyond the online state
// of our peer log should change.
peerLog.onlineEvent(true)
require.Len(t, peerLog.getOnlinePeriods(), 0)
// Add a channel and assert that we have one channel listed.
chan1 := wire.OutPoint{
Index: 1,
}
require.NoError(t, peerLog.addChannel(chan1))
require.Equal(t, 1, peerLog.channelCount())
for _, test := range tests {
test := test
// Assert that we can now successfully get our added channel.
_, _, err = peerLog.channelUptime(chan1)
require.NoError(t, err)
t.Run(test.name, func(t *testing.T) {
test.eventLog.add(test.event)
// Bump our test clock's time so that our current time is different to
// channel open time.
now := testNow.Add(time.Hour)
clock.SetTime(now)
require.Equal(
t, test.expectedEvents, test.eventLog.events,
)
})
// Now that we have added a channel and an hour has passed, we expect
// our uptime and lifetime to both equal an hour.
lifetime, uptime, err := peerLog.channelUptime(chan1)
require.NoError(t, err)
require.Equal(t, time.Hour, lifetime)
require.Equal(t, time.Hour, uptime)
// Add an offline event for our peer.
peerLog.onlineEvent(false)
// Now we add another channel to our store and assert that we now report
// two channels for this peer.
chan2 := wire.OutPoint{
Index: 2,
}
require.NoError(t, peerLog.addChannel(chan2))
require.Equal(t, 2, peerLog.channelCount())
// Progress our time again, so that our peer has now been offline for
// two hours.
now = now.Add(time.Hour * 2)
clock.SetTime(now)
// Our first channel should report as having been monitored for three
// hours, but only online for one of those hours.
lifetime, uptime, err = peerLog.channelUptime(chan1)
require.NoError(t, err)
require.Equal(t, time.Hour*3, lifetime)
require.Equal(t, time.Hour, uptime)
// Remove our first channel and check that we can still correctly query
// uptime for the second channel.
require.NoError(t, peerLog.removeChannel(chan1))
require.Equal(t, 1, peerLog.channelCount())
// Our second channel, which was created when our peer was offline,
// should report as having been monitored for two hours, but have zero
// uptime.
lifetime, uptime, err = peerLog.channelUptime(chan2)
require.NoError(t, err)
require.Equal(t, time.Hour*2, lifetime)
require.Equal(t, time.Duration(0), uptime)
// Finally, remove our second channel and assert that our peer cleans
// up its in memory set of events.
require.NoError(t, peerLog.removeChannel(chan2))
require.Equal(t, 0, peerLog.channelCount())
require.Len(t, peerLog.onlineEvents, 0)
}
// TestGetOnlinePeriod tests the getOnlinePeriod function. It tests the case
@ -62,21 +104,18 @@ func TestGetOnlinePeriod(t *testing.T) {
fourHoursAgo := testNow.Add(time.Hour * -4)
threeHoursAgo := testNow.Add(time.Hour * -3)
twoHoursAgo := testNow.Add(time.Hour * -2)
oneHourAgo := testNow.Add(time.Hour * -1)
tests := []struct {
name string
events []*channelEvent
events []*event
expectedOnline []*onlinePeriod
openedAt time.Time
closedAt time.Time
}{
{
name: "no events",
},
{
name: "start on online period",
events: []*channelEvent{
events: []*event{
{
timestamp: threeHoursAgo,
eventType: peerOnlineEvent,
@ -95,7 +134,7 @@ func TestGetOnlinePeriod(t *testing.T) {
},
{
name: "start on offline period",
events: []*channelEvent{
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOfflineEvent,
@ -103,8 +142,8 @@ func TestGetOnlinePeriod(t *testing.T) {
},
},
{
name: "end on an online period, channel not closed",
events: []*channelEvent{
name: "end on an online period",
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
@ -118,24 +157,8 @@ func TestGetOnlinePeriod(t *testing.T) {
},
},
{
name: "end on an online period, channel closed",
events: []*channelEvent{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
},
},
expectedOnline: []*onlinePeriod{
{
start: fourHoursAgo,
end: oneHourAgo,
},
},
closedAt: oneHourAgo,
},
{
name: "duplicate online events, channel not closed",
events: []*channelEvent{
name: "duplicate online events",
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
@ -153,28 +176,8 @@ func TestGetOnlinePeriod(t *testing.T) {
},
},
{
name: "duplicate online events, channel closed",
events: []*channelEvent{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
},
{
timestamp: twoHoursAgo,
eventType: peerOnlineEvent,
},
},
expectedOnline: []*onlinePeriod{
{
start: fourHoursAgo,
end: threeHoursAgo,
},
},
closedAt: threeHoursAgo,
},
{
name: "duplicate offline events, channel not closed",
events: []*channelEvent{
name: "duplicate offline events",
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOfflineEvent,
@ -188,7 +191,7 @@ func TestGetOnlinePeriod(t *testing.T) {
},
{
name: "duplicate online then offline",
events: []*channelEvent{
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
@ -211,7 +214,7 @@ func TestGetOnlinePeriod(t *testing.T) {
},
{
name: "duplicate offline then online",
events: []*channelEvent{
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOfflineEvent,
@ -240,11 +243,9 @@ func TestGetOnlinePeriod(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
score := &chanEventLog{
events: test.events,
clock: clock.NewTestClock(testNow),
openedAt: test.openedAt,
closedAt: test.closedAt,
score := &peerLog{
onlineEvents: test.events,
clock: clock.NewTestClock(testNow),
}
online := score.getOnlinePeriods()
@ -265,18 +266,9 @@ func TestUptime(t *testing.T) {
tests := []struct {
name string
// opened at is the time the channel was recorded as being open,
// and is never expected to be zero.
openedAt time.Time
// closed at is the time the channel was recorded as being
// closed, and can have a zero value if the channel is not
// closed.
closedAt time.Time
// events is the set of event log that we are calculating uptime
// for.
events []*channelEvent
events []*event
// startTime is the beginning of the period that we are
// calculating uptime for, it cannot have a zero value.
@ -306,23 +298,8 @@ func TestUptime(t *testing.T) {
expectErr: true,
},
{
name: "Online event and closed",
openedAt: fourHoursAgo,
closedAt: oneHourAgo,
events: []*channelEvent{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
},
},
startTime: fourHoursAgo,
endTime: testNow,
expectedUptime: time.Hour * 3,
},
{
name: "Online event and not closed",
openedAt: fourHoursAgo,
events: []*channelEvent{
name: "online event and no offline",
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
@ -333,37 +310,8 @@ func TestUptime(t *testing.T) {
expectedUptime: time.Hour * 4,
},
{
name: "Offline event and closed",
openedAt: fourHoursAgo,
closedAt: threeHoursAgo,
events: []*channelEvent{
{
timestamp: fourHoursAgo,
eventType: peerOfflineEvent,
},
},
startTime: fourHoursAgo,
endTime: testNow,
},
{
name: "Online event before close",
openedAt: fourHoursAgo,
closedAt: oneHourAgo,
events: []*channelEvent{
{
timestamp: twoHoursAgo,
eventType: peerOnlineEvent,
},
},
startTime: fourHoursAgo,
endTime: testNow,
expectedUptime: time.Hour,
},
{
name: "Online then offline event",
openedAt: fourHoursAgo,
closedAt: oneHourAgo,
events: []*channelEvent{
name: "online then offline event",
events: []*event{
{
timestamp: threeHoursAgo,
eventType: peerOnlineEvent,
@ -378,10 +326,8 @@ func TestUptime(t *testing.T) {
expectedUptime: time.Hour,
},
{
name: "Online event before uptime period",
openedAt: fourHoursAgo,
closedAt: oneHourAgo,
events: []*channelEvent{
name: "online event before uptime period",
events: []*event{
{
timestamp: threeHoursAgo,
eventType: peerOnlineEvent,
@ -389,12 +335,11 @@ func TestUptime(t *testing.T) {
},
startTime: twoHoursAgo,
endTime: testNow,
expectedUptime: time.Hour,
expectedUptime: time.Hour * 2,
},
{
name: "Offline event after uptime period",
openedAt: fourHoursAgo,
events: []*channelEvent{
name: "offline event after uptime period",
events: []*event{
{
timestamp: fourHoursAgo,
eventType: peerOnlineEvent,
@ -409,9 +354,8 @@ func TestUptime(t *testing.T) {
expectedUptime: time.Hour * 2,
},
{
name: "All events within period",
openedAt: fourHoursAgo,
events: []*channelEvent{
name: "all events within period",
events: []*event{
{
timestamp: twoHoursAgo,
eventType: peerOnlineEvent,
@ -422,9 +366,8 @@ func TestUptime(t *testing.T) {
expectedUptime: time.Hour,
},
{
name: "Multiple online and offline",
openedAt: testNow.Add(time.Hour * -8),
events: []*channelEvent{
name: "multiple online and offline",
events: []*event{
{
timestamp: testNow.Add(time.Hour * -7),
eventType: peerOnlineEvent,
@ -456,11 +399,9 @@ func TestUptime(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
score := &chanEventLog{
events: test.events,
clock: clock.NewTestClock(testNow),
openedAt: test.openedAt,
closedAt: test.closedAt,
score := &peerLog{
onlineEvents: test.events,
clock: clock.NewTestClock(testNow),
}
uptime, err := score.uptime(

@ -32,6 +32,10 @@ var (
// ErrChannelNotFound is returned when a query is made for a channel
// that the event store does not have knowledge of.
ErrChannelNotFound = errors.New("channel not found in event store")
// ErrPeerNotFound is returned when a query is made for a channel
// that has a peer that the event store is not currently tracking.
ErrPeerNotFound = errors.New("peer not found in event store")
)
// ChannelEventStore maintains a set of event logs for the node's channels to
@ -39,12 +43,8 @@ var (
type ChannelEventStore struct {
cfg *Config
// channels maps channel points to event logs.
channels map[wire.OutPoint]*chanEventLog
// peers tracks the current online status of peers based on online
// and offline events.
peers map[route.Vertex]bool
// peers tracks all of our currently monitored peers and their channels.
peers map[route.Vertex]peerMonitor
// chanInfoRequests serves requests for information about our channel.
chanInfoRequests chan channelInfoRequest
@ -77,6 +77,7 @@ type Config struct {
}
type channelInfoRequest struct {
peer route.Vertex
channelPoint wire.OutPoint
responseChan chan channelInfoResponse
}
@ -92,8 +93,7 @@ type channelInfoResponse struct {
func NewChannelEventStore(config *Config) *ChannelEventStore {
store := &ChannelEventStore{
cfg: config,
channels: make(map[wire.OutPoint]*chanEventLog),
peers: make(map[route.Vertex]bool),
peers: make(map[route.Vertex]peerMonitor),
chanInfoRequests: make(chan channelInfoRequest),
quit: make(chan struct{}),
}
@ -173,60 +173,56 @@ func (c *ChannelEventStore) Stop() {
c.wg.Wait()
}
// addChannel adds a new channel to the ChannelEventStore's map of channels with
// an initial peer online state (if the peer is online). If the channel is
// already present in the map, the function returns early. This function should
// be called to add existing channels on startup and when open channel events
// are observed.
// addChannel checks whether we are already tracking a channel's peer, creates a
// new peer log to track it if we are not yet monitoring it, and adds the
// channel.
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
peer route.Vertex) {
// Check for the unexpected case where the channel is already in the
// store.
_, ok := c.channels[channelPoint]
if ok {
log.Errorf("Channel %v duplicated in channel store",
channelPoint)
return
peerMonitor, ok := c.peers[peer]
if !ok {
peerMonitor = newPeerLog(c.cfg.Clock)
c.peers[peer] = peerMonitor
}
// Create an event log for the channel.
eventLog := newEventLog(channelPoint, peer, c.cfg.Clock)
// If the peer is already online, add a peer online event to record
// the starting state of the peer.
if c.peers[peer] {
eventLog.add(peerOnlineEvent)
if err := peerMonitor.addChannel(channelPoint); err != nil {
log.Errorf("could not add channel: %v", err)
}
c.channels[channelPoint] = eventLog
}
// closeChannel records a closed time for a channel, and returns early is the
// channel is not known to the event store.
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint) {
// Check for the unexpected case where the channel is unknown to the
// store.
eventLog, ok := c.channels[channelPoint]
// channel is not known to the event store. We log warnings (rather than errors)
// when we cannot find a peer/channel because channels that we restore from a
// static channel backup do not have their open notified, so the event store
// never learns about them, but they are closed using the regular flow so we
// will try to remove them on close. At present, we cannot easily distinguish
// between these closes and others.
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint,
peer route.Vertex) {
peerMonitor, ok := c.peers[peer]
if !ok {
log.Errorf("Close channel %v unknown to store", channelPoint)
log.Warnf("peer not known to store: %v", peer)
return
}
eventLog.close()
if err := peerMonitor.removeChannel(channelPoint); err != nil {
log.Warnf("could not remove channel: %v", err)
}
}
// peerEvent adds a peer online or offline event to all channels we currently
// have open with a peer.
func (c *ChannelEventStore) peerEvent(peer route.Vertex, event eventType) {
// Track current online status of peers in the channelEventStore.
c.peers[peer] = event == peerOnlineEvent
for _, eventLog := range c.channels {
if eventLog.peer == peer {
eventLog.add(event)
}
// peerEvent creates a peer monitor for a peer if we do not currently have
// one, and adds an online event to it.
func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) {
// If we are not currently tracking events for this peer, add a peer
// log for it.
peerMonitor, ok := c.peers[peer]
if !ok {
peerMonitor = newPeerLog(c.cfg.Clock)
c.peers[peer] = peerMonitor
}
peerMonitor.onlineEvent(online)
}
// subscriptions abstracts away from subscription clients to allow for mocking.
@ -268,7 +264,19 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
// A channel has been closed, we must remove the channel
// from the store and record a channel closed event.
case channelnotifier.ClosedChannelEvent:
c.closeChannel(event.CloseSummary.ChanPoint)
compressed := event.CloseSummary.RemotePub.SerializeCompressed()
peerKey, err := route.NewVertexFromBytes(
compressed,
)
if err != nil {
log.Errorf("Could not get vertex "+
"from: %v", compressed)
continue
}
c.closeChannel(
event.CloseSummary.ChanPoint, peerKey,
)
}
// Process peer online and offline events.
@ -278,13 +286,13 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
// and should record an online event for any channels
// with that peer.
case peernotifier.PeerOnlineEvent:
c.peerEvent(event.PubKey, peerOnlineEvent)
c.peerEvent(event.PubKey, true)
// We have lost a connection with our peer, and should
// record an offline event for any channels with that
// peer.
case peernotifier.PeerOfflineEvent:
c.peerEvent(event.PubKey, peerOfflineEvent)
c.peerEvent(event.PubKey, false)
}
// Serve all requests for channel lifetime.
@ -314,10 +322,11 @@ type ChannelInfo struct {
}
// GetChanInfo gets all the information we have on a channel in the event store.
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) (
*ChannelInfo, error) {
func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint,
peer route.Vertex) (*ChannelInfo, error) {
request := channelInfoRequest{
peer: peer,
channelPoint: channelPoint,
responseChan: make(chan channelInfoResponse),
}
@ -347,26 +356,18 @@ func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint) (
func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
error) {
// Look for the channel in our current set.
channel, ok := c.channels[req.channelPoint]
peerMonitor, ok := c.peers[req.peer]
if !ok {
return nil, ErrChannelNotFound
return nil, ErrPeerNotFound
}
// If our channel is not closed, we want to calculate uptime until the
// present.
endTime := channel.closedAt
if endTime.IsZero() {
endTime = c.cfg.Clock.Now()
}
uptime, err := channel.uptime(channel.openedAt, endTime)
lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint)
if err != nil {
return nil, err
}
return &ChannelInfo{
Lifetime: endTime.Sub(channel.openedAt),
Lifetime: lifetime,
Uptime: uptime,
}, nil
}

@ -107,9 +107,7 @@ func TestMonitorChannelEvents(t *testing.T) {
ctx.peerEvent(peer1, true)
}
testEventStore(t, gen, 1, map[route.Vertex]bool{
peer1: true,
})
testEventStore(t, gen, peer1, 1)
})
t.Run("duplicate channel open events", func(t *testing.T) {
@ -119,9 +117,7 @@ func TestMonitorChannelEvents(t *testing.T) {
ctx.peerEvent(peer1, true)
}
testEventStore(t, gen, 1, map[route.Vertex]bool{
peer1: true,
})
testEventStore(t, gen, peer1, 1)
})
t.Run("peer online before channel created", func(t *testing.T) {
@ -130,9 +126,7 @@ func TestMonitorChannelEvents(t *testing.T) {
ctx.sendChannelOpenedUpdate(pubKey, chan1)
}
testEventStore(t, gen, 1, map[route.Vertex]bool{
peer1: true,
})
testEventStore(t, gen, peer1, 1)
})
t.Run("multiple channels for peer", func(t *testing.T) {
@ -144,9 +138,7 @@ func TestMonitorChannelEvents(t *testing.T) {
ctx.sendChannelOpenedUpdate(pubKey, chan2)
}
testEventStore(t, gen, 2, map[route.Vertex]bool{
peer1: false,
})
testEventStore(t, gen, peer1, 2)
})
t.Run("multiple channels for peer, one closed", func(t *testing.T) {
@ -161,18 +153,15 @@ func TestMonitorChannelEvents(t *testing.T) {
ctx.peerEvent(peer1, true)
}
testEventStore(t, gen, 2, map[route.Vertex]bool{
peer1: true,
})
testEventStore(t, gen, peer1, 1)
})
}
// testEventStore creates a new test contexts, generates a set of events for it
// and tests that it has the number of channels and online state for peers that
// we expect.
// and tests that it has the number of channels we expect.
func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx),
expectedChannels int, expectedPeers map[route.Vertex]bool) {
peer route.Vertex, expectedChannels int) {
testCtx := newChanEventStoreTestCtx(t)
testCtx.start()
@ -182,36 +171,12 @@ func testEventStore(t *testing.T, generateEvents func(*chanEventStoreTestCtx),
// Shutdown the store so that we can safely access the maps in our event
// store.
testCtx.stop()
require.Len(t, testCtx.store.channels, expectedChannels)
require.Equal(t, expectedPeers, testCtx.store.peers)
}
// TestAddChannel tests that channels are added to the event store with
// appropriate timestamps. This test addresses a bug where offline channels
// did not have an opened time set, and checks that an online event is set for
// peers that are online at the time that a channel is opened.
func TestAddChannel(t *testing.T) {
ctx := newChanEventStoreTestCtx(t)
ctx.start()
// Get our peer and check that it has the channels we expect.
monitor, ok := testCtx.store.peers[peer]
require.True(t, ok)
// Create a channel for a peer that is not online yet.
_, _, channel1 := ctx.createChannel()
// Get a set of values for another channel, but do not create it yet.
//
peer2, pubkey2, channel2 := ctx.newChannel()
ctx.peerEvent(peer2, true)
ctx.sendChannelOpenedUpdate(pubkey2, channel2)
ctx.stop()
// Assert that our peer that was offline on connection has no events
// and our peer that was online on connection has one.
require.Len(t, ctx.store.channels[channel1].events, 0)
chan2Events := ctx.store.channels[channel2].events
require.Len(t, chan2Events, 1)
require.Equal(t, peerOnlineEvent, chan2Events[0].eventType)
require.Equal(t, expectedChannels, monitor.channelCount())
}
// TestGetChanInfo tests the GetChanInfo function for the cases where a channel
@ -232,7 +197,7 @@ func TestGetChanInfo(t *testing.T) {
// Try to get info for a channel that has not been opened yet, we
// expect to get an error.
_, err := ctx.store.GetChanInfo(channel)
_, err := ctx.store.GetChanInfo(channel, peer)
require.Equal(t, ErrChannelNotFound, err)
// Now we send our store a notification that a channel has been opened.
@ -242,7 +207,7 @@ func TestGetChanInfo(t *testing.T) {
// for the channel to be created so that we do not update our time
// before the channel open is processed.
require.Eventually(t, func() bool {
_, err = ctx.store.GetChanInfo(channel)
_, err = ctx.store.GetChanInfo(channel, peer)
return err == nil
}, timeout, time.Millisecond*20)
@ -251,7 +216,7 @@ func TestGetChanInfo(t *testing.T) {
ctx.clock.SetTime(now)
// At this stage our channel has been open and online for an hour.
info, err := ctx.store.GetChanInfo(channel)
info, err := ctx.store.GetChanInfo(channel, peer)
require.NoError(t, err)
require.Equal(t, time.Hour, info.Lifetime)
require.Equal(t, time.Hour, info.Uptime)
@ -262,7 +227,7 @@ func TestGetChanInfo(t *testing.T) {
// Since we have not bumped our mocked time, our uptime calculations
// should be the same, even though we've just processed an offline
// event.
info, err = ctx.store.GetChanInfo(channel)
info, err = ctx.store.GetChanInfo(channel, peer)
require.NoError(t, err)
require.Equal(t, time.Hour, info.Lifetime)
require.Equal(t, time.Hour, info.Uptime)
@ -273,7 +238,7 @@ func TestGetChanInfo(t *testing.T) {
now = now.Add(time.Hour)
ctx.clock.SetTime(now)
info, err = ctx.store.GetChanInfo(channel)
info, err = ctx.store.GetChanInfo(channel, peer)
require.NoError(t, err)
require.Equal(t, time.Hour*2, info.Lifetime)
require.Equal(t, time.Hour, info.Uptime)

29
chanfitness/interface.go Normal file

@ -0,0 +1,29 @@
package chanfitness
import (
"time"
"github.com/btcsuite/btcd/wire"
)
// peerMonitor is an interface implemented by entities that monitor our peers
// online events and the channels we currently have open with them.
type peerMonitor interface {
// event adds an online or offline event.
onlineEvent(online bool)
// addChannel adds a new channel.
addChannel(channelPoint wire.OutPoint) error
// removeChannel removes a channel.
removeChannel(channelPoint wire.OutPoint) error
// channelCount returns the number of channels that we currently have
// with the peer.
channelCount() int
// channelUptime looks up a channel and returns the amount of time that
// the channel has been monitored for and its uptime over this period.
channelUptime(channelPoint wire.OutPoint) (time.Duration,
time.Duration, error)
}

@ -3536,12 +3536,17 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
return channel, nil
}
peer, err := route.NewVertexFromBytes(nodePub.SerializeCompressed())
if err != nil {
return nil, err
}
// Query the event store for additional information about the channel.
// Do not fail if it is not available, because there is a potential
// race between a channel being added to our node and the event store
// being notified of it.
outpoint := dbChannel.FundingOutpoint
info, err := r.server.chanEventStore.GetChanInfo(outpoint)
info, err := r.server.chanEventStore.GetChanInfo(outpoint, peer)
switch err {
// If the store does not know about the channel, we just log it.
case chanfitness.ErrChannelNotFound: