invoices: return error from SubscribeNotifications on shutdown

This commit is contained in:
Conner Fromknecht 2020-05-19 20:31:38 -07:00
parent 1332575482
commit 5b747715fc
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
3 changed files with 19 additions and 8 deletions

@ -1182,7 +1182,9 @@ func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start // added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
// by first sending out all new events with an invoice index _greater_ than // by first sending out all new events with an invoice index _greater_ than
// this value. Afterwards, we'll send out real-time notifications. // this value. Afterwards, we'll send out real-time notifications.
func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *InvoiceSubscription { func (i *InvoiceRegistry) SubscribeNotifications(
addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
client := &InvoiceSubscription{ client := &InvoiceSubscription{
NewInvoices: make(chan *channeldb.Invoice), NewInvoices: make(chan *channeldb.Invoice),
SettledInvoices: make(chan *channeldb.Invoice), SettledInvoices: make(chan *channeldb.Invoice),
@ -1254,9 +1256,10 @@ func (i *InvoiceRegistry) SubscribeNotifications(addIndex, settleIndex uint64) *
select { select {
case i.newSubscriptions <- client: case i.newSubscriptions <- client:
case <-i.quit: case <-i.quit:
return nil, ErrShuttingDown
} }
return client return client, nil
} }
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the // SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/record"
"github.com/stretchr/testify/assert"
) )
// TestSettleInvoice tests settling of an invoice and related notifications. // TestSettleInvoice tests settling of an invoice and related notifications.
@ -16,7 +17,8 @@ func TestSettleInvoice(t *testing.T) {
ctx := newTestContext(t) ctx := newTestContext(t)
defer ctx.cleanup() defer ctx.cleanup()
allSubscriptions := ctx.registry.SubscribeNotifications(0, 0) allSubscriptions, err := ctx.registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Subscribe to the not yet existing invoice. // Subscribe to the not yet existing invoice.
@ -221,11 +223,12 @@ func TestCancelInvoice(t *testing.T) {
ctx := newTestContext(t) ctx := newTestContext(t)
defer ctx.cleanup() defer ctx.cleanup()
allSubscriptions := ctx.registry.SubscribeNotifications(0, 0) allSubscriptions, err := ctx.registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Try to cancel the not yet existing invoice. This should fail. // Try to cancel the not yet existing invoice. This should fail.
err := ctx.registry.CancelInvoice(testInvoicePaymentHash) err = ctx.registry.CancelInvoice(testInvoicePaymentHash)
if err != channeldb.ErrInvoiceNotFound { if err != channeldb.ErrInvoiceNotFound {
t.Fatalf("expected ErrInvoiceNotFound, but got %v", err) t.Fatalf("expected ErrInvoiceNotFound, but got %v", err)
} }
@ -352,7 +355,8 @@ func TestSettleHoldInvoice(t *testing.T) {
} }
defer registry.Stop() defer registry.Stop()
allSubscriptions := registry.SubscribeNotifications(0, 0) allSubscriptions, err := registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
// Subscribe to the not yet existing invoice. // Subscribe to the not yet existing invoice.
@ -651,7 +655,8 @@ func testKeySend(t *testing.T, keySendEnabled bool) {
ctx.registry.cfg.AcceptKeySend = keySendEnabled ctx.registry.cfg.AcceptKeySend = keySendEnabled
allSubscriptions := ctx.registry.SubscribeNotifications(0, 0) allSubscriptions, err := ctx.registry.SubscribeNotifications(0, 0)
assert.Nil(t, err)
defer allSubscriptions.Cancel() defer allSubscriptions.Cancel()
hodlChan := make(chan interface{}, 1) hodlChan := make(chan interface{}, 1)

@ -4535,9 +4535,12 @@ func (r *rpcServer) ListInvoices(ctx context.Context,
func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription, func (r *rpcServer) SubscribeInvoices(req *lnrpc.InvoiceSubscription,
updateStream lnrpc.Lightning_SubscribeInvoicesServer) error { updateStream lnrpc.Lightning_SubscribeInvoicesServer) error {
invoiceClient := r.server.invoices.SubscribeNotifications( invoiceClient, err := r.server.invoices.SubscribeNotifications(
req.AddIndex, req.SettleIndex, req.AddIndex, req.SettleIndex,
) )
if err != nil {
return err
}
defer invoiceClient.Cancel() defer invoiceClient.Cancel()
for { for {