From bed2acea33121d92b2eebba377537712d50b0697 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 9 Aug 2019 11:41:34 +0200 Subject: [PATCH 1/4] invoices: remove redundant state field from invoice event --- invoices/invoiceregistry.go | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 9d498581..06f592ba 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -139,7 +139,6 @@ func (i *InvoiceRegistry) Stop() { // Only two event types are currently supported: newly created invoices, and // instance where invoices are settled. type invoiceEvent struct { - state channeldb.ContractState hash lntypes.Hash invoice *channeldb.Invoice } @@ -204,8 +203,9 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { case event := <-i.invoiceEvents: // For backwards compatibility, do not notify all // invoice subscribers of cancel and accept events. - if event.state != channeldb.ContractCanceled && - event.state != channeldb.ContractAccepted { + state := event.invoice.Terms.State + if state != channeldb.ContractCanceled && + state != channeldb.ContractAccepted { i.dispatchToClients(event) } @@ -228,7 +228,6 @@ func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) { select { case client.ntfnQueue.ChanIn() <- &invoiceEvent{ - state: event.state, invoice: event.invoice, }: case <-i.quit: @@ -250,23 +249,24 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { // ensure we don't duplicate any events. // TODO(joostjager): Refactor switches. + state := event.invoice.Terms.State switch { // If we've already sent this settle event to // the client, then we can skip this. - case event.state == channeldb.ContractSettled && + case state == channeldb.ContractSettled && client.settleIndex >= invoice.SettleIndex: continue // Similarly, if we've already sent this add to // the client then we can skip this one. - case event.state == channeldb.ContractOpen && + case state == channeldb.ContractOpen && client.addIndex >= invoice.AddIndex: continue // These two states should never happen, but we // log them just in case so we can detect this // instance. - case event.state == channeldb.ContractOpen && + case state == channeldb.ContractOpen && client.addIndex+1 != invoice.AddIndex: log.Warnf("client=%v for invoice "+ "notifications missed an update, "+ @@ -274,7 +274,7 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { clientID, client.addIndex, invoice.AddIndex) - case event.state == channeldb.ContractSettled && + case state == channeldb.ContractSettled && client.settleIndex+1 != invoice.SettleIndex: log.Warnf("client=%v for invoice "+ "notifications missed an update, "+ @@ -285,7 +285,6 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { select { case client.ntfnQueue.ChanIn() <- &invoiceEvent{ - state: event.state, invoice: invoice, }: case <-i.quit: @@ -296,13 +295,14 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) { // the latest add/settle index it has. We'll use this to ensure // we don't send a notification twice, which can happen if a new // event is added while we're catching up a new client. - switch event.state { + switch event.invoice.Terms.State { case channeldb.ContractSettled: client.settleIndex = invoice.SettleIndex case channeldb.ContractOpen: client.addIndex = invoice.AddIndex default: - log.Errorf("unexpected invoice state: %v", event.state) + log.Errorf("unexpected invoice state: %v", + event.invoice.Terms.State) } } } @@ -333,7 +333,6 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro select { case client.ntfnQueue.ChanIn() <- &invoiceEvent{ - state: channeldb.ContractOpen, invoice: &addEvent, }: case <-i.quit: @@ -348,7 +347,6 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro select { case client.ntfnQueue.ChanIn() <- &invoiceEvent{ - state: channeldb.ContractSettled, invoice: &settleEvent, }: case <-i.quit: @@ -381,7 +379,6 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents( err = client.notify(&invoiceEvent{ hash: client.hash, invoice: &invoice, - state: invoice.Terms.State, }) if err != nil { return err @@ -722,7 +719,6 @@ func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash, state channeldb.ContractState) { event := &invoiceEvent{ - state: state, invoice: invoice, hash: hash, } @@ -859,14 +855,15 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) * invoiceEvent := ntfn.(*invoiceEvent) var targetChan chan *channeldb.Invoice - switch invoiceEvent.state { + state := invoiceEvent.invoice.Terms.State + switch state { case channeldb.ContractOpen: targetChan = client.NewInvoices case channeldb.ContractSettled: targetChan = client.SettledInvoices default: log.Errorf("unknown invoice "+ - "state: %v", invoiceEvent.state) + "state: %v", state) continue } From 46e2a9e9b8a4eaa7e31f7dcb27da48e0d2304f2c Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 9 Aug 2019 11:44:27 +0200 Subject: [PATCH 2/4] invoices: reuse client.notify function --- invoices/invoiceregistry.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 06f592ba..80f4474f 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -226,13 +226,7 @@ func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) { continue } - select { - case client.ntfnQueue.ChanIn() <- &invoiceEvent{ - invoice: event.invoice, - }: - case <-i.quit: - return - } + client.notify(event) } } From 9ab23de197ad9175d32c0bee9f88d533b7f0b11f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Tue, 13 Aug 2019 18:57:27 +0200 Subject: [PATCH 3/4] invoices: create error for invoice registry shutting down --- invoices/invoiceregistry.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index 80f4474f..f2e70145 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -36,6 +36,10 @@ var ( // ErrInvoiceAmountTooLow is returned when an invoice is attempted to be // accepted or settled with an amount that is too low. ErrInvoiceAmountTooLow = errors.New("paid amount less than invoice amount") + + // ErrShuttingDown is returned when an operation failed because the + // invoice registry is shutting down. + ErrShuttingDown = errors.New("invoice registry shutting down") ) // HodlEvent describes how an htlc should be resolved. If HodlEvent.Preimage is @@ -330,7 +334,7 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro invoice: &addEvent, }: case <-i.quit: - return fmt.Errorf("registry shutting down") + return ErrShuttingDown } } @@ -344,7 +348,7 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro invoice: &settleEvent, }: case <-i.quit: - return fmt.Errorf("registry shutting down") + return ErrShuttingDown } } @@ -801,7 +805,7 @@ func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error { select { case i.ntfnQueue.ChanIn() <- event: case <-i.inv.quit: - return fmt.Errorf("registry shutting down") + return ErrShuttingDown } return nil From 395e0596c2174df53a8095d92b87643449e0730f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 9 Aug 2019 12:22:53 +0200 Subject: [PATCH 4/4] invoices: fix synchronization issue with single invoice subscribers This commit fixes a synchronization issue where a single invoice subscriber could receive duplicate and/or out of order invoice updates. --- invoices/invoiceregistry.go | 95 +++++++++++++++++----------- invoices/invoiceregistry_test.go | 15 ++++- lnrpc/invoicesrpc/invoices_server.go | 5 +- 3 files changed, 73 insertions(+), 42 deletions(-) diff --git a/invoices/invoiceregistry.go b/invoices/invoiceregistry.go index f2e70145..d596cd9a 100644 --- a/invoices/invoiceregistry.go +++ b/invoices/invoiceregistry.go @@ -66,10 +66,12 @@ type InvoiceRegistry struct { notificationClients map[uint32]*InvoiceSubscription singleNotificationClients map[uint32]*SingleInvoiceSubscription - newSubscriptions chan *InvoiceSubscription - newSingleSubscriptions chan *SingleInvoiceSubscription - subscriptionCancels chan uint32 - invoiceEvents chan *invoiceEvent + newSubscriptions chan *InvoiceSubscription + subscriptionCancels chan uint32 + + // invoiceEvents is a single channel over which both invoice updates and + // new single invoice subscriptions are carried. + invoiceEvents chan interface{} // debugInvoices is a map which stores special "debug" invoices which // should be only created/used when manual tests require an invoice @@ -112,9 +114,8 @@ func NewRegistry(cdb *channeldb.DB, decodeFinalCltvExpiry func(invoice string) ( notificationClients: make(map[uint32]*InvoiceSubscription), singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription), newSubscriptions: make(chan *InvoiceSubscription), - newSingleSubscriptions: make(chan *SingleInvoiceSubscription), subscriptionCancels: make(chan uint32), - invoiceEvents: make(chan *invoiceEvent, 100), + invoiceEvents: make(chan interface{}, 100), hodlSubscriptions: make(map[lntypes.Hash]map[chan<- interface{}]struct{}), hodlReverseSubscriptions: make(map[chan<- interface{}]map[lntypes.Hash]struct{}), decodeFinalCltvExpiry: decodeFinalCltvExpiry, @@ -176,23 +177,6 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { // continue. i.notificationClients[newClient.id] = newClient - // A new single invoice subscription has arrived. We'll query - // for any backlog notifications, then add it to the set of - // clients. - case newClient := <-i.newSingleSubscriptions: - err := i.deliverSingleBacklogEvents(newClient) - if err != nil { - log.Errorf("Unable to deliver backlog invoice "+ - "notifications: %v", err) - } - - log.Infof("New single invoice subscription "+ - "client: id=%v, hash=%v", - newClient.id, newClient.hash, - ) - - i.singleNotificationClients[newClient.id] = newClient - // A client no longer wishes to receive invoice notifications. // So we'll remove them from the set of active clients. case clientID := <-i.subscriptionCancels: @@ -202,18 +186,39 @@ func (i *InvoiceRegistry) invoiceEventNotifier() { delete(i.notificationClients, clientID) delete(i.singleNotificationClients, clientID) - // A sub-systems has just modified the invoice state, so we'll - // dispatch notifications to all registered clients. + // An invoice event has come in. This can either be an update to + // an invoice or a new single invoice subscriber. Both type of + // events are passed in via the same channel, to make sure that + // subscribers get a consistent view of the event sequence. case event := <-i.invoiceEvents: - // For backwards compatibility, do not notify all - // invoice subscribers of cancel and accept events. - state := event.invoice.Terms.State - if state != channeldb.ContractCanceled && - state != channeldb.ContractAccepted { + switch e := event.(type) { - i.dispatchToClients(event) + // A sub-systems has just modified the invoice state, so + // we'll dispatch notifications to all registered + // clients. + case *invoiceEvent: + // For backwards compatibility, do not notify + // all invoice subscribers of cancel and accept + // events. + state := e.invoice.Terms.State + if state != channeldb.ContractCanceled && + state != channeldb.ContractAccepted { + + i.dispatchToClients(e) + } + i.dispatchToSingleClients(e) + + // A new single invoice subscription has arrived. Add it + // to the set of clients. It is important to do this in + // sequence with any other invoice events, because an + // initial invoice update has already been sent out to + // the subscriber. + case *SingleInvoiceSubscription: + log.Infof("New single invoice subscription "+ + "client: id=%v, hash=%v", e.id, e.hash) + + i.singleNotificationClients[e.id] = e } - i.dispatchToSingleClients(event) case <-i.quit: return @@ -367,7 +372,9 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents( // It is possible that the invoice does not exist yet, but the client is // already watching it in anticipation. - if err == channeldb.ErrInvoiceNotFound { + if err == channeldb.ErrInvoiceNotFound || + err == channeldb.ErrNoInvoicesCreated { + return nil } if err != nil { @@ -896,7 +903,7 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) * // SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the // caller to receive async notifications for a specific invoice. func (i *InvoiceRegistry) SubscribeSingleInvoice( - hash lntypes.Hash) *SingleInvoiceSubscription { + hash lntypes.Hash) (*SingleInvoiceSubscription, error) { client := &SingleInvoiceSubscription{ Updates: make(chan *channeldb.Invoice), @@ -949,12 +956,24 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice( } }() - select { - case i.newSingleSubscriptions <- client: - case <-i.quit: + // Within the lock, we both query the invoice state and pass the client + // subscription to the invoiceEvents channel. This is to make sure that + // the client receives a consistent stream of events. + i.Lock() + defer i.Unlock() + + err := i.deliverSingleBacklogEvents(client) + if err != nil { + return nil, err } - return client + select { + case i.invoiceEvents <- client: + case <-i.quit: + return nil, ErrShuttingDown + } + + return client, nil } // notifyHodlSubscribers sends out the hodl event to all current subscribers. diff --git a/invoices/invoiceregistry_test.go b/invoices/invoiceregistry_test.go index 71f6bfee..ff3c04be 100644 --- a/invoices/invoiceregistry_test.go +++ b/invoices/invoiceregistry_test.go @@ -71,7 +71,10 @@ func TestSettleInvoice(t *testing.T) { defer allSubscriptions.Cancel() // Subscribe to the not yet existing invoice. - subscription := registry.SubscribeSingleInvoice(hash) + subscription, err := registry.SubscribeSingleInvoice(hash) + if err != nil { + t.Fatal(err) + } defer subscription.Cancel() if subscription.hash != hash { @@ -225,7 +228,10 @@ func TestCancelInvoice(t *testing.T) { } // Subscribe to the not yet existing invoice. - subscription := registry.SubscribeSingleInvoice(hash) + subscription, err := registry.SubscribeSingleInvoice(hash) + if err != nil { + t.Fatal(err) + } defer subscription.Cancel() if subscription.hash != hash { @@ -329,7 +335,10 @@ func TestHoldInvoice(t *testing.T) { defer allSubscriptions.Cancel() // Subscribe to the not yet existing invoice. - subscription := registry.SubscribeSingleInvoice(hash) + subscription, err := registry.SubscribeSingleInvoice(hash) + if err != nil { + t.Fatal(err) + } defer subscription.Cancel() if subscription.hash != hash { diff --git a/lnrpc/invoicesrpc/invoices_server.go b/lnrpc/invoicesrpc/invoices_server.go index c88de633..df038b18 100644 --- a/lnrpc/invoicesrpc/invoices_server.go +++ b/lnrpc/invoicesrpc/invoices_server.go @@ -178,7 +178,10 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest, return err } - invoiceClient := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash) + invoiceClient, err := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash) + if err != nil { + return err + } defer invoiceClient.Cancel() for {