diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index 1fa846ae..3bf7c324 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -62,11 +62,11 @@ type ChannelEventStore struct { type Config struct { // SubscribeChannelEvents provides a subscription client which provides // a stream of channel events. - SubscribeChannelEvents func() (*subscribe.Client, error) + SubscribeChannelEvents func() (subscribe.Subscription, error) // SubscribePeerEvents provides a subscription client which provides a // stream of peer online/offline events. - SubscribePeerEvents func() (*subscribe.Client, error) + SubscribePeerEvents func() (subscribe.Subscription, error) // GetOpenChannels provides a list of existing open channels which is // used to populate the ChannelEventStore with a set of channels on diff --git a/server.go b/server.go index cba47e39..91bbf211 100644 --- a/server.go +++ b/server.go @@ -60,6 +60,7 @@ import ( "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/localchans" "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/sweep" "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/tor" @@ -1203,9 +1204,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // Create a channel event store which monitors all open channels. s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{ - SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents, - SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents, - GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels, + SubscribeChannelEvents: func() (subscribe.Subscription, error) { + return s.channelNotifier.SubscribeChannelEvents() + }, + SubscribePeerEvents: func() (subscribe.Subscription, error) { + return s.peerNotifier.SubscribePeerEvents() + }, + GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels, }) if cfg.WtClient.Active { diff --git a/subscribe/interface.go b/subscribe/interface.go new file mode 100644 index 00000000..f28429df --- /dev/null +++ b/subscribe/interface.go @@ -0,0 +1,17 @@ +package subscribe + +// Subscription is an interface implemented by subscriptions to a server +// providing updates. +type Subscription interface { + // Updates returns a read-only channel where the updates the client has + // subscribed to will be delivered. + Updates() <-chan interface{} + + // Quit is a channel that will be closed in case the server decides to + // no longer deliver updates to this client. + Quit() <-chan struct{} + + // Cancel should be called in case the client no longer wants to + // subscribe for updates from the server. + Cancel() +} diff --git a/subscribe/subscribe.go b/subscribe/subscribe.go index 1d413031..f95283cc 100644 --- a/subscribe/subscribe.go +++ b/subscribe/subscribe.go @@ -14,9 +14,9 @@ var ErrServerShuttingDown = errors.New("subscription server shutting down") // Client is used to get notified about updates the caller has subscribed to, type Client struct { - // Cancel should be called in case the client no longer wants to + // cancel should be called in case the client no longer wants to // subscribe for updates from the server. - Cancel func() + cancel func() updates *queue.ConcurrentQueue quit chan struct{} @@ -34,6 +34,12 @@ func (c *Client) Quit() <-chan struct{} { return c.quit } +// Cancel should be called in case the client no longer wants to +// subscribe for updates from the server. +func (c *Client) Cancel() { + c.cancel() +} + // Server is a struct that manages a set of subscriptions and their // corresponding clients. Any update will be delivered to all active clients. type Server struct { @@ -118,7 +124,7 @@ func (s *Server) Subscribe() (*Client, error) { client := &Client{ updates: queue.NewConcurrentQueue(20), quit: make(chan struct{}), - Cancel: func() { + cancel: func() { select { case s.clientUpdates <- &clientUpdate{ cancel: true,