From 3aa008ab043856e39e66b2248084847c5d678132 Mon Sep 17 00:00:00 2001 From: carla Date: Tue, 8 Sep 2020 13:47:13 +0200 Subject: [PATCH] multi: add interface for subscribe client so it can be mocked The current implementation of subscribe is difficult to mock because the queue that we send updates on in unexported, so you cannot create a subscribe.Client object and then add your own updates. While it is possible to run a subscribe server in tests, subscribe servers will shutdown before dispatching their udpates to all clients, which can be flakey (and is difficult to workaround). In this commit, we add a subscription interface so that these testing struggles can be addressed with a mock. --- chanfitness/chaneventstore.go | 4 ++-- server.go | 11 ++++++++--- subscribe/interface.go | 17 +++++++++++++++++ subscribe/subscribe.go | 12 +++++++++--- 4 files changed, 36 insertions(+), 8 deletions(-) create mode 100644 subscribe/interface.go diff --git a/chanfitness/chaneventstore.go b/chanfitness/chaneventstore.go index 1fa846ae..3bf7c324 100644 --- a/chanfitness/chaneventstore.go +++ b/chanfitness/chaneventstore.go @@ -62,11 +62,11 @@ type ChannelEventStore struct { type Config struct { // SubscribeChannelEvents provides a subscription client which provides // a stream of channel events. - SubscribeChannelEvents func() (*subscribe.Client, error) + SubscribeChannelEvents func() (subscribe.Subscription, error) // SubscribePeerEvents provides a subscription client which provides a // stream of peer online/offline events. - SubscribePeerEvents func() (*subscribe.Client, error) + SubscribePeerEvents func() (subscribe.Subscription, error) // GetOpenChannels provides a list of existing open channels which is // used to populate the ChannelEventStore with a set of channels on diff --git a/server.go b/server.go index cba47e39..91bbf211 100644 --- a/server.go +++ b/server.go @@ -60,6 +60,7 @@ import ( "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/localchans" "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/subscribe" "github.com/lightningnetwork/lnd/sweep" "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/tor" @@ -1203,9 +1204,13 @@ func newServer(cfg *Config, listenAddrs []net.Addr, // Create a channel event store which monitors all open channels. s.chanEventStore = chanfitness.NewChannelEventStore(&chanfitness.Config{ - SubscribeChannelEvents: s.channelNotifier.SubscribeChannelEvents, - SubscribePeerEvents: s.peerNotifier.SubscribePeerEvents, - GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels, + SubscribeChannelEvents: func() (subscribe.Subscription, error) { + return s.channelNotifier.SubscribeChannelEvents() + }, + SubscribePeerEvents: func() (subscribe.Subscription, error) { + return s.peerNotifier.SubscribePeerEvents() + }, + GetOpenChannels: s.remoteChanDB.FetchAllOpenChannels, }) if cfg.WtClient.Active { diff --git a/subscribe/interface.go b/subscribe/interface.go new file mode 100644 index 00000000..f28429df --- /dev/null +++ b/subscribe/interface.go @@ -0,0 +1,17 @@ +package subscribe + +// Subscription is an interface implemented by subscriptions to a server +// providing updates. +type Subscription interface { + // Updates returns a read-only channel where the updates the client has + // subscribed to will be delivered. + Updates() <-chan interface{} + + // Quit is a channel that will be closed in case the server decides to + // no longer deliver updates to this client. + Quit() <-chan struct{} + + // Cancel should be called in case the client no longer wants to + // subscribe for updates from the server. + Cancel() +} diff --git a/subscribe/subscribe.go b/subscribe/subscribe.go index 1d413031..f95283cc 100644 --- a/subscribe/subscribe.go +++ b/subscribe/subscribe.go @@ -14,9 +14,9 @@ var ErrServerShuttingDown = errors.New("subscription server shutting down") // Client is used to get notified about updates the caller has subscribed to, type Client struct { - // Cancel should be called in case the client no longer wants to + // cancel should be called in case the client no longer wants to // subscribe for updates from the server. - Cancel func() + cancel func() updates *queue.ConcurrentQueue quit chan struct{} @@ -34,6 +34,12 @@ func (c *Client) Quit() <-chan struct{} { return c.quit } +// Cancel should be called in case the client no longer wants to +// subscribe for updates from the server. +func (c *Client) Cancel() { + c.cancel() +} + // Server is a struct that manages a set of subscriptions and their // corresponding clients. Any update will be delivered to all active clients. type Server struct { @@ -118,7 +124,7 @@ func (s *Server) Subscribe() (*Client, error) { client := &Client{ updates: queue.NewConcurrentQueue(20), quit: make(chan struct{}), - Cancel: func() { + cancel: func() { select { case s.clientUpdates <- &clientUpdate{ cancel: true,