diff --git a/log.go b/log.go index 6cca176e..a3324ab4 100644 --- a/log.go +++ b/log.go @@ -31,6 +31,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/monitoring" "github.com/lightningnetwork/lnd/netann" + "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/sweep" @@ -90,6 +91,7 @@ var ( chbuLog = build.NewSubLogger("CHBU", backendLog.Logger) promLog = build.NewSubLogger("PROM", backendLog.Logger) wtclLog = build.NewSubLogger("WTCL", backendLog.Logger) + prnfLog = build.NewSubLogger("PRNF", backendLog.Logger) ) // Initialize package-global logger variables. @@ -119,6 +121,7 @@ func init() { chanbackup.UseLogger(chbuLog) monitoring.UseLogger(promLog) wtclient.UseLogger(wtclLog) + peernotifier.UseLogger(prnfLog) addSubLogger(routerrpc.Subsystem, routerrpc.UseLogger) addSubLogger(wtclientrpc.Subsystem, wtclientrpc.UseLogger) @@ -165,6 +168,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "CHBU": chbuLog, "PROM": promLog, "WTCL": wtclLog, + "PRNF": prnfLog, } // initLogRotator initializes the logging rotator to write logs to logFile and diff --git a/peernotifier/log.go b/peernotifier/log.go new file mode 100644 index 00000000..1178c0f3 --- /dev/null +++ b/peernotifier/log.go @@ -0,0 +1,29 @@ +package peernotifier + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger("PRNF", nil)) +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/peernotifier/peernotifier.go b/peernotifier/peernotifier.go new file mode 100644 index 00000000..0943c82a --- /dev/null +++ b/peernotifier/peernotifier.go @@ -0,0 +1,87 @@ +package peernotifier + +import ( + "sync" + + "github.com/lightningnetwork/lnd/subscribe" +) + +// PeerNotifier is a subsystem which observes peer offline and online events. +// It takes subscriptions for its events, and whenever it observes a new event +// it notifies its subscribers over the proper channel. +type PeerNotifier struct { + started sync.Once + stopped sync.Once + + ntfnServer *subscribe.Server +} + +// PeerOnlineEvent represents a new event where a peer comes online. +type PeerOnlineEvent struct { + // PubKey is the peer's compressed public key. + PubKey [33]byte +} + +// PeerOfflineEvent represents a new event where a peer goes offline. +type PeerOfflineEvent struct { + // PubKey is the peer's compressed public key. + PubKey [33]byte +} + +// New creates a new peer notifier which notifies clients of peer online +// and offline events. +func New() *PeerNotifier { + return &PeerNotifier{ + ntfnServer: subscribe.NewServer(), + } +} + +// Start starts the PeerNotifier's subscription server. +func (p *PeerNotifier) Start() error { + var err error + + p.started.Do(func() { + log.Info("PeerNotifier starting") + err = p.ntfnServer.Start() + }) + + return err +} + +// Stop signals the notifier for a graceful shutdown. +func (p *PeerNotifier) Stop() { + p.stopped.Do(func() { + log.Info("Stopping PeerNotifier") + p.ntfnServer.Stop() + }) +} + +// SubscribePeerEvents returns a subscribe.Client that will receive updates +// any time the Server is informed of a peer event. +func (p *PeerNotifier) SubscribePeerEvents() (*subscribe.Client, error) { + return p.ntfnServer.Subscribe() +} + +// NotifyPeerOnline sends a peer online event to all clients subscribed to the +// peer notifier. +func (p *PeerNotifier) NotifyPeerOnline(pubKey [33]byte) { + event := PeerOnlineEvent{PubKey: pubKey} + + log.Debugf("PeerNotifier notifying peer: %x online", pubKey) + + if err := p.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send peer online update: %v", err) + } +} + +// NotifyPeerOffline sends a peer offline event to all the clients subscribed +// to the peer notifier. +func (p *PeerNotifier) NotifyPeerOffline(pubKey [33]byte) { + event := PeerOfflineEvent{PubKey: pubKey} + + log.Debugf("PeerNotifier notifying peer: %x offline", pubKey) + + if err := p.ntfnServer.SendUpdate(event); err != nil { + log.Warnf("Unable to send peer offline update: %v", err) + } +} diff --git a/server.go b/server.go index ba9bf50d..6288d95e 100644 --- a/server.go +++ b/server.go @@ -44,6 +44,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/nat" "github.com/lightningnetwork/lnd/netann" + "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/route" @@ -188,6 +189,8 @@ type server struct { channelNotifier *channelnotifier.ChannelNotifier + peerNotifier *peernotifier.PeerNotifier + witnessBeacon contractcourt.WitnessBeacon breachArbiter *breachArbiter @@ -1081,6 +1084,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, return nil, err } + // Assemble a peer notifier which will provide clients with subscriptions + // to peer online and offline events. + s.peerNotifier = peernotifier.New() + if cfg.WtClient.Active { policy := wtpolicy.DefaultPolicy() @@ -1183,6 +1190,10 @@ func (s *server) Start() error { startErr = err return } + if err := s.peerNotifier.Start(); err != nil { + startErr = err + return + } if err := s.sphinx.Start(); err != nil { startErr = err return @@ -1341,6 +1352,7 @@ func (s *server) Stop() error { s.chainArb.Stop() s.sweeper.Stop() s.channelNotifier.Stop() + s.peerNotifier.Stop() s.cc.wallet.Shutdown() s.cc.chainView.Stop() s.connMgr.Stop() @@ -2713,7 +2725,8 @@ func (s *server) addPeer(p *peer) { // TODO(roasbeef): pipe all requests through to the // queryHandler/peerManager - pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + pubSer := p.addr.IdentityKey.SerializeCompressed() + pubStr := string(pubSer) s.peersByPub[pubStr] = p @@ -2722,6 +2735,13 @@ func (s *server) addPeer(p *peer) { } else { s.outboundPeers[pubStr] = p } + + // Inform the peer notifier of a peer online event so that it can be reported + // to clients listening for peer events. + var pubKey [33]byte + copy(pubKey[:], pubSer) + + s.peerNotifier.NotifyPeerOnline(pubKey) } // peerInitializer asynchronously starts a newly connected peer after it has @@ -2963,7 +2983,8 @@ func (s *server) removePeer(p *peer) { return } - pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + pubSer := p.addr.IdentityKey.SerializeCompressed() + pubStr := string(pubSer) delete(s.peersByPub, pubStr) @@ -2972,6 +2993,13 @@ func (s *server) removePeer(p *peer) { } else { delete(s.outboundPeers, pubStr) } + + // Inform the peer notifier of a peer offline event so that it can be + // reported to clients listening for peer events. + var pubKey [33]byte + copy(pubKey[:], pubSer) + + s.peerNotifier.NotifyPeerOffline(pubKey) } // openChanReq is a message sent to the server in order to request the