Merge pull request #4298 from cfromknecht/sub-inv-fixes

channeldb+invoices: minor invoice subscription fixes
This commit is contained in:
Olaoluwa Osuntokun 2020-05-27 15:56:47 -07:00 committed by GitHub
commit d2d8793afd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 41 additions and 30 deletions

@ -11,6 +11,7 @@ import (
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/record"
"github.com/stretchr/testify/assert"
) )
var ( var (
@ -305,6 +306,9 @@ func TestInvoiceAddTimeSeries(t *testing.T) {
t.Fatalf("unable to make test db: %v", err) t.Fatalf("unable to make test db: %v", err)
} }
_, err = db.InvoicesAddedSince(0)
assert.Nil(t, err)
// We'll start off by creating 20 random invoices, and inserting them // We'll start off by creating 20 random invoices, and inserting them
// into the database. // into the database.
const numInvoices = 20 const numInvoices = 20
@ -372,6 +376,9 @@ func TestInvoiceAddTimeSeries(t *testing.T) {
} }
} }
_, err = db.InvoicesSettledSince(0)
assert.Nil(t, err)
var settledInvoices []Invoice var settledInvoices []Invoice
var settleIndex uint64 = 1 var settleIndex uint64 = 1
// We'll now only settle the latter half of each of those invoices. // We'll now only settle the latter half of each of those invoices.

