From 78cd07570bafaf719b7084287094cab966467a35 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 19 Dec 2018 16:15:09 +0100 Subject: [PATCH] invoiceregistry: extract dispatch to method --- invoices/invoiceregistry.go | 134 +++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 63 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index a9bf44b9..c4e45fe2 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -139,69 +139,7 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { // A sub-systems has just modified the invoice state, so we'll // dispatch notifications to all registered clients. case event := <-i.invoiceEvents: - for clientID, client := range i.notificationClients { - // Before we dispatch this event, we'll check - // to ensure that this client hasn't already - // received this notification in order to - // ensure we don't duplicate any events. - invoice := event.invoice - switch { - // If we've already sent this settle event to - // the client, then we can skip this. - case event.state == channeldb.ContractSettled && - client.settleIndex >= invoice.SettleIndex: - continue - - // Similarly, if we've already sent this add to - // the client then we can skip this one. - case event.state == channeldb.ContractOpen && - client.addIndex >= invoice.AddIndex: - continue - - // These two states should never happen, but we - // log them just in case so we can detect this - // instance. - case event.state == channeldb.ContractOpen && - client.addIndex+1 != invoice.AddIndex: - log.Warnf("client=%v for invoice "+ - "notifications missed an update, "+ - "add_index=%v, new add event index=%v", - clientID, client.addIndex, - invoice.AddIndex) - case event.state == channeldb.ContractSettled && - client.settleIndex+1 != invoice.SettleIndex: - log.Warnf("client=%v for invoice "+ - "notifications missed an update, "+ - "settle_index=%v, new settle event index=%v", - clientID, client.settleIndex, - invoice.SettleIndex) - } - - select { - case client.ntfnQueue.ChanIn() <- &invoiceEvent{ - state: event.state, - invoice: invoice, - }: - case <-i.quit: - return - } - - // Each time we send a notification to a - // client, we'll record the latest add/settle - // index it has. We'll use this to ensure we - // don't send a notification twice, which can - // happen if a new event is added while we're - // catching up a new client. - switch event.state { - case channeldb.ContractSettled: - client.settleIndex = invoice.SettleIndex - case channeldb.ContractOpen: - client.addIndex = invoice.AddIndex - default: - log.Errorf("unknown invoice "+ - "state: %v", event.state) - } - } + i.dispatchToClients(event) case <-i.quit: return @@ -209,6 +147,76 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { } } +// dispatchToClients passes the supplied event to all notification clients that +// subscribed to all invoices. Add and settle indices are used to make sure that +// clients don't receive duplicate or unwanted events. +func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { + invoice := event.invoice + + for clientID, client := range i.notificationClients { + // Before we dispatch this event, we'll check + // to ensure that this client hasn't already + // received this notification in order to + // ensure we don't duplicate any events. + + // TODO(joostjager): Refactor switches. + switch { + // If we've already sent this settle event to + // the client, then we can skip this. + case event.state == channeldb.ContractSettled && + client.settleIndex >= invoice.SettleIndex: + continue + + // Similarly, if we've already sent this add to + // the client then we can skip this one. + case event.state == channeldb.ContractOpen && + client.addIndex >= invoice.AddIndex: + continue + + // These two states should never happen, but we + // log them just in case so we can detect this + // instance. + case event.state == channeldb.ContractOpen && + client.addIndex+1 != invoice.AddIndex: + log.Warnf("client=%v for invoice "+ + "notifications missed an update, "+ + "add_index=%v, new add event index=%v", + clientID, client.addIndex, + invoice.AddIndex) + + case event.state == channeldb.ContractSettled && + client.settleIndex+1 != invoice.SettleIndex: + log.Warnf("client=%v for invoice "+ + "notifications missed an update, "+ + "settle_index=%v, new settle event index=%v", + clientID, client.settleIndex, + invoice.SettleIndex) + } + + select { + case client.ntfnQueue.ChanIn() <- &invoiceEvent{ + state: event.state, + invoice: invoice, + }: + case <-i.quit: + return + } + + // Each time we send a notification to a client, we'll record + // the latest add/settle index it has. We'll use this to ensure + // we don't send a notification twice, which can happen if a new + // event is added while we're catching up a new client. + switch event.state { + case channeldb.ContractSettled: + client.settleIndex = invoice.SettleIndex + case channeldb.ContractOpen: + client.addIndex = invoice.AddIndex + default: + log.Errorf("unknown invoice state: %v", event.state) + } + } +} + // deliverBacklogEvents will attempts to query the invoice database for any // notifications that the client has missed since it reconnected last. func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {