From 4f9795e8ae5f4779e856b5e7da91a7542b57f879 Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 17 Dec 2019 17:36:28 +0200 Subject: [PATCH] chanfitness: switch to query by channel outpoint In this commit, the channelEventStore in the channel fitness subsystem is changed to identify channels by their outpoint rather than short channel id. This change is made made becuase outpoints are the preferred way to expose references over rpc, and easier to perform queries within lnd. --- chanfitness/chanevent.go | 17 +++--- chanfitness/chaneventstore.go | 51 +++++++++--------- chanfitness/chaneventstore_test.go | 84 ++++++++++++++++-------------- rpcserver.go | 8 +-- 4 files changed, 88 insertions(+), 72 deletions(-) diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index b91ada03..f4caf7bf 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -36,8 +37,8 @@ type channelEvent struct { // chanEventLog stores all events that have occurred over a channel's lifetime. type chanEventLog struct { - // id is the uint64 of the short channel ID. - id uint64 + // channelPoint is the outpoint for the channel's funding transaction. + channelPoint wire.OutPoint // peer is the compressed public key of the peer being monitored. peer route.Vertex @@ -59,11 +60,13 @@ type chanEventLog struct { closedAt time.Time } -func newEventLog(id uint64, peer route.Vertex, now func() time.Time) *chanEventLog { +func newEventLog(outpoint wire.OutPoint, peer route.Vertex, + now func() time.Time) *chanEventLog { + return &chanEventLog{ - id: id, - peer: peer, - now: now, + channelPoint: outpoint, + peer: peer, + now: now, } } @@ -95,7 +98,7 @@ func (e *chanEventLog) add(eventType eventType) { e.openedAt = event.timestamp } - log.Debugf("Channel %v recording event: %v", e.id, eventType) + log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType) } // onlinePeriod represents a period of time over which a peer was online. diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index 31ac2d56..1853df0b 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/peernotifier" @@ -37,8 +38,8 @@ var ( type ChannelEventStore struct { cfg *Config - // channels maps short channel IDs to event logs. - channels map[uint64]*chanEventLog + // channels maps channel points to event logs. + channels map[wire.OutPoint]*chanEventLog // peers tracks the current online status of peers based on online/offline // events. @@ -76,7 +77,7 @@ type Config struct { // channel's lifespan and a blocking response channel on which the result is // sent. type lifespanRequest struct { - channelID uint64 + channelPoint wire.OutPoint responseChan chan lifespanResponse } @@ -91,7 +92,7 @@ type lifespanResponse struct { // uptimeRequest contains the parameters required to query the store for a // channel's uptime and a blocking response channel on which the result is sent. type uptimeRequest struct { - channelID uint64 + channelPoint wire.OutPoint startTime time.Time endTime time.Time responseChan chan uptimeResponse @@ -110,7 +111,7 @@ type uptimeResponse struct { func NewChannelEventStore(config *Config) *ChannelEventStore { store := &ChannelEventStore{ cfg: config, - channels: make(map[uint64]*chanEventLog), + channels: make(map[wire.OutPoint]*chanEventLog), peers: make(map[route.Vertex]bool), lifespanRequests: make(chan lifespanRequest), uptimeRequests: make(chan uptimeRequest), @@ -167,7 +168,7 @@ func (c *ChannelEventStore) Start() error { // Add existing channels to the channel store with an initial peer // online or offline event. - c.addChannel(ch.ShortChanID().ToUint64(), peerKey) + c.addChannel(ch.FundingOutpoint, peerKey) } // Start a goroutine that consumes events from all subscriptions. @@ -196,15 +197,17 @@ func (c *ChannelEventStore) Stop() { // already present in the map, the function returns early. This function should // be called to add existing channels on startup and when open channel events // are observed. -func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) { +func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, + peer route.Vertex) { + // Check for the unexpected case where the channel is already in the store. - _, ok := c.channels[channelID] + _, ok := c.channels[channelPoint] if ok { - log.Errorf("Channel %v duplicated in channel store", channelID) + log.Errorf("Channel %v duplicated in channel store", channelPoint) return } - eventLog := newEventLog(channelID, peer, time.Now) + eventLog := newEventLog(channelPoint, peer, time.Now) // If the peer is online, add a peer online event to indicate its starting // state. @@ -213,16 +216,16 @@ func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) { eventLog.add(peerOnlineEvent) } - c.channels[channelID] = eventLog + c.channels[channelPoint] = eventLog } // closeChannel records a closed time for a channel, and returns early is the // channel is not known to the event store. -func (c *ChannelEventStore) closeChannel(channelID uint64) { +func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint) { // Check for the unexpected case where the channel is unknown to the store. - eventLog, ok := c.channels[channelID] + eventLog, ok := c.channels[channelPoint] if !ok { - log.Errorf("Close channel %v unknown to store", channelID) + log.Errorf("Close channel %v unknown to store", channelPoint) return } @@ -265,8 +268,6 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { // A new channel has been opened, we must add the channel to the // store and record a channel open event. case channelnotifier.OpenChannelEvent: - chanID := event.Channel.ShortChanID().ToUint64() - peerKey, err := route.NewVertexFromBytes( event.Channel.IdentityPub.SerializeCompressed(), ) @@ -275,12 +276,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { event.Channel.IdentityPub.SerializeCompressed()) } - c.addChannel(chanID, peerKey) + c.addChannel(event.Channel.FundingOutpoint, peerKey) // A channel has been closed, we must remove the channel from the // store and record a channel closed event. case channelnotifier.ClosedChannelEvent: - c.closeChannel(event.CloseSummary.ShortChanID.ToUint64()) + c.closeChannel(event.CloseSummary.ChanPoint) } // Process peer online and offline events. @@ -301,7 +302,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { case req := <-c.lifespanRequests: var resp lifespanResponse - channel, ok := c.channels[req.channelID] + channel, ok := c.channels[req.channelPoint] if !ok { resp.err = ErrChannelNotFound } else { @@ -315,7 +316,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { case req := <-c.uptimeRequests: var resp uptimeResponse - channel, ok := c.channels[req.channelID] + channel, ok := c.channels[req.channelPoint] if !ok { resp.err = ErrChannelNotFound } else { @@ -336,9 +337,11 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { // GetLifespan returns the opening and closing time observed for a channel and // a boolean to indicate whether the channel is known the the event store. If // the channel is still open, a zero close time is returned. -func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, error) { +func (c *ChannelEventStore) GetLifespan( + channelPoint wire.OutPoint) (time.Time, time.Time, error) { + request := lifespanRequest{ - channelID: chanID, + channelPoint: channelPoint, responseChan: make(chan lifespanResponse), } @@ -364,11 +367,11 @@ func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, er // GetUptime returns the uptime of a channel over a period and an error if the // channel cannot be found or the uptime calculation fails. -func (c *ChannelEventStore) GetUptime(chanID uint64, startTime, +func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime, endTime time.Time) (time.Duration, error) { request := uptimeRequest{ - channelID: chanID, + channelPoint: channelPoint, startTime: startTime, endTime: endTime, responseChan: make(chan uptimeResponse), diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 6bce3c8c..ce52c736 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -6,9 +6,10 @@ import ( "time" "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" - "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" @@ -74,13 +75,11 @@ func TestStartStoreError(t *testing.T) { } } -// TestMonitorChannelEvents tests the store's handling of channel and peer -// events. It tests for the unexpected cases where we receive a channel open for -// an already known channel and but does not test for closing an unknown channel -// because it would require custom logic in the test to prevent iterating -// through an eventLog which does not exist. This test does not test handling -// of uptime and lifespan requests, as they are tested in their own tests. -func TestMonitorChannelEvents(t *testing.T) { +// getTestChannel returns a non-zero peer pubKey, serialized pubKey and channel +// outpoint for testing. +func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex, + wire.OutPoint) { + privKey, err := btcec.NewPrivateKey(btcec.S256()) if err != nil { t.Fatalf("Error getting pubkey: %v", err) @@ -93,11 +92,20 @@ func TestMonitorChannelEvents(t *testing.T) { t.Fatalf("Could not create vertex: %v", err) } - shortID := lnwire.ShortChannelID{ - BlockHeight: 1234, - TxIndex: 2, - TxPosition: 2, + return privKey.PubKey(), pubKey, wire.OutPoint{ + Hash: [chainhash.HashSize]byte{1, 2, 3}, + Index: 0, } +} + +// TestMonitorChannelEvents tests the store's handling of channel and peer +// events. It tests for the unexpected cases where we receive a channel open for +// an already known channel and but does not test for closing an unknown channel +// because it would require custom logic in the test to prevent iterating +// through an eventLog which does not exist. This test does not test handling +// of uptime and lifespan requests, as they are tested in their own tests. +func TestMonitorChannelEvents(t *testing.T) { + pubKey, vertex, chanPoint := getTestChannel(t) tests := []struct { name string @@ -118,13 +126,13 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex} }, expectedEvents: []eventType{peerOnlineEvent}, }, @@ -134,19 +142,19 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex} // Add a duplicate channel open event. channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } }, @@ -156,13 +164,13 @@ func TestMonitorChannelEvents(t *testing.T) { name: "Channel opened, peer already online", generateEvents: func(channelEvents, peerEvents chan<- interface{}) { // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex} // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } }, @@ -175,18 +183,18 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex} // Add a close channel event. channelEvents <- channelnotifier.ClosedChannelEvent{ CloseSummary: &channeldb.ChannelCloseSummary{ - ShortChanID: shortID, + ChanPoint: chanPoint, }, } }, @@ -198,20 +206,20 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a close channel event. channelEvents <- channelnotifier.ClosedChannelEvent{ CloseSummary: &channeldb.ChannelCloseSummary{ - ShortChanID: shortID, + ChanPoint: chanPoint, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex} }, }, } @@ -242,7 +250,7 @@ func TestMonitorChannelEvents(t *testing.T) { // Retrieve the eventLog for the channel and check that its // contents are as expected. - eventLog, ok := store.channels[shortID.ToUint64()] + eventLog, ok := store.channels[chanPoint] if !ok { t.Fatalf("Expected to find event store") } @@ -265,7 +273,7 @@ func TestGetLifetime(t *testing.T) { tests := []struct { name string channelFound bool - chanID uint64 + channelPoint wire.OutPoint opened time.Time closed time.Time expectedError error @@ -304,13 +312,13 @@ func TestGetLifetime(t *testing.T) { // Add channel to eventStore if the test indicates that it should // be present. if test.channelFound { - store.channels[test.chanID] = &chanEventLog{ + store.channels[test.channelPoint] = &chanEventLog{ openedAt: test.opened, closedAt: test.closed, } } - open, close, err := store.GetLifespan(test.chanID) + open, close, err := store.GetLifespan(test.channelPoint) if test.expectedError != err { t.Fatalf("Expected: %v, got: %v", test.expectedError, err) } @@ -341,7 +349,7 @@ func TestGetUptime(t *testing.T) { tests := []struct { name string - chanID uint64 + channelPoint wire.OutPoint // events is the set of events we expect to find in the channel store. events []*channelEvent @@ -437,7 +445,7 @@ func TestGetUptime(t *testing.T) { // Add the channel to the store if it is intended to be found. if test.channelFound { - store.channels[test.chanID] = &chanEventLog{ + store.channels[test.channelPoint] = &chanEventLog{ events: test.events, now: func() time.Time { return now }, openedAt: test.openedAt, @@ -445,7 +453,7 @@ func TestGetUptime(t *testing.T) { } } - uptime, err := store.GetUptime(test.chanID, test.startTime, test.endTime) + uptime, err := store.GetUptime(test.channelPoint, test.startTime, test.endTime) if test.expectedError != err { t.Fatalf("Expected: %v, got: %v", test.expectedError, err) } diff --git a/rpcserver.go b/rpcserver.go index 120418c7..6cb2e955 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2852,14 +2852,16 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, channel.UnsettledBalance += channel.PendingHtlcs[i].Amount } + outpoint := dbChannel.FundingOutpoint + // Get the lifespan observed by the channel event store. If the channel is // not known to the channel event store, return early because we cannot // calculate any further uptime information. - startTime, endTime, err := r.server.chanEventStore.GetLifespan(channel.ChanId) + startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint) switch err { case chanfitness.ErrChannelNotFound: rpcsLog.Infof("channel: %v not found by channel event store", - channel.ChanId) + outpoint) return channel, nil case nil: @@ -2880,7 +2882,7 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, // channel is known to the event store, so we can return any non-nil error // that occurs. uptime, err := r.server.chanEventStore.GetUptime( - channel.ChanId, startTime, endTime, + outpoint, startTime, endTime, ) if err != nil { return nil, err