Merge pull request #3389 from joostjager/invreg-sync-fix

invoices: fix synchronization issue with single invoice subscribers
This commit is contained in:
Olaoluwa Osuntokun 2019-08-13 18:16:47 -07:00 committed by GitHub
commit 9f88577627
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 66 deletions

@ -36,6 +36,10 @@ var (
// ErrInvoiceAmountTooLow is returned when an invoice is attempted to be // ErrInvoiceAmountTooLow is returned when an invoice is attempted to be
// accepted or settled with an amount that is too low. // accepted or settled with an amount that is too low.
ErrInvoiceAmountTooLow = errors.New("paid amount less than invoice amount") ErrInvoiceAmountTooLow = errors.New("paid amount less than invoice amount")
// ErrShuttingDown is returned when an operation failed because the
// invoice registry is shutting down.
ErrShuttingDown = errors.New("invoice registry shutting down")
) )
// HodlEvent describes how an htlc should be resolved. If HodlEvent.Preimage is // HodlEvent describes how an htlc should be resolved. If HodlEvent.Preimage is
@ -63,9 +67,11 @@ type InvoiceRegistry struct {
singleNotificationClients map[uint32]*SingleInvoiceSubscription singleNotificationClients map[uint32]*SingleInvoiceSubscription
newSubscriptions chan *InvoiceSubscription newSubscriptions chan *InvoiceSubscription
newSingleSubscriptions chan *SingleInvoiceSubscription
subscriptionCancels chan uint32 subscriptionCancels chan uint32
invoiceEvents chan *invoiceEvent
// invoiceEvents is a single channel over which both invoice updates and
// new single invoice subscriptions are carried.
invoiceEvents chan interface{}
// debugInvoices is a map which stores special "debug" invoices which // debugInvoices is a map which stores special "debug" invoices which
// should be only created/used when manual tests require an invoice // should be only created/used when manual tests require an invoice
@ -108,9 +114,8 @@ func NewRegistry(cdb *channeldb.DB, decodeFinalCltvExpiry func(invoice string) (
notificationClients: make(map[uint32]*InvoiceSubscription), notificationClients: make(map[uint32]*InvoiceSubscription),
singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription), singleNotificationClients: make(map[uint32]*SingleInvoiceSubscription),
newSubscriptions: make(chan *InvoiceSubscription), newSubscriptions: make(chan *InvoiceSubscription),
newSingleSubscriptions: make(chan *SingleInvoiceSubscription),
subscriptionCancels: make(chan uint32), subscriptionCancels: make(chan uint32),
invoiceEvents: make(chan *invoiceEvent, 100), invoiceEvents: make(chan interface{}, 100),
hodlSubscriptions: make(map[lntypes.Hash]map[chan<- interface{}]struct{}), hodlSubscriptions: make(map[lntypes.Hash]map[chan<- interface{}]struct{}),
hodlReverseSubscriptions: make(map[chan<- interface{}]map[lntypes.Hash]struct{}), hodlReverseSubscriptions: make(map[chan<- interface{}]map[lntypes.Hash]struct{}),
decodeFinalCltvExpiry: decodeFinalCltvExpiry, decodeFinalCltvExpiry: decodeFinalCltvExpiry,
@ -139,7 +144,6 @@ func (i *InvoiceRegistry) Stop() {
// Only two event types are currently supported: newly created invoices, and // Only two event types are currently supported: newly created invoices, and
// instance where invoices are settled. // instance where invoices are settled.
type invoiceEvent struct { type invoiceEvent struct {
state channeldb.ContractState
hash lntypes.Hash hash lntypes.Hash
invoice *channeldb.Invoice invoice *channeldb.Invoice
} }
@ -173,23 +177,6 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
// continue. // continue.
i.notificationClients[newClient.id] = newClient i.notificationClients[newClient.id] = newClient
// A new single invoice subscription has arrived. We'll query
// for any backlog notifications, then add it to the set of
// clients.
case newClient := <-i.newSingleSubscriptions:
err := i.deliverSingleBacklogEvents(newClient)
if err != nil {
log.Errorf("Unable to deliver backlog invoice "+
"notifications: %v", err)
}
log.Infof("New single invoice subscription "+
"client: id=%v, hash=%v",
newClient.id, newClient.hash,
)
i.singleNotificationClients[newClient.id] = newClient
// A client no longer wishes to receive invoice notifications. // A client no longer wishes to receive invoice notifications.
// So we'll remove them from the set of active clients. // So we'll remove them from the set of active clients.
case clientID := <-i.subscriptionCancels: case clientID := <-i.subscriptionCancels:
@ -199,17 +186,39 @@ func (i *InvoiceRegistry) invoiceEventNotifier() {
delete(i.notificationClients, clientID) delete(i.notificationClients, clientID)
delete(i.singleNotificationClients, clientID) delete(i.singleNotificationClients, clientID)
// A sub-systems has just modified the invoice state, so we'll // An invoice event has come in. This can either be an update to
// dispatch notifications to all registered clients. // an invoice or a new single invoice subscriber. Both type of
// events are passed in via the same channel, to make sure that
// subscribers get a consistent view of the event sequence.
case event := <-i.invoiceEvents: case event := <-i.invoiceEvents:
// For backwards compatibility, do not notify all switch e := event.(type) {
// invoice subscribers of cancel and accept events.
if event.state != channeldb.ContractCanceled &&
event.state != channeldb.ContractAccepted {
i.dispatchToClients(event) // A sub-systems has just modified the invoice state, so
// we'll dispatch notifications to all registered
// clients.
case *invoiceEvent:
// For backwards compatibility, do not notify
// all invoice subscribers of cancel and accept
// events.
state := e.invoice.Terms.State
if state != channeldb.ContractCanceled &&
state != channeldb.ContractAccepted {
i.dispatchToClients(e)
}
i.dispatchToSingleClients(e)
// A new single invoice subscription has arrived. Add it
// to the set of clients. It is important to do this in
// sequence with any other invoice events, because an
// initial invoice update has already been sent out to
// the subscriber.
case *SingleInvoiceSubscription:
log.Infof("New single invoice subscription "+
"client: id=%v, hash=%v", e.id, e.hash)
i.singleNotificationClients[e.id] = e
} }
i.dispatchToSingleClients(event)
case <-i.quit: case <-i.quit:
return return
@ -226,14 +235,7 @@ func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
continue continue
} }
select { client.notify(event)
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
state: event.state,
invoice: event.invoice,
}:
case <-i.quit:
return
}
} }
} }
@ -250,23 +252,24 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
// ensure we don't duplicate any events. // ensure we don't duplicate any events.
// TODO(joostjager): Refactor switches. // TODO(joostjager): Refactor switches.
state := event.invoice.Terms.State
switch { switch {
// If we've already sent this settle event to // If we've already sent this settle event to
// the client, then we can skip this. // the client, then we can skip this.
case event.state == channeldb.ContractSettled && case state == channeldb.ContractSettled &&
client.settleIndex >= invoice.SettleIndex: client.settleIndex >= invoice.SettleIndex:
continue continue
// Similarly, if we've already sent this add to // Similarly, if we've already sent this add to
// the client then we can skip this one. // the client then we can skip this one.
case event.state == channeldb.ContractOpen && case state == channeldb.ContractOpen &&
client.addIndex >= invoice.AddIndex: client.addIndex >= invoice.AddIndex:
continue continue
// These two states should never happen, but we // These two states should never happen, but we
// log them just in case so we can detect this // log them just in case so we can detect this
// instance. // instance.
case event.state == channeldb.ContractOpen && case state == channeldb.ContractOpen &&
client.addIndex+1 != invoice.AddIndex: client.addIndex+1 != invoice.AddIndex:
log.Warnf("client=%v for invoice "+ log.Warnf("client=%v for invoice "+
"notifications missed an update, "+ "notifications missed an update, "+
@ -274,7 +277,7 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
clientID, client.addIndex, clientID, client.addIndex,
invoice.AddIndex) invoice.AddIndex)
case event.state == channeldb.ContractSettled && case state == channeldb.ContractSettled &&
client.settleIndex+1 != invoice.SettleIndex: client.settleIndex+1 != invoice.SettleIndex:
log.Warnf("client=%v for invoice "+ log.Warnf("client=%v for invoice "+
"notifications missed an update, "+ "notifications missed an update, "+
@ -285,7 +288,6 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
select { select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{ case client.ntfnQueue.ChanIn() <- &invoiceEvent{
state: event.state,
invoice: invoice, invoice: invoice,
}: }:
case <-i.quit: case <-i.quit:
@ -296,13 +298,14 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
// the latest add/settle index it has. We'll use this to ensure // the latest add/settle index it has. We'll use this to ensure
// we don't send a notification twice, which can happen if a new // we don't send a notification twice, which can happen if a new
// event is added while we're catching up a new client. // event is added while we're catching up a new client.
switch event.state { switch event.invoice.Terms.State {
case channeldb.ContractSettled: case channeldb.ContractSettled:
client.settleIndex = invoice.SettleIndex client.settleIndex = invoice.SettleIndex
case channeldb.ContractOpen: case channeldb.ContractOpen:
client.addIndex = invoice.AddIndex client.addIndex = invoice.AddIndex
default: default:
log.Errorf("unexpected invoice state: %v", event.state) log.Errorf("unexpected invoice state: %v",
event.invoice.Terms.State)
} }
} }
} }
@ -333,11 +336,10 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro
select { select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{ case client.ntfnQueue.ChanIn() <- &invoiceEvent{
state: channeldb.ContractOpen,
invoice: &addEvent, invoice: &addEvent,
}: }:
case <-i.quit: case <-i.quit:
return fmt.Errorf("registry shutting down") return ErrShuttingDown
} }
} }
@ -348,11 +350,10 @@ func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) erro
select { select {
case client.ntfnQueue.ChanIn() <- &invoiceEvent{ case client.ntfnQueue.ChanIn() <- &invoiceEvent{
state: channeldb.ContractSettled,
invoice: &settleEvent, invoice: &settleEvent,
}: }:
case <-i.quit: case <-i.quit:
return fmt.Errorf("registry shutting down") return ErrShuttingDown
} }
} }
@ -371,7 +372,9 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents(
// It is possible that the invoice does not exist yet, but the client is // It is possible that the invoice does not exist yet, but the client is
// already watching it in anticipation. // already watching it in anticipation.
if err == channeldb.ErrInvoiceNotFound { if err == channeldb.ErrInvoiceNotFound ||
err == channeldb.ErrNoInvoicesCreated {
return nil return nil
} }
if err != nil { if err != nil {
@ -381,7 +384,6 @@ func (i *InvoiceRegistry) deliverSingleBacklogEvents(
err = client.notify(&invoiceEvent{ err = client.notify(&invoiceEvent{
hash: client.hash, hash: client.hash,
invoice: &invoice, invoice: &invoice,
state: invoice.Terms.State,
}) })
if err != nil { if err != nil {
return err return err
@ -722,7 +724,6 @@ func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
state channeldb.ContractState) { state channeldb.ContractState) {
event := &invoiceEvent{ event := &invoiceEvent{
state: state,
invoice: invoice, invoice: invoice,
hash: hash, hash: hash,
} }
@ -811,7 +812,7 @@ func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
select { select {
case i.ntfnQueue.ChanIn() <- event: case i.ntfnQueue.ChanIn() <- event:
case <-i.inv.quit: case <-i.inv.quit:
return fmt.Errorf("registry shutting down") return ErrShuttingDown
} }
return nil return nil
@ -859,14 +860,15 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
invoiceEvent := ntfn.(*invoiceEvent) invoiceEvent := ntfn.(*invoiceEvent)
var targetChan chan *channeldb.Invoice var targetChan chan *channeldb.Invoice
switch invoiceEvent.state { state := invoiceEvent.invoice.Terms.State
switch state {
case channeldb.ContractOpen: case channeldb.ContractOpen:
targetChan = client.NewInvoices targetChan = client.NewInvoices
case channeldb.ContractSettled: case channeldb.ContractSettled:
targetChan = client.SettledInvoices targetChan = client.SettledInvoices
default: default:
log.Errorf("unknown invoice "+ log.Errorf("unknown invoice "+
"state: %v", invoiceEvent.state) "state: %v", state)
continue continue
} }
@ -901,7 +903,7 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the // SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
// caller to receive async notifications for a specific invoice. // caller to receive async notifications for a specific invoice.
func (i *InvoiceRegistry) SubscribeSingleInvoice( func (i *InvoiceRegistry) SubscribeSingleInvoice(
hash lntypes.Hash) *SingleInvoiceSubscription { hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
client := &SingleInvoiceSubscription{ client := &SingleInvoiceSubscription{
Updates: make(chan *channeldb.Invoice), Updates: make(chan *channeldb.Invoice),
@ -954,12 +956,24 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
} }
}() }()
select { // Within the lock, we both query the invoice state and pass the client
case i.newSingleSubscriptions <- client: // subscription to the invoiceEvents channel. This is to make sure that
case <-i.quit: // the client receives a consistent stream of events.
i.Lock()
defer i.Unlock()
err := i.deliverSingleBacklogEvents(client)
if err != nil {
return nil, err
} }
return client select {
case i.invoiceEvents <- client:
case <-i.quit:
return nil, ErrShuttingDown
}
return client, nil
} }
// notifyHodlSubscribers sends out the hodl event to all current subscribers. // notifyHodlSubscribers sends out the hodl event to all current subscribers.

@ -71,7 +71,10 @@ func TestSettleInvoice(t *testing.T) {
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Subscribe to the not yet existing invoice. // Subscribe to the not yet existing invoice.
subscription := registry.SubscribeSingleInvoice(hash) subscription, err := registry.SubscribeSingleInvoice(hash)
if err != nil {
t.Fatal(err)
}
defer subscription.Cancel() defer subscription.Cancel()
if subscription.hash != hash { if subscription.hash != hash {
@ -225,7 +228,10 @@ func TestCancelInvoice(t *testing.T) {
} }
// Subscribe to the not yet existing invoice. // Subscribe to the not yet existing invoice.
subscription := registry.SubscribeSingleInvoice(hash) subscription, err := registry.SubscribeSingleInvoice(hash)
if err != nil {
t.Fatal(err)
}
defer subscription.Cancel() defer subscription.Cancel()
if subscription.hash != hash { if subscription.hash != hash {
@ -329,7 +335,10 @@ func TestHoldInvoice(t *testing.T) {
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Subscribe to the not yet existing invoice. // Subscribe to the not yet existing invoice.
subscription := registry.SubscribeSingleInvoice(hash) subscription, err := registry.SubscribeSingleInvoice(hash)
if err != nil {
t.Fatal(err)
}
defer subscription.Cancel() defer subscription.Cancel()
if subscription.hash != hash { if subscription.hash != hash {

@ -178,7 +178,10 @@ func (s *Server) SubscribeSingleInvoice(req *SubscribeSingleInvoiceRequest,
return err return err
} }
invoiceClient := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash) invoiceClient, err := s.cfg.InvoiceRegistry.SubscribeSingleInvoice(hash)
if err != nil {
return err
}
defer invoiceClient.Cancel() defer invoiceClient.Cancel()
for { for {