subscribe: add new subscribe package

This commit creates a new package 'subscribe', that exposes a common
Client-Server subscription system, that can be shared among packages.
This commit is contained in:
Johan T. Halseth 2018-10-12 17:08:14 +02:00 committed by Valentine Wallace
parent 8ac5f2bb17
commit 5181f100ed
2 changed files with 326 additions and 0 deletions

216
subscribe/subscribe.go Normal file
View File

@ -0,0 +1,216 @@
package subscribe
import (
"errors"
"sync"
"sync/atomic"
"github.com/lightningnetwork/lnd/queue"
)
// ErrServerShuttingDown is an error returned in case the server is in the
// process of shutting down.
var ErrServerShuttingDown = errors.New("subscription server shutting down")
// Client is used to get notified about updates the caller has subscribed to,
type Client struct {
// Cancel should be called in case the client no longer wants to
// subscribe for updates from the server.
Cancel func()
updates *queue.ConcurrentQueue
quit chan struct{}
}
// Updates returns a read-only channel where the updates the client has
// subscribed to will be delivered.
func (c *Client) Updates() <-chan interface{} {
return c.updates.ChanOut()
}
// Quit is a channel that will be closed in case the server decides to no
// longer deliver updates to this client.
func (c *Client) Quit() <-chan struct{} {
return c.quit
}
// Server is a struct that manages a set of subscriptions and their
// corresponding clients. Any update will be delivered to all active clients.
type Server struct {
clientCounter uint64 // To be used atomically.
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
clients map[uint64]*Client
clientUpdates chan *clientUpdate
updates chan interface{}
quit chan struct{}
wg sync.WaitGroup
}
// clientUpdate is an internal message sent to the subscriptionHandler to
// either register a new client for subscription or cancel an existing
// subscription.
type clientUpdate struct {
// cancel indicates if the update to the client is cancelling an
// existing client's subscription. If not then this update will be to
// subscribe a new client.
cancel bool
// clientID is the unique identifier for this client. Any further
// updates (deleting or adding) to this notification client will be
// dispatched according to the target clientID.
clientID uint64
// client is the new client that will receive updates. Will be nil in
// case this is a cancallation update.
client *Client
}
// NewServer returns a new Server.
func NewServer() *Server {
return &Server{
clients: make(map[uint64]*Client),
clientUpdates: make(chan *clientUpdate),
updates: make(chan interface{}),
quit: make(chan struct{}),
}
}
// Start starts the Server, making it ready to accept subscriptions and
// updates.
func (s *Server) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return nil
}
s.wg.Add(1)
go s.subscriptionHandler()
return nil
}
// Stop stops the server.
func (s *Server) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
close(s.quit)
s.wg.Wait()
return nil
}
// Subscribe returns a Client that will receive updates any time the Server is
// made aware of a new event.
func (s *Server) Subscribe() (*Client, error) {
// We'll first atomically obtain the next ID for this client from the
// incrementing client ID counter.
clientID := atomic.AddUint64(&s.clientCounter, 1)
// Create the client that will be returned. The Cancel method is
// populated to send the cancellation intent to the
// subscriptionHandler.
client := &Client{
updates: queue.NewConcurrentQueue(20),
quit: make(chan struct{}),
Cancel: func() {
select {
case s.clientUpdates <- &clientUpdate{
cancel: true,
clientID: clientID,
}:
case <-s.quit:
return
}
},
}
select {
case s.clientUpdates <- &clientUpdate{
cancel: false,
clientID: clientID,
client: client,
}:
case <-s.quit:
return nil, ErrServerShuttingDown
}
return client, nil
}
// SendUpdate is called to send the passed update to all currently active
// subscription clients.
func (s *Server) SendUpdate(update interface{}) error {
select {
case s.updates <- update:
return nil
case <-s.quit:
return ErrServerShuttingDown
}
}
// subscriptionHandler is the main handler for the Server. It will handle
// incoming updates and subscriptions, and forward the incoming updates to the
// registered clients.
//
// NOTE: MUST be run as a goroutine.
func (s *Server) subscriptionHandler() {
defer s.wg.Done()
for {
select {
// If a client update is received, the either a new
// subscription becomes active, or we cancel and existing one.
case update := <-s.clientUpdates:
clientID := update.clientID
// In case this is a cancellation, stop the client's
// underlying queue, and remove the client from the set
// of active subscription clients.
if update.cancel {
client, ok := s.clients[update.clientID]
if ok {
client.updates.Stop()
close(client.quit)
delete(s.clients, clientID)
}
continue
}
// If this was not a cancellation, start the underlying
// queue and add the client to our set of subscription
// clients. It will be notified about any new updates
// the server receives.
update.client.updates.Start()
s.clients[update.clientID] = update.client
// A new update was received, forward it to all active clients.
case upd := <-s.updates:
for _, client := range s.clients {
select {
case client.updates.ChanIn() <- upd:
case <-client.quit:
case <-s.quit:
return
}
}
// In case the server is shutting down, stop the clients and
// close the quit channels to notify them.
case <-s.quit:
for _, client := range s.clients {
client.updates.Stop()
close(client.quit)
}
return
}
}
}

110
subscribe/subscribe_test.go Normal file
View File

@ -0,0 +1,110 @@
package subscribe_test
import (
"testing"
"time"
"github.com/lightningnetwork/lnd/subscribe"
)
// TestSubscribe tests that the subscription clients receive the updates sent
// to them after they subscribe, and that cancelled clients don't get more
// updates.
func TestSubscribe(t *testing.T) {
t.Parallel()
server := subscribe.NewServer()
if err := server.Start(); err != nil {
t.Fatalf("unable to start server")
}
const numClients = 300
const numUpdates = 1000
var clients [numClients]*subscribe.Client
// Start by registering two thirds the clients.
for i := 0; i < numClients*2/3; i++ {
c, err := server.Subscribe()
if err != nil {
t.Fatalf("unable to subscribe: %v", err)
}
clients[i] = c
}
// Send half the updates.
for i := 0; i < numUpdates/2; i++ {
if err := server.SendUpdate(i); err != nil {
t.Fatalf("unable to send update")
}
}
// Register the rest of the clients.
for i := numClients * 2 / 3; i < numClients; i++ {
c, err := server.Subscribe()
if err != nil {
t.Fatalf("unable to subscribe: %v", err)
}
clients[i] = c
}
// Cancel one third of the clients.
for i := 0; i < numClients/3; i++ {
clients[i].Cancel()
}
// Send the rest of the updates.
for i := numUpdates / 2; i < numUpdates; i++ {
if err := server.SendUpdate(i); err != nil {
t.Fatalf("unable to send update")
}
}
// Now ensure the clients got the updates we expect.
for i, c := range clients {
var from, to int
switch {
// We expect the first third of the clients to quit, since they
// were cancelled.
case i < numClients/3:
select {
case <-c.Quit():
continue
case <-time.After(1 * time.Second):
t.Fatalf("cancelled client %v did not quit", i)
}
// The next third should receive all updates.
case i < numClients*2/3:
from = 0
to = numUpdates
// And finally the last third should receive the last half of
// the updates.
default:
from = numUpdates / 2
to = numUpdates
}
for cnt := from; cnt < to; cnt++ {
select {
case upd := <-c.Updates():
j := upd.(int)
if j != cnt {
t.Fatalf("expected %v, got %v, for "+
"client %v", cnt, j, i)
}
case <-time.After(1 * time.Second):
t.Fatalf("did not receive expected update %v "+
"for client %v", cnt, i)
}
}
}
}