chanfitness: switch to query by channel outpoint

In this commit, the channelEventStore in the channel
fitness subsystem is changed to identify channels
by their outpoint rather than short channel id. This
change is made made becuase outpoints are the preferred
way to expose references over rpc, and easier to perform
queries within lnd.
This commit is contained in:
carla 2019-12-17 17:36:28 +02:00
parent 47e700ba9e
commit 4f9795e8ae
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
4 changed files with 88 additions and 72 deletions

@ -4,6 +4,7 @@ import (
"fmt"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/routing/route"
)
@ -36,8 +37,8 @@ type channelEvent struct {
// chanEventLog stores all events that have occurred over a channel's lifetime.
type chanEventLog struct {
// id is the uint64 of the short channel ID.
id uint64
// channelPoint is the outpoint for the channel's funding transaction.
channelPoint wire.OutPoint
// peer is the compressed public key of the peer being monitored.
peer route.Vertex
@ -59,11 +60,13 @@ type chanEventLog struct {
closedAt time.Time
}
func newEventLog(id uint64, peer route.Vertex, now func() time.Time) *chanEventLog {
func newEventLog(outpoint wire.OutPoint, peer route.Vertex,
now func() time.Time) *chanEventLog {
return &chanEventLog{
id: id,
peer: peer,
now: now,
channelPoint: outpoint,
peer: peer,
now: now,
}
}
@ -95,7 +98,7 @@ func (e *chanEventLog) add(eventType eventType) {
e.openedAt = event.timestamp
}
log.Debugf("Channel %v recording event: %v", e.id, eventType)
log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType)
}
// onlinePeriod represents a period of time over which a peer was online.

@ -15,6 +15,7 @@ import (
"sync"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/peernotifier"
@ -37,8 +38,8 @@ var (
type ChannelEventStore struct {
cfg *Config
// channels maps short channel IDs to event logs.
channels map[uint64]*chanEventLog
// channels maps channel points to event logs.
channels map[wire.OutPoint]*chanEventLog
// peers tracks the current online status of peers based on online/offline
// events.
@ -76,7 +77,7 @@ type Config struct {
// channel's lifespan and a blocking response channel on which the result is
// sent.
type lifespanRequest struct {
channelID uint64
channelPoint wire.OutPoint
responseChan chan lifespanResponse
}
@ -91,7 +92,7 @@ type lifespanResponse struct {
// uptimeRequest contains the parameters required to query the store for a
// channel's uptime and a blocking response channel on which the result is sent.
type uptimeRequest struct {
channelID uint64
channelPoint wire.OutPoint
startTime time.Time
endTime time.Time
responseChan chan uptimeResponse
@ -110,7 +111,7 @@ type uptimeResponse struct {
func NewChannelEventStore(config *Config) *ChannelEventStore {
store := &ChannelEventStore{
cfg: config,
channels: make(map[uint64]*chanEventLog),
channels: make(map[wire.OutPoint]*chanEventLog),
peers: make(map[route.Vertex]bool),
lifespanRequests: make(chan lifespanRequest),
uptimeRequests: make(chan uptimeRequest),
@ -167,7 +168,7 @@ func (c *ChannelEventStore) Start() error {
// Add existing channels to the channel store with an initial peer
// online or offline event.
c.addChannel(ch.ShortChanID().ToUint64(), peerKey)
c.addChannel(ch.FundingOutpoint, peerKey)
}
// Start a goroutine that consumes events from all subscriptions.
@ -196,15 +197,17 @@ func (c *ChannelEventStore) Stop() {
// already present in the map, the function returns early. This function should
// be called to add existing channels on startup and when open channel events
// are observed.
func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) {
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
peer route.Vertex) {
// Check for the unexpected case where the channel is already in the store.
_, ok := c.channels[channelID]
_, ok := c.channels[channelPoint]
if ok {
log.Errorf("Channel %v duplicated in channel store", channelID)
log.Errorf("Channel %v duplicated in channel store", channelPoint)
return
}
eventLog := newEventLog(channelID, peer, time.Now)
eventLog := newEventLog(channelPoint, peer, time.Now)
// If the peer is online, add a peer online event to indicate its starting
// state.
@ -213,16 +216,16 @@ func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) {
eventLog.add(peerOnlineEvent)
}
c.channels[channelID] = eventLog
c.channels[channelPoint] = eventLog
}
// closeChannel records a closed time for a channel, and returns early is the
// channel is not known to the event store.
func (c *ChannelEventStore) closeChannel(channelID uint64) {
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint) {
// Check for the unexpected case where the channel is unknown to the store.
eventLog, ok := c.channels[channelID]
eventLog, ok := c.channels[channelPoint]
if !ok {
log.Errorf("Close channel %v unknown to store", channelID)
log.Errorf("Close channel %v unknown to store", channelPoint)
return
}
@ -265,8 +268,6 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
// A new channel has been opened, we must add the channel to the
// store and record a channel open event.
case channelnotifier.OpenChannelEvent:
chanID := event.Channel.ShortChanID().ToUint64()
peerKey, err := route.NewVertexFromBytes(
event.Channel.IdentityPub.SerializeCompressed(),
)
@ -275,12 +276,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
event.Channel.IdentityPub.SerializeCompressed())
}
c.addChannel(chanID, peerKey)
c.addChannel(event.Channel.FundingOutpoint, peerKey)
// A channel has been closed, we must remove the channel from the
// store and record a channel closed event.
case channelnotifier.ClosedChannelEvent:
c.closeChannel(event.CloseSummary.ShortChanID.ToUint64())
c.closeChannel(event.CloseSummary.ChanPoint)
}
// Process peer online and offline events.
@ -301,7 +302,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
case req := <-c.lifespanRequests:
var resp lifespanResponse
channel, ok := c.channels[req.channelID]
channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
@ -315,7 +316,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
case req := <-c.uptimeRequests:
var resp uptimeResponse
channel, ok := c.channels[req.channelID]
channel, ok := c.channels[req.channelPoint]
if !ok {
resp.err = ErrChannelNotFound
} else {
@ -336,9 +337,11 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
// GetLifespan returns the opening and closing time observed for a channel and
// a boolean to indicate whether the channel is known the the event store. If
// the channel is still open, a zero close time is returned.
func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, error) {
func (c *ChannelEventStore) GetLifespan(
channelPoint wire.OutPoint) (time.Time, time.Time, error) {
request := lifespanRequest{
channelID: chanID,
channelPoint: channelPoint,
responseChan: make(chan lifespanResponse),
}
@ -364,11 +367,11 @@ func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, er
// GetUptime returns the uptime of a channel over a period and an error if the
// channel cannot be found or the uptime calculation fails.
func (c *ChannelEventStore) GetUptime(chanID uint64, startTime,
func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime,
endTime time.Time) (time.Duration, error) {
request := uptimeRequest{
channelID: chanID,
channelPoint: channelPoint,
startTime: startTime,
endTime: endTime,
responseChan: make(chan uptimeResponse),

@ -6,9 +6,10 @@ import (
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/subscribe"
@ -74,13 +75,11 @@ func TestStartStoreError(t *testing.T) {
}
}
// TestMonitorChannelEvents tests the store's handling of channel and peer
// events. It tests for the unexpected cases where we receive a channel open for
// an already known channel and but does not test for closing an unknown channel
// because it would require custom logic in the test to prevent iterating
// through an eventLog which does not exist. This test does not test handling
// of uptime and lifespan requests, as they are tested in their own tests.
func TestMonitorChannelEvents(t *testing.T) {
// getTestChannel returns a non-zero peer pubKey, serialized pubKey and channel
// outpoint for testing.
func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex,
wire.OutPoint) {
privKey, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
t.Fatalf("Error getting pubkey: %v", err)
@ -93,11 +92,20 @@ func TestMonitorChannelEvents(t *testing.T) {
t.Fatalf("Could not create vertex: %v", err)
}
shortID := lnwire.ShortChannelID{
BlockHeight: 1234,
TxIndex: 2,
TxPosition: 2,
return privKey.PubKey(), pubKey, wire.OutPoint{
Hash: [chainhash.HashSize]byte{1, 2, 3},
Index: 0,
}
}
// TestMonitorChannelEvents tests the store's handling of channel and peer
// events. It tests for the unexpected cases where we receive a channel open for
// an already known channel and but does not test for closing an unknown channel
// because it would require custom logic in the test to prevent iterating
// through an eventLog which does not exist. This test does not test handling
// of uptime and lifespan requests, as they are tested in their own tests.
func TestMonitorChannelEvents(t *testing.T) {
pubKey, vertex, chanPoint := getTestChannel(t)
tests := []struct {
name string
@ -118,13 +126,13 @@ func TestMonitorChannelEvents(t *testing.T) {
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
ShortChannelID: shortID,
IdentityPub: privKey.PubKey(),
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex}
},
expectedEvents: []eventType{peerOnlineEvent},
},
@ -134,19 +142,19 @@ func TestMonitorChannelEvents(t *testing.T) {
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
ShortChannelID: shortID,
IdentityPub: privKey.PubKey(),
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex}
// Add a duplicate channel open event.
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
ShortChannelID: shortID,
IdentityPub: privKey.PubKey(),
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
},
@ -156,13 +164,13 @@ func TestMonitorChannelEvents(t *testing.T) {
name: "Channel opened, peer already online",
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
// Add a peer online event.
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex}
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
ShortChannelID: shortID,
IdentityPub: privKey.PubKey(),
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
},
@ -175,18 +183,18 @@ func TestMonitorChannelEvents(t *testing.T) {
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
ShortChannelID: shortID,
IdentityPub: privKey.PubKey(),
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey}
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex}
// Add a close channel event.
channelEvents <- channelnotifier.ClosedChannelEvent{
CloseSummary: &channeldb.ChannelCloseSummary{
ShortChanID: shortID,
ChanPoint: chanPoint,
},
}
},
@ -198,20 +206,20 @@ func TestMonitorChannelEvents(t *testing.T) {
// Add an open channel event
channelEvents <- channelnotifier.OpenChannelEvent{
Channel: &channeldb.OpenChannel{
ShortChannelID: shortID,
IdentityPub: privKey.PubKey(),
FundingOutpoint: chanPoint,
IdentityPub: pubKey,
},
}
// Add a close channel event.
channelEvents <- channelnotifier.ClosedChannelEvent{
CloseSummary: &channeldb.ChannelCloseSummary{
ShortChanID: shortID,
ChanPoint: chanPoint,
},
}
// Add a peer online event.
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey}
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex}
},
},
}
@ -242,7 +250,7 @@ func TestMonitorChannelEvents(t *testing.T) {
// Retrieve the eventLog for the channel and check that its
// contents are as expected.
eventLog, ok := store.channels[shortID.ToUint64()]
eventLog, ok := store.channels[chanPoint]
if !ok {
t.Fatalf("Expected to find event store")
}
@ -265,7 +273,7 @@ func TestGetLifetime(t *testing.T) {
tests := []struct {
name string
channelFound bool
chanID uint64
channelPoint wire.OutPoint
opened time.Time
closed time.Time
expectedError error
@ -304,13 +312,13 @@ func TestGetLifetime(t *testing.T) {
// Add channel to eventStore if the test indicates that it should
// be present.
if test.channelFound {
store.channels[test.chanID] = &chanEventLog{
store.channels[test.channelPoint] = &chanEventLog{
openedAt: test.opened,
closedAt: test.closed,
}
}
open, close, err := store.GetLifespan(test.chanID)
open, close, err := store.GetLifespan(test.channelPoint)
if test.expectedError != err {
t.Fatalf("Expected: %v, got: %v", test.expectedError, err)
}
@ -341,7 +349,7 @@ func TestGetUptime(t *testing.T) {
tests := []struct {
name string
chanID uint64
channelPoint wire.OutPoint
// events is the set of events we expect to find in the channel store.
events []*channelEvent
@ -437,7 +445,7 @@ func TestGetUptime(t *testing.T) {
// Add the channel to the store if it is intended to be found.
if test.channelFound {
store.channels[test.chanID] = &chanEventLog{
store.channels[test.channelPoint] = &chanEventLog{
events: test.events,
now: func() time.Time { return now },
openedAt: test.openedAt,
@ -445,7 +453,7 @@ func TestGetUptime(t *testing.T) {
}
}
uptime, err := store.GetUptime(test.chanID, test.startTime, test.endTime)
uptime, err := store.GetUptime(test.channelPoint, test.startTime, test.endTime)
if test.expectedError != err {
t.Fatalf("Expected: %v, got: %v", test.expectedError, err)
}

@ -2852,14 +2852,16 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
channel.UnsettledBalance += channel.PendingHtlcs[i].Amount
}
outpoint := dbChannel.FundingOutpoint
// Get the lifespan observed by the channel event store. If the channel is
// not known to the channel event store, return early because we cannot
// calculate any further uptime information.
startTime, endTime, err := r.server.chanEventStore.GetLifespan(channel.ChanId)
startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint)
switch err {
case chanfitness.ErrChannelNotFound:
rpcsLog.Infof("channel: %v not found by channel event store",
channel.ChanId)
outpoint)
return channel, nil
case nil:
@ -2880,7 +2882,7 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
// channel is known to the event store, so we can return any non-nil error
// that occurs.
uptime, err := r.server.chanEventStore.GetUptime(
channel.ChanId, startTime, endTime,
outpoint, startTime, endTime,
)
if err != nil {
return nil, err