diff --git a/invoiceregistry.go b/invoiceregistry.go index 0ca22027..b55bdf6b 100644 --- a/invoiceregistry.go +++ b/invoiceregistry.go @@ -31,6 +31,10 @@ type invoiceRegistry struct { cdb *channeldb.DB + clientMtx sync.Mutex + nextClientID uint32 + notificationClients map[uint32]*invoiceSubscription + // debugInvoices is a mp which stores special "debug" invoices which // should be only created/used when manual tests require an invoice // that *all* nodes are able to fully settle. @@ -43,8 +47,9 @@ type invoiceRegistry struct { // which are volatile yet available system wide within the daemon. func newInvoiceRegistry(cdb *channeldb.DB) *invoiceRegistry { return &invoiceRegistry{ - cdb: cdb, - debugInvoices: make(map[wire.ShaHash]*channeldb.Invoice), + cdb: cdb, + debugInvoices: make(map[wire.ShaHash]*channeldb.Invoice), + notificationClients: make(map[uint32]*invoiceSubscription), } } @@ -83,7 +88,14 @@ func (i *invoiceRegistry) AddInvoice(invoice *channeldb.Invoice) error { })) // TODO(roasbeef): also check in memory for quick lookups/settles? - return i.cdb.AddInvoice(invoice) + if err := i.cdb.AddInvoice(invoice); err != nil { + return err + } + + // TODO(roasbeef): re-enable? + //go i.notifyClients(invoice, false) + + return nil } // lookupInvoice looks up an invoice by it's payment hash (R-Hash), if found @@ -126,5 +138,80 @@ func (i *invoiceRegistry) SettleInvoice(rHash wire.ShaHash) error { // If this isn't a debug invoice, then we'll attempt to settle an // invoice matching this rHash on disk (if one exists). - return i.cdb.SettleInvoice(rHash) + if err := i.cdb.SettleInvoice(rHash); err != nil { + return err + } + + // Launch a new goroutine to notify any/all registered invoice + // notification clients. + go func() { + invoice, err := i.cdb.LookupInvoice(rHash) + if err != nil { + ltndLog.Errorf("unable to find invoice: %v", err) + return + } + + i.notifyClients(invoice, true) + }() + + return nil +} + +// notifyClients notifies all currently registered invoice notificaiton clients +// of a newly added/settled invoice. +func (i *invoiceRegistry) notifyClients(invoice *channeldb.Invoice, settle bool) { + i.clientMtx.Lock() + defer i.clientMtx.Unlock() + + for _, client := range i.notificationClients { + var eventChan chan *channeldb.Invoice + if settle { + eventChan = client.SettledInvoices + } else { + eventChan = client.NewInvoices + } + + go func() { + eventChan <- invoice + }() + } +} + +// invoiceSubscription represents an intent to receive updates for newly added +// or settled invoices. For each newly added invoice, a copy of the invoice +// will be sent over the NewInvoices channel. Similarly, for each newly settled +// invoice, a copy of the invoice will be sent over the SettledInvoices +// channel. +type invoiceSubscription struct { + NewInvoices chan *channeldb.Invoice + SettledInvoices chan *channeldb.Invoice + + inv *invoiceRegistry + id uint32 +} + +// Cancel unregisteres the invoiceSubscription, freeing any previoulsy allocate +// resources. +func (i *invoiceSubscription) Cancel() { + i.inv.clientMtx.Lock() + delete(i.inv.notificationClients, i.id) + i.inv.clientMtx.Unlock() +} + +// SubscribeNotifications returns an invoiceSubscription which allows the +// caller to receive async notifications when any invoices are settled or +// added. +func (i *invoiceRegistry) SubscribeNotifications() *invoiceSubscription { + client := &invoiceSubscription{ + NewInvoices: make(chan *channeldb.Invoice), + SettledInvoices: make(chan *channeldb.Invoice), + } + + i.clientMtx.Lock() + i.notificationClients[i.nextClientID] = client + client.id = i.nextClientID + i.nextClientID++ + i.clientMtx.Unlock() + + return client }