diff --git a/.golangci.yml b/.golangci.yml index ca3e6881..6ac09542 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -55,6 +55,9 @@ linters: # the linter. - prealloc + # Init functions are used by loggers throughout the codebase. + - gochecknoinits + issues: # Only show newly introduced problems. new-from-rev: 01f696afce2f9c0d4ed854edefa3846891d01d8a diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go new file mode 100644 index 00000000..a4339765 --- /dev/null +++ b/chanfitness/chaneventstore.go @@ -0,0 +1,390 @@ +// Package chanfitness monitors the behaviour of channels to provide insight +// into the health and performance of a channel. This is achieved by maintaining +// an event store which tracks events for each channel. +// +// Lifespan: the period that the channel has been known to the scoring system. +// Note that lifespan may not equal the channel's full lifetime because data is +// not currently persisted. +// +// Uptime: the total time within a given period that the channel's remote peer +// has been online. +package chanfitness + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" + "github.com/lightningnetwork/lnd/peernotifier" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/subscribe" +) + +// errShuttingDown is returned when the store cannot respond to a query because +// it has received the shutdown signal. +var errShuttingDown = errors.New("channel event store shutting down") + +// ChannelEventStore maintains a set of event logs for the node's channels to +// provide insight into the performance and health of channels. +type ChannelEventStore struct { + cfg *Config + + // channels maps short channel IDs to event logs. + channels map[uint64]*chanEventLog + + // peers tracks the current online status of peers based on online/offline + // events. + peers map[route.Vertex]bool + + // lifespanRequests serves requests for the lifespan of channels. + lifespanRequests chan lifespanRequest + + // uptimeRequests serves requests for the uptime of channels. + uptimeRequests chan uptimeRequest + + quit chan struct{} + + wg sync.WaitGroup +} + +// Config provides the event store with functions required to monitor channel +// activity. All elements of the config must be non-nil for the event store to +// operate. +type Config struct { + // SubscribeChannelEvents provides a subscription client which provides a + // stream of channel events. + SubscribeChannelEvents func() (*subscribe.Client, error) + + // SubscribePeerEvents provides a subscription client which provides a + // stream of peer online/offline events. + SubscribePeerEvents func() (*subscribe.Client, error) + + // GetOpenChannels provides a list of existing open channels which is used + // to populate the ChannelEventStore with a set of channels on startup. + GetOpenChannels func() ([]*channeldb.OpenChannel, error) +} + +// lifespanRequest contains the channel ID required to query the store for a +// channel's lifespan and a blocking response channel on which the result is +// sent. +type lifespanRequest struct { + channelID uint64 + responseChan chan lifespanResponse +} + +// lifespanResponse contains the response to a lifespanRequest and an error if +// one occurred. +type lifespanResponse struct { + start time.Time + end time.Time + err error +} + +// uptimeRequest contains the parameters required to query the store for a +// channel's uptime and a blocking response channel on which the result is sent. +type uptimeRequest struct { + channelID uint64 + startTime time.Time + endTime time.Time + responseChan chan uptimeResponse +} + +// uptimeResponse contains the response to an uptimeRequest and an error if one +// occurred. +type uptimeResponse struct { + uptime time.Duration + err error +} + +// NewChannelEventStore initializes an event store with the config provided. +// Note that this function does not start the main event loop, Start() must be +// called. +func NewChannelEventStore(config *Config) *ChannelEventStore { + store := &ChannelEventStore{ + cfg: config, + channels: make(map[uint64]*chanEventLog), + peers: make(map[route.Vertex]bool), + lifespanRequests: make(chan lifespanRequest), + uptimeRequests: make(chan uptimeRequest), + quit: make(chan struct{}), + } + + return store +} + +// Start adds all existing open channels to the event store and starts the main +// loop which records channel and peer events, and serves requests for +// information from the store. If this function fails, it cancels its existing +// subscriptions and returns an error. +func (c *ChannelEventStore) Start() error { + // Create a subscription to channel events. + channelClient, err := c.cfg.SubscribeChannelEvents() + if err != nil { + return err + } + + // Create a subscription to peer events. If an error occurs, cancel the + // existing subscription to channel events and return. + peerClient, err := c.cfg.SubscribePeerEvents() + if err != nil { + channelClient.Cancel() + return err + } + + // cancel should be called to cancel all subscriptions if an error occurs. + cancel := func() { + channelClient.Cancel() + peerClient.Cancel() + } + + // Add the existing set of channels to the event store. This is required + // because channel events will not be triggered for channels that exist + // at startup time. + channels, err := c.cfg.GetOpenChannels() + if err != nil { + cancel() + return err + } + + log.Infof("Adding %v channels to event store", len(channels)) + + for _, ch := range channels { + peerKey, err := route.NewVertexFromBytes( + ch.IdentityPub.SerializeCompressed(), + ) + if err != nil { + cancel() + return err + } + + // Add existing channels to the channel store with an initial peer + // online or offline event. + c.addChannel(ch.ShortChanID().ToUint64(), peerKey) + } + + // Start a goroutine that consumes events from all subscriptions. + c.wg.Add(1) + go c.consume(&subscriptions{ + channelUpdates: channelClient.Updates(), + peerUpdates: peerClient.Updates(), + cancel: cancel, + }) + + return nil +} + +// Stop terminates all goroutines started by the event store. +func (c *ChannelEventStore) Stop() { + log.Info("Stopping event store") + + // Stop the consume goroutine. + close(c.quit) + + c.wg.Wait() +} + +// addChannel adds a new channel to the ChannelEventStore's map of channels with +// an initial peer online state (if the peer is online). If the channel is +// already present in the map, the function returns early. This function should +// be called to add existing channels on startup and when open channel events +// are observed. +func (c *ChannelEventStore) addChannel(channelID uint64, peer route.Vertex) { + // Check for the unexpected case where the channel is already in the store. + _, ok := c.channels[channelID] + if ok { + log.Errorf("Channel %v duplicated in channel store", channelID) + return + } + + eventLog := newEventLog(channelID, peer, time.Now) + + // If the peer is online, add a peer online event to indicate its starting + // state. + online := c.peers[peer] + if online { + eventLog.add(peerOnlineEvent) + } + + c.channels[channelID] = eventLog +} + +// closeChannel records a closed time for a channel, and returns early is the +// channel is not known to the event store. +func (c *ChannelEventStore) closeChannel(channelID uint64) { + // Check for the unexpected case where the channel is unknown to the store. + eventLog, ok := c.channels[channelID] + if !ok { + log.Errorf("Close channel %v unknown to store", channelID) + return + } + + eventLog.close() +} + +// peerEvent adds a peer online or offline event to all channels we currently +// have open with a peer. +func (c *ChannelEventStore) peerEvent(peer route.Vertex, event eventType) { + // Track current online status of peers in the channelEventStore. + c.peers[peer] = event == peerOnlineEvent + + for _, eventLog := range c.channels { + if eventLog.peer == peer { + eventLog.add(event) + } + } +} + +// subscriptions abstracts away from subscription clients to allow for mocking. +type subscriptions struct { + channelUpdates <-chan interface{} + peerUpdates <-chan interface{} + cancel func() +} + +// consume is the event store's main loop. It consumes subscriptions to update +// the event store with channel and peer events, and serves requests for channel +// uptime and lifespan. +func (c *ChannelEventStore) consume(subscriptions *subscriptions) { + defer c.wg.Done() + defer subscriptions.cancel() + + // Consume events until the channel is closed. + for { + select { + // Process channel opened and closed events. + case e := <-subscriptions.channelUpdates: + switch event := e.(type) { + // A new channel has been opened, we must add the channel to the + // store and record a channel open event. + case channelnotifier.OpenChannelEvent: + chanID := event.Channel.ShortChanID().ToUint64() + + peerKey, err := route.NewVertexFromBytes( + event.Channel.IdentityPub.SerializeCompressed(), + ) + if err != nil { + log.Errorf("Could not get vertex from: %v", + event.Channel.IdentityPub.SerializeCompressed()) + } + + c.addChannel(chanID, peerKey) + + // A channel has been closed, we must remove the channel from the + // store and record a channel closed event. + case channelnotifier.ClosedChannelEvent: + c.closeChannel(event.CloseSummary.ShortChanID.ToUint64()) + } + + // Process peer online and offline events. + case e := <-subscriptions.peerUpdates: + switch event := e.(type) { + // We have reestablished a connection with our peer, and should + // record an online event for any channels with that peer. + case peernotifier.PeerOnlineEvent: + c.peerEvent(event.PubKey, peerOnlineEvent) + + // We have lost a connection with our peer, and should record an + // offline event for any channels with that peer. + case peernotifier.PeerOfflineEvent: + c.peerEvent(event.PubKey, peerOfflineEvent) + } + + // Serve all requests for channel lifetime. + case req := <-c.lifespanRequests: + var resp lifespanResponse + + channel, ok := c.channels[req.channelID] + if !ok { + resp.err = fmt.Errorf("channel %v not found", req.channelID) + } else { + resp.start = channel.openedAt + resp.end = channel.closedAt + } + + req.responseChan <- resp + + // Serve requests for channel uptime. + case req := <-c.uptimeRequests: + var resp uptimeResponse + + channel, ok := c.channels[req.channelID] + if !ok { + resp.err = fmt.Errorf("channel %v not found", req.channelID) + } else { + uptime, err := channel.uptime(req.startTime, req.endTime) + resp.uptime = uptime + resp.err = err + } + + req.responseChan <- resp + + // Exit if the store receives the signal to shutdown. + case <-c.quit: + return + } + } +} + +// GetLifespan returns the opening and closing time observed for a channel and +// a boolean to indicate whether the channel is known the the event store. If +// the channel is still open, a zero close time is returned. +func (c *ChannelEventStore) GetLifespan(chanID uint64) (time.Time, time.Time, error) { + request := lifespanRequest{ + channelID: chanID, + responseChan: make(chan lifespanResponse), + } + + // Send a request for the channel's lifespan to the main event loop, or + // return early with an error if the store has already received a shutdown + // signal. + select { + case c.lifespanRequests <- request: + case <-c.quit: + return time.Time{}, time.Time{}, errShuttingDown + } + + // Return the response we receive on the response channel or exit early if + // the store is instructed to exit. + select { + case resp := <-request.responseChan: + return resp.start, resp.end, resp.err + + case <-c.quit: + return time.Time{}, time.Time{}, errShuttingDown + } +} + +// GetUptime returns the uptime of a channel over a period and an error if the +// channel cannot be found or the uptime calculation fails. +func (c *ChannelEventStore) GetUptime(chanID uint64, startTime, + endTime time.Time) (time.Duration, error) { + + request := uptimeRequest{ + channelID: chanID, + startTime: startTime, + endTime: endTime, + responseChan: make(chan uptimeResponse), + } + + // Send a request for the channel's uptime to the main event loop, or + // return early with an error if the store has already received a shutdown + // signal. + select { + case c.uptimeRequests <- request: + case <-c.quit: + return 0, errShuttingDown + } + + // Return the response we receive on the response channel or exit early if + // the store is instructed to exit. + select { + case resp := <-request.responseChan: + return resp.uptime, resp.err + + case <-c.quit: + return 0, errShuttingDown + } +} diff --git a/chanfitness/chaneventstore_test.go b/chanfitness/chaneventstore_test.go new file mode 100644 index 00000000..a30f7d15 --- /dev/null +++ b/chanfitness/chaneventstore_test.go @@ -0,0 +1,462 @@ +package chanfitness + +import ( + "errors" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/peernotifier" + "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/subscribe" +) + +// TestStartStoreError tests the starting of the store in cases where the setup +// functions fail. It does not test the mechanics of consuming events because +// these are covered in a separate set of tests. +func TestStartStoreError(t *testing.T) { + // Ok and erroring subscribe functions are defined here to de-clutter tests. + okSubscribeFunc := func() (*subscribe.Client, error) { + return &subscribe.Client{ + Cancel: func() {}, + }, nil + } + + errSubscribeFunc := func() (client *subscribe.Client, e error) { + return nil, errors.New("intentional test err") + } + + tests := []struct { + name string + ChannelEvents func() (*subscribe.Client, error) + PeerEvents func() (*subscribe.Client, error) + GetChannels func() ([]*channeldb.OpenChannel, error) + }{ + { + name: "Channel events fail", + ChannelEvents: errSubscribeFunc, + }, + { + name: "Peer events fail", + ChannelEvents: okSubscribeFunc, + PeerEvents: errSubscribeFunc, + }, + { + name: "Get open channels fails", + ChannelEvents: okSubscribeFunc, + PeerEvents: okSubscribeFunc, + GetChannels: func() (channels []*channeldb.OpenChannel, e error) { + return nil, errors.New("intentional test err") + }, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + store := NewChannelEventStore(&Config{ + SubscribeChannelEvents: test.ChannelEvents, + SubscribePeerEvents: test.PeerEvents, + GetOpenChannels: test.GetChannels, + }) + + err := store.Start() + // Check that we receive an error, because the test only checks for + // error cases. + if err == nil { + t.Fatalf("Expected error on startup, got: nil") + } + }) + } +} + +// TestMonitorChannelEvents tests the store's handling of channel and peer +// events. It tests for the unexpected cases where we receive a channel open for +// an already known channel and but does not test for closing an unknown channel +// because it would require custom logic in the test to prevent iterating +// through an eventLog which does not exist. This test does not test handling +// of uptime and lifespan requests, as they are tested in their own tests. +func TestMonitorChannelEvents(t *testing.T) { + privKey, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("Error getting pubkey: %v", err) + } + + pubKey, err := route.NewVertexFromBytes( + privKey.PubKey().SerializeCompressed(), + ) + if err != nil { + t.Fatalf("Could not create vertex: %v", err) + } + + shortID := lnwire.ShortChannelID{ + BlockHeight: 1234, + TxIndex: 2, + TxPosition: 2, + } + + tests := []struct { + name string + + // generateEvents takes channels which represent the updates channels + // for subscription clients and passes events in the desired order. + // This function is intended to be blocking so that the test does not + // have a data race with event consumption, so the channels should not + // be buffered. + generateEvents func(channelEvents, peerEvents chan<- interface{}) + + // expectedEvents is the expected set of event types in the store. + expectedEvents []eventType + }{ + { + name: "Channel opened, peer comes online", + generateEvents: func(channelEvents, peerEvents chan<- interface{}) { + // Add an open channel event + channelEvents <- channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + } + + // Add a peer online event. + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + }, + expectedEvents: []eventType{peerOnlineEvent}, + }, + { + name: "Duplicate channel open events", + generateEvents: func(channelEvents, peerEvents chan<- interface{}) { + // Add an open channel event + channelEvents <- channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + } + + // Add a peer online event. + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + + // Add a duplicate channel open event. + channelEvents <- channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + } + }, + expectedEvents: []eventType{peerOnlineEvent}, + }, + { + name: "Channel opened, peer already online", + generateEvents: func(channelEvents, peerEvents chan<- interface{}) { + // Add a peer online event. + peerEvents <- peernotifier.PeerOnlineEvent{PubKey: pubKey} + + // Add an open channel event + channelEvents <- channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + } + }, + expectedEvents: []eventType{peerOnlineEvent}, + }, + + { + name: "Channel opened, peer offline, closed", + generateEvents: func(channelEvents, peerEvents chan<- interface{}) { + // Add an open channel event + channelEvents <- channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + } + + // Add a peer online event. + peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey} + + // Add a close channel event. + channelEvents <- channelnotifier.ClosedChannelEvent{ + CloseSummary: &channeldb.ChannelCloseSummary{ + ShortChanID: shortID, + }, + } + }, + expectedEvents: []eventType{peerOfflineEvent}, + }, + { + name: "Event after channel close not recorded", + generateEvents: func(channelEvents, peerEvents chan<- interface{}) { + // Add an open channel event + channelEvents <- channelnotifier.OpenChannelEvent{ + Channel: &channeldb.OpenChannel{ + ShortChannelID: shortID, + IdentityPub: privKey.PubKey(), + }, + } + + // Add a close channel event. + channelEvents <- channelnotifier.ClosedChannelEvent{ + CloseSummary: &channeldb.ChannelCloseSummary{ + ShortChanID: shortID, + }, + } + + // Add a peer online event. + peerEvents <- peernotifier.PeerOfflineEvent{PubKey: pubKey} + }, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + // Create a store with the channels and online peers specified + // by the test. + store := NewChannelEventStore(&Config{}) + + // Create channels which represent the subscriptions we have to peer + // and client events. + channelEvents := make(chan interface{}) + peerEvents := make(chan interface{}) + + store.wg.Add(1) + go store.consume(&subscriptions{ + channelUpdates: channelEvents, + peerUpdates: peerEvents, + cancel: func() {}, + }) + + // Add events to the store then kill the goroutine using store.Stop. + test.generateEvents(channelEvents, peerEvents) + store.Stop() + + // Retrieve the eventLog for the channel and check that its + // contents are as expected. + eventLog, ok := store.channels[shortID.ToUint64()] + if !ok { + t.Fatalf("Expected to find event store") + } + + for i, e := range eventLog.events { + if test.expectedEvents[i] != e.eventType { + t.Fatalf("Expected type: %v, got: %v", + test.expectedEvents[i], e.eventType) + } + } + }) + } +} + +// TestGetLifetime tests the GetLifetime function for the cases where a channel +// is known and unknown to the store. +func TestGetLifetime(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + channelFound bool + chanID uint64 + opened time.Time + closed time.Time + expectErr bool + }{ + { + name: "Channel found", + channelFound: true, + opened: now, + closed: now.Add(time.Hour * -1), + expectErr: false, + }, + { + name: "Channel not found", + expectErr: true, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + // Create and empty events store for testing. + store := NewChannelEventStore(&Config{}) + + // Start goroutine which consumes GetLifespan requests. + store.wg.Add(1) + go store.consume(&subscriptions{ + channelUpdates: make(chan interface{}), + peerUpdates: make(chan interface{}), + cancel: func() {}, + }) + + // Stop the store's go routine. + defer store.Stop() + + // Add channel to eventStore if the test indicates that it should + // be present. + if test.channelFound { + store.channels[test.chanID] = &chanEventLog{ + openedAt: test.opened, + closedAt: test.closed, + } + } + + open, close, err := store.GetLifespan(test.chanID) + if test.expectErr && err == nil { + t.Fatal("Expected an error, got nil") + } + if !test.expectErr && err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if open != test.opened { + t.Errorf("Expected: %v, got %v", test.opened, open) + } + + if close != test.closed { + t.Errorf("Expected: %v, got %v", test.closed, close) + } + }) + } +} + +// TestGetUptime tests the getUptime call for channels known to the event store. +// It does not test the trivial case where a channel is unknown to the store, +// because this is simply a zero return if an item is not found in a map. It +// tests the unexpected edge cases where a tracked channel does not have any +// events recorded, and when a zero time is specified for the uptime range. +func TestGetUptime(t *testing.T) { + // Set time for deterministic unit tests. + now := time.Now() + + twoHoursAgo := now.Add(time.Hour * -2) + fourHoursAgo := now.Add(time.Hour * -4) + + tests := []struct { + name string + + chanID uint64 + + // events is the set of events we expect to find in the channel store. + events []*channelEvent + + // openedAt is the time the channel is recorded as open by the store. + openedAt time.Time + + // closedAt is the time the channel is recorded as closed by the store. + // If the channel is still open, this value is zero. + closedAt time.Time + + // channelFound is true if we expect to find the channel in the store. + channelFound bool + + // startTime specifies the beginning of the uptime range we want to + // calculate. + startTime time.Time + + // endTime specified the end of the uptime range we want to calculate. + endTime time.Time + + expectedUptime time.Duration + expectErr bool + }{ + { + name: "No events", + startTime: twoHoursAgo, + endTime: now, + channelFound: true, + }, + { + name: "50% Uptime", + events: []*channelEvent{ + { + timestamp: fourHoursAgo, + eventType: peerOnlineEvent, + }, + { + timestamp: twoHoursAgo, + eventType: peerOfflineEvent, + }, + }, + openedAt: fourHoursAgo, + expectedUptime: time.Hour * 2, + startTime: fourHoursAgo, + endTime: now, + channelFound: true, + }, + { + name: "Zero start time", + events: []*channelEvent{ + { + timestamp: fourHoursAgo, + eventType: peerOnlineEvent, + }, + }, + openedAt: fourHoursAgo, + expectedUptime: time.Hour * 4, + endTime: now, + channelFound: true, + }, + { + name: "Channel not found", + startTime: twoHoursAgo, + endTime: now, + channelFound: false, + expectErr: true, + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + // Set up event store with the events specified for the test and + // mocked time. + store := NewChannelEventStore(&Config{}) + + // Start goroutine which consumes GetUptime requests. + store.wg.Add(1) + go store.consume(&subscriptions{ + channelUpdates: make(chan interface{}), + peerUpdates: make(chan interface{}), + cancel: func() {}, + }) + + // Stop the store's goroutine. + defer store.Stop() + + // Add the channel to the store if it is intended to be found. + if test.channelFound { + store.channels[test.chanID] = &chanEventLog{ + events: test.events, + now: func() time.Time { return now }, + openedAt: test.openedAt, + closedAt: test.closedAt, + } + } + + uptime, err := store.GetUptime(test.chanID, test.startTime, test.endTime) + if test.expectErr && err == nil { + t.Fatal("Expected an error, got nil") + } + if !test.expectErr && err != nil { + t.Fatalf("Expcted no error, got: %v", err) + } + + if uptime != test.expectedUptime { + t.Fatalf("Expected uptime percentage: %v, got %v", + test.expectedUptime, uptime) + } + + }) + } +} diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index fa6cbcdd..b2e5c674 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -73,7 +73,11 @@ func (c *ChannelNotifier) Stop() { } // SubscribeChannelEvents returns a subscribe.Client that will receive updates -// any time the Server is made aware of a new event. +// any time the Server is made aware of a new event. The subscription provides +// channel events from the point of subscription onwards. +// +// TODO(carlaKC): update to allow subscriptions to specify a block height from +// which we would like to subscribe to events. func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) { return c.ntfnServer.Subscribe() } diff --git a/server.go b/server.go index b52a8f22..15130aff 100644 --- a/server.go +++ b/server.go @@ -30,6 +30,7 @@ import ( "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/chanacceptor" "github.com/lightningnetwork/lnd/chanbackup" + "github.com/lightningnetwork/lnd/chanfitness" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" @@ -243,6 +244,10 @@ type server struct { // channelNotifier to be notified of newly opened and closed channels. chanSubSwapper *chanbackup.SubSwapper + // chanEventStore tracks the behaviour of channels and their remote peers to + // provide insights into their health and performance. + chanEventStore *chanfitness.ChannelEventStore + quit chan struct{} wg sync.WaitGroup @@ -1113,6 +1118,13 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, // to peer online and offline events. s.peerNotifier = peernotifier.New() + // Create a channel event store which monitors all open channels. + s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{ + SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents, + SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents, + GetOpenChannels: s.chanDB.FetchAllOpenChannels, + }) + if cfg.WtClient.Active { policy := wtpolicy.DefaultPolicy() @@ -1270,6 +1282,11 @@ func (s *server) Start() error { return } + if err := s.chanEventStore.Start(); err != nil { + startErr = err + return + } + // Before we start the connMgr, we'll check to see if we have // any backups to recover. We do this now as we want to ensure // that have all the information we need to handle channel @@ -1385,6 +1402,7 @@ func (s *server) Stop() error { s.invoices.Stop() s.fundingMgr.Stop() s.chanSubSwapper.Stop() + s.chanEventStore.Stop() // Disconnect from each active peers to ensure that // peerTerminationWatchers signal completion to each peer.