invoices: fix synchronization issue with single invoice subscribers
This commit fixes a synchronization issue where a single invoice subscriber could receive duplicate and/or out of order invoice updates.
This commit is contained in:
parent
9ab23de197
commit
395e0596c2
@ -67,9 +67,11 @@ type InvoiceRegistry struct {
|
||||
singleNotificationClients map[uint32]*SingleInvoiceSubscription
|
||||
|
||||
newSubscriptions chan *InvoiceSubscription
|
||||
newSingleSubscriptions chan *SingleInvoiceSubscription
|
||||
subscriptionCancels chan uint32
|
||||
invoiceEvents chan *invoiceEvent
|
||||
|
||||
// invoiceEvents is a single channel over which both invoice updates and
|
||||
// new single invoice subscriptions are carried.
|
||||
invoiceEvents chan interface{}
|
||||
|
||||
// debugInvoices is a map which stores special "debug" invoices which
|
||||
// should be only created/used when manual tests require an invoice
|
||||
@ -112,9 +114,8 @@ func NewRegistry(cdb *channeldb.DB, decodeFinalCltvExpiry func(invoice string) (
|
||||
notificationClients: make(map[uint32]*InvoiceSubscription),
|
||||
singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription),
|
||||
newSubscriptions: make(chan *InvoiceSubscription),
|
||||
newSingleSubscriptions: make(chan *SingleInvoiceSubscription),
|
||||
subscriptionCancels: make(chan uint32),
|
||||
invoiceEvents: make(chan *invoiceEvent, 100),
|
||||
invoiceEvents: make(chan interface{}, 100),
|
||||
hodlSubscriptions: make(map[lntypes.Hash]map[chan<- interface{}]struct{}),
|
||||
hodlReverseSubscriptions: make(map[chan<- interface{}]map[lntypes.Hash]struct{}),
|
||||
decodeFinalCltvExpiry: decodeFinalCltvExpiry,
|
||||
@ -176,23 +177,6 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
|
||||
// continue.
|
||||
i.notificationClients[newClient.id] = newClient
|
||||
|
||||
// A new single invoice subscription has arrived. We'll query
|
||||
// for any backlog notifications, then add it to the set of
|
||||
// clients.
|
||||
case newClient := <-i.newSingleSubscriptions:
|
||||
err := i.deliverSingleBacklogEvents(newClient)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to deliver backlog invoice "+
|
||||
"notifications: %v", err)
|
||||
}
|
||||
|
||||
log.Infof("New single invoice subscription "+
|
||||
"client: id=%v, hash=%v",
|
||||
newClient.id, newClient.hash,
|
||||
)
|
||||
|
||||
i.singleNotificationClients[newClient.id] = newClient
|
||||
|
||||
// A client no longer wishes to receive invoice notifications.
|
||||
// So we'll remove them from the set of active clients.
|
||||
case clientID := <-i.subscriptionCancels:
|
||||
@ -202,18 +186,39 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
|
||||
delete(i.notificationClients, clientID)
|
||||
delete(i.singleNotificationClients, clientID)
|
||||
|
||||
// A sub-systems has just modified the invoice state, so we'll
|
||||
// dispatch notifications to all registered clients.
|
||||
// An invoice event has come in. This can either be an update to
|
||||
// an invoice or a new single invoice subscriber. Both type of
|
||||
// events are passed in via the same channel, to make sure that
|
||||
// subscribers get a consistent view of the event sequence.
|
||||
case event := <-i.invoiceEvents:
|
||||
// For backwards compatibility, do not notify all
|
||||
// invoice subscribers of cancel and accept events.
|
||||
state := event.invoice.Terms.State
|
||||
switch e := event.(type) {
|
||||
|
||||
// A sub-systems has just modified the invoice state, so
|
||||
// we'll dispatch notifications to all registered
|
||||
// clients.
|
||||
case *invoiceEvent:
|
||||
// For backwards compatibility, do not notify
|
||||
// all invoice subscribers of cancel and accept
|
||||
// events.
|
||||
state := e.invoice.Terms.State
|
||||
if state != channeldb.ContractCanceled &&
|
||||
state != channeldb.ContractAccepted {
|
||||
|
||||
i.dispatchToClients(event)
|
||||
i.dispatchToClients(e)
|
||||
}
|
||||
i.dispatchToSingleClients(e)
|
||||
|
||||
// A new single invoice subscription has arrived. Add it
|
||||
// to the set of clients. It is important to do this in
|
||||
// sequence with any other invoice events, because an
|
||||
// initial invoice update has already been sent out to
|
||||
// the subscriber.
|
||||
case *SingleInvoiceSubscription:
|
||||
log.Infof("New single invoice subscription "+
|
||||
"client: id=%v, hash=%v", e.id, e.hash)
|
||||
|
||||
i.singleNotificationClients[e.id] = e
|
||||
}
|
||||
i.dispatchToSingleClients(event)
|
||||
|
||||
case <-i.quit:
|
||||
return
|
||||
@ -367,7 +372,9 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents(
|
||||
|
||||
// It is possible that the invoice does not exist yet, but the client is
|
||||
// already watching it in anticipation.
|
||||
if err == channeldb.ErrInvoiceNotFound {
|
||||
if err == channeldb.ErrInvoiceNotFound ||
|
||||
err == channeldb.ErrNoInvoicesCreated {
|
||||
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
@ -896,7 +903,7 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
|
||||
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
|
||||
// caller to receive async notifications for a specific invoice.
|
||||
func (i *InvoiceRegistry) SubscribeSingleInvoice(
|
||||
hash lntypes.Hash) *SingleInvoiceSubscription {
|
||||
hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
|
||||
|
||||
client := &SingleInvoiceSubscription{
|
||||
Updates: make(chan *channeldb.Invoice),
|
||||
@ -949,12 +956,24 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case i.newSingleSubscriptions <- client:
|
||||
case <-i.quit:
|
||||
// Within the lock, we both query the invoice state and pass the client
|
||||
// subscription to the invoiceEvents channel. This is to make sure that
|
||||
// the client receives a consistent stream of events.
|
||||
i.Lock()
|
||||
defer i.Unlock()
|
||||
|
||||
err := i.deliverSingleBacklogEvents(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return client
|
||||
select {
|
||||
case i.invoiceEvents <- client:
|
||||
case <-i.quit:
|
||||
return nil, ErrShuttingDown
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// notifyHodlSubscribers sends out the hodl event to all current subscribers.
|
||||
|
@ -71,7 +71,10 @@ func TestSettleInvoice(t *testing.T) {
|
||||
defer allSubscriptions.Cancel()
|
||||
|
||||
// Subscribe to the not yet existing invoice.
|
||||
subscription := registry.SubscribeSingleInvoice(hash)
|
||||
subscription, err := registry.SubscribeSingleInvoice(hash)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer subscription.Cancel()
|
||||
|
||||
if subscription.hash != hash {
|
||||
@ -225,7 +228,10 @@ func TestCancelInvoice(t *testing.T) {
|
||||
}
|
||||
|
||||
// Subscribe to the not yet existing invoice.
|
||||
subscription := registry.SubscribeSingleInvoice(hash)
|
||||
subscription, err := registry.SubscribeSingleInvoice(hash)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer subscription.Cancel()
|
||||
|
||||
if subscription.hash != hash {
|
||||
@ -329,7 +335,10 @@ func TestHoldInvoice(t *testing.T) {
|
||||
defer allSubscriptions.Cancel()
|
||||
|
||||
// Subscribe to the not yet existing invoice.
|
||||
subscription := registry.SubscribeSingleInvoice(hash)
|
||||
subscription, err := registry.SubscribeSingleInvoice(hash)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer subscription.Cancel()
|
||||
|
||||
if subscription.hash != hash {
|
||||
|
@ -178,7 +178,10 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
|
||||
return err
|
||||
}
|
||||
|
||||
invoiceClient := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash)
|
||||
invoiceClient, err := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer invoiceClient.Cancel()
|
||||
|
||||
for {
|
||||
|
Loading…
Reference in New Issue
Block a user