invoiceregistry: extract dispatch to method

This commit is contained in:
Joost Jager 2018-12-19 16:15:09 +01:00
parent 3545685177
commit 78cd07570b
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7

@ -139,69 +139,7 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
// A sub-systems has just modified the invoice state, so we'll // A sub-systems has just modified the invoice state, so we'll
// dispatch notifications to all registered clients. // dispatch notifications to all registered clients.
case event := <-i.invoiceEvents: case event := <-i.invoiceEvents:
for clientID, client := range i.notificationClients { i.dispatchToClients(event)
// Before we dispatch this event, we'll check
// to ensure that this client hasn't already
// received this notification in order to
// ensure we don't duplicate any events.
invoice := event.invoice
switch {
// If we've already sent this settle event to
// the client, then we can skip this.
case event.state == channeldb.ContractSettled &&
client.settleIndex >= invoice.SettleIndex:
continue
// Similarly, if we've already sent this add to
// the client then we can skip this one.
case event.state == channeldb.ContractOpen &&
client.addIndex >= invoice.AddIndex:
continue
// These two states should never happen, but we
// log them just in case so we can detect this
// instance.
case event.state == channeldb.ContractOpen &&
client.addIndex+1 != invoice.AddIndex:
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"add_index=%v, new add event index=%v",
clientID, client.addIndex,
invoice.AddIndex)
case event.state == channeldb.ContractSettled &&
client.settleIndex+1 != invoice.SettleIndex:
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"settle_index=%v, new settle event index=%v",
clientID, client.settleIndex,
invoice.SettleIndex)
}
select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
state: event.state,
invoice: invoice,
}:
case <-i.quit:
return
}
// Each time we send a notification to a
// client, we'll record the latest add/settle
// index it has. We'll use this to ensure we
// don't send a notification twice, which can
// happen if a new event is added while we're
// catching up a new client.
switch event.state {
case channeldb.ContractSettled:
client.settleIndex = invoice.SettleIndex
case channeldb.ContractOpen:
client.addIndex = invoice.AddIndex
default:
log.Errorf("unknown invoice "+
"state: %v", event.state)
}
}
case <-i.quit: case <-i.quit:
return return
@ -209,6 +147,76 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
} }
} }
// dispatchToClients passes the supplied event to all notification clients that
// subscribed to all invoices. Add and settle indices are used to make sure that
// clients don't receive duplicate or unwanted events.
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
invoice := event.invoice
for clientID, client := range i.notificationClients {
// Before we dispatch this event, we'll check
// to ensure that this client hasn't already
// received this notification in order to
// ensure we don't duplicate any events.
// TODO(joostjager): Refactor switches.
switch {
// If we've already sent this settle event to
// the client, then we can skip this.
case event.state == channeldb.ContractSettled &&
client.settleIndex >= invoice.SettleIndex:
continue
// Similarly, if we've already sent this add to
// the client then we can skip this one.
case event.state == channeldb.ContractOpen &&
client.addIndex >= invoice.AddIndex:
continue
// These two states should never happen, but we
// log them just in case so we can detect this
// instance.
case event.state == channeldb.ContractOpen &&
client.addIndex+1 != invoice.AddIndex:
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"add_index=%v, new add event index=%v",
clientID, client.addIndex,
invoice.AddIndex)
case event.state == channeldb.ContractSettled &&
client.settleIndex+1 != invoice.SettleIndex:
log.Warnf("client=%v for invoice "+
"notifications missed an update, "+
"settle_index=%v, new settle event index=%v",
clientID, client.settleIndex,
invoice.SettleIndex)
}
select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
state: event.state,
invoice: invoice,
}:
case <-i.quit:
return
}
// Each time we send a notification to a client, we'll record
// the latest add/settle index it has. We'll use this to ensure
// we don't send a notification twice, which can happen if a new
// event is added while we're catching up a new client.
switch event.state {
case channeldb.ContractSettled:
client.settleIndex = invoice.SettleIndex
case channeldb.ContractOpen:
client.addIndex = invoice.AddIndex
default:
log.Errorf("unknown invoice state: %v", event.state)
}
}
}
// deliverBacklogEvents will attempts to query the invoice database for any // deliverBacklogEvents will attempts to query the invoice database for any
// notifications that the client has missed since it reconnected last. // notifications that the client has missed since it reconnected last.
func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error { func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {