From ec12463a950d77216e878812222d19ed5061f926 Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 17 Dec 2019 17:36:21 +0200 Subject: [PATCH 1/4] chanfitness: define and export ErrChannelNotFound --- chanfitness/chaneventstore.go | 17 +++++--- chanfitness/chaneventstore_test.go | 64 +++++++++++++++--------------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index a4339765..31ac2d56 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -12,7 +12,6 @@ package chanfitness import ( "errors" - "fmt" "sync" "time" @@ -23,9 +22,15 @@ import ( "github.com/lightningnetwork/lnd/subscribe" ) -// errShuttingDown is returned when the store cannot respond to a query because -// it has received the shutdown signal. -var errShuttingDown = errors.New("channel event store shutting down") +var ( + // errShuttingDown is returned when the store cannot respond to a query because + // it has received the shutdown signal. + errShuttingDown = errors.New("channel event store shutting down") + + // ErrChannelNotFound is returned when a query is made for a channel that + // the event store does not have knowledge of. + ErrChannelNotFound = errors.New("channel not found in event store") +) // ChannelEventStore maintains a set of event logs for the node's channels to // provide insight into the performance and health of channels. @@ -298,7 +303,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { channel, ok := c.channels[req.channelID] if !ok { - resp.err = fmt.Errorf("channel %v not found", req.channelID) + resp.err = ErrChannelNotFound } else { resp.start = channel.openedAt resp.end = channel.closedAt @@ -312,7 +317,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { channel, ok := c.channels[req.channelID] if !ok { - resp.err = fmt.Errorf("channel %v not found", req.channelID) + resp.err = ErrChannelNotFound } else { uptime, err := channel.uptime(req.startTime, req.endTime) resp.uptime = uptime diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index a30f7d15..6bce3c8c 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -263,23 +263,23 @@ func TestGetLifetime(t *testing.T) { now := time.Now() tests := []struct { - name string - channelFound bool - chanID uint64 - opened time.Time - closed time.Time - expectErr bool + name string + channelFound bool + chanID uint64 + opened time.Time + closed time.Time + expectedError error }{ { - name: "Channel found", - channelFound: true, - opened: now, - closed: now.Add(time.Hour * -1), - expectErr: false, + name: "Channel found", + channelFound: true, + opened: now, + closed: now.Add(time.Hour * -1), + expectedError: nil, }, { - name: "Channel not found", - expectErr: true, + name: "Channel not found", + expectedError: ErrChannelNotFound, }, } @@ -311,11 +311,8 @@ func TestGetLifetime(t *testing.T) { } open, close, err := store.GetLifespan(test.chanID) - if test.expectErr && err == nil { - t.Fatal("Expected an error, got nil") - } - if !test.expectErr && err != nil { - t.Fatalf("Expected no error, got: %v", err) + if test.expectedError != err { + t.Fatalf("Expected: %v, got: %v", test.expectedError, err) } if open != test.opened { @@ -367,13 +364,15 @@ func TestGetUptime(t *testing.T) { endTime time.Time expectedUptime time.Duration - expectErr bool + + expectedError error }{ { - name: "No events", - startTime: twoHoursAgo, - endTime: now, - channelFound: true, + name: "No events", + startTime: twoHoursAgo, + endTime: now, + channelFound: true, + expectedError: nil, }, { name: "50% Uptime", @@ -392,6 +391,7 @@ func TestGetUptime(t *testing.T) { startTime: fourHoursAgo, endTime: now, channelFound: true, + expectedError: nil, }, { name: "Zero start time", @@ -405,13 +405,14 @@ func TestGetUptime(t *testing.T) { expectedUptime: time.Hour * 4, endTime: now, channelFound: true, + expectedError: nil, }, { - name: "Channel not found", - startTime: twoHoursAgo, - endTime: now, - channelFound: false, - expectErr: true, + name: "Channel not found", + startTime: twoHoursAgo, + endTime: now, + channelFound: false, + expectedError: ErrChannelNotFound, }, } @@ -445,11 +446,8 @@ func TestGetUptime(t *testing.T) { } uptime, err := store.GetUptime(test.chanID, test.startTime, test.endTime) - if test.expectErr && err == nil { - t.Fatal("Expected an error, got nil") - } - if !test.expectErr && err != nil { - t.Fatalf("Expcted no error, got: %v", err) + if test.expectedError != err { + t.Fatalf("Expected: %v, got: %v", test.expectedError, err) } if uptime != test.expectedUptime { From 47e700ba9ec0aa68999317cb5143d2c30426200e Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 17 Dec 2019 17:36:27 +0200 Subject: [PATCH 2/4] rpcserver: return uptime errors for known channels Upgrade logging of channel uptime/lifespan query errors to returning the error, with the exception of the case where the channel is known to the store. This error is logged, due to the potential for race conditions between the channel event store being notified of a new open channel and the rpc channel subscription creating a rpc channel struct (which queries for uptime). --- rpcserver.go | 86 +++++++++++++++++++++++++++++----------------------- 1 file changed, 48 insertions(+), 38 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 97367db3..120418c7 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -32,6 +32,7 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/chanfitness" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" @@ -2752,7 +2753,10 @@ func (r *rpcServer) ListChannels(ctx context.Context, // Next, we'll determine whether we should add this channel to // our list depending on the type of channels requested to us. isActive := peerOnline && linkActive - channel := createRPCOpenChannel(r, graph, dbChannel, isActive) + channel, err := createRPCOpenChannel(r, graph, dbChannel, isActive) + if err != nil { + return nil, err + } // We'll only skip returning this channel if we were requested // for a specific kind and this channel doesn't satisfy it. @@ -2775,7 +2779,7 @@ func (r *rpcServer) ListChannels(ctx context.Context, // createRPCOpenChannel creates an *lnrpc.Channel from the *channeldb.Channel. func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, - dbChannel *channeldb.OpenChannel, isActive bool) *lnrpc.Channel { + dbChannel *channeldb.OpenChannel, isActive bool) (*lnrpc.Channel, error) { nodePub := dbChannel.IdentityPub nodeID := hex.EncodeToString(nodePub.SerializeCompressed()) @@ -2810,43 +2814,12 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, } externalCommitFee := dbChannel.Capacity - sumOutputs - chanID := dbChannel.ShortChannelID.ToUint64() - - var ( - uptime time.Duration - lifespan time.Duration - ) - - // Get the lifespan observed by the channel event store. - startTime, endTime, err := r.server.chanEventStore.GetLifespan(chanID) - if err != nil { - // If the channel cannot be found, log an error and do not perform - // further calculations for uptime and lifespan. - rpcsLog.Warnf("GetLifespan %v error: %v", chanID, err) - } else { - // If endTime is zero, the channel is still open, progress endTime to - // the present so we can calculate lifespan. - if endTime.IsZero() { - endTime = time.Now() - } - lifespan = endTime.Sub(startTime) - - uptime, err = r.server.chanEventStore.GetUptime( - chanID, - startTime, - endTime, - ) - if err != nil { - rpcsLog.Warnf("GetUptime %v error: %v", chanID, err) - } - } - channel := &lnrpc.Channel{ Active: isActive, Private: !isPublic, RemotePubkey: nodeID, ChannelPoint: chanPoint.String(), - ChanId: chanID, + ChanId: dbChannel.ShortChannelID.ToUint64(), Capacity: int64(dbChannel.Capacity), LocalBalance: int64(localBalance.ToSatoshis()), RemoteBalance: int64(remoteBalance.ToSatoshis()), @@ -2863,8 +2836,6 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, LocalChanReserveSat: int64(dbChannel.LocalChanCfg.ChanReserve), RemoteChanReserveSat: int64(dbChannel.RemoteChanCfg.ChanReserve), StaticRemoteKey: dbChannel.ChanType.IsTweakless(), - Lifetime: int64(lifespan.Seconds()), - Uptime: int64(uptime.Seconds()), } for i, htlc := range localCommit.Htlcs { @@ -2881,7 +2852,42 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, channel.UnsettledBalance += channel.PendingHtlcs[i].Amount } - return channel + // Get the lifespan observed by the channel event store. If the channel is + // not known to the channel event store, return early because we cannot + // calculate any further uptime information. + startTime, endTime, err := r.server.chanEventStore.GetLifespan(channel.ChanId) + switch err { + case chanfitness.ErrChannelNotFound: + rpcsLog.Infof("channel: %v not found by channel event store", + channel.ChanId) + + return channel, nil + case nil: + // If there is no error getting lifespan, continue to uptime + // calculation. + default: + return nil, err + } + + // If endTime is zero, the channel is still open, progress endTime to + // the present so we can calculate lifetime. + if endTime.IsZero() { + endTime = time.Now() + } + channel.Lifetime = int64(endTime.Sub(startTime).Seconds()) + + // Once we have successfully obtained channel lifespan, we know that the + // channel is known to the event store, so we can return any non-nil error + // that occurs. + uptime, err := r.server.chanEventStore.GetUptime( + channel.ChanId, startTime, endTime, + ) + if err != nil { + return nil, err + } + channel.Uptime = int64(uptime.Seconds()) + + return channel, nil } // createRPCClosedChannel creates an *lnrpc.ClosedChannelSummary from a @@ -2947,8 +2953,12 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription, var update *lnrpc.ChannelEventUpdate switch event := e.(type) { case channelnotifier.OpenChannelEvent: - channel := createRPCOpenChannel(r, graph, + channel, err := createRPCOpenChannel(r, graph, event.Channel, true) + if err != nil { + return err + } + update = &lnrpc.ChannelEventUpdate{ Type: lnrpc.ChannelEventUpdate_OPEN_CHANNEL, Channel: &lnrpc.ChannelEventUpdate_OpenChannel{ From 4f9795e8ae5f4779e856b5e7da91a7542b57f879 Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 17 Dec 2019 17:36:28 +0200 Subject: [PATCH 3/4] chanfitness: switch to query by channel outpoint In this commit, the channelEventStore in the channel fitness subsystem is changed to identify channels by their outpoint rather than short channel id. This change is made made becuase outpoints are the preferred way to expose references over rpc, and easier to perform queries within lnd. --- chanfitness/chanevent.go | 17 +++--- chanfitness/chaneventstore.go | 51 +++++++++--------- chanfitness/chaneventstore_test.go | 84 ++++++++++++++++-------------- rpcserver.go | 8 +-- 4 files changed, 88 insertions(+), 72 deletions(-) diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index b91ada03..f4caf7bf 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -36,8 +37,8 @@ type channelEvent struct { // chanEventLog stores all events that have occurred over a channel's lifetime. type chanEventLog struct { - // id is the uint64 of the short channel ID. - id uint64 + // channelPoint is the outpoint for the channel's funding transaction. + channelPoint wire.OutPoint // peer is the compressed public key of the peer being monitored. peer route.Vertex @@ -59,11 +60,13 @@ type chanEventLog struct { closedAt time.Time } -func newEventLog(id uint64, peer route.Vertex, now func() time.Time) *chanEventLog { +func newEventLog(outpoint wire.OutPoint, peer route.Vertex, + now func() time.Time) *chanEventLog { + return &chanEventLog{ - id: id, - peer: peer, - now: now, + channelPoint: outpoint, + peer: peer, + now: now, } } @@ -95,7 +98,7 @@ func (e *chanEventLog) add(eventType eventType) { e.openedAt = event.timestamp } - log.Debugf("Channel %v recording event: %v", e.id, eventType) + log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType) } // onlinePeriod represents a period of time over which a peer was online. diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index 31ac2d56..1853df0b 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/peernotifier" @@ -37,8 +38,8 @@ var ( type ChannelEventStore struct { cfg *Config - // channels maps short channel IDs to event logs. - channels map[uint64]*chanEventLog + // channels maps channel points to event logs. + channels map[wire.OutPoint]*chanEventLog // peers tracks the current online status of peers based on online/offline // events. @@ -76,7 +77,7 @@ type Config struct { // channel's lifespan and a blocking response channel on which the result is // sent. type lifespanRequest struct { - channelID uint64 + channelPoint wire.OutPoint responseChan chan lifespanResponse } @@ -91,7 +92,7 @@ type lifespanResponse struct { // uptimeRequest contains the parameters required to query the store for a // channel's uptime and a blocking response channel on which the result is sent. type uptimeRequest struct { - channelID uint64 + channelPoint wire.OutPoint startTime time.Time endTime time.Time responseChan chan uptimeResponse @@ -110,7 +111,7 @@ type uptimeResponse struct { func NewChannelEventStore(config *Config) *ChannelEventStore { store := &ChannelEventStore{ cfg: config, - channels: make(map[uint64]*chanEventLog), + channels: make(map[wire.OutPoint]*chanEventLog), peers: make(map[route.Vertex]bool), lifespanRequests: make(chan lifespanRequest), uptimeRequests: make(chan uptimeRequest), @@ -167,7 +168,7 @@ func (c *ChannelEventStore) Start() error { // Add existing channels to the channel store with an initial peer // online or offline event. - c.addChannel(ch.ShortChanID().ToUint64(), peerKey) + c.addChannel(ch.FundingOutpoint, peerKey) } // Start a goroutine that consumes events from all subscriptions. @@ -196,15 +197,17 @@ func (c *ChannelEventStore) Stop() { // already present in the map, the function returns early. This function should // be called to add existing channels on startup and when open channel events // are observed. -func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) { +func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, + peer route.Vertex) { + // Check for the unexpected case where the channel is already in the store. - _, ok := c.channels[channelID] + _, ok := c.channels[channelPoint] if ok { - log.Errorf("Channel %v duplicated in channel store", channelID) + log.Errorf("Channel %v duplicated in channel store", channelPoint) return } - eventLog := newEventLog(channelID, peer, time.Now) + eventLog := newEventLog(channelPoint, peer, time.Now) // If the peer is online, add a peer online event to indicate its starting // state. @@ -213,16 +216,16 @@ func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) { eventLog.add(peerOnlineEvent) } - c.channels[channelID] = eventLog + c.channels[channelPoint] = eventLog } // closeChannel records a closed time for a channel, and returns early is the // channel is not known to the event store. -func (c *ChannelEventStore) closeChannel(channelID uint64) { +func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint) { // Check for the unexpected case where the channel is unknown to the store. - eventLog, ok := c.channels[channelID] + eventLog, ok := c.channels[channelPoint] if !ok { - log.Errorf("Close channel %v unknown to store", channelID) + log.Errorf("Close channel %v unknown to store", channelPoint) return } @@ -265,8 +268,6 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { // A new channel has been opened, we must add the channel to the // store and record a channel open event. case channelnotifier.OpenChannelEvent: - chanID := event.Channel.ShortChanID().ToUint64() - peerKey, err := route.NewVertexFromBytes( event.Channel.IdentityPub.SerializeCompressed(), ) @@ -275,12 +276,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { event.Channel.IdentityPub.SerializeCompressed()) } - c.addChannel(chanID, peerKey) + c.addChannel(event.Channel.FundingOutpoint, peerKey) // A channel has been closed, we must remove the channel from the // store and record a channel closed event. case channelnotifier.ClosedChannelEvent: - c.closeChannel(event.CloseSummary.ShortChanID.ToUint64()) + c.closeChannel(event.CloseSummary.ChanPoint) } // Process peer online and offline events. @@ -301,7 +302,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { case req := <-c.lifespanRequests: var resp lifespanResponse - channel, ok := c.channels[req.channelID] + channel, ok := c.channels[req.channelPoint] if !ok { resp.err = ErrChannelNotFound } else { @@ -315,7 +316,7 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { case req := <-c.uptimeRequests: var resp uptimeResponse - channel, ok := c.channels[req.channelID] + channel, ok := c.channels[req.channelPoint] if !ok { resp.err = ErrChannelNotFound } else { @@ -336,9 +337,11 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) { // GetLifespan returns the opening and closing time observed for a channel and // a boolean to indicate whether the channel is known the the event store. If // the channel is still open, a zero close time is returned. -func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, error) { +func (c *ChannelEventStore) GetLifespan( + channelPoint wire.OutPoint) (time.Time, time.Time, error) { + request := lifespanRequest{ - channelID: chanID, + channelPoint: channelPoint, responseChan: make(chan lifespanResponse), } @@ -364,11 +367,11 @@ func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, er // GetUptime returns the uptime of a channel over a period and an error if the // channel cannot be found or the uptime calculation fails. -func (c *ChannelEventStore) GetUptime(chanID uint64, startTime, +func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime, endTime time.Time) (time.Duration, error) { request := uptimeRequest{ - channelID: chanID, + channelPoint: channelPoint, startTime: startTime, endTime: endTime, responseChan: make(chan uptimeResponse), diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index 6bce3c8c..ce52c736 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -6,9 +6,10 @@ import ( "time" "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" - "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" @@ -74,13 +75,11 @@ func TestStartStoreError(t *testing.T) { } } -// TestMonitorChannelEvents tests the store's handling of channel and peer -// events. It tests for the unexpected cases where we receive a channel open for -// an already known channel and but does not test for closing an unknown channel -// because it would require custom logic in the test to prevent iterating -// through an eventLog which does not exist. This test does not test handling -// of uptime and lifespan requests, as they are tested in their own tests. -func TestMonitorChannelEvents(t *testing.T) { +// getTestChannel returns a non-zero peer pubKey, serialized pubKey and channel +// outpoint for testing. +func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex, + wire.OutPoint) { + privKey, err := btcec.NewPrivateKey(btcec.S256()) if err != nil { t.Fatalf("Error getting pubkey: %v", err) @@ -93,11 +92,20 @@ func TestMonitorChannelEvents(t *testing.T) { t.Fatalf("Could not create vertex: %v", err) } - shortID := lnwire.ShortChannelID{ - BlockHeight: 1234, - TxIndex: 2, - TxPosition: 2, + return privKey.PubKey(), pubKey, wire.OutPoint{ + Hash: [chainhash.HashSize]byte{1, 2, 3}, + Index: 0, } +} + +// TestMonitorChannelEvents tests the store's handling of channel and peer +// events. It tests for the unexpected cases where we receive a channel open for +// an already known channel and but does not test for closing an unknown channel +// because it would require custom logic in the test to prevent iterating +// through an eventLog which does not exist. This test does not test handling +// of uptime and lifespan requests, as they are tested in their own tests. +func TestMonitorChannelEvents(t *testing.T) { + pubKey, vertex, chanPoint := getTestChannel(t) tests := []struct { name string @@ -118,13 +126,13 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex} }, expectedEvents: []eventType{peerOnlineEvent}, }, @@ -134,19 +142,19 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex} // Add a duplicate channel open event. channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } }, @@ -156,13 +164,13 @@ func TestMonitorChannelEvents(t *testing.T) { name: "Channel opened, peer already online", generateEvents: func(channelEvents, peerEvents chan<- interface{}) { // Add a peer online event. - peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex} // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } }, @@ -175,18 +183,18 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex} // Add a close channel event. channelEvents <- channelnotifier.ClosedChannelEvent{ CloseSummary: &channeldb.ChannelCloseSummary{ - ShortChanID: shortID, + ChanPoint: chanPoint, }, } }, @@ -198,20 +206,20 @@ func TestMonitorChannelEvents(t *testing.T) { // Add an open channel event channelEvents <- channelnotifier.OpenChannelEvent{ Channel: &channeldb.OpenChannel{ - ShortChannelID: shortID, - IdentityPub: privKey.PubKey(), + FundingOutpoint: chanPoint, + IdentityPub: pubKey, }, } // Add a close channel event. channelEvents <- channelnotifier.ClosedChannelEvent{ CloseSummary: &channeldb.ChannelCloseSummary{ - ShortChanID: shortID, + ChanPoint: chanPoint, }, } // Add a peer online event. - peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey} + peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex} }, }, } @@ -242,7 +250,7 @@ func TestMonitorChannelEvents(t *testing.T) { // Retrieve the eventLog for the channel and check that its // contents are as expected. - eventLog, ok := store.channels[shortID.ToUint64()] + eventLog, ok := store.channels[chanPoint] if !ok { t.Fatalf("Expected to find event store") } @@ -265,7 +273,7 @@ func TestGetLifetime(t *testing.T) { tests := []struct { name string channelFound bool - chanID uint64 + channelPoint wire.OutPoint opened time.Time closed time.Time expectedError error @@ -304,13 +312,13 @@ func TestGetLifetime(t *testing.T) { // Add channel to eventStore if the test indicates that it should // be present. if test.channelFound { - store.channels[test.chanID] = &chanEventLog{ + store.channels[test.channelPoint] = &chanEventLog{ openedAt: test.opened, closedAt: test.closed, } } - open, close, err := store.GetLifespan(test.chanID) + open, close, err := store.GetLifespan(test.channelPoint) if test.expectedError != err { t.Fatalf("Expected: %v, got: %v", test.expectedError, err) } @@ -341,7 +349,7 @@ func TestGetUptime(t *testing.T) { tests := []struct { name string - chanID uint64 + channelPoint wire.OutPoint // events is the set of events we expect to find in the channel store. events []*channelEvent @@ -437,7 +445,7 @@ func TestGetUptime(t *testing.T) { // Add the channel to the store if it is intended to be found. if test.channelFound { - store.channels[test.chanID] = &chanEventLog{ + store.channels[test.channelPoint] = &chanEventLog{ events: test.events, now: func() time.Time { return now }, openedAt: test.openedAt, @@ -445,7 +453,7 @@ func TestGetUptime(t *testing.T) { } } - uptime, err := store.GetUptime(test.chanID, test.startTime, test.endTime) + uptime, err := store.GetUptime(test.channelPoint, test.startTime, test.endTime) if test.expectedError != err { t.Fatalf("Expected: %v, got: %v", test.expectedError, err) } diff --git a/rpcserver.go b/rpcserver.go index 120418c7..6cb2e955 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2852,14 +2852,16 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, channel.UnsettledBalance += channel.PendingHtlcs[i].Amount } + outpoint := dbChannel.FundingOutpoint + // Get the lifespan observed by the channel event store. If the channel is // not known to the channel event store, return early because we cannot // calculate any further uptime information. - startTime, endTime, err := r.server.chanEventStore.GetLifespan(channel.ChanId) + startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint) switch err { case chanfitness.ErrChannelNotFound: rpcsLog.Infof("channel: %v not found by channel event store", - channel.ChanId) + outpoint) return channel, nil case nil: @@ -2880,7 +2882,7 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph, // channel is known to the event store, so we can return any non-nil error // that occurs. uptime, err := r.server.chanEventStore.GetUptime( - channel.ChanId, startTime, endTime, + outpoint, startTime, endTime, ) if err != nil { return nil, err From eebc23a8adbb0bad5aa1b1acb58bd561c2c1aae7 Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 17 Dec 2019 17:36:28 +0200 Subject: [PATCH 4/4] chanfitness: set open time for channels with offline peers This commit addresses a bug in the channel event store where the opened time of a channel event log was not set for peers that were offline on startup. Previously, opened time was set to the time of the first event in the event log. This worked for online peers, because the eventlog was created with an initial online event. However, offline peers had no inital event so had no open time set. This commit simplifies the creation of an event log by removing the initial event and setting open time for all event logs. This has the effect of potentially introducing a gap between opened time for a log and the first peer online event for peers with channels that exist at startup if a peer takes time to reconnect. However, the cost of this is less than the benefit of reducing the bug-prone custom code path that was previously in place. --- chanfitness/chanevent.go | 17 +++++++---------- chanfitness/chaneventstore.go | 8 +------- chanfitness/chaneventstore_test.go | 23 +++++++++++++++++++++++ 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index f4caf7bf..53048ee3 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -60,14 +60,18 @@ type chanEventLog struct { closedAt time.Time } -func newEventLog(outpoint wire.OutPoint, peer route.Vertex, +// newEventLog creates an event log for a channel with the openedAt time set. +func newEventLog(channelPoint wire.OutPoint, peer route.Vertex, now func() time.Time) *chanEventLog { - return &chanEventLog{ - channelPoint: outpoint, + eventlog := &chanEventLog{ + channelPoint: channelPoint, peer: peer, now: now, + openedAt: now(), } + + return eventlog } // close sets the closing time for an event log. @@ -91,13 +95,6 @@ func (e *chanEventLog) add(eventType eventType) { } e.events = append(e.events, event) - // If the eventLog does not have an opened time set, set it to the timestamp - // of the event. This has the effect of setting the eventLog's open time to - // the timestamp of the first event added. - if e.openedAt.IsZero() { - e.openedAt = event.timestamp - } - log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType) } diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index 1853df0b..69107808 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -207,15 +207,9 @@ func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, return } + // Create an event log for the channel. eventLog := newEventLog(channelPoint, peer, time.Now) - // If the peer is online, add a peer online event to indicate its starting - // state. - online := c.peers[peer] - if online { - eventLog.add(peerOnlineEvent) - } - c.channels[channelPoint] = eventLog } diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go index ce52c736..36a74990 100644 --- a/chanfitness/chaneventstore_test.go +++ b/chanfitness/chaneventstore_test.go @@ -466,3 +466,26 @@ func TestGetUptime(t *testing.T) { }) } } + +// TestAddChannel tests that channels are added to the event store with +// appropriate timestamps. This test addresses a bug where offline channels +// did not have an opened time set. +func TestAddChannel(t *testing.T) { + _, vertex, chanPoint := getTestChannel(t) + + store := NewChannelEventStore(&Config{}) + + // Add channel to the store. + store.addChannel(chanPoint, vertex) + + // Check that the eventlog is successfully added. + eventlog, ok := store.channels[chanPoint] + if !ok { + t.Fatalf("channel should be in store") + } + + // Ensure that open time is always set. + if eventlog.openedAt.IsZero() { + t.Fatalf("channel should have opened at set") + } +}