// Package chanfitness monitors the behaviour of channels to provide insight // into the health and performance of a channel. This is achieved by maintaining // an event store which tracks events for each channel. // // Lifespan: the period that the channel has been known to the scoring system. // Note that lifespan may not equal the channel's full lifetime because data is // not currently persisted. // // Uptime: the total time within a given period that the channel's remote peer // has been online. package chanfitness import ( "errors" "sync" "time" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/ticker" ) const ( // FlapCountFlushRate determines how often we write peer total flap // count to disk. FlapCountFlushRate = time.Hour ) var ( // errShuttingDown is returned when the store cannot respond to a query // because it has received the shutdown signal. errShuttingDown = errors.New("channel event store shutting down") // ErrChannelNotFound is returned when a query is made for a channel // that the event store does not have knowledge of. ErrChannelNotFound = errors.New("channel not found in event store") // ErrPeerNotFound is returned when a query is made for a channel // that has a peer that the event store is not currently tracking. ErrPeerNotFound = errors.New("peer not found in event store") ) // ChannelEventStore maintains a set of event logs for the node's channels to // provide insight into the performance and health of channels. type ChannelEventStore struct { cfg *Config // peers tracks all of our currently monitored peers and their channels. peers map[route.Vertex]peerMonitor // chanInfoRequests serves requests for information about our channel. chanInfoRequests chan channelInfoRequest // peerRequests serves requests for information about a peer. peerRequests chan peerRequest quit chan struct{} wg sync.WaitGroup } // Config provides the event store with functions required to monitor channel // activity. All elements of the config must be non-nil for the event store to // operate. type Config struct { // SubscribeChannelEvents provides a subscription client which provides // a stream of channel events. SubscribeChannelEvents func() (subscribe.Subscription, error) // SubscribePeerEvents provides a subscription client which provides a // stream of peer online/offline events. SubscribePeerEvents func() (subscribe.Subscription, error) // GetOpenChannels provides a list of existing open channels which is // used to populate the ChannelEventStore with a set of channels on // startup. GetOpenChannels func() ([]*channeldb.OpenChannel, error) // Clock is the time source that the subsystem uses, provided here // for ease of testing. Clock clock.Clock // WriteFlapCounts records the flap count for a set of peers on disk. WriteFlapCount func(map[route.Vertex]*channeldb.FlapCount) error // ReadFlapCount gets the flap count for a peer on disk. ReadFlapCount func(route.Vertex) (*channeldb.FlapCount, error) // FlapCountTicker is a ticker which controls how often we flush our // peer's flap count to disk. FlapCountTicker ticker.Ticker } // peerFlapCountMap is the map used to map peers to flap counts, declared here // to allow shorter function signatures. type peerFlapCountMap map[route.Vertex]*channeldb.FlapCount type channelInfoRequest struct { peer route.Vertex channelPoint wire.OutPoint responseChan chan channelInfoResponse } type channelInfoResponse struct { info *ChannelInfo err error } type peerRequest struct { peer route.Vertex responseChan chan peerResponse } type peerResponse struct { flapCount int ts *time.Time err error } // NewChannelEventStore initializes an event store with the config provided. // Note that this function does not start the main event loop, Start() must be // called. func NewChannelEventStore(config *Config) *ChannelEventStore { store := &ChannelEventStore{ cfg: config, peers: make(map[route.Vertex]peerMonitor), chanInfoRequests: make(chan channelInfoRequest), peerRequests: make(chan peerRequest), quit: make(chan struct{}), } return store } // Start adds all existing open channels to the event store and starts the main // loop which records channel and peer events, and serves requests for // information from the store. If this function fails, it cancels its existing // subscriptions and returns an error. func (c *ChannelEventStore) Start() error { // Create a subscription to channel events. channelClient, err := c.cfg.SubscribeChannelEvents() if err != nil { return err } // Create a subscription to peer events. If an error occurs, cancel the // existing subscription to channel events and return. peerClient, err := c.cfg.SubscribePeerEvents() if err != nil { channelClient.Cancel() return err } // cancel should be called to cancel all subscriptions if an error // occurs. cancel := func() { channelClient.Cancel() peerClient.Cancel() } // Add the existing set of channels to the event store. This is required // because channel events will not be triggered for channels that exist // at startup time. channels, err := c.cfg.GetOpenChannels() if err != nil { cancel() return err } log.Infof("Adding %v channels to event store", len(channels)) for _, ch := range channels { peerKey, err := route.NewVertexFromBytes( ch.IdentityPub.SerializeCompressed(), ) if err != nil { cancel() return err } // Add existing channels to the channel store with an initial // peer online or offline event. c.addChannel(ch.FundingOutpoint, peerKey) } // Start a goroutine that consumes events from all subscriptions. c.wg.Add(1) go c.consume(&subscriptions{ channelUpdates: channelClient.Updates(), peerUpdates: peerClient.Updates(), cancel: cancel, }) return nil } // Stop terminates all goroutines started by the event store. func (c *ChannelEventStore) Stop() { log.Info("Stopping event store") // Stop the consume goroutine. close(c.quit) c.wg.Wait() // Stop the ticker after the goroutine reading from it has exited, to // avoid a race. c.cfg.FlapCountTicker.Stop() } // addChannel checks whether we are already tracking a channel's peer, creates a // new peer log to track it if we are not yet monitoring it, and adds the // channel. func (c *ChannelEventStore) addChannel(channelPoint wire.OutPoint, peer route.Vertex) { peerMonitor, err := c.getPeerMonitor(peer) if err != nil { log.Error("could not create monitor: %v", err) return } if err := peerMonitor.addChannel(channelPoint); err != nil { log.Errorf("could not add channel: %v", err) } } // getPeerMonitor tries to get an existing peer monitor from our in memory list, // and falls back to creating a new monitor if it is not currently known. func (c *ChannelEventStore) getPeerMonitor(peer route.Vertex) (peerMonitor, error) { peerMonitor, ok := c.peers[peer] if ok { return peerMonitor, nil } var ( flapCount int lastFlap *time.Time ) historicalFlap, err := c.cfg.ReadFlapCount(peer) switch err { // If we do not have any records for this peer we set a 0 flap count // and timestamp. case channeldb.ErrNoPeerBucket: case nil: flapCount = int(historicalFlap.Count) lastFlap = &historicalFlap.LastFlap // Return if we get an unexpected error. default: return nil, err } peerMonitor = newPeerLog(c.cfg.Clock, flapCount, lastFlap) c.peers[peer] = peerMonitor return peerMonitor, nil } // closeChannel records a closed time for a channel, and returns early is the // channel is not known to the event store. We log warnings (rather than errors) // when we cannot find a peer/channel because channels that we restore from a // static channel backup do not have their open notified, so the event store // never learns about them, but they are closed using the regular flow so we // will try to remove them on close. At present, we cannot easily distinguish // between these closes and others. func (c *ChannelEventStore) closeChannel(channelPoint wire.OutPoint, peer route.Vertex) { peerMonitor, ok := c.peers[peer] if !ok { log.Warnf("peer not known to store: %v", peer) return } if err := peerMonitor.removeChannel(channelPoint); err != nil { log.Warnf("could not remove channel: %v", err) } } // peerEvent creates a peer monitor for a peer if we do not currently have // one, and adds an online event to it. func (c *ChannelEventStore) peerEvent(peer route.Vertex, online bool) { peerMonitor, err := c.getPeerMonitor(peer) if err != nil { log.Error("could not create monitor: %v", err) return } peerMonitor.onlineEvent(online) } // subscriptions abstracts away from subscription clients to allow for mocking. type subscriptions struct { channelUpdates <-chan interface{} peerUpdates <-chan interface{} cancel func() } // consume is the event store's main loop. It consumes subscriptions to update // the event store with channel and peer events, and serves requests for channel // uptime and lifespan. func (c *ChannelEventStore) consume(subscriptions *subscriptions) { // Start our flap count ticker. c.cfg.FlapCountTicker.Resume() // On exit, we will cancel our subscriptions and write our most recent // flap counts to disk. This ensures that we have consistent data in // the case of a graceful shutdown. If we do not shutdown gracefully, // our worst case is data from our last flap count tick (1H). defer func() { subscriptions.cancel() if err := c.recordFlapCount(); err != nil { log.Errorf("error recording flap on shutdown: %v", err) } c.wg.Done() }() // Consume events until the channel is closed. for { select { // Process channel opened and closed events. case e := <-subscriptions.channelUpdates: switch event := e.(type) { // A new channel has been opened, we must add the // channel to the store and record a channel open event. case channelnotifier.OpenChannelEvent: compressed := event.Channel.IdentityPub.SerializeCompressed() peerKey, err := route.NewVertexFromBytes( compressed, ) if err != nil { log.Errorf("Could not get vertex "+ "from: %v", compressed) } c.addChannel( event.Channel.FundingOutpoint, peerKey, ) // A channel has been closed, we must remove the channel // from the store and record a channel closed event. case channelnotifier.ClosedChannelEvent: compressed := event.CloseSummary.RemotePub.SerializeCompressed() peerKey, err := route.NewVertexFromBytes( compressed, ) if err != nil { log.Errorf("Could not get vertex "+ "from: %v", compressed) continue } c.closeChannel( event.CloseSummary.ChanPoint, peerKey, ) } // Process peer online and offline events. case e := <-subscriptions.peerUpdates: switch event := e.(type) { // We have reestablished a connection with our peer, // and should record an online event for any channels // with that peer. case peernotifier.PeerOnlineEvent: c.peerEvent(event.PubKey, true) // We have lost a connection with our peer, and should // record an offline event for any channels with that // peer. case peernotifier.PeerOfflineEvent: c.peerEvent(event.PubKey, false) } // Serve all requests for channel lifetime. case req := <-c.chanInfoRequests: var resp channelInfoResponse resp.info, resp.err = c.getChanInfo(req) req.responseChan <- resp // Serve all requests for information about our peer. case req := <-c.peerRequests: var resp peerResponse resp.flapCount, resp.ts, resp.err = c.flapCount( req.peer, ) req.responseChan <- resp case <-c.cfg.FlapCountTicker.Ticks(): if err := c.recordFlapCount(); err != nil { log.Errorf("could not record flap "+ "count: %v", err) } // Exit if the store receives the signal to shutdown. case <-c.quit: return } } } // ChannelInfo provides the set of information that the event store has recorded // for a channel. type ChannelInfo struct { // Lifetime is the total amount of time we have monitored the channel // for. Lifetime time.Duration // Uptime is the total amount of time that the channel peer has been // observed as online during the monitored lifespan. Uptime time.Duration } // GetChanInfo gets all the information we have on a channel in the event store. func (c *ChannelEventStore) GetChanInfo(channelPoint wire.OutPoint, peer route.Vertex) (*ChannelInfo, error) { request := channelInfoRequest{ peer: peer, channelPoint: channelPoint, responseChan: make(chan channelInfoResponse), } // Send a request for the channel's information to the main event loop, // or return early with an error if the store has already received a // shutdown signal. select { case c.chanInfoRequests <- request: case <-c.quit: return nil, errShuttingDown } // Return the response we receive on the response channel or exit early // if the store is instructed to exit. select { case resp := <-request.responseChan: return resp.info, resp.err case <-c.quit: return nil, errShuttingDown } } // getChanInfo collects channel information for a channel. It gets uptime over // the full lifetime of the channel. func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo, error) { peerMonitor, ok := c.peers[req.peer] if !ok { return nil, ErrPeerNotFound } lifetime, uptime, err := peerMonitor.channelUptime(req.channelPoint) if err != nil { return nil, err } return &ChannelInfo{ Lifetime: lifetime, Uptime: uptime, }, nil } // FlapCount returns the flap count we have for a peer and the timestamp of its // last flap. If we do not have any flaps recorded for the peer, the last flap // timestamp will be nil. func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time, error) { request := peerRequest{ peer: peer, responseChan: make(chan peerResponse), } // Send a request for the peer's information to the main event loop, // or return early with an error if the store has already received a // shutdown signal. select { case c.peerRequests <- request: case <-c.quit: return 0, nil, errShuttingDown } // Return the response we receive on the response channel or exit early // if the store is instructed to exit. select { case resp := <-request.responseChan: return resp.flapCount, resp.ts, resp.err case <-c.quit: return 0, nil, errShuttingDown } } // flapCount gets our peer flap count and last flap timestamp from our in memory // record of a peer, falling back to on disk if we are not currently tracking // the peer. If we have no flap count recorded for the peer, a nil last flap // time will be returned. func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time, error) { // First check whether we are tracking this peer in memory, because this // record will have the most accurate flap count. We do not fail if we // can't find the peer in memory, because we may have previously // recorded its flap count on disk. peerMonitor, ok := c.peers[peer] if ok { count, ts := peerMonitor.getFlapCount() return count, ts, nil } // Try to get our flap count from the database. If this value is not // recorded, we return a nil last flap time to indicate that we have no // record of the peer's flap count. flapCount, err := c.cfg.ReadFlapCount(peer) switch err { case channeldb.ErrNoPeerBucket: return 0, nil, nil case nil: return int(flapCount.Count), &flapCount.LastFlap, nil default: return 0, nil, err } } // recordFlapCount will record our flap count for each peer that we are // currently tracking, skipping peers that have a 0 flap count. func (c *ChannelEventStore) recordFlapCount() error { updates := make(peerFlapCountMap) for peer, monitor := range c.peers { flapCount, lastFlap := monitor.getFlapCount() if lastFlap == nil { continue } updates[peer] = &channeldb.FlapCount{ Count: uint32(flapCount), LastFlap: *lastFlap, } } log.Debugf("recording flap count for: %v peers", len(updates)) return c.cfg.WriteFlapCount(updates) }