From 395e0596c2174df53a8095d92b87643449e0730f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 9 Aug 2019 12:22:53 +0200 Subject: [PATCH] invoices: fix synchronization issue with single invoice subscribers This commit fixes a synchronization issue where a single invoice subscriber could receive duplicate and/or out of order invoice updates. --- invoices/invoiceregistry.go | 95 +++++++++++++++++----------- invoices/invoiceregistry_test.go | 15 ++++- lnrpc/invoicesrpc/invoices_server.go | 5 +- 3 files changed, 73 insertions(+), 42 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index f2e70145..d596cd9a 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -66,10 +66,12 @@ type InvoiceRegistry struct { notificationClients map[uint32]*InvoiceSubscription singleNotificationClients map[uint32]*SingleInvoiceSubscription - newSubscriptions chan *InvoiceSubscription - newSingleSubscriptions chan *SingleInvoiceSubscription - subscriptionCancels chan uint32 - invoiceEvents chan *invoiceEvent + newSubscriptions chan *InvoiceSubscription + subscriptionCancels chan uint32 + + // invoiceEvents is a single channel over which both invoice updates and + // new single invoice subscriptions are carried. + invoiceEvents chan interface{} // debugInvoices is a map which stores special "debug" invoices which // should be only created/used when manual tests require an invoice @@ -112,9 +114,8 @@ func NewRegistry(cdb *channeldb.DB, decodeFinalCltvExpiry func(invoice string) ( notificationClients: make(map[uint32]*InvoiceSubscription), singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription), newSubscriptions: make(chan *InvoiceSubscription), - newSingleSubscriptions: make(chan *SingleInvoiceSubscription), subscriptionCancels: make(chan uint32), - invoiceEvents: make(chan *invoiceEvent, 100), + invoiceEvents: make(chan interface{}, 100), hodlSubscriptions: make(map[lntypes.Hash]map[chan<- interface{}]struct{}), hodlReverseSubscriptions: make(map[chan<- interface{}]map[lntypes.Hash]struct{}), decodeFinalCltvExpiry: decodeFinalCltvExpiry, @@ -176,23 +177,6 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { // continue. i.notificationClients[newClient.id] = newClient - // A new single invoice subscription has arrived. We'll query - // for any backlog notifications, then add it to the set of - // clients. - case newClient := <-i.newSingleSubscriptions: - err := i.deliverSingleBacklogEvents(newClient) - if err != nil { - log.Errorf("Unable to deliver backlog invoice "+ - "notifications: %v", err) - } - - log.Infof("New single invoice subscription "+ - "client: id=%v, hash=%v", - newClient.id, newClient.hash, - ) - - i.singleNotificationClients[newClient.id] = newClient - // A client no longer wishes to receive invoice notifications. // So we'll remove them from the set of active clients. case clientID := <-i.subscriptionCancels: @@ -202,18 +186,39 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { delete(i.notificationClients, clientID) delete(i.singleNotificationClients, clientID) - // A sub-systems has just modified the invoice state, so we'll - // dispatch notifications to all registered clients. + // An invoice event has come in. This can either be an update to + // an invoice or a new single invoice subscriber. Both type of + // events are passed in via the same channel, to make sure that + // subscribers get a consistent view of the event sequence. case event := <-i.invoiceEvents: - // For backwards compatibility, do not notify all - // invoice subscribers of cancel and accept events. - state := event.invoice.Terms.State - if state != channeldb.ContractCanceled && - state != channeldb.ContractAccepted { + switch e := event.(type) { - i.dispatchToClients(event) + // A sub-systems has just modified the invoice state, so + // we'll dispatch notifications to all registered + // clients. + case *invoiceEvent: + // For backwards compatibility, do not notify + // all invoice subscribers of cancel and accept + // events. + state := e.invoice.Terms.State + if state != channeldb.ContractCanceled && + state != channeldb.ContractAccepted { + + i.dispatchToClients(e) + } + i.dispatchToSingleClients(e) + + // A new single invoice subscription has arrived. Add it + // to the set of clients. It is important to do this in + // sequence with any other invoice events, because an + // initial invoice update has already been sent out to + // the subscriber. + case *SingleInvoiceSubscription: + log.Infof("New single invoice subscription "+ + "client: id=%v, hash=%v", e.id, e.hash) + + i.singleNotificationClients[e.id] = e } - i.dispatchToSingleClients(event) case <-i.quit: return @@ -367,7 +372,9 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents( // It is possible that the invoice does not exist yet, but the client is // already watching it in anticipation. - if err == channeldb.ErrInvoiceNotFound { + if err == channeldb.ErrInvoiceNotFound || + err == channeldb.ErrNoInvoicesCreated { + return nil } if err != nil { @@ -896,7 +903,7 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) * // SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the // caller to receive async notifications for a specific invoice. func (i *InvoiceRegistry) SubscribeSingleInvoice( - hash lntypes.Hash) *SingleInvoiceSubscription { + hash lntypes.Hash) (*SingleInvoiceSubscription, error) { client := &SingleInvoiceSubscription{ Updates: make(chan *channeldb.Invoice), @@ -949,12 +956,24 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( } }() - select { - case i.newSingleSubscriptions <- client: - case <-i.quit: + // Within the lock, we both query the invoice state and pass the client + // subscription to the invoiceEvents channel. This is to make sure that + // the client receives a consistent stream of events. + i.Lock() + defer i.Unlock() + + err := i.deliverSingleBacklogEvents(client) + if err != nil { + return nil, err } - return client + select { + case i.invoiceEvents <- client: + case <-i.quit: + return nil, ErrShuttingDown + } + + return client, nil } // notifyHodlSubscribers sends out the hodl event to all current subscribers. diff --git a/invoices/invoiceregistry_test.go b/invoices/invoiceregistry_test.go index 71f6bfee..ff3c04be 100644 --- a/invoices/invoiceregistry_test.go +++ b/invoices/invoiceregistry_test.go @@ -71,7 +71,10 @@ func TestSettleInvoice(t *testing.T) { defer allSubscriptions.Cancel() // Subscribe to the not yet existing invoice. - subscription := registry.SubscribeSingleInvoice(hash) + subscription, err := registry.SubscribeSingleInvoice(hash) + if err != nil { + t.Fatal(err) + } defer subscription.Cancel() if subscription.hash != hash { @@ -225,7 +228,10 @@ func TestCancelInvoice(t *testing.T) { } // Subscribe to the not yet existing invoice. - subscription := registry.SubscribeSingleInvoice(hash) + subscription, err := registry.SubscribeSingleInvoice(hash) + if err != nil { + t.Fatal(err) + } defer subscription.Cancel() if subscription.hash != hash { @@ -329,7 +335,10 @@ func TestHoldInvoice(t *testing.T) { defer allSubscriptions.Cancel() // Subscribe to the not yet existing invoice. - subscription := registry.SubscribeSingleInvoice(hash) + subscription, err := registry.SubscribeSingleInvoice(hash) + if err != nil { + t.Fatal(err) + } defer subscription.Cancel() if subscription.hash != hash { diff --git a/lnrpc/invoicesrpc/invoices_server.go b/lnrpc/invoicesrpc/invoices_server.go index c88de633..df038b18 100644 --- a/lnrpc/invoicesrpc/invoices_server.go +++ b/lnrpc/invoicesrpc/invoices_server.go @@ -178,7 +178,10 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest, return err } - invoiceClient := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash) + invoiceClient, err := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash) + if err != nil { + return err + } defer invoiceClient.Cancel() for {