cb26fd8a17
This commit introduces the channel notifier which is a central source of active, inactive, and closed channel events. This notifier was originally intended to be used by the `SubscribeChannels` streaming RPC call, but can be used by any subsystem that needs to be notified on a channel becoming active, inactive or closed. It may also be extended in the future to support other types of notifications.
138 lines
4.3 KiB
Go
138 lines
4.3 KiB
Go
package channelnotifier
|
|
|
|
import (
|
|
"sync/atomic"
|
|
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/lightningnetwork/lnd/channeldb"
|
|
"github.com/lightningnetwork/lnd/subscribe"
|
|
)
|
|
|
|
// ChannelNotifier is a subsystem which all active, inactive, and closed channel
|
|
// events pipe through. It takes subscriptions for its events, and whenever
|
|
// it receives a new event it notifies its subscribers over the proper channel.
|
|
type ChannelNotifier struct {
|
|
started uint32
|
|
stopped uint32
|
|
|
|
ntfnServer *subscribe.Server
|
|
|
|
chanDB *channeldb.DB
|
|
}
|
|
|
|
// OpenChannelEvent represents a new event where a channel goes from pending
|
|
// open to open.
|
|
type OpenChannelEvent struct {
|
|
// Channel is the channel that has become open.
|
|
Channel *channeldb.OpenChannel
|
|
}
|
|
|
|
// ActiveChannelEvent represents a new event where a channel becomes active.
|
|
type ActiveChannelEvent struct {
|
|
// ChannelPoint is the channelpoint for the newly active channel.
|
|
ChannelPoint *wire.OutPoint
|
|
}
|
|
|
|
// InactiveChannelEvent represents a new event where a channel becomes inactive.
|
|
type InactiveChannelEvent struct {
|
|
// ChannelPoint is the channelpoint for the newly inactive channel.
|
|
ChannelPoint *wire.OutPoint
|
|
}
|
|
|
|
// ClosedChannelEvent represents a new event where a channel becomes closed.
|
|
type ClosedChannelEvent struct {
|
|
// CloseSummary is the summary of the channel close that has occurred.
|
|
CloseSummary *channeldb.ChannelCloseSummary
|
|
}
|
|
|
|
// New creates a new channel notifier. The ChannelNotifier gets channel
|
|
// events from peers and from the chain arbitrator, and dispatches them to
|
|
// its clients.
|
|
func New(chanDB *channeldb.DB) *ChannelNotifier {
|
|
return &ChannelNotifier{
|
|
ntfnServer: subscribe.NewServer(),
|
|
chanDB: chanDB,
|
|
}
|
|
}
|
|
|
|
// Start starts the ChannelNotifier and all goroutines it needs to carry out its task.
|
|
func (c *ChannelNotifier) Start() error {
|
|
if !atomic.CompareAndSwapUint32(&c.started, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
log.Tracef("ChannelNotifier %v starting", c)
|
|
|
|
if err := c.ntfnServer.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop signals the notifier for a graceful shutdown.
|
|
func (c *ChannelNotifier) Stop() {
|
|
if !atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
|
|
return
|
|
}
|
|
|
|
c.ntfnServer.Stop()
|
|
}
|
|
|
|
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
|
|
// any time the Server is made aware of a new event.
|
|
func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
|
|
return c.ntfnServer.Subscribe()
|
|
}
|
|
|
|
// NotifyOpenChannelEvent notifies the channelEventNotifier goroutine that a
|
|
// channel has gone from pending open to open.
|
|
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {
|
|
|
|
// Fetch the relevant channel from the database.
|
|
channel, err := c.chanDB.FetchChannel(chanPoint)
|
|
if err != nil {
|
|
log.Warnf("Unable to fetch open channel from the db: %v", err)
|
|
}
|
|
|
|
// Send the open event to all channel event subscribers.
|
|
event := OpenChannelEvent{Channel: channel}
|
|
if err := c.ntfnServer.SendUpdate(event); err != nil {
|
|
log.Warnf("Unable to send open channel update: %v", err)
|
|
}
|
|
}
|
|
|
|
// NotifyClosedChannelEvent notifies the channelEventNotifier goroutine that a
|
|
// channel has closed.
|
|
func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) {
|
|
// Fetch the relevant closed channel from the database.
|
|
closeSummary, err := c.chanDB.FetchClosedChannel(&chanPoint)
|
|
if err != nil {
|
|
log.Warnf("Unable to fetch closed channel summary from the db: %v", err)
|
|
}
|
|
|
|
// Send the closed event to all channel event subscribers.
|
|
event := ClosedChannelEvent{CloseSummary: closeSummary}
|
|
if err := c.ntfnServer.SendUpdate(event); err != nil {
|
|
log.Warnf("Unable to send closed channel update: %v", err)
|
|
}
|
|
}
|
|
|
|
// NotifyActiveChannelEvent notifies the channelEventNotifier goroutine that a
|
|
// channel is active.
|
|
func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) {
|
|
event := ActiveChannelEvent{ChannelPoint: &chanPoint}
|
|
if err := c.ntfnServer.SendUpdate(event); err != nil {
|
|
log.Warnf("Unable to send active channel update: %v", err)
|
|
}
|
|
}
|
|
|
|
// NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a
|
|
// channel is inactive.
|
|
func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) {
|
|
event := InactiveChannelEvent{ChannelPoint: &chanPoint}
|
|
if err := c.ntfnServer.SendUpdate(event); err != nil {
|
|
log.Warnf("Unable to send inactive channel update: %v", err)
|
|
}
|
|
}
|