peernotifier: Add peer notifier package for peer online/offline events

This commit adds a peer notifier package which provides clients with
a subscription to peer online and offline events.
This commit is contained in:
carla 2019-07-29 10:59:48 -04:00
parent 8c9c4b52e8
commit 4ceceda757
4 changed files with 150 additions and 2 deletions

4
log.go

@ -31,6 +31,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/monitoring" "github.com/lightningnetwork/lnd/monitoring"
"github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/sweep" "github.com/lightningnetwork/lnd/sweep"
@ -90,6 +91,7 @@ var (
chbuLog = build.NewSubLogger("CHBU", backendLog.Logger) chbuLog = build.NewSubLogger("CHBU", backendLog.Logger)
promLog = build.NewSubLogger("PROM", backendLog.Logger) promLog = build.NewSubLogger("PROM", backendLog.Logger)
wtclLog = build.NewSubLogger("WTCL", backendLog.Logger) wtclLog = build.NewSubLogger("WTCL", backendLog.Logger)
prnfLog = build.NewSubLogger("PRNF", backendLog.Logger)
) )
// Initialize package-global logger variables. // Initialize package-global logger variables.
@ -119,6 +121,7 @@ func init() {
chanbackup.UseLogger(chbuLog) chanbackup.UseLogger(chbuLog)
monitoring.UseLogger(promLog) monitoring.UseLogger(promLog)
wtclient.UseLogger(wtclLog) wtclient.UseLogger(wtclLog)
peernotifier.UseLogger(prnfLog)
addSubLogger(routerrpc.Subsystem, routerrpc.UseLogger) addSubLogger(routerrpc.Subsystem, routerrpc.UseLogger)
addSubLogger(wtclientrpc.Subsystem, wtclientrpc.UseLogger) addSubLogger(wtclientrpc.Subsystem, wtclientrpc.UseLogger)
@ -165,6 +168,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"CHBU": chbuLog, "CHBU": chbuLog,
"PROM": promLog, "PROM": promLog,
"WTCL": wtclLog, "WTCL": wtclLog,
"PRNF": prnfLog,
} }
// initLogRotator initializes the logging rotator to write logs to logFile and // initLogRotator initializes the logging rotator to write logs to logFile and

29
peernotifier/log.go Normal file

@ -0,0 +1,29 @@
package peernotifier
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger("PRNF", nil))
}
// DisableLog disables all library log output. Logging output is disabled
// by default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

@ -0,0 +1,87 @@
package peernotifier
import (
"sync"
"github.com/lightningnetwork/lnd/subscribe"
)
// PeerNotifier is a subsystem which observes peer offline and online events.
// It takes subscriptions for its events, and whenever it observes a new event
// it notifies its subscribers over the proper channel.
type PeerNotifier struct {
started sync.Once
stopped sync.Once
ntfnServer *subscribe.Server
}
// PeerOnlineEvent represents a new event where a peer comes online.
type PeerOnlineEvent struct {
// PubKey is the peer's compressed public key.
PubKey [33]byte
}
// PeerOfflineEvent represents a new event where a peer goes offline.
type PeerOfflineEvent struct {
// PubKey is the peer's compressed public key.
PubKey [33]byte
}
// New creates a new peer notifier which notifies clients of peer online
// and offline events.
func New() *PeerNotifier {
return &PeerNotifier{
ntfnServer: subscribe.NewServer(),
}
}
// Start starts the PeerNotifier's subscription server.
func (p *PeerNotifier) Start() error {
var err error
p.started.Do(func() {
log.Info("PeerNotifier starting")
err = p.ntfnServer.Start()
})
return err
}
// Stop signals the notifier for a graceful shutdown.
func (p *PeerNotifier) Stop() {
p.stopped.Do(func() {
log.Info("Stopping PeerNotifier")
p.ntfnServer.Stop()
})
}
// SubscribePeerEvents returns a subscribe.Client that will receive updates
// any time the Server is informed of a peer event.
func (p *PeerNotifier) SubscribePeerEvents() (*subscribe.Client, error) {
return p.ntfnServer.Subscribe()
}
// NotifyPeerOnline sends a peer online event to all clients subscribed to the
// peer notifier.
func (p *PeerNotifier) NotifyPeerOnline(pubKey [33]byte) {
event := PeerOnlineEvent{PubKey: pubKey}
log.Debugf("PeerNotifier notifying peer: %x online", pubKey)
if err := p.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send peer online update: %v", err)
}
}
// NotifyPeerOffline sends a peer offline event to all the clients subscribed
// to the peer notifier.
func (p *PeerNotifier) NotifyPeerOffline(pubKey [33]byte) {
event := PeerOfflineEvent{PubKey: pubKey}
log.Debugf("PeerNotifier notifying peer: %x offline", pubKey)
if err := p.ntfnServer.SendUpdate(event); err != nil {
log.Warnf("Unable to send peer offline update: %v", err)
}
}

