invoices/invoiceregistry: properly synchronize backlog
This commit moves the db calls for retrieving add and settle backlogs outide of the main event loop. All other db operations are performed outside of the event loop and synchronized via the invoice registry's mutex, which also synchronizes the order in which events submitted to be processed. This resolves various concurrency issues where notifications can be missed of inconsistent reads against the databse. This is especially important in this case because we are actually making two separate database calls.
This commit is contained in:
parent
5b747715fc
commit
6a02fa1107
@ -234,15 +234,6 @@ func (i *InvoiceRegistry) invoiceEventLoop() {
|
||||
// We'll query for any backlog notifications, then add it to the
|
||||
// set of clients.
|
||||
case newClient := <-i.newSubscriptions:
|
||||
// Before we add the client to our set of active
|
||||
// clients, we'll first attempt to deliver any backlog
|
||||
// invoice events.
|
||||
err := i.deliverBacklogEvents(newClient)
|
||||
if err != nil {
|
||||
log.Errorf("unable to deliver backlog invoice "+
|
||||
"notifications: %v", err)
|
||||
}
|
||||
|
||||
log.Infof("New invoice subscription "+
|
||||
"client: id=%v", newClient.id)
|
||||
|
||||
@ -410,9 +401,6 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
|
||||
// deliverBacklogEvents will attempts to query the invoice database for any
|
||||
// notifications that the client has missed since it reconnected last.
|
||||
func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {
|
||||
// First, we'll query the database to see if based on the provided
|
||||
// addIndex and settledIndex we need to deliver any backlog
|
||||
// notifications.
|
||||
addEvents, err := i.cdb.InvoicesAddedSince(client.addIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1253,6 +1241,16 @@ func (i *InvoiceRegistry) SubscribeNotifications(
|
||||
}
|
||||
}()
|
||||
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
// Query the database to see if based on the provided addIndex and
|
||||
// settledIndex we need to deliver any backlog notifications.
|
||||
err := i.deliverBacklogEvents(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
select {
|
||||
case i.newSubscriptions <- client:
|
||||
case <-i.quit:
|
||||
|
Loading…
Reference in New Issue
Block a user