Merge pull request #3799 from carlaKC/chanfitness-bugfix
Bugfix: Chanfitness open time for offline peers
This commit is contained in:
commit
76dfd6326d
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/routing/route"
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,8 +37,8 @@ type channelEvent struct {
|
|||||||
|
|
||||||
// chanEventLog stores all events that have occurred over a channel's lifetime.
|
// chanEventLog stores all events that have occurred over a channel's lifetime.
|
||||||
type chanEventLog struct {
|
type chanEventLog struct {
|
||||||
// id is the uint64 of the short channel ID.
|
// channelPoint is the outpoint for the channel's funding transaction.
|
||||||
id uint64
|
channelPoint wire.OutPoint
|
||||||
|
|
||||||
// peer is the compressed public key of the peer being monitored.
|
// peer is the compressed public key of the peer being monitored.
|
||||||
peer route.Vertex
|
peer route.Vertex
|
||||||
@ -59,12 +60,18 @@ type chanEventLog struct {
|
|||||||
closedAt time.Time
|
closedAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEventLog(id uint64, peer route.Vertex, now func() time.Time) *chanEventLog {
|
// newEventLog creates an event log for a channel with the openedAt time set.
|
||||||
return &chanEventLog{
|
func newEventLog(channelPoint wire.OutPoint, peer route.Vertex,
|
||||||
id: id,
|
now func() time.Time) *chanEventLog {
|
||||||
peer: peer,
|
|
||||||
now: now,
|
eventlog := &chanEventLog{
|
||||||
|
channelPoint: channelPoint,
|
||||||
|
peer: peer,
|
||||||
|
now: now,
|
||||||
|
openedAt: now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return eventlog
|
||||||
}
|
}
|
||||||
|
|
||||||
// close sets the closing time for an event log.
|
// close sets the closing time for an event log.
|
||||||
@ -88,14 +95,7 @@ func (e *chanEventLog) add(eventType eventType) {
|
|||||||
}
|
}
|
||||||
e.events = append(e.events, event)
|
e.events = append(e.events, event)
|
||||||
|
|
||||||
// If the eventLog does not have an opened time set, set it to the timestamp
|
log.Debugf("Channel %v recording event: %v", e.channelPoint, eventType)
|
||||||
// of the event. This has the effect of setting the eventLog's open time to
|
|
||||||
// the timestamp of the first event added.
|
|
||||||
if e.openedAt.IsZero() {
|
|
||||||
e.openedAt = event.timestamp
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debugf("Channel %v recording event: %v", e.id, eventType)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// onlinePeriod represents a period of time over which a peer was online.
|
// onlinePeriod represents a period of time over which a peer was online.
|
||||||
|
@ -12,10 +12,10 @@ package chanfitness
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/channelnotifier"
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
"github.com/lightningnetwork/lnd/peernotifier"
|
"github.com/lightningnetwork/lnd/peernotifier"
|
||||||
@ -23,17 +23,23 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/subscribe"
|
"github.com/lightningnetwork/lnd/subscribe"
|
||||||
)
|
)
|
||||||
|
|
||||||
// errShuttingDown is returned when the store cannot respond to a query because
|
var (
|
||||||
// it has received the shutdown signal.
|
// errShuttingDown is returned when the store cannot respond to a query because
|
||||||
var errShuttingDown = errors.New("channel event store shutting down")
|
// it has received the shutdown signal.
|
||||||
|
errShuttingDown = errors.New("channel event store shutting down")
|
||||||
|
|
||||||
|
// ErrChannelNotFound is returned when a query is made for a channel that
|
||||||
|
// the event store does not have knowledge of.
|
||||||
|
ErrChannelNotFound = errors.New("channel not found in event store")
|
||||||
|
)
|
||||||
|
|
||||||
// ChannelEventStore maintains a set of event logs for the node's channels to
|
// ChannelEventStore maintains a set of event logs for the node's channels to
|
||||||
// provide insight into the performance and health of channels.
|
// provide insight into the performance and health of channels.
|
||||||
type ChannelEventStore struct {
|
type ChannelEventStore struct {
|
||||||
cfg *Config
|
cfg *Config
|
||||||
|
|
||||||
// channels maps short channel IDs to event logs.
|
// channels maps channel points to event logs.
|
||||||
channels map[uint64]*chanEventLog
|
channels map[wire.OutPoint]*chanEventLog
|
||||||
|
|
||||||
// peers tracks the current online status of peers based on online/offline
|
// peers tracks the current online status of peers based on online/offline
|
||||||
// events.
|
// events.
|
||||||
@ -71,7 +77,7 @@ type Config struct {
|
|||||||
// channel's lifespan and a blocking response channel on which the result is
|
// channel's lifespan and a blocking response channel on which the result is
|
||||||
// sent.
|
// sent.
|
||||||
type lifespanRequest struct {
|
type lifespanRequest struct {
|
||||||
channelID uint64
|
channelPoint wire.OutPoint
|
||||||
responseChan chan lifespanResponse
|
responseChan chan lifespanResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,7 +92,7 @@ type lifespanResponse struct {
|
|||||||
// uptimeRequest contains the parameters required to query the store for a
|
// uptimeRequest contains the parameters required to query the store for a
|
||||||
// channel's uptime and a blocking response channel on which the result is sent.
|
// channel's uptime and a blocking response channel on which the result is sent.
|
||||||
type uptimeRequest struct {
|
type uptimeRequest struct {
|
||||||
channelID uint64
|
channelPoint wire.OutPoint
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
endTime time.Time
|
endTime time.Time
|
||||||
responseChan chan uptimeResponse
|
responseChan chan uptimeResponse
|
||||||
@ -105,7 +111,7 @@ type uptimeResponse struct {
|
|||||||
func NewChannelEventStore(config *Config) *ChannelEventStore {
|
func NewChannelEventStore(config *Config) *ChannelEventStore {
|
||||||
store := &ChannelEventStore{
|
store := &ChannelEventStore{
|
||||||
cfg: config,
|
cfg: config,
|
||||||
channels: make(map[uint64]*chanEventLog),
|
channels: make(map[wire.OutPoint]*chanEventLog),
|
||||||
peers: make(map[route.Vertex]bool),
|
peers: make(map[route.Vertex]bool),
|
||||||
lifespanRequests: make(chan lifespanRequest),
|
lifespanRequests: make(chan lifespanRequest),
|
||||||
uptimeRequests: make(chan uptimeRequest),
|
uptimeRequests: make(chan uptimeRequest),
|
||||||
@ -162,7 +168,7 @@ func (c *ChannelEventStore) Start() error {
|
|||||||
|
|
||||||
// Add existing channels to the channel store with an initial peer
|
// Add existing channels to the channel store with an initial peer
|
||||||
// online or offline event.
|
// online or offline event.
|
||||||
c.addChannel(ch.ShortChanID().ToUint64(), peerKey)
|
c.addChannel(ch.FundingOutpoint, peerKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a goroutine that consumes events from all subscriptions.
|
// Start a goroutine that consumes events from all subscriptions.
|
||||||
@ -191,33 +197,29 @@ func (c *ChannelEventStore) Stop() {
|
|||||||
// already present in the map, the function returns early. This function should
|
// already present in the map, the function returns early. This function should
|
||||||
// be called to add existing channels on startup and when open channel events
|
// be called to add existing channels on startup and when open channel events
|
||||||
// are observed.
|
// are observed.
|
||||||
func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) {
|
func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint,
|
||||||
|
peer route.Vertex) {
|
||||||
|
|
||||||
// Check for the unexpected case where the channel is already in the store.
|
// Check for the unexpected case where the channel is already in the store.
|
||||||
_, ok := c.channels[channelID]
|
_, ok := c.channels[channelPoint]
|
||||||
if ok {
|
if ok {
|
||||||
log.Errorf("Channel %v duplicated in channel store", channelID)
|
log.Errorf("Channel %v duplicated in channel store", channelPoint)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
eventLog := newEventLog(channelID, peer, time.Now)
|
// Create an event log for the channel.
|
||||||
|
eventLog := newEventLog(channelPoint, peer, time.Now)
|
||||||
|
|
||||||
// If the peer is online, add a peer online event to indicate its starting
|
c.channels[channelPoint] = eventLog
|
||||||
// state.
|
|
||||||
online := c.peers[peer]
|
|
||||||
if online {
|
|
||||||
eventLog.add(peerOnlineEvent)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.channels[channelID] = eventLog
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// closeChannel records a closed time for a channel, and returns early is the
|
// closeChannel records a closed time for a channel, and returns early is the
|
||||||
// channel is not known to the event store.
|
// channel is not known to the event store.
|
||||||
func (c *ChannelEventStore) closeChannel(channelID uint64) {
|
func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint) {
|
||||||
// Check for the unexpected case where the channel is unknown to the store.
|
// Check for the unexpected case where the channel is unknown to the store.
|
||||||
eventLog, ok := c.channels[channelID]
|
eventLog, ok := c.channels[channelPoint]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Errorf("Close channel %v unknown to store", channelID)
|
log.Errorf("Close channel %v unknown to store", channelPoint)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -260,8 +262,6 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
|
|||||||
// A new channel has been opened, we must add the channel to the
|
// A new channel has been opened, we must add the channel to the
|
||||||
// store and record a channel open event.
|
// store and record a channel open event.
|
||||||
case channelnotifier.OpenChannelEvent:
|
case channelnotifier.OpenChannelEvent:
|
||||||
chanID := event.Channel.ShortChanID().ToUint64()
|
|
||||||
|
|
||||||
peerKey, err := route.NewVertexFromBytes(
|
peerKey, err := route.NewVertexFromBytes(
|
||||||
event.Channel.IdentityPub.SerializeCompressed(),
|
event.Channel.IdentityPub.SerializeCompressed(),
|
||||||
)
|
)
|
||||||
@ -270,12 +270,12 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
|
|||||||
event.Channel.IdentityPub.SerializeCompressed())
|
event.Channel.IdentityPub.SerializeCompressed())
|
||||||
}
|
}
|
||||||
|
|
||||||
c.addChannel(chanID, peerKey)
|
c.addChannel(event.Channel.FundingOutpoint, peerKey)
|
||||||
|
|
||||||
// A channel has been closed, we must remove the channel from the
|
// A channel has been closed, we must remove the channel from the
|
||||||
// store and record a channel closed event.
|
// store and record a channel closed event.
|
||||||
case channelnotifier.ClosedChannelEvent:
|
case channelnotifier.ClosedChannelEvent:
|
||||||
c.closeChannel(event.CloseSummary.ShortChanID.ToUint64())
|
c.closeChannel(event.CloseSummary.ChanPoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process peer online and offline events.
|
// Process peer online and offline events.
|
||||||
@ -296,9 +296,9 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
|
|||||||
case req := <-c.lifespanRequests:
|
case req := <-c.lifespanRequests:
|
||||||
var resp lifespanResponse
|
var resp lifespanResponse
|
||||||
|
|
||||||
channel, ok := c.channels[req.channelID]
|
channel, ok := c.channels[req.channelPoint]
|
||||||
if !ok {
|
if !ok {
|
||||||
resp.err = fmt.Errorf("channel %v not found", req.channelID)
|
resp.err = ErrChannelNotFound
|
||||||
} else {
|
} else {
|
||||||
resp.start = channel.openedAt
|
resp.start = channel.openedAt
|
||||||
resp.end = channel.closedAt
|
resp.end = channel.closedAt
|
||||||
@ -310,9 +310,9 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
|
|||||||
case req := <-c.uptimeRequests:
|
case req := <-c.uptimeRequests:
|
||||||
var resp uptimeResponse
|
var resp uptimeResponse
|
||||||
|
|
||||||
channel, ok := c.channels[req.channelID]
|
channel, ok := c.channels[req.channelPoint]
|
||||||
if !ok {
|
if !ok {
|
||||||
resp.err = fmt.Errorf("channel %v not found", req.channelID)
|
resp.err = ErrChannelNotFound
|
||||||
} else {
|
} else {
|
||||||
uptime, err := channel.uptime(req.startTime, req.endTime)
|
uptime, err := channel.uptime(req.startTime, req.endTime)
|
||||||
resp.uptime = uptime
|
resp.uptime = uptime
|
||||||
@ -331,9 +331,11 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
|
|||||||
// GetLifespan returns the opening and closing time observed for a channel and
|
// GetLifespan returns the opening and closing time observed for a channel and
|
||||||
// a boolean to indicate whether the channel is known the the event store. If
|
// a boolean to indicate whether the channel is known the the event store. If
|
||||||
// the channel is still open, a zero close time is returned.
|
// the channel is still open, a zero close time is returned.
|
||||||
func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, error) {
|
func (c *ChannelEventStore) GetLifespan(
|
||||||
|
channelPoint wire.OutPoint) (time.Time, time.Time, error) {
|
||||||
|
|
||||||
request := lifespanRequest{
|
request := lifespanRequest{
|
||||||
channelID: chanID,
|
channelPoint: channelPoint,
|
||||||
responseChan: make(chan lifespanResponse),
|
responseChan: make(chan lifespanResponse),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -359,11 +361,11 @@ func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, er
|
|||||||
|
|
||||||
// GetUptime returns the uptime of a channel over a period and an error if the
|
// GetUptime returns the uptime of a channel over a period and an error if the
|
||||||
// channel cannot be found or the uptime calculation fails.
|
// channel cannot be found or the uptime calculation fails.
|
||||||
func (c *ChannelEventStore) GetUptime(chanID uint64, startTime,
|
func (c *ChannelEventStore) GetUptime(channelPoint wire.OutPoint, startTime,
|
||||||
endTime time.Time) (time.Duration, error) {
|
endTime time.Time) (time.Duration, error) {
|
||||||
|
|
||||||
request := uptimeRequest{
|
request := uptimeRequest{
|
||||||
channelID: chanID,
|
channelPoint: channelPoint,
|
||||||
startTime: startTime,
|
startTime: startTime,
|
||||||
endTime: endTime,
|
endTime: endTime,
|
||||||
responseChan: make(chan uptimeResponse),
|
responseChan: make(chan uptimeResponse),
|
||||||
|
@ -6,9 +6,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/channelnotifier"
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
|
||||||
"github.com/lightningnetwork/lnd/peernotifier"
|
"github.com/lightningnetwork/lnd/peernotifier"
|
||||||
"github.com/lightningnetwork/lnd/routing/route"
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
"github.com/lightningnetwork/lnd/subscribe"
|
"github.com/lightningnetwork/lnd/subscribe"
|
||||||
@ -74,13 +75,11 @@ func TestStartStoreError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestMonitorChannelEvents tests the store's handling of channel and peer
|
// getTestChannel returns a non-zero peer pubKey, serialized pubKey and channel
|
||||||
// events. It tests for the unexpected cases where we receive a channel open for
|
// outpoint for testing.
|
||||||
// an already known channel and but does not test for closing an unknown channel
|
func getTestChannel(t *testing.T) (*btcec.PublicKey, route.Vertex,
|
||||||
// because it would require custom logic in the test to prevent iterating
|
wire.OutPoint) {
|
||||||
// through an eventLog which does not exist. This test does not test handling
|
|
||||||
// of uptime and lifespan requests, as they are tested in their own tests.
|
|
||||||
func TestMonitorChannelEvents(t *testing.T) {
|
|
||||||
privKey, err := btcec.NewPrivateKey(btcec.S256())
|
privKey, err := btcec.NewPrivateKey(btcec.S256())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error getting pubkey: %v", err)
|
t.Fatalf("Error getting pubkey: %v", err)
|
||||||
@ -93,11 +92,20 @@ func TestMonitorChannelEvents(t *testing.T) {
|
|||||||
t.Fatalf("Could not create vertex: %v", err)
|
t.Fatalf("Could not create vertex: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shortID := lnwire.ShortChannelID{
|
return privKey.PubKey(), pubKey, wire.OutPoint{
|
||||||
BlockHeight: 1234,
|
Hash: [chainhash.HashSize]byte{1, 2, 3},
|
||||||
TxIndex: 2,
|
Index: 0,
|
||||||
TxPosition: 2,
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMonitorChannelEvents tests the store's handling of channel and peer
|
||||||
|
// events. It tests for the unexpected cases where we receive a channel open for
|
||||||
|
// an already known channel and but does not test for closing an unknown channel
|
||||||
|
// because it would require custom logic in the test to prevent iterating
|
||||||
|
// through an eventLog which does not exist. This test does not test handling
|
||||||
|
// of uptime and lifespan requests, as they are tested in their own tests.
|
||||||
|
func TestMonitorChannelEvents(t *testing.T) {
|
||||||
|
pubKey, vertex, chanPoint := getTestChannel(t)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -118,13 +126,13 @@ func TestMonitorChannelEvents(t *testing.T) {
|
|||||||
// Add an open channel event
|
// Add an open channel event
|
||||||
channelEvents <- channelnotifier.OpenChannelEvent{
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
Channel: &channeldb.OpenChannel{
|
Channel: &channeldb.OpenChannel{
|
||||||
ShortChannelID: shortID,
|
FundingOutpoint: chanPoint,
|
||||||
IdentityPub: privKey.PubKey(),
|
IdentityPub: pubKey,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a peer online event.
|
// Add a peer online event.
|
||||||
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
|
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex}
|
||||||
},
|
},
|
||||||
expectedEvents: []eventType{peerOnlineEvent},
|
expectedEvents: []eventType{peerOnlineEvent},
|
||||||
},
|
},
|
||||||
@ -134,19 +142,19 @@ func TestMonitorChannelEvents(t *testing.T) {
|
|||||||
// Add an open channel event
|
// Add an open channel event
|
||||||
channelEvents <- channelnotifier.OpenChannelEvent{
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
Channel: &channeldb.OpenChannel{
|
Channel: &channeldb.OpenChannel{
|
||||||
ShortChannelID: shortID,
|
FundingOutpoint: chanPoint,
|
||||||
IdentityPub: privKey.PubKey(),
|
IdentityPub: pubKey,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a peer online event.
|
// Add a peer online event.
|
||||||
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
|
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex}
|
||||||
|
|
||||||
// Add a duplicate channel open event.
|
// Add a duplicate channel open event.
|
||||||
channelEvents <- channelnotifier.OpenChannelEvent{
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
Channel: &channeldb.OpenChannel{
|
Channel: &channeldb.OpenChannel{
|
||||||
ShortChannelID: shortID,
|
FundingOutpoint: chanPoint,
|
||||||
IdentityPub: privKey.PubKey(),
|
IdentityPub: pubKey,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -156,13 +164,13 @@ func TestMonitorChannelEvents(t *testing.T) {
|
|||||||
name: "Channel opened, peer already online",
|
name: "Channel opened, peer already online",
|
||||||
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
|
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
|
||||||
// Add a peer online event.
|
// Add a peer online event.
|
||||||
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
|
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: vertex}
|
||||||
|
|
||||||
// Add an open channel event
|
// Add an open channel event
|
||||||
channelEvents <- channelnotifier.OpenChannelEvent{
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
Channel: &channeldb.OpenChannel{
|
Channel: &channeldb.OpenChannel{
|
||||||
ShortChannelID: shortID,
|
FundingOutpoint: chanPoint,
|
||||||
IdentityPub: privKey.PubKey(),
|
IdentityPub: pubKey,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -175,18 +183,18 @@ func TestMonitorChannelEvents(t *testing.T) {
|
|||||||
// Add an open channel event
|
// Add an open channel event
|
||||||
channelEvents <- channelnotifier.OpenChannelEvent{
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
Channel: &channeldb.OpenChannel{
|
Channel: &channeldb.OpenChannel{
|
||||||
ShortChannelID: shortID,
|
FundingOutpoint: chanPoint,
|
||||||
IdentityPub: privKey.PubKey(),
|
IdentityPub: pubKey,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a peer online event.
|
// Add a peer online event.
|
||||||
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey}
|
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex}
|
||||||
|
|
||||||
// Add a close channel event.
|
// Add a close channel event.
|
||||||
channelEvents <- channelnotifier.ClosedChannelEvent{
|
channelEvents <- channelnotifier.ClosedChannelEvent{
|
||||||
CloseSummary: &channeldb.ChannelCloseSummary{
|
CloseSummary: &channeldb.ChannelCloseSummary{
|
||||||
ShortChanID: shortID,
|
ChanPoint: chanPoint,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -198,20 +206,20 @@ func TestMonitorChannelEvents(t *testing.T) {
|
|||||||
// Add an open channel event
|
// Add an open channel event
|
||||||
channelEvents <- channelnotifier.OpenChannelEvent{
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
Channel: &channeldb.OpenChannel{
|
Channel: &channeldb.OpenChannel{
|
||||||
ShortChannelID: shortID,
|
FundingOutpoint: chanPoint,
|
||||||
IdentityPub: privKey.PubKey(),
|
IdentityPub: pubKey,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a close channel event.
|
// Add a close channel event.
|
||||||
channelEvents <- channelnotifier.ClosedChannelEvent{
|
channelEvents <- channelnotifier.ClosedChannelEvent{
|
||||||
CloseSummary: &channeldb.ChannelCloseSummary{
|
CloseSummary: &channeldb.ChannelCloseSummary{
|
||||||
ShortChanID: shortID,
|
ChanPoint: chanPoint,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a peer online event.
|
// Add a peer online event.
|
||||||
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey}
|
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: vertex}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -242,7 +250,7 @@ func TestMonitorChannelEvents(t *testing.T) {
|
|||||||
|
|
||||||
// Retrieve the eventLog for the channel and check that its
|
// Retrieve the eventLog for the channel and check that its
|
||||||
// contents are as expected.
|
// contents are as expected.
|
||||||
eventLog, ok := store.channels[shortID.ToUint64()]
|
eventLog, ok := store.channels[chanPoint]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Fatalf("Expected to find event store")
|
t.Fatalf("Expected to find event store")
|
||||||
}
|
}
|
||||||
@ -263,23 +271,23 @@ func TestGetLifetime(t *testing.T) {
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
channelFound bool
|
channelFound bool
|
||||||
chanID uint64
|
channelPoint wire.OutPoint
|
||||||
opened time.Time
|
opened time.Time
|
||||||
closed time.Time
|
closed time.Time
|
||||||
expectErr bool
|
expectedError error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Channel found",
|
name: "Channel found",
|
||||||
channelFound: true,
|
channelFound: true,
|
||||||
opened: now,
|
opened: now,
|
||||||
closed: now.Add(time.Hour * -1),
|
closed: now.Add(time.Hour * -1),
|
||||||
expectErr: false,
|
expectedError: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Channel not found",
|
name: "Channel not found",
|
||||||
expectErr: true,
|
expectedError: ErrChannelNotFound,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -304,18 +312,15 @@ func TestGetLifetime(t *testing.T) {
|
|||||||
// Add channel to eventStore if the test indicates that it should
|
// Add channel to eventStore if the test indicates that it should
|
||||||
// be present.
|
// be present.
|
||||||
if test.channelFound {
|
if test.channelFound {
|
||||||
store.channels[test.chanID] = &chanEventLog{
|
store.channels[test.channelPoint] = &chanEventLog{
|
||||||
openedAt: test.opened,
|
openedAt: test.opened,
|
||||||
closedAt: test.closed,
|
closedAt: test.closed,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
open, close, err := store.GetLifespan(test.chanID)
|
open, close, err := store.GetLifespan(test.channelPoint)
|
||||||
if test.expectErr && err == nil {
|
if test.expectedError != err {
|
||||||
t.Fatal("Expected an error, got nil")
|
t.Fatalf("Expected: %v, got: %v", test.expectedError, err)
|
||||||
}
|
|
||||||
if !test.expectErr && err != nil {
|
|
||||||
t.Fatalf("Expected no error, got: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if open != test.opened {
|
if open != test.opened {
|
||||||
@ -344,7 +349,7 @@ func TestGetUptime(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
|
||||||
chanID uint64
|
channelPoint wire.OutPoint
|
||||||
|
|
||||||
// events is the set of events we expect to find in the channel store.
|
// events is the set of events we expect to find in the channel store.
|
||||||
events []*channelEvent
|
events []*channelEvent
|
||||||
@ -367,13 +372,15 @@ func TestGetUptime(t *testing.T) {
|
|||||||
endTime time.Time
|
endTime time.Time
|
||||||
|
|
||||||
expectedUptime time.Duration
|
expectedUptime time.Duration
|
||||||
expectErr bool
|
|
||||||
|
expectedError error
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "No events",
|
name: "No events",
|
||||||
startTime: twoHoursAgo,
|
startTime: twoHoursAgo,
|
||||||
endTime: now,
|
endTime: now,
|
||||||
channelFound: true,
|
channelFound: true,
|
||||||
|
expectedError: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "50% Uptime",
|
name: "50% Uptime",
|
||||||
@ -392,6 +399,7 @@ func TestGetUptime(t *testing.T) {
|
|||||||
startTime: fourHoursAgo,
|
startTime: fourHoursAgo,
|
||||||
endTime: now,
|
endTime: now,
|
||||||
channelFound: true,
|
channelFound: true,
|
||||||
|
expectedError: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Zero start time",
|
name: "Zero start time",
|
||||||
@ -405,13 +413,14 @@ func TestGetUptime(t *testing.T) {
|
|||||||
expectedUptime: time.Hour * 4,
|
expectedUptime: time.Hour * 4,
|
||||||
endTime: now,
|
endTime: now,
|
||||||
channelFound: true,
|
channelFound: true,
|
||||||
|
expectedError: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Channel not found",
|
name: "Channel not found",
|
||||||
startTime: twoHoursAgo,
|
startTime: twoHoursAgo,
|
||||||
endTime: now,
|
endTime: now,
|
||||||
channelFound: false,
|
channelFound: false,
|
||||||
expectErr: true,
|
expectedError: ErrChannelNotFound,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -436,7 +445,7 @@ func TestGetUptime(t *testing.T) {
|
|||||||
|
|
||||||
// Add the channel to the store if it is intended to be found.
|
// Add the channel to the store if it is intended to be found.
|
||||||
if test.channelFound {
|
if test.channelFound {
|
||||||
store.channels[test.chanID] = &chanEventLog{
|
store.channels[test.channelPoint] = &chanEventLog{
|
||||||
events: test.events,
|
events: test.events,
|
||||||
now: func() time.Time { return now },
|
now: func() time.Time { return now },
|
||||||
openedAt: test.openedAt,
|
openedAt: test.openedAt,
|
||||||
@ -444,12 +453,9 @@ func TestGetUptime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uptime, err := store.GetUptime(test.chanID, test.startTime, test.endTime)
|
uptime, err := store.GetUptime(test.channelPoint, test.startTime, test.endTime)
|
||||||
if test.expectErr && err == nil {
|
if test.expectedError != err {
|
||||||
t.Fatal("Expected an error, got nil")
|
t.Fatalf("Expected: %v, got: %v", test.expectedError, err)
|
||||||
}
|
|
||||||
if !test.expectErr && err != nil {
|
|
||||||
t.Fatalf("Expcted no error, got: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if uptime != test.expectedUptime {
|
if uptime != test.expectedUptime {
|
||||||
@ -460,3 +466,26 @@ func TestGetUptime(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestAddChannel tests that channels are added to the event store with
|
||||||
|
// appropriate timestamps. This test addresses a bug where offline channels
|
||||||
|
// did not have an opened time set.
|
||||||
|
func TestAddChannel(t *testing.T) {
|
||||||
|
_, vertex, chanPoint := getTestChannel(t)
|
||||||
|
|
||||||
|
store := NewChannelEventStore(&Config{})
|
||||||
|
|
||||||
|
// Add channel to the store.
|
||||||
|
store.addChannel(chanPoint, vertex)
|
||||||
|
|
||||||
|
// Check that the eventlog is successfully added.
|
||||||
|
eventlog, ok := store.channels[chanPoint]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("channel should be in store")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure that open time is always set.
|
||||||
|
if eventlog.openedAt.IsZero() {
|
||||||
|
t.Fatalf("channel should have opened at set")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
88
rpcserver.go
88
rpcserver.go
@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/build"
|
"github.com/lightningnetwork/lnd/build"
|
||||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||||
"github.com/lightningnetwork/lnd/chanbackup"
|
"github.com/lightningnetwork/lnd/chanbackup"
|
||||||
|
"github.com/lightningnetwork/lnd/chanfitness"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/channelnotifier"
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
@ -2754,7 +2755,10 @@ func (r *rpcServer) ListChannels(ctx context.Context,
|
|||||||
// Next, we'll determine whether we should add this channel to
|
// Next, we'll determine whether we should add this channel to
|
||||||
// our list depending on the type of channels requested to us.
|
// our list depending on the type of channels requested to us.
|
||||||
isActive := peerOnline && linkActive
|
isActive := peerOnline && linkActive
|
||||||
channel := createRPCOpenChannel(r, graph, dbChannel, isActive)
|
channel, err := createRPCOpenChannel(r, graph, dbChannel, isActive)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// We'll only skip returning this channel if we were requested
|
// We'll only skip returning this channel if we were requested
|
||||||
// for a specific kind and this channel doesn't satisfy it.
|
// for a specific kind and this channel doesn't satisfy it.
|
||||||
@ -2777,7 +2781,7 @@ func (r *rpcServer) ListChannels(ctx context.Context,
|
|||||||
|
|
||||||
// createRPCOpenChannel creates an *lnrpc.Channel from the *channeldb.Channel.
|
// createRPCOpenChannel creates an *lnrpc.Channel from the *channeldb.Channel.
|
||||||
func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
|
func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
|
||||||
dbChannel *channeldb.OpenChannel, isActive bool) *lnrpc.Channel {
|
dbChannel *channeldb.OpenChannel, isActive bool) (*lnrpc.Channel, error) {
|
||||||
|
|
||||||
nodePub := dbChannel.IdentityPub
|
nodePub := dbChannel.IdentityPub
|
||||||
nodeID := hex.EncodeToString(nodePub.SerializeCompressed())
|
nodeID := hex.EncodeToString(nodePub.SerializeCompressed())
|
||||||
@ -2812,43 +2816,12 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
|
|||||||
}
|
}
|
||||||
externalCommitFee := dbChannel.Capacity - sumOutputs
|
externalCommitFee := dbChannel.Capacity - sumOutputs
|
||||||
|
|
||||||
chanID := dbChannel.ShortChannelID.ToUint64()
|
|
||||||
|
|
||||||
var (
|
|
||||||
uptime time.Duration
|
|
||||||
lifespan time.Duration
|
|
||||||
)
|
|
||||||
|
|
||||||
// Get the lifespan observed by the channel event store.
|
|
||||||
startTime, endTime, err := r.server.chanEventStore.GetLifespan(chanID)
|
|
||||||
if err != nil {
|
|
||||||
// If the channel cannot be found, log an error and do not perform
|
|
||||||
// further calculations for uptime and lifespan.
|
|
||||||
rpcsLog.Warnf("GetLifespan %v error: %v", chanID, err)
|
|
||||||
} else {
|
|
||||||
// If endTime is zero, the channel is still open, progress endTime to
|
|
||||||
// the present so we can calculate lifespan.
|
|
||||||
if endTime.IsZero() {
|
|
||||||
endTime = time.Now()
|
|
||||||
}
|
|
||||||
lifespan = endTime.Sub(startTime)
|
|
||||||
|
|
||||||
uptime, err = r.server.chanEventStore.GetUptime(
|
|
||||||
chanID,
|
|
||||||
startTime,
|
|
||||||
endTime,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
rpcsLog.Warnf("GetUptime %v error: %v", chanID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
channel := &lnrpc.Channel{
|
channel := &lnrpc.Channel{
|
||||||
Active: isActive,
|
Active: isActive,
|
||||||
Private: !isPublic,
|
Private: !isPublic,
|
||||||
RemotePubkey: nodeID,
|
RemotePubkey: nodeID,
|
||||||
ChannelPoint: chanPoint.String(),
|
ChannelPoint: chanPoint.String(),
|
||||||
ChanId: chanID,
|
ChanId: dbChannel.ShortChannelID.ToUint64(),
|
||||||
Capacity: int64(dbChannel.Capacity),
|
Capacity: int64(dbChannel.Capacity),
|
||||||
LocalBalance: int64(localBalance.ToSatoshis()),
|
LocalBalance: int64(localBalance.ToSatoshis()),
|
||||||
RemoteBalance: int64(remoteBalance.ToSatoshis()),
|
RemoteBalance: int64(remoteBalance.ToSatoshis()),
|
||||||
@ -2865,8 +2838,6 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
|
|||||||
LocalChanReserveSat: int64(dbChannel.LocalChanCfg.ChanReserve),
|
LocalChanReserveSat: int64(dbChannel.LocalChanCfg.ChanReserve),
|
||||||
RemoteChanReserveSat: int64(dbChannel.RemoteChanCfg.ChanReserve),
|
RemoteChanReserveSat: int64(dbChannel.RemoteChanCfg.ChanReserve),
|
||||||
StaticRemoteKey: dbChannel.ChanType.IsTweakless(),
|
StaticRemoteKey: dbChannel.ChanType.IsTweakless(),
|
||||||
Lifetime: int64(lifespan.Seconds()),
|
|
||||||
Uptime: int64(uptime.Seconds()),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, htlc := range localCommit.Htlcs {
|
for i, htlc := range localCommit.Htlcs {
|
||||||
@ -2883,7 +2854,44 @@ func createRPCOpenChannel(r *rpcServer, graph *channeldb.ChannelGraph,
|
|||||||
channel.UnsettledBalance += channel.PendingHtlcs[i].Amount
|
channel.UnsettledBalance += channel.PendingHtlcs[i].Amount
|
||||||
}
|
}
|
||||||
|
|
||||||
return channel
|
outpoint := dbChannel.FundingOutpoint
|
||||||
|
|
||||||
|
// Get the lifespan observed by the channel event store. If the channel is
|
||||||
|
// not known to the channel event store, return early because we cannot
|
||||||
|
// calculate any further uptime information.
|
||||||
|
startTime, endTime, err := r.server.chanEventStore.GetLifespan(outpoint)
|
||||||
|
switch err {
|
||||||
|
case chanfitness.ErrChannelNotFound:
|
||||||
|
rpcsLog.Infof("channel: %v not found by channel event store",
|
||||||
|
outpoint)
|
||||||
|
|
||||||
|
return channel, nil
|
||||||
|
case nil:
|
||||||
|
// If there is no error getting lifespan, continue to uptime
|
||||||
|
// calculation.
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If endTime is zero, the channel is still open, progress endTime to
|
||||||
|
// the present so we can calculate lifetime.
|
||||||
|
if endTime.IsZero() {
|
||||||
|
endTime = time.Now()
|
||||||
|
}
|
||||||
|
channel.Lifetime = int64(endTime.Sub(startTime).Seconds())
|
||||||
|
|
||||||
|
// Once we have successfully obtained channel lifespan, we know that the
|
||||||
|
// channel is known to the event store, so we can return any non-nil error
|
||||||
|
// that occurs.
|
||||||
|
uptime, err := r.server.chanEventStore.GetUptime(
|
||||||
|
outpoint, startTime, endTime,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
channel.Uptime = int64(uptime.Seconds())
|
||||||
|
|
||||||
|
return channel, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// createRPCClosedChannel creates an *lnrpc.ClosedChannelSummary from a
|
// createRPCClosedChannel creates an *lnrpc.ClosedChannelSummary from a
|
||||||
@ -2949,8 +2957,12 @@ func (r *rpcServer) SubscribeChannelEvents(req *lnrpc.ChannelEventSubscription,
|
|||||||
var update *lnrpc.ChannelEventUpdate
|
var update *lnrpc.ChannelEventUpdate
|
||||||
switch event := e.(type) {
|
switch event := e.(type) {
|
||||||
case channelnotifier.OpenChannelEvent:
|
case channelnotifier.OpenChannelEvent:
|
||||||
channel := createRPCOpenChannel(r, graph,
|
channel, err := createRPCOpenChannel(r, graph,
|
||||||
event.Channel, true)
|
event.Channel, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
update = &lnrpc.ChannelEventUpdate{
|
update = &lnrpc.ChannelEventUpdate{
|
||||||
Type: lnrpc.ChannelEventUpdate_OPEN_CHANNEL,
|
Type: lnrpc.ChannelEventUpdate_OPEN_CHANNEL,
|
||||||
Channel: &lnrpc.ChannelEventUpdate_OpenChannel{
|
Channel: &lnrpc.ChannelEventUpdate_OpenChannel{
|
||||||
|
Loading…
Reference in New Issue
Block a user