@ -488,12 +488,12 @@ func (d *DB) InvoicesAddedSince(sinceAddIndex uint64) ([]Invoice, error) {
err := kvdb.View(d, func(tx kvdb.RTx) error { err := kvdb.View(d, func(tx kvdb.RTx) error {
invoices := tx.ReadBucket(invoiceBucket) invoices := tx.ReadBucket(invoiceBucket)
if invoices == nil { if invoices == nil {
return ErrNoInvoicesCreated return nil
} }
addIndex := invoices.NestedReadBucket(addIndexBucket) addIndex := invoices.NestedReadBucket(addIndexBucket)
if addIndex == nil { if addIndex == nil {
return ErrNoInvoicesCreated return nil
} }
// We'll now run through each entry in the add index starting // We'll now run through each entry in the add index starting
@ -520,12 +520,7 @@ func (d *DB) InvoicesAddedSince(sinceAddIndex uint64) ([]Invoice, error) {
return nil return nil
}) })
switch { if err != nil {
// If no invoices have been created, then we'll return the empty set of
// invoices.
case err == ErrNoInvoicesCreated:
case err != nil:
return nil, err return nil, err
} }
@ -886,12 +881,12 @@ func (d *DB) InvoicesSettledSince(sinceSettleIndex uint64) ([]Invoice, error) {
err := kvdb.View(d, func(tx kvdb.RTx) error { err := kvdb.View(d, func(tx kvdb.RTx) error {
invoices := tx.ReadBucket(invoiceBucket) invoices := tx.ReadBucket(invoiceBucket)
if invoices == nil { if invoices == nil {
return ErrNoInvoicesCreated return nil
} }
settleIndex := invoices.NestedReadBucket(settleIndexBucket) settleIndex := invoices.NestedReadBucket(settleIndexBucket)
if settleIndex == nil { if settleIndex == nil {
return ErrNoInvoicesCreated return nil
} }
// We'll now run through each entry in the add index starting // We'll now run through each entry in the add index starting

@ -234,15 +234,6 @@ func (i *InvoiceRegistry) invoiceEventLoop() {
// We'll query for any backlog notifications, then add it to the // We'll query for any backlog notifications, then add it to the
// set of clients. // set of clients.
case newClient := <-i.newSubscriptions: case newClient := <-i.newSubscriptions:
// Before we add the client to our set of active
// clients, we'll first attempt to deliver any backlog
// invoice events.
err := i.deliverBacklogEvents(newClient)
if err != nil {
log.Errorf("unable to deliver backlog invoice "+
"notifications: %v", err)
}
log.Infof("New invoice subscription "+ log.Infof("New invoice subscription "+
"client: id=%v", newClient.id) "client: id=%v", newClient.id)
@ -410,9 +401,6 @@ func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
// deliverBacklogEvents will attempts to query the invoice database for any // deliverBacklogEvents will attempts to query the invoice database for any
// notifications that the client has missed since it reconnected last. // notifications that the client has missed since it reconnected last.
func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error { func (i *InvoiceRegistry) deliverBacklogEvents(client *InvoiceSubscription) error {
// First, we'll query the database to see if based on the provided
// addIndex and settledIndex we need to deliver any backlog
// notifications.
addEvents, err := i.cdb.InvoicesAddedSince(client.addIndex) addEvents, err := i.cdb.InvoicesAddedSince(client.addIndex)
if err != nil { if err != nil {
return err return err
@ -1182,7 +1170,9 @@ func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start // added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
// by first sending out all new events with an invoice index _greater_ than // by first sending out all new events with an invoice index _greater_ than
// this value. Afterwards, we'll send out real-time notifications. // this value. Afterwards, we'll send out real-time notifications.
func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *InvoiceSubscription { func (i *InvoiceRegistry) SubscribeNotifications(
addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
client := &InvoiceSubscription{ client := &InvoiceSubscription{
NewInvoices: make(chan *channeldb.Invoice), NewInvoices: make(chan *channeldb.Invoice),
SettledInvoices: make(chan *channeldb.Invoice), SettledInvoices: make(chan *channeldb.Invoice),
@ -1251,12 +1241,23 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
} }
}() }()
i.Lock()
defer i.Unlock()
// Query the database to see if based on the provided addIndex and
// settledIndex we need to deliver any backlog notifications.
err := i.deliverBacklogEvents(client)
if err != nil {
return nil, err
}
select { select {
case i.newSubscriptions <- client: case i.newSubscriptions <- client:
case <-i.quit: case <-i.quit:
return nil, ErrShuttingDown
} }
return client return client, nil
} }
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the // SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/record"
"github.com/stretchr/testify/assert"
) )
// TestSettleInvoice tests settling of an invoice and related notifications. // TestSettleInvoice tests settling of an invoice and related notifications.
@ -16,7 +17,8 @@ func TestSettleInvoice(t *testing.T) {
ctx := newTestContext(t) ctx := newTestContext(t)
defer ctx.cleanup() defer ctx.cleanup()
allSubscriptions := ctx.registry.SubscribeNotifications(0, 0) allSubscriptions, err := ctx.registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Subscribe to the not yet existing invoice. // Subscribe to the not yet existing invoice.
@ -221,11 +223,12 @@ func TestCancelInvoice(t *testing.T) {
ctx := newTestContext(t) ctx := newTestContext(t)
defer ctx.cleanup() defer ctx.cleanup()
allSubscriptions := ctx.registry.SubscribeNotifications(0, 0) allSubscriptions, err := ctx.registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Try to cancel the not yet existing invoice. This should fail. // Try to cancel the not yet existing invoice. This should fail.
err := ctx.registry.CancelInvoice(testInvoicePaymentHash) err = ctx.registry.CancelInvoice(testInvoicePaymentHash)
if err != channeldb.ErrInvoiceNotFound { if err != channeldb.ErrInvoiceNotFound {
t.Fatalf("expected ErrInvoiceNotFound, but got %v", err) t.Fatalf("expected ErrInvoiceNotFound, but got %v", err)
} }
@ -352,7 +355,8 @@ func TestSettleHoldInvoice(t *testing.T) {
} }
defer registry.Stop() defer registry.Stop()
allSubscriptions := registry.SubscribeNotifications(0, 0) allSubscriptions, err := registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Subscribe to the not yet existing invoice. // Subscribe to the not yet existing invoice.
@ -651,7 +655,8 @@ func testKeySend(t *testing.T, keySendEnabled bool) {
ctx.registry.cfg.AcceptKeySend = keySendEnabled ctx.registry.cfg.AcceptKeySend = keySendEnabled
allSubscriptions := ctx.registry.SubscribeNotifications(0, 0) allSubscriptions, err := ctx.registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
hodlChan := make(chan interface{}, 1) hodlChan := make(chan interface{}, 1)

@ -4535,9 +4535,12 @@ func (r *rpcServer) ListInvoices(ctx context.Context,
func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription, func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription,
updateStream lnrpc.Lightning_SubscribeInvoicesServer) error { updateStream lnrpc.Lightning_SubscribeInvoicesServer) error {
invoiceClient := r.server.invoices.SubscribeNotifications( invoiceClient, err := r.server.invoices.SubscribeNotifications(
req.AddIndex, req.SettleIndex, req.AddIndex, req.SettleIndex,
) )
if err != nil {
return err
}
defer invoiceClient.Cancel() defer invoiceClient.Cancel()
for { for {