lnd: introduce the ChannelNotifier.

This commit introduces the channel notifier which is a central source
of active, inactive, and closed channel events.

This notifier was originally intended to be used by the `SubscribeChannels`
streaming RPC call, but can be used by any subsystem that needs to be
notified on a channel becoming active, inactive or closed.

It may also be extended in the future to support other types of notifications.
This commit is contained in:
Valentine Wallace 2018-10-21 20:36:56 -07:00 committed by Valentine Wallace
parent b0b6151cc1
commit cb26fd8a17
4 changed files with 195 additions and 0 deletions

@ -0,0 +1,137 @@
package channelnotifier
import (
"sync/atomic"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/subscribe"
)
// ChannelNotifier is a subsystem which all active, inactive, and closed channel
// events pipe through. It takes subscriptions for its events, and whenever
// it receives a new event it notifies its subscribers over the proper channel.
type ChannelNotifier struct {
started uint32
stopped uint32
ntfnServer *subscribe.Server
chanDB *channeldb.DB
}
// OpenChannelEvent represents a new event where a channel goes from pending
// open to open.
type OpenChannelEvent struct {
// Channel is the channel that has become open.
Channel *channeldb.OpenChannel
}
// ActiveChannelEvent represents a new event where a channel becomes active.
type ActiveChannelEvent struct {
// ChannelPoint is the channelpoint for the newly active channel.
ChannelPoint *wire.OutPoint
}
// InactiveChannelEvent represents a new event where a channel becomes inactive.
type InactiveChannelEvent struct {
// ChannelPoint is the channelpoint for the newly inactive channel.
ChannelPoint *wire.OutPoint
}
// ClosedChannelEvent represents a new event where a channel becomes closed.
type ClosedChannelEvent struct {
// CloseSummary is the summary of the channel close that has occurred.
CloseSummary *channeldb.ChannelCloseSummary
}
// New creates a new channel notifier. The ChannelNotifier gets channel
// events from peers and from the chain arbitrator, and dispatches them to
// its clients.
func New(chanDB *channeldb.DB) *ChannelNotifier {
return &ChannelNotifier{
ntfnServer: subscribe.NewServer(),
chanDB: chanDB,
}
}
// Start starts the ChannelNotifier and all goroutines it needs to carry out its task.
func (c *ChannelNotifier) Start() error {
if !atomic.CompareAndSwapUint32(&c.started, 0, 1) {
return nil
}
log.Tracef("ChannelNotifier %v starting", c)
if err := c.ntfnServer.Start(); err != nil {
return err
}
return nil
}
// Stop signals the notifier for a graceful shutdown.
func (c *ChannelNotifier) Stop() {
if !atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
return
}
c.ntfnServer.Stop()
}
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
// any time the Server is made aware of a new event.
func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
return c.ntfnServer.Subscribe()
}
// NotifyOpenChannelEvent notifies the channelEventNotifier goroutine that a
// channel has gone from pending open to open.
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {
// Fetch the relevant channel from the database.
channel, err := c.chanDB.FetchChannel(chanPoint)
if err != nil {
log.Warnf("Unable to fetch open channel from the db: %v", err)
}
// Send the open event to all channel event subscribers.
event := OpenChannelEvent{Channel: channel}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send open channel update: %v", err)
}
}
// NotifyClosedChannelEvent notifies the channelEventNotifier goroutine that a
// channel has closed.
func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) {
// Fetch the relevant closed channel from the database.
closeSummary, err := c.chanDB.FetchClosedChannel(&chanPoint)
if err != nil {
log.Warnf("Unable to fetch closed channel summary from the db: %v", err)
}
// Send the closed event to all channel event subscribers.
event := ClosedChannelEvent{CloseSummary: closeSummary}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send closed channel update: %v", err)
}
}
// NotifyActiveChannelEvent notifies the channelEventNotifier goroutine that a
// channel is active.
func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) {
event := ActiveChannelEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send active channel update: %v", err)
}
}
// NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a
// channel is inactive.
func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) {
event := InactiveChannelEvent{ChannelPoint: &chanPoint}
if err := c.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send inactive channel update: %v", err)
}
}

45
channelnotifier/log.go Normal file

@ -0,0 +1,45 @@
package channelnotifier
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// log is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("CHNF", nil))
}
// DisableLog disables all library log output. Logging output is disabled by
// default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
// logClosure is used to provide a closure over expensive logging operations so
// don't have to be performed when the logging level doesn't warrant it.
type logClosure func() string
// String invokes the underlying function and returns the result.
func (c logClosure) String() string {
return c()
}
// newLogClosure returns a new closure over a function that returns a string
// which itself provides a Stringer interface so that it can be used with the
// logging system.
func newLogClosure(c func() string) logClosure {
return logClosure(c)
}

4
log.go

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
@ -80,6 +81,7 @@ var (
wtwrLog = build.NewSubLogger("WTWR", backendLog.Logger)
ntfrLog = build.NewSubLogger("NTFR", backendLog.Logger)
irpcLog = build.NewSubLogger("IRPC", backendLog.Logger)
chnfLog = build.NewSubLogger("CHNF", backendLog.Logger)
)
// Initialize package-global logger variables.
@ -105,6 +107,7 @@ func init() {
watchtower.UseLogger(wtwrLog)
chainrpc.UseLogger(ntfrLog)
invoicesrpc.UseLogger(irpcLog)
channelnotifier.UseLogger(chnfLog)
}
// subsystemLoggers maps each subsystem identifier to its associated logger.
@ -136,6 +139,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"WTWR": wtwrLog,
"NTFR": ntfnLog,
"IRPC": irpcLog,
"CHNF": chnfLog,
}
// initLogRotator initializes the logging rotator to write logs to logFile and

@ -28,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
@ -145,6 +146,8 @@ type server struct {
invoices *invoices.InvoiceRegistry
channelNotifier *channelnotifier.ChannelNotifier
witnessBeacon contractcourt.WitnessBeacon
breachArbiter *breachArbiter
@ -274,6 +277,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
invoices: invoices.NewRegistry(chanDB, activeNetParams.Params),
channelNotifier: channelnotifier.New(chanDB),
identityPriv: privKey,
nodeSigner: netann.NewNodeSigner(privKey),
@ -986,6 +991,9 @@ func (s *server) Start() error {
if err := s.cc.chainNotifier.Start(); err != nil {
return err
}
if err := s.channelNotifier.Start(); err != nil {
return err
}
if err := s.sphinx.Start(); err != nil {
return err
}
@ -1083,6 +1091,7 @@ func (s *server) Stop() error {
s.authGossiper.Stop()
s.chainArb.Stop()
s.sweeper.Stop()
s.channelNotifier.Stop()
s.cc.wallet.Shutdown()
s.cc.chainView.Stop()
s.connMgr.Stop()