Merge pull request #3354 from carlaKC/peernotify-addpeernotifierservice
Peernotifier: Add Peer Notifier package
This commit is contained in:
commit
e44445e952
4
log.go
4
log.go
@ -31,6 +31,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
"github.com/lightningnetwork/lnd/monitoring"
|
||||
"github.com/lightningnetwork/lnd/netann"
|
||||
"github.com/lightningnetwork/lnd/peernotifier"
|
||||
"github.com/lightningnetwork/lnd/routing"
|
||||
"github.com/lightningnetwork/lnd/signal"
|
||||
"github.com/lightningnetwork/lnd/sweep"
|
||||
@ -90,6 +91,7 @@ var (
|
||||
chbuLog = build.NewSubLogger("CHBU", backendLog.Logger)
|
||||
promLog = build.NewSubLogger("PROM", backendLog.Logger)
|
||||
wtclLog = build.NewSubLogger("WTCL", backendLog.Logger)
|
||||
prnfLog = build.NewSubLogger("PRNF", backendLog.Logger)
|
||||
)
|
||||
|
||||
// Initialize package-global logger variables.
|
||||
@ -119,6 +121,7 @@ func init() {
|
||||
chanbackup.UseLogger(chbuLog)
|
||||
monitoring.UseLogger(promLog)
|
||||
wtclient.UseLogger(wtclLog)
|
||||
peernotifier.UseLogger(prnfLog)
|
||||
|
||||
addSubLogger(routerrpc.Subsystem, routerrpc.UseLogger)
|
||||
addSubLogger(wtclientrpc.Subsystem, wtclientrpc.UseLogger)
|
||||
@ -165,6 +168,7 @@ var subsystemLoggers = map[string]btclog.Logger{
|
||||
"CHBU": chbuLog,
|
||||
"PROM": promLog,
|
||||
"WTCL": wtclLog,
|
||||
"PRNF": prnfLog,
|
||||
}
|
||||
|
||||
// initLogRotator initializes the logging rotator to write logs to logFile and
|
||||
|
29
peernotifier/log.go
Normal file
29
peernotifier/log.go
Normal file
@ -0,0 +1,29 @@
|
||||
package peernotifier
|
||||
|
||||
import (
|
||||
"github.com/btcsuite/btclog"
|
||||
"github.com/lightningnetwork/lnd/build"
|
||||
)
|
||||
|
||||
// log is a logger that is initialized with no output filters. This
|
||||
// means the package will not perform any logging by default until the caller
|
||||
// requests it.
|
||||
var log btclog.Logger
|
||||
|
||||
// The default amount of logging is none.
|
||||
func init() {
|
||||
UseLogger(build.NewSubLogger("PRNF", nil))
|
||||
}
|
||||
|
||||
// DisableLog disables all library log output. Logging output is disabled
|
||||
// by default until UseLogger is called.
|
||||
func DisableLog() {
|
||||
UseLogger(btclog.Disabled)
|
||||
}
|
||||
|
||||
// UseLogger uses a specified Logger to output package logging info.
|
||||
// This should be used in preference to SetLogWriter if the caller is also
|
||||
// using btclog.
|
||||
func UseLogger(logger btclog.Logger) {
|
||||
log = logger
|
||||
}
|
87
peernotifier/peernotifier.go
Normal file
87
peernotifier/peernotifier.go
Normal file
@ -0,0 +1,87 @@
|
||||
package peernotifier
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/lightningnetwork/lnd/subscribe"
|
||||
)
|
||||
|
||||
// PeerNotifier is a subsystem which observes peer offline and online events.
|
||||
// It takes subscriptions for its events, and whenever it observes a new event
|
||||
// it notifies its subscribers over the proper channel.
|
||||
type PeerNotifier struct {
|
||||
started sync.Once
|
||||
stopped sync.Once
|
||||
|
||||
ntfnServer *subscribe.Server
|
||||
}
|
||||
|
||||
// PeerOnlineEvent represents a new event where a peer comes online.
|
||||
type PeerOnlineEvent struct {
|
||||
// PubKey is the peer's compressed public key.
|
||||
PubKey [33]byte
|
||||
}
|
||||
|
||||
// PeerOfflineEvent represents a new event where a peer goes offline.
|
||||
type PeerOfflineEvent struct {
|
||||
// PubKey is the peer's compressed public key.
|
||||
PubKey [33]byte
|
||||
}
|
||||
|
||||
// New creates a new peer notifier which notifies clients of peer online
|
||||
// and offline events.
|
||||
func New() *PeerNotifier {
|
||||
return &PeerNotifier{
|
||||
ntfnServer: subscribe.NewServer(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the PeerNotifier's subscription server.
|
||||
func (p *PeerNotifier) Start() error {
|
||||
var err error
|
||||
|
||||
p.started.Do(func() {
|
||||
log.Info("PeerNotifier starting")
|
||||
err = p.ntfnServer.Start()
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop signals the notifier for a graceful shutdown.
|
||||
func (p *PeerNotifier) Stop() {
|
||||
p.stopped.Do(func() {
|
||||
log.Info("Stopping PeerNotifier")
|
||||
p.ntfnServer.Stop()
|
||||
})
|
||||
}
|
||||
|
||||
// SubscribePeerEvents returns a subscribe.Client that will receive updates
|
||||
// any time the Server is informed of a peer event.
|
||||
func (p *PeerNotifier) SubscribePeerEvents() (*subscribe.Client, error) {
|
||||
return p.ntfnServer.Subscribe()
|
||||
}
|
||||
|
||||
// NotifyPeerOnline sends a peer online event to all clients subscribed to the
|
||||
// peer notifier.
|
||||
func (p *PeerNotifier) NotifyPeerOnline(pubKey [33]byte) {
|
||||
event := PeerOnlineEvent{PubKey: pubKey}
|
||||
|
||||
log.Debugf("PeerNotifier notifying peer: %x online", pubKey)
|
||||
|
||||
if err := p.ntfnServer.SendUpdate(event); err != nil {
|
||||
log.Warnf("Unable to send peer online update: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyPeerOffline sends a peer offline event to all the clients subscribed
|
||||
// to the peer notifier.
|
||||
func (p *PeerNotifier) NotifyPeerOffline(pubKey [33]byte) {
|
||||
event := PeerOfflineEvent{PubKey: pubKey}
|
||||
|
||||
log.Debugf("PeerNotifier notifying peer: %x offline", pubKey)
|
||||
|
||||
if err := p.ntfnServer.SendUpdate(event); err != nil {
|
||||
log.Warnf("Unable to send peer offline update: %v", err)
|
||||
}
|
||||
}
|
32
server.go
32
server.go
@ -44,6 +44,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/nat"
|
||||
"github.com/lightningnetwork/lnd/netann"
|
||||
"github.com/lightningnetwork/lnd/peernotifier"
|
||||
"github.com/lightningnetwork/lnd/pool"
|
||||
"github.com/lightningnetwork/lnd/routing"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
@ -188,6 +189,8 @@ type server struct {
|
||||
|
||||
channelNotifier *channelnotifier.ChannelNotifier
|
||||
|
||||
peerNotifier *peernotifier.PeerNotifier
|
||||
|
||||
witnessBeacon contractcourt.WitnessBeacon
|
||||
|
||||
breachArbiter *breachArbiter
|
||||
@ -1089,6 +1092,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Assemble a peer notifier which will provide clients with subscriptions
|
||||
// to peer online and offline events.
|
||||
s.peerNotifier = peernotifier.New()
|
||||
|
||||
if cfg.WtClient.Active {
|
||||
policy := wtpolicy.DefaultPolicy()
|
||||
|
||||
@ -1191,6 +1198,10 @@ func (s *server) Start() error {
|
||||
startErr = err
|
||||
return
|
||||
}
|
||||
if err := s.peerNotifier.Start(); err != nil {
|
||||
startErr = err
|
||||
return
|
||||
}
|
||||
if err := s.sphinx.Start(); err != nil {
|
||||
startErr = err
|
||||
return
|
||||
@ -1349,6 +1360,7 @@ func (s *server) Stop() error {
|
||||
s.chainArb.Stop()
|
||||
s.sweeper.Stop()
|
||||
s.channelNotifier.Stop()
|
||||
s.peerNotifier.Stop()
|
||||
s.cc.wallet.Shutdown()
|
||||
s.cc.chainView.Stop()
|
||||
s.connMgr.Stop()
|
||||
@ -2721,7 +2733,8 @@ func (s *server) addPeer(p *peer) {
|
||||
// TODO(roasbeef): pipe all requests through to the
|
||||
// queryHandler/peerManager
|
||||
|
||||
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
|
||||
pubSer := p.addr.IdentityKey.SerializeCompressed()
|
||||
pubStr := string(pubSer)
|
||||
|
||||
s.peersByPub[pubStr] = p
|
||||
|
||||
@ -2730,6 +2743,13 @@ func (s *server) addPeer(p *peer) {
|
||||
} else {
|
||||
s.outboundPeers[pubStr] = p
|
||||
}
|
||||
|
||||
// Inform the peer notifier of a peer online event so that it can be reported
|
||||
// to clients listening for peer events.
|
||||
var pubKey [33]byte
|
||||
copy(pubKey[:], pubSer)
|
||||
|
||||
s.peerNotifier.NotifyPeerOnline(pubKey)
|
||||
}
|
||||
|
||||
// peerInitializer asynchronously starts a newly connected peer after it has
|
||||
@ -2971,7 +2991,8 @@ func (s *server) removePeer(p *peer) {
|
||||
return
|
||||
}
|
||||
|
||||
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
|
||||
pubSer := p.addr.IdentityKey.SerializeCompressed()
|
||||
pubStr := string(pubSer)
|
||||
|
||||
delete(s.peersByPub, pubStr)
|
||||
|
||||
@ -2980,6 +3001,13 @@ func (s *server) removePeer(p *peer) {
|
||||
} else {
|
||||
delete(s.outboundPeers, pubStr)
|
||||
}
|
||||
|
||||
// Inform the peer notifier of a peer offline event so that it can be
|
||||
// reported to clients listening for peer events.
|
||||
var pubKey [33]byte
|
||||
copy(pubKey[:], pubSer)
|
||||
|
||||
s.peerNotifier.NotifyPeerOffline(pubKey)
|
||||
}
|
||||
|
||||
// openChanReq is a message sent to the server in order to request the
|
||||
|
Loading…
Reference in New Issue
Block a user