@ -44,6 +44,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/nat" "github.com/lightningnetwork/lnd/nat"
"github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
@ -188,6 +189,8 @@ type server struct {
channelNotifier *channelnotifier.ChannelNotifier channelNotifier *channelnotifier.ChannelNotifier
peerNotifier *peernotifier.PeerNotifier
witnessBeacon contractcourt.WitnessBeacon witnessBeacon contractcourt.WitnessBeacon
breachArbiter *breachArbiter breachArbiter *breachArbiter
@ -1081,6 +1084,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
return nil, err return nil, err
} }
// Assemble a peer notifier which will provide clients with subscriptions
// to peer online and offline events.
s.peerNotifier = peernotifier.New()
if cfg.WtClient.Active { if cfg.WtClient.Active {
policy := wtpolicy.DefaultPolicy() policy := wtpolicy.DefaultPolicy()
@ -1183,6 +1190,10 @@ func (s *server) Start() error {
startErr = err startErr = err
return return
} }
if err := s.peerNotifier.Start(); err != nil {
startErr = err
return
}
if err := s.sphinx.Start(); err != nil { if err := s.sphinx.Start(); err != nil {
startErr = err startErr = err
return return
@ -1341,6 +1352,7 @@ func (s *server) Stop() error {
s.chainArb.Stop() s.chainArb.Stop()
s.sweeper.Stop() s.sweeper.Stop()
s.channelNotifier.Stop() s.channelNotifier.Stop()
s.peerNotifier.Stop()
s.cc.wallet.Shutdown() s.cc.wallet.Shutdown()
s.cc.chainView.Stop() s.cc.chainView.Stop()
s.connMgr.Stop() s.connMgr.Stop()
@ -2713,7 +2725,8 @@ func (s *server) addPeer(p *peer) {
// TODO(roasbeef): pipe all requests through to the // TODO(roasbeef): pipe all requests through to the
// queryHandler/peerManager // queryHandler/peerManager
pubStr := string(p.addr.IdentityKey.SerializeCompressed()) pubSer := p.addr.IdentityKey.SerializeCompressed()
pubStr := string(pubSer)
s.peersByPub[pubStr] = p s.peersByPub[pubStr] = p
@ -2722,6 +2735,13 @@ func (s *server) addPeer(p *peer) {
} else { } else {
s.outboundPeers[pubStr] = p s.outboundPeers[pubStr] = p
} }
// Inform the peer notifier of a peer online event so that it can be reported
// to clients listening for peer events.
var pubKey [33]byte
copy(pubKey[:], pubSer)
s.peerNotifier.NotifyPeerOnline(pubKey)
} }
// peerInitializer asynchronously starts a newly connected peer after it has // peerInitializer asynchronously starts a newly connected peer after it has
@ -2963,7 +2983,8 @@ func (s *server) removePeer(p *peer) {
return return
} }
pubStr := string(p.addr.IdentityKey.SerializeCompressed()) pubSer := p.addr.IdentityKey.SerializeCompressed()
pubStr := string(pubSer)
delete(s.peersByPub, pubStr) delete(s.peersByPub, pubStr)
@ -2972,6 +2993,13 @@ func (s *server) removePeer(p *peer) {
} else { } else {
delete(s.outboundPeers, pubStr) delete(s.outboundPeers, pubStr)
} }
// Inform the peer notifier of a peer offline event so that it can be
// reported to clients listening for peer events.
var pubKey [33]byte
copy(pubKey[:], pubSer)
s.peerNotifier.NotifyPeerOffline(pubKey)
} }
// openChanReq is a message sent to the server in order to request the // openChanReq is a message sent to the server in order to request the