From cb26fd8a173054180b6a59577279fd0a30bbc5c8 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Sun, 21 Oct 2018 20:36:56 -0700 Subject: [PATCH] lnd: introduce the ChannelNotifier. This commit introduces the channel notifier which is a central source of active, inactive, and closed channel events. This notifier was originally intended to be used by the `SubscribeChannels` streaming RPC call, but can be used by any subsystem that needs to be notified on a channel becoming active, inactive or closed. It may also be extended in the future to support other types of notifications. --- channelnotifier/channelnotifier.go | 137 +++++++++++++++++++++++++++++ channelnotifier/log.go | 45 ++++++++++ log.go | 4 + server.go | 9 ++ 4 files changed, 195 insertions(+) create mode 100644 channelnotifier/channelnotifier.go create mode 100644 channelnotifier/log.go diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go new file mode 100644 index 00000000..58a1bf1c --- /dev/null +++ b/channelnotifier/channelnotifier.go @@ -0,0 +1,137 @@ +package channelnotifier + +import ( + "sync/atomic" + + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/subscribe" +) + +// ChannelNotifier is a subsystem which all active, inactive, and closed channel +// events pipe through. It takes subscriptions for its events, and whenever +// it receives a new event it notifies its subscribers over the proper channel. +type ChannelNotifier struct { + started uint32 + stopped uint32 + + ntfnServer *subscribe.Server + + chanDB *channeldb.DB +} + +// OpenChannelEvent represents a new event where a channel goes from pending +// open to open. +type OpenChannelEvent struct { + // Channel is the channel that has become open. + Channel *channeldb.OpenChannel +} + +// ActiveChannelEvent represents a new event where a channel becomes active. +type ActiveChannelEvent struct { + // ChannelPoint is the channelpoint for the newly active channel. + ChannelPoint *wire.OutPoint +} + +// InactiveChannelEvent represents a new event where a channel becomes inactive. +type InactiveChannelEvent struct { + // ChannelPoint is the channelpoint for the newly inactive channel. + ChannelPoint *wire.OutPoint +} + +// ClosedChannelEvent represents a new event where a channel becomes closed. +type ClosedChannelEvent struct { + // CloseSummary is the summary of the channel close that has occurred. + CloseSummary *channeldb.ChannelCloseSummary +} + +// New creates a new channel notifier. The ChannelNotifier gets channel +// events from peers and from the chain arbitrator, and dispatches them to +// its clients. +func New(chanDB *channeldb.DB) *ChannelNotifier { + return &ChannelNotifier{ + ntfnServer: subscribe.NewServer(), + chanDB: chanDB, + } +} + +// Start starts the ChannelNotifier and all goroutines it needs to carry out its task. +func (c *ChannelNotifier) Start() error { + if !atomic.CompareAndSwapUint32(&c.started, 0, 1) { + return nil + } + + log.Tracef("ChannelNotifier %v starting", c) + + if err := c.ntfnServer.Start(); err != nil { + return err + } + + return nil +} + +// Stop signals the notifier for a graceful shutdown. +func (c *ChannelNotifier) Stop() { + if !atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { + return + } + + c.ntfnServer.Stop() +} + +// SubscribeChannelEvents returns a subscribe.Client that will receive updates +// any time the Server is made aware of a new event. +func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) { + return c.ntfnServer.Subscribe() +} + +// NotifyOpenChannelEvent notifies the channelEventNotifier goroutine that a +// channel has gone from pending open to open. +func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) { + + // Fetch the relevant channel from the database. + channel, err := c.chanDB.FetchChannel(chanPoint) + if err != nil { + log.Warnf("Unable to fetch open channel from the db: %v", err) + } + + // Send the open event to all channel event subscribers. + event := OpenChannelEvent{Channel: channel} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send open channel update: %v", err) + } +} + +// NotifyClosedChannelEvent notifies the channelEventNotifier goroutine that a +// channel has closed. +func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) { + // Fetch the relevant closed channel from the database. + closeSummary, err := c.chanDB.FetchClosedChannel(&chanPoint) + if err != nil { + log.Warnf("Unable to fetch closed channel summary from the db: %v", err) + } + + // Send the closed event to all channel event subscribers. + event := ClosedChannelEvent{CloseSummary: closeSummary} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send closed channel update: %v", err) + } +} + +// NotifyActiveChannelEvent notifies the channelEventNotifier goroutine that a +// channel is active. +func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) { + event := ActiveChannelEvent{ChannelPoint: &chanPoint} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send active channel update: %v", err) + } +} + +// NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a +// channel is inactive. +func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) { + event := InactiveChannelEvent{ChannelPoint: &chanPoint} + if err := c.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send inactive channel update: %v", err) + } +} diff --git a/channelnotifier/log.go b/channelnotifier/log.go new file mode 100644 index 00000000..f44bb9b2 --- /dev/null +++ b/channelnotifier/log.go @@ -0,0 +1,45 @@ +package channelnotifier + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This means the +// package will not perform any logging by default until the caller requests +// it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("CHNF", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled by +// default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. This +// should be used in preference to SetLogWriter if the caller is also using +// btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} + +// logClosure is used to provide a closure over expensive logging operations so +// don't have to be performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/log.go b/log.go index b066485a..4232dbde 100644 --- a/log.go +++ b/log.go @@ -15,6 +15,7 @@ import ( "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" @@ -80,6 +81,7 @@ var ( wtwrLog = build.NewSubLogger("WTWR", backendLog.Logger) ntfrLog = build.NewSubLogger("NTFR", backendLog.Logger) irpcLog = build.NewSubLogger("IRPC", backendLog.Logger) + chnfLog = build.NewSubLogger("CHNF", backendLog.Logger) ) // Initialize package-global logger variables. @@ -105,6 +107,7 @@ func init() { watchtower.UseLogger(wtwrLog) chainrpc.UseLogger(ntfrLog) invoicesrpc.UseLogger(irpcLog) + channelnotifier.UseLogger(chnfLog) } // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -136,6 +139,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "WTWR": wtwrLog, "NTFR": ntfnLog, "IRPC": irpcLog, + "CHNF": chnfLog, } // initLogRotator initializes the logging rotator to write logs to logFile and diff --git a/server.go b/server.go index ed560807..fa563762 100644 --- a/server.go +++ b/server.go @@ -28,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/autopilot" "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/channelnotifier" "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/htlcswitch" @@ -145,6 +146,8 @@ type server struct { invoices *invoices.InvoiceRegistry + channelNotifier *channelnotifier.ChannelNotifier + witnessBeacon contractcourt.WitnessBeacon breachArbiter *breachArbiter @@ -274,6 +277,8 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, invoices: invoices.NewRegistry(chanDB, activeNetParams.Params), + channelNotifier: channelnotifier.New(chanDB), + identityPriv: privKey, nodeSigner: netann.NewNodeSigner(privKey), @@ -986,6 +991,9 @@ func (s *server) Start() error { if err := s.cc.chainNotifier.Start(); err != nil { return err } + if err := s.channelNotifier.Start(); err != nil { + return err + } if err := s.sphinx.Start(); err != nil { return err } @@ -1083,6 +1091,7 @@ func (s *server) Stop() error { s.authGossiper.Stop() s.chainArb.Stop() s.sweeper.Stop() + s.channelNotifier.Stop() s.cc.wallet.Shutdown() s.cc.chainView.Stop() s.connMgr.Stop()