chanfitness: Add channel event store
This commit adds a channel event store to the channel fitness package which is used to manage tracking of a node's channels. It adds tracking for channel open/closed and peer online/offline events for all channels that a node has open. Events are consumed from channelNotifier and peerNotifier event subscriptions. If either of these subscriptions is cancelled, channel scoring stops, because both subscriptions are expected to run until node shutdown. Two functions are exposed to allow external callers to get uptime information about a channel. GetLifespan returns the period over which the channel has been monitored. GetUptime returns the channel's uptime over a specified period. Callers can use these functions to get the channel's remote peer uptime over its entire lifetime, or a subset of that period.
This commit is contained in:
parent
744876003d
commit
1e86589bee
@ -55,6 +55,9 @@ linters:
|
|||||||
# the linter.
|
# the linter.
|
||||||
- prealloc
|
- prealloc
|
||||||
|
|
||||||
|
# Init functions are used by loggers throughout the codebase.
|
||||||
|
- gochecknoinits
|
||||||
|
|
||||||
issues:
|
issues:
|
||||||
# Only show newly introduced problems.
|
# Only show newly introduced problems.
|
||||||
new-from-rev: 01f696afce2f9c0d4ed854edefa3846891d01d8a
|
new-from-rev: 01f696afce2f9c0d4ed854edefa3846891d01d8a
|
||||||
|
390
chanfitness/chaneventstore.go
Normal file
390
chanfitness/chaneventstore.go
Normal file
@ -0,0 +1,390 @@
|
|||||||
|
// Package chanfitness monitors the behaviour of channels to provide insight
|
||||||
|
// into the health and performance of a channel. This is achieved by maintaining
|
||||||
|
// an event store which tracks events for each channel.
|
||||||
|
//
|
||||||
|
// Lifespan: the period that the channel has been known to the scoring system.
|
||||||
|
// Note that lifespan may not equal the channel's full lifetime because data is
|
||||||
|
// not currently persisted.
|
||||||
|
//
|
||||||
|
// Uptime: the total time within a given period that the channel's remote peer
|
||||||
|
// has been online.
|
||||||
|
package chanfitness
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
|
"github.com/lightningnetwork/lnd/peernotifier"
|
||||||
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
|
"github.com/lightningnetwork/lnd/subscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// errShuttingDown is returned when the store cannot respond to a query because
|
||||||
|
// it has received the shutdown signal.
|
||||||
|
var errShuttingDown = errors.New("channel event store shutting down")
|
||||||
|
|
||||||
|
// ChannelEventStore maintains a set of event logs for the node's channels to
|
||||||
|
// provide insight into the performance and health of channels.
|
||||||
|
type ChannelEventStore struct {
|
||||||
|
cfg *Config
|
||||||
|
|
||||||
|
// channels maps short channel IDs to event logs.
|
||||||
|
channels map[uint64]*chanEventLog
|
||||||
|
|
||||||
|
// peers tracks the current online status of peers based on online/offline
|
||||||
|
// events.
|
||||||
|
peers map[route.Vertex]bool
|
||||||
|
|
||||||
|
// lifespanRequests serves requests for the lifespan of channels.
|
||||||
|
lifespanRequests chan lifespanRequest
|
||||||
|
|
||||||
|
// uptimeRequests serves requests for the uptime of channels.
|
||||||
|
uptimeRequests chan uptimeRequest
|
||||||
|
|
||||||
|
quit chan struct{}
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config provides the event store with functions required to monitor channel
|
||||||
|
// activity. All elements of the config must be non-nil for the event store to
|
||||||
|
// operate.
|
||||||
|
type Config struct {
|
||||||
|
// SubscribeChannelEvents provides a subscription client which provides a
|
||||||
|
// stream of channel events.
|
||||||
|
SubscribeChannelEvents func() (*subscribe.Client, error)
|
||||||
|
|
||||||
|
// SubscribePeerEvents provides a subscription client which provides a
|
||||||
|
// stream of peer online/offline events.
|
||||||
|
SubscribePeerEvents func() (*subscribe.Client, error)
|
||||||
|
|
||||||
|
// GetOpenChannels provides a list of existing open channels which is used
|
||||||
|
// to populate the ChannelEventStore with a set of channels on startup.
|
||||||
|
GetOpenChannels func() ([]*channeldb.OpenChannel, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// lifespanRequest contains the channel ID required to query the store for a
|
||||||
|
// channel's lifespan and a blocking response channel on which the result is
|
||||||
|
// sent.
|
||||||
|
type lifespanRequest struct {
|
||||||
|
channelID uint64
|
||||||
|
responseChan chan lifespanResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
// lifespanResponse contains the response to a lifespanRequest and an error if
|
||||||
|
// one occurred.
|
||||||
|
type lifespanResponse struct {
|
||||||
|
start time.Time
|
||||||
|
end time.Time
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// uptimeRequest contains the parameters required to query the store for a
|
||||||
|
// channel's uptime and a blocking response channel on which the result is sent.
|
||||||
|
type uptimeRequest struct {
|
||||||
|
channelID uint64
|
||||||
|
startTime time.Time
|
||||||
|
endTime time.Time
|
||||||
|
responseChan chan uptimeResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
// uptimeResponse contains the response to an uptimeRequest and an error if one
|
||||||
|
// occurred.
|
||||||
|
type uptimeResponse struct {
|
||||||
|
uptime time.Duration
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChannelEventStore initializes an event store with the config provided.
|
||||||
|
// Note that this function does not start the main event loop, Start() must be
|
||||||
|
// called.
|
||||||
|
func NewChannelEventStore(config *Config) *ChannelEventStore {
|
||||||
|
store := &ChannelEventStore{
|
||||||
|
cfg: config,
|
||||||
|
channels: make(map[uint64]*chanEventLog),
|
||||||
|
peers: make(map[route.Vertex]bool),
|
||||||
|
lifespanRequests: make(chan lifespanRequest),
|
||||||
|
uptimeRequests: make(chan uptimeRequest),
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
return store
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start adds all existing open channels to the event store and starts the main
|
||||||
|
// loop which records channel and peer events, and serves requests for
|
||||||
|
// information from the store. If this function fails, it cancels its existing
|
||||||
|
// subscriptions and returns an error.
|
||||||
|
func (c *ChannelEventStore) Start() error {
|
||||||
|
// Create a subscription to channel events.
|
||||||
|
channelClient, err := c.cfg.SubscribeChannelEvents()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a subscription to peer events. If an error occurs, cancel the
|
||||||
|
// existing subscription to channel events and return.
|
||||||
|
peerClient, err := c.cfg.SubscribePeerEvents()
|
||||||
|
if err != nil {
|
||||||
|
channelClient.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// cancel should be called to cancel all subscriptions if an error occurs.
|
||||||
|
cancel := func() {
|
||||||
|
channelClient.Cancel()
|
||||||
|
peerClient.Cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the existing set of channels to the event store. This is required
|
||||||
|
// because channel events will not be triggered for channels that exist
|
||||||
|
// at startup time.
|
||||||
|
channels, err := c.cfg.GetOpenChannels()
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Adding %v channels to event store", len(channels))
|
||||||
|
|
||||||
|
for _, ch := range channels {
|
||||||
|
peerKey, err := route.NewVertexFromBytes(
|
||||||
|
ch.IdentityPub.SerializeCompressed(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add existing channels to the channel store with an initial peer
|
||||||
|
// online or offline event.
|
||||||
|
c.addChannel(ch.ShortChanID().ToUint64(), peerKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start a goroutine that consumes events from all subscriptions.
|
||||||
|
c.wg.Add(1)
|
||||||
|
go c.consume(&subscriptions{
|
||||||
|
channelUpdates: channelClient.Updates(),
|
||||||
|
peerUpdates: peerClient.Updates(),
|
||||||
|
cancel: cancel,
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop terminates all goroutines started by the event store.
|
||||||
|
func (c *ChannelEventStore) Stop() {
|
||||||
|
log.Info("Stopping event store")
|
||||||
|
|
||||||
|
// Stop the consume goroutine.
|
||||||
|
close(c.quit)
|
||||||
|
|
||||||
|
c.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// addChannel adds a new channel to the ChannelEventStore's map of channels with
|
||||||
|
// an initial peer online state (if the peer is online). If the channel is
|
||||||
|
// already present in the map, the function returns early. This function should
|
||||||
|
// be called to add existing channels on startup and when open channel events
|
||||||
|
// are observed.
|
||||||
|
func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) {
|
||||||
|
// Check for the unexpected case where the channel is already in the store.
|
||||||
|
_, ok := c.channels[channelID]
|
||||||
|
if ok {
|
||||||
|
log.Errorf("Channel %v duplicated in channel store", channelID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
eventLog := newEventLog(channelID, peer, time.Now)
|
||||||
|
|
||||||
|
// If the peer is online, add a peer online event to indicate its starting
|
||||||
|
// state.
|
||||||
|
online := c.peers[peer]
|
||||||
|
if online {
|
||||||
|
eventLog.add(peerOnlineEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.channels[channelID] = eventLog
|
||||||
|
}
|
||||||
|
|
||||||
|
// closeChannel records a closed time for a channel, and returns early is the
|
||||||
|
// channel is not known to the event store.
|
||||||
|
func (c *ChannelEventStore) closeChannel(channelID uint64) {
|
||||||
|
// Check for the unexpected case where the channel is unknown to the store.
|
||||||
|
eventLog, ok := c.channels[channelID]
|
||||||
|
if !ok {
|
||||||
|
log.Errorf("Close channel %v unknown to store", channelID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
eventLog.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// peerEvent adds a peer online or offline event to all channels we currently
|
||||||
|
// have open with a peer.
|
||||||
|
func (c *ChannelEventStore) peerEvent(peer route.Vertex, event eventType) {
|
||||||
|
// Track current online status of peers in the channelEventStore.
|
||||||
|
c.peers[peer] = event == peerOnlineEvent
|
||||||
|
|
||||||
|
for _, eventLog := range c.channels {
|
||||||
|
if eventLog.peer == peer {
|
||||||
|
eventLog.add(event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// subscriptions abstracts away from subscription clients to allow for mocking.
|
||||||
|
type subscriptions struct {
|
||||||
|
channelUpdates <-chan interface{}
|
||||||
|
peerUpdates <-chan interface{}
|
||||||
|
cancel func()
|
||||||
|
}
|
||||||
|
|
||||||
|
// consume is the event store's main loop. It consumes subscriptions to update
|
||||||
|
// the event store with channel and peer events, and serves requests for channel
|
||||||
|
// uptime and lifespan.
|
||||||
|
func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
|
||||||
|
defer c.wg.Done()
|
||||||
|
defer subscriptions.cancel()
|
||||||
|
|
||||||
|
// Consume events until the channel is closed.
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
// Process channel opened and closed events.
|
||||||
|
case e := <-subscriptions.channelUpdates:
|
||||||
|
switch event := e.(type) {
|
||||||
|
// A new channel has been opened, we must add the channel to the
|
||||||
|
// store and record a channel open event.
|
||||||
|
case channelnotifier.OpenChannelEvent:
|
||||||
|
chanID := event.Channel.ShortChanID().ToUint64()
|
||||||
|
|
||||||
|
peerKey, err := route.NewVertexFromBytes(
|
||||||
|
event.Channel.IdentityPub.SerializeCompressed(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Could not get vertex from: %v",
|
||||||
|
event.Channel.IdentityPub.SerializeCompressed())
|
||||||
|
}
|
||||||
|
|
||||||
|
c.addChannel(chanID, peerKey)
|
||||||
|
|
||||||
|
// A channel has been closed, we must remove the channel from the
|
||||||
|
// store and record a channel closed event.
|
||||||
|
case channelnotifier.ClosedChannelEvent:
|
||||||
|
c.closeChannel(event.CloseSummary.ShortChanID.ToUint64())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process peer online and offline events.
|
||||||
|
case e := <-subscriptions.peerUpdates:
|
||||||
|
switch event := e.(type) {
|
||||||
|
// We have reestablished a connection with our peer, and should
|
||||||
|
// record an online event for any channels with that peer.
|
||||||
|
case peernotifier.PeerOnlineEvent:
|
||||||
|
c.peerEvent(event.PubKey, peerOnlineEvent)
|
||||||
|
|
||||||
|
// We have lost a connection with our peer, and should record an
|
||||||
|
// offline event for any channels with that peer.
|
||||||
|
case peernotifier.PeerOfflineEvent:
|
||||||
|
c.peerEvent(event.PubKey, peerOfflineEvent)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serve all requests for channel lifetime.
|
||||||
|
case req := <-c.lifespanRequests:
|
||||||
|
var resp lifespanResponse
|
||||||
|
|
||||||
|
channel, ok := c.channels[req.channelID]
|
||||||
|
if !ok {
|
||||||
|
resp.err = fmt.Errorf("channel %v not found", req.channelID)
|
||||||
|
} else {
|
||||||
|
resp.start = channel.openedAt
|
||||||
|
resp.end = channel.closedAt
|
||||||
|
}
|
||||||
|
|
||||||
|
req.responseChan <- resp
|
||||||
|
|
||||||
|
// Serve requests for channel uptime.
|
||||||
|
case req := <-c.uptimeRequests:
|
||||||
|
var resp uptimeResponse
|
||||||
|
|
||||||
|
channel, ok := c.channels[req.channelID]
|
||||||
|
if !ok {
|
||||||
|
resp.err = fmt.Errorf("channel %v not found", req.channelID)
|
||||||
|
} else {
|
||||||
|
uptime, err := channel.uptime(req.startTime, req.endTime)
|
||||||
|
resp.uptime = uptime
|
||||||
|
resp.err = err
|
||||||
|
}
|
||||||
|
|
||||||
|
req.responseChan <- resp
|
||||||
|
|
||||||
|
// Exit if the store receives the signal to shutdown.
|
||||||
|
case <-c.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLifespan returns the opening and closing time observed for a channel and
|
||||||
|
// a boolean to indicate whether the channel is known the the event store. If
|
||||||
|
// the channel is still open, a zero close time is returned.
|
||||||
|
func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, error) {
|
||||||
|
request := lifespanRequest{
|
||||||
|
channelID: chanID,
|
||||||
|
responseChan: make(chan lifespanResponse),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a request for the channel's lifespan to the main event loop, or
|
||||||
|
// return early with an error if the store has already received a shutdown
|
||||||
|
// signal.
|
||||||
|
select {
|
||||||
|
case c.lifespanRequests <- request:
|
||||||
|
case <-c.quit:
|
||||||
|
return time.Time{}, time.Time{}, errShuttingDown
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the response we receive on the response channel or exit early if
|
||||||
|
// the store is instructed to exit.
|
||||||
|
select {
|
||||||
|
case resp := <-request.responseChan:
|
||||||
|
return resp.start, resp.end, resp.err
|
||||||
|
|
||||||
|
case <-c.quit:
|
||||||
|
return time.Time{}, time.Time{}, errShuttingDown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetUptime returns the uptime of a channel over a period and an error if the
|
||||||
|
// channel cannot be found or the uptime calculation fails.
|
||||||
|
func (c *ChannelEventStore) GetUptime(chanID uint64, startTime,
|
||||||
|
endTime time.Time) (time.Duration, error) {
|
||||||
|
|
||||||
|
request := uptimeRequest{
|
||||||
|
channelID: chanID,
|
||||||
|
startTime: startTime,
|
||||||
|
endTime: endTime,
|
||||||
|
responseChan: make(chan uptimeResponse),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a request for the channel's uptime to the main event loop, or
|
||||||
|
// return early with an error if the store has already received a shutdown
|
||||||
|
// signal.
|
||||||
|
select {
|
||||||
|
case c.uptimeRequests <- request:
|
||||||
|
case <-c.quit:
|
||||||
|
return 0, errShuttingDown
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return the response we receive on the response channel or exit early if
|
||||||
|
// the store is instructed to exit.
|
||||||
|
select {
|
||||||
|
case resp := <-request.responseChan:
|
||||||
|
return resp.uptime, resp.err
|
||||||
|
|
||||||
|
case <-c.quit:
|
||||||
|
return 0, errShuttingDown
|
||||||
|
}
|
||||||
|
}
|
462
chanfitness/chaneventstore_test.go
Normal file
462
chanfitness/chaneventstore_test.go
Normal file
@ -0,0 +1,462 @@
|
|||||||
|
package chanfitness
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/btcec"
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/peernotifier"
|
||||||
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
|
"github.com/lightningnetwork/lnd/subscribe"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestStartStoreError tests the starting of the store in cases where the setup
|
||||||
|
// functions fail. It does not test the mechanics of consuming events because
|
||||||
|
// these are covered in a separate set of tests.
|
||||||
|
func TestStartStoreError(t *testing.T) {
|
||||||
|
// Ok and erroring subscribe functions are defined here to de-clutter tests.
|
||||||
|
okSubscribeFunc := func() (*subscribe.Client, error) {
|
||||||
|
return &subscribe.Client{
|
||||||
|
Cancel: func() {},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
errSubscribeFunc := func() (client *subscribe.Client, e error) {
|
||||||
|
return nil, errors.New("intentional test err")
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
ChannelEvents func() (*subscribe.Client, error)
|
||||||
|
PeerEvents func() (*subscribe.Client, error)
|
||||||
|
GetChannels func() ([]*channeldb.OpenChannel, error)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Channel events fail",
|
||||||
|
ChannelEvents: errSubscribeFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Peer events fail",
|
||||||
|
ChannelEvents: okSubscribeFunc,
|
||||||
|
PeerEvents: errSubscribeFunc,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Get open channels fails",
|
||||||
|
ChannelEvents: okSubscribeFunc,
|
||||||
|
PeerEvents: okSubscribeFunc,
|
||||||
|
GetChannels: func() (channels []*channeldb.OpenChannel, e error) {
|
||||||
|
return nil, errors.New("intentional test err")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
test := test
|
||||||
|
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
store := NewChannelEventStore(&Config{
|
||||||
|
SubscribeChannelEvents: test.ChannelEvents,
|
||||||
|
SubscribePeerEvents: test.PeerEvents,
|
||||||
|
GetOpenChannels: test.GetChannels,
|
||||||
|
})
|
||||||
|
|
||||||
|
err := store.Start()
|
||||||
|
// Check that we receive an error, because the test only checks for
|
||||||
|
// error cases.
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Expected error on startup, got: nil")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMonitorChannelEvents tests the store's handling of channel and peer
|
||||||
|
// events. It tests for the unexpected cases where we receive a channel open for
|
||||||
|
// an already known channel and but does not test for closing an unknown channel
|
||||||
|
// because it would require custom logic in the test to prevent iterating
|
||||||
|
// through an eventLog which does not exist. This test does not test handling
|
||||||
|
// of uptime and lifespan requests, as they are tested in their own tests.
|
||||||
|
func TestMonitorChannelEvents(t *testing.T) {
|
||||||
|
privKey, err := btcec.NewPrivateKey(btcec.S256())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error getting pubkey: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pubKey, err := route.NewVertexFromBytes(
|
||||||
|
privKey.PubKey().SerializeCompressed(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Could not create vertex: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
shortID := lnwire.ShortChannelID{
|
||||||
|
BlockHeight: 1234,
|
||||||
|
TxIndex: 2,
|
||||||
|
TxPosition: 2,
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
|
||||||
|
// generateEvents takes channels which represent the updates channels
|
||||||
|
// for subscription clients and passes events in the desired order.
|
||||||
|
// This function is intended to be blocking so that the test does not
|
||||||
|
// have a data race with event consumption, so the channels should not
|
||||||
|
// be buffered.
|
||||||
|
generateEvents func(channelEvents, peerEvents chan<- interface{})
|
||||||
|
|
||||||
|
// expectedEvents is the expected set of event types in the store.
|
||||||
|
expectedEvents []eventType
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Channel opened, peer comes online",
|
||||||
|
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
|
||||||
|
// Add an open channel event
|
||||||
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
|
Channel: &channeldb.OpenChannel{
|
||||||
|
ShortChannelID: shortID,
|
||||||
|
IdentityPub: privKey.PubKey(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a peer online event.
|
||||||
|
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
|
||||||
|
},
|
||||||
|
expectedEvents: []eventType{peerOnlineEvent},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Duplicate channel open events",
|
||||||
|
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
|
||||||
|
// Add an open channel event
|
||||||
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
|
Channel: &channeldb.OpenChannel{
|
||||||
|
ShortChannelID: shortID,
|
||||||
|
IdentityPub: privKey.PubKey(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a peer online event.
|
||||||
|
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
|
||||||
|
|
||||||
|
// Add a duplicate channel open event.
|
||||||
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
|
Channel: &channeldb.OpenChannel{
|
||||||
|
ShortChannelID: shortID,
|
||||||
|
IdentityPub: privKey.PubKey(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
expectedEvents: []eventType{peerOnlineEvent},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Channel opened, peer already online",
|
||||||
|
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
|
||||||
|
// Add a peer online event.
|
||||||
|
peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey}
|
||||||
|
|
||||||
|
// Add an open channel event
|
||||||
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
|
Channel: &channeldb.OpenChannel{
|
||||||
|
ShortChannelID: shortID,
|
||||||
|
IdentityPub: privKey.PubKey(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
expectedEvents: []eventType{peerOnlineEvent},
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
name: "Channel opened, peer offline, closed",
|
||||||
|
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
|
||||||
|
// Add an open channel event
|
||||||
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
|
Channel: &channeldb.OpenChannel{
|
||||||
|
ShortChannelID: shortID,
|
||||||
|
IdentityPub: privKey.PubKey(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a peer online event.
|
||||||
|
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey}
|
||||||
|
|
||||||
|
// Add a close channel event.
|
||||||
|
channelEvents <- channelnotifier.ClosedChannelEvent{
|
||||||
|
CloseSummary: &channeldb.ChannelCloseSummary{
|
||||||
|
ShortChanID: shortID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
expectedEvents: []eventType{peerOfflineEvent},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Event after channel close not recorded",
|
||||||
|
generateEvents: func(channelEvents, peerEvents chan<- interface{}) {
|
||||||
|
// Add an open channel event
|
||||||
|
channelEvents <- channelnotifier.OpenChannelEvent{
|
||||||
|
Channel: &channeldb.OpenChannel{
|
||||||
|
ShortChannelID: shortID,
|
||||||
|
IdentityPub: privKey.PubKey(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a close channel event.
|
||||||
|
channelEvents <- channelnotifier.ClosedChannelEvent{
|
||||||
|
CloseSummary: &channeldb.ChannelCloseSummary{
|
||||||
|
ShortChanID: shortID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a peer online event.
|
||||||
|
peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
test := test
|
||||||
|
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
// Create a store with the channels and online peers specified
|
||||||
|
// by the test.
|
||||||
|
store := NewChannelEventStore(&Config{})
|
||||||
|
|
||||||
|
// Create channels which represent the subscriptions we have to peer
|
||||||
|
// and client events.
|
||||||
|
channelEvents := make(chan interface{})
|
||||||
|
peerEvents := make(chan interface{})
|
||||||
|
|
||||||
|
store.wg.Add(1)
|
||||||
|
go store.consume(&subscriptions{
|
||||||
|
channelUpdates: channelEvents,
|
||||||
|
peerUpdates: peerEvents,
|
||||||
|
cancel: func() {},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Add events to the store then kill the goroutine using store.Stop.
|
||||||
|
test.generateEvents(channelEvents, peerEvents)
|
||||||
|
store.Stop()
|
||||||
|
|
||||||
|
// Retrieve the eventLog for the channel and check that its
|
||||||
|
// contents are as expected.
|
||||||
|
eventLog, ok := store.channels[shortID.ToUint64()]
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Expected to find event store")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, e := range eventLog.events {
|
||||||
|
if test.expectedEvents[i] != e.eventType {
|
||||||
|
t.Fatalf("Expected type: %v, got: %v",
|
||||||
|
test.expectedEvents[i], e.eventType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestGetLifetime tests the GetLifetime function for the cases where a channel
|
||||||
|
// is known and unknown to the store.
|
||||||
|
func TestGetLifetime(t *testing.T) {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
channelFound bool
|
||||||
|
chanID uint64
|
||||||
|
opened time.Time
|
||||||
|
closed time.Time
|
||||||
|
expectErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Channel found",
|
||||||
|
channelFound: true,
|
||||||
|
opened: now,
|
||||||
|
closed: now.Add(time.Hour * -1),
|
||||||
|
expectErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Channel not found",
|
||||||
|
expectErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
test := test
|
||||||
|
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
// Create and empty events store for testing.
|
||||||
|
store := NewChannelEventStore(&Config{})
|
||||||
|
|
||||||
|
// Start goroutine which consumes GetLifespan requests.
|
||||||
|
store.wg.Add(1)
|
||||||
|
go store.consume(&subscriptions{
|
||||||
|
channelUpdates: make(chan interface{}),
|
||||||
|
peerUpdates: make(chan interface{}),
|
||||||
|
cancel: func() {},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Stop the store's go routine.
|
||||||
|
defer store.Stop()
|
||||||
|
|
||||||
|
// Add channel to eventStore if the test indicates that it should
|
||||||
|
// be present.
|
||||||
|
if test.channelFound {
|
||||||
|
store.channels[test.chanID] = &chanEventLog{
|
||||||
|
openedAt: test.opened,
|
||||||
|
closedAt: test.closed,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
open, close, err := store.GetLifespan(test.chanID)
|
||||||
|
if test.expectErr && err == nil {
|
||||||
|
t.Fatal("Expected an error, got nil")
|
||||||
|
}
|
||||||
|
if !test.expectErr && err != nil {
|
||||||
|
t.Fatalf("Expected no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if open != test.opened {
|
||||||
|
t.Errorf("Expected: %v, got %v", test.opened, open)
|
||||||
|
}
|
||||||
|
|
||||||
|
if close != test.closed {
|
||||||
|
t.Errorf("Expected: %v, got %v", test.closed, close)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestGetUptime tests the getUptime call for channels known to the event store.
|
||||||
|
// It does not test the trivial case where a channel is unknown to the store,
|
||||||
|
// because this is simply a zero return if an item is not found in a map. It
|
||||||
|
// tests the unexpected edge cases where a tracked channel does not have any
|
||||||
|
// events recorded, and when a zero time is specified for the uptime range.
|
||||||
|
func TestGetUptime(t *testing.T) {
|
||||||
|
// Set time for deterministic unit tests.
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
twoHoursAgo := now.Add(time.Hour * -2)
|
||||||
|
fourHoursAgo := now.Add(time.Hour * -4)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
|
||||||
|
chanID uint64
|
||||||
|
|
||||||
|
// events is the set of events we expect to find in the channel store.
|
||||||
|
events []*channelEvent
|
||||||
|
|
||||||
|
// openedAt is the time the channel is recorded as open by the store.
|
||||||
|
openedAt time.Time
|
||||||
|
|
||||||
|
// closedAt is the time the channel is recorded as closed by the store.
|
||||||
|
// If the channel is still open, this value is zero.
|
||||||
|
closedAt time.Time
|
||||||
|
|
||||||
|
// channelFound is true if we expect to find the channel in the store.
|
||||||
|
channelFound bool
|
||||||
|
|
||||||
|
// startTime specifies the beginning of the uptime range we want to
|
||||||
|
// calculate.
|
||||||
|
startTime time.Time
|
||||||
|
|
||||||
|
// endTime specified the end of the uptime range we want to calculate.
|
||||||
|
endTime time.Time
|
||||||
|
|
||||||
|
expectedUptime time.Duration
|
||||||
|
expectErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "No events",
|
||||||
|
startTime: twoHoursAgo,
|
||||||
|
endTime: now,
|
||||||
|
channelFound: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "50% Uptime",
|
||||||
|
events: []*channelEvent{
|
||||||
|
{
|
||||||
|
timestamp: fourHoursAgo,
|
||||||
|
eventType: peerOnlineEvent,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timestamp: twoHoursAgo,
|
||||||
|
eventType: peerOfflineEvent,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
openedAt: fourHoursAgo,
|
||||||
|
expectedUptime: time.Hour * 2,
|
||||||
|
startTime: fourHoursAgo,
|
||||||
|
endTime: now,
|
||||||
|
channelFound: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Zero start time",
|
||||||
|
events: []*channelEvent{
|
||||||
|
{
|
||||||
|
timestamp: fourHoursAgo,
|
||||||
|
eventType: peerOnlineEvent,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
openedAt: fourHoursAgo,
|
||||||
|
expectedUptime: time.Hour * 4,
|
||||||
|
endTime: now,
|
||||||
|
channelFound: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Channel not found",
|
||||||
|
startTime: twoHoursAgo,
|
||||||
|
endTime: now,
|
||||||
|
channelFound: false,
|
||||||
|
expectErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
test := test
|
||||||
|
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
// Set up event store with the events specified for the test and
|
||||||
|
// mocked time.
|
||||||
|
store := NewChannelEventStore(&Config{})
|
||||||
|
|
||||||
|
// Start goroutine which consumes GetUptime requests.
|
||||||
|
store.wg.Add(1)
|
||||||
|
go store.consume(&subscriptions{
|
||||||
|
channelUpdates: make(chan interface{}),
|
||||||
|
peerUpdates: make(chan interface{}),
|
||||||
|
cancel: func() {},
|
||||||
|
})
|
||||||
|
|
||||||
|
// Stop the store's goroutine.
|
||||||
|
defer store.Stop()
|
||||||
|
|
||||||
|
// Add the channel to the store if it is intended to be found.
|
||||||
|
if test.channelFound {
|
||||||
|
store.channels[test.chanID] = &chanEventLog{
|
||||||
|
events: test.events,
|
||||||
|
now: func() time.Time { return now },
|
||||||
|
openedAt: test.openedAt,
|
||||||
|
closedAt: test.closedAt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
uptime, err := store.GetUptime(test.chanID, test.startTime, test.endTime)
|
||||||
|
if test.expectErr && err == nil {
|
||||||
|
t.Fatal("Expected an error, got nil")
|
||||||
|
}
|
||||||
|
if !test.expectErr && err != nil {
|
||||||
|
t.Fatalf("Expcted no error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if uptime != test.expectedUptime {
|
||||||
|
t.Fatalf("Expected uptime percentage: %v, got %v",
|
||||||
|
test.expectedUptime, uptime)
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
@ -73,7 +73,11 @@ func (c *ChannelNotifier) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
|
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
|
||||||
// any time the Server is made aware of a new event.
|
// any time the Server is made aware of a new event. The subscription provides
|
||||||
|
// channel events from the point of subscription onwards.
|
||||||
|
//
|
||||||
|
// TODO(carlaKC): update to allow subscriptions to specify a block height from
|
||||||
|
// which we would like to subscribe to events.
|
||||||
func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
|
func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
|
||||||
return c.ntfnServer.Subscribe()
|
return c.ntfnServer.Subscribe()
|
||||||
}
|
}
|
||||||
|
18
server.go
18
server.go
@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/brontide"
|
"github.com/lightningnetwork/lnd/brontide"
|
||||||
"github.com/lightningnetwork/lnd/chanacceptor"
|
"github.com/lightningnetwork/lnd/chanacceptor"
|
||||||
"github.com/lightningnetwork/lnd/chanbackup"
|
"github.com/lightningnetwork/lnd/chanbackup"
|
||||||
|
"github.com/lightningnetwork/lnd/chanfitness"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/channelnotifier"
|
"github.com/lightningnetwork/lnd/channelnotifier"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
@ -243,6 +244,10 @@ type server struct {
|
|||||||
// channelNotifier to be notified of newly opened and closed channels.
|
// channelNotifier to be notified of newly opened and closed channels.
|
||||||
chanSubSwapper *chanbackup.SubSwapper
|
chanSubSwapper *chanbackup.SubSwapper
|
||||||
|
|
||||||
|
// chanEventStore tracks the behaviour of channels and their remote peers to
|
||||||
|
// provide insights into their health and performance.
|
||||||
|
chanEventStore *chanfitness.ChannelEventStore
|
||||||
|
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@ -1113,6 +1118,13 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
|
|||||||
// to peer online and offline events.
|
// to peer online and offline events.
|
||||||
s.peerNotifier = peernotifier.New()
|
s.peerNotifier = peernotifier.New()
|
||||||
|
|
||||||
|
// Create a channel event store which monitors all open channels.
|
||||||
|
s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{
|
||||||
|
SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents,
|
||||||
|
SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents,
|
||||||
|
GetOpenChannels: s.chanDB.FetchAllOpenChannels,
|
||||||
|
})
|
||||||
|
|
||||||
if cfg.WtClient.Active {
|
if cfg.WtClient.Active {
|
||||||
policy := wtpolicy.DefaultPolicy()
|
policy := wtpolicy.DefaultPolicy()
|
||||||
|
|
||||||
@ -1270,6 +1282,11 @@ func (s *server) Start() error {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := s.chanEventStore.Start(); err != nil {
|
||||||
|
startErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Before we start the connMgr, we'll check to see if we have
|
// Before we start the connMgr, we'll check to see if we have
|
||||||
// any backups to recover. We do this now as we want to ensure
|
// any backups to recover. We do this now as we want to ensure
|
||||||
// that have all the information we need to handle channel
|
// that have all the information we need to handle channel
|
||||||
@ -1385,6 +1402,7 @@ func (s *server) Stop() error {
|
|||||||
s.invoices.Stop()
|
s.invoices.Stop()
|
||||||
s.fundingMgr.Stop()
|
s.fundingMgr.Stop()
|
||||||
s.chanSubSwapper.Stop()
|
s.chanSubSwapper.Stop()
|
||||||
|
s.chanEventStore.Stop()
|
||||||
|
|
||||||
// Disconnect from each active peers to ensure that
|
// Disconnect from each active peers to ensure that
|
||||||
// peerTerminationWatchers signal completion to each peer.
|
// peerTerminationWatchers signal completion to each peer.
|
||||||
|
Loading…
Reference in New Issue
Block a user