invoices: reduce heap usage when starting the expiry watcher

This commit is contained in:
Andras Banki-Horvath 2020-10-21 13:21:35 +02:00
parent d12f76fd6d
commit 478cf704be
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
3 changed files with 40 additions and 60 deletions

@ -103,10 +103,11 @@ func (ew *InvoiceExpiryWatcher) Stop() {
} }
} }
// prepareInvoice checks if the passed invoice may be canceled and calculates // makeInvoiceExpiry checks if the passed invoice may be canceled and calculates
// the expiry time. // the expiry time and creates a slimmer invoiceExpiry object with the hash and
func (ew *InvoiceExpiryWatcher) prepareInvoice( // expiry time.
paymentHash lntypes.Hash, invoice *channeldb.Invoice) *invoiceExpiry { func makeInvoiceExpiry(paymentHash lntypes.Hash,
invoice *channeldb.Invoice) *invoiceExpiry {
if invoice.State != channeldb.ContractOpen { if invoice.State != channeldb.ContractOpen {
log.Debugf("Invoice not added to expiry watcher: %v", log.Debugf("Invoice not added to expiry watcher: %v",
@ -127,45 +128,14 @@ func (ew *InvoiceExpiryWatcher) prepareInvoice(
} }
} }
// AddInvoices adds multiple invoices to the InvoiceExpiryWatcher. // AddInvoices adds invoices to the InvoiceExpiryWatcher.
func (ew *InvoiceExpiryWatcher) AddInvoices( func (ew *InvoiceExpiryWatcher) AddInvoices(invoices ...*invoiceExpiry) {
invoices map[lntypes.Hash]*channeldb.Invoice) { if len(invoices) > 0 {
invoicesWithExpiry := make([]*invoiceExpiry, 0, len(invoices))
for paymentHash, invoice := range invoices {
newInvoiceExpiry := ew.prepareInvoice(paymentHash, invoice)
if newInvoiceExpiry != nil {
invoicesWithExpiry = append(
invoicesWithExpiry, newInvoiceExpiry,
)
}
}
if len(invoicesWithExpiry) > 0 {
log.Debugf("Added %d invoices to the expiry watcher",
len(invoicesWithExpiry))
select { select {
case ew.newInvoices <- invoicesWithExpiry: case ew.newInvoices <- invoices:
// Select on quit too so that callers won't get blocked in case log.Debugf("Added %d invoices to the expiry watcher",
// of concurrent shutdown. len(invoices))
case <-ew.quit:
}
}
}
// AddInvoice adds a new invoice to the InvoiceExpiryWatcher. This won't check
// if the invoice is already added and will only add invoices with ContractOpen
// state.
func (ew *InvoiceExpiryWatcher) AddInvoice(
paymentHash lntypes.Hash, invoice *channeldb.Invoice) {
newInvoiceExpiry := ew.prepareInvoice(paymentHash, invoice)
if newInvoiceExpiry != nil {
log.Debugf("Adding invoice '%v' to expiry watcher,"+
"expiration: %v", paymentHash, newInvoiceExpiry.Expiry)
select {
case ew.newInvoices <- []*invoiceExpiry{newInvoiceExpiry}:
// Select on quit too so that callers won't get blocked in case // Select on quit too so that callers won't get blocked in case
// of concurrent shutdown. // of concurrent shutdown.
case <-ew.quit: case <-ew.quit:
@ -220,14 +190,21 @@ func (ew *InvoiceExpiryWatcher) mainLoop() {
// Cancel any invoices that may have expired. // Cancel any invoices that may have expired.
ew.cancelNextExpiredInvoice() ew.cancelNextExpiredInvoice()
pushInvoices := func(invoicesWithExpiry []*invoiceExpiry) {
for _, invoiceWithExpiry := range invoicesWithExpiry {
// Avoid pushing nil object to the heap.
if invoiceWithExpiry != nil {
ew.expiryQueue.Push(invoiceWithExpiry)
}
}
}
select { select {
case invoicesWithExpiry := <-ew.newInvoices: case invoicesWithExpiry := <-ew.newInvoices:
// Take newly forwarded invoices with higher priority // Take newly forwarded invoices with higher priority
// in order to not block the newInvoices channel. // in order to not block the newInvoices channel.
for _, invoiceWithExpiry := range invoicesWithExpiry { pushInvoices(invoicesWithExpiry)
ew.expiryQueue.Push(invoiceWithExpiry)
}
continue continue
default: default:
@ -238,9 +215,7 @@ func (ew *InvoiceExpiryWatcher) mainLoop() {
continue continue
case invoicesWithExpiry := <-ew.newInvoices: case invoicesWithExpiry := <-ew.newInvoices:
for _, invoice := range invoicesWithExpiry { pushInvoices(invoicesWithExpiry)
ew.expiryQueue.Push(invoice)
}
case <-ew.quit: case <-ew.quit:
return return

@ -5,7 +5,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
) )
@ -125,7 +124,7 @@ func TestInvoiceExpiryWithOnlyExpiredInvoices(t *testing.T) {
test := newInvoiceExpiryWatcherTest(t, testTime, 0, 5) test := newInvoiceExpiryWatcherTest(t, testTime, 0, 5)
for paymentHash, invoice := range test.testData.pendingInvoices { for paymentHash, invoice := range test.testData.pendingInvoices {
test.watcher.AddInvoice(paymentHash, invoice) test.watcher.AddInvoices(makeInvoiceExpiry(paymentHash, invoice))
} }
test.waitForFinish(testTimeout) test.waitForFinish(testTimeout)
@ -141,11 +140,11 @@ func TestInvoiceExpiryWithPendingAndExpiredInvoices(t *testing.T) {
test := newInvoiceExpiryWatcherTest(t, testTime, 5, 5) test := newInvoiceExpiryWatcherTest(t, testTime, 5, 5)
for paymentHash, invoice := range test.testData.expiredInvoices { for paymentHash, invoice := range test.testData.expiredInvoices {
test.watcher.AddInvoice(paymentHash, invoice) test.watcher.AddInvoices(makeInvoiceExpiry(paymentHash, invoice))
} }
for paymentHash, invoice := range test.testData.pendingInvoices { for paymentHash, invoice := range test.testData.pendingInvoices {
test.watcher.AddInvoice(paymentHash, invoice) test.watcher.AddInvoices(makeInvoiceExpiry(paymentHash, invoice))
} }
test.waitForFinish(testTimeout) test.waitForFinish(testTimeout)
@ -158,17 +157,17 @@ func TestInvoiceExpiryWhenAddingMultipleInvoices(t *testing.T) {
t.Parallel() t.Parallel()
test := newInvoiceExpiryWatcherTest(t, testTime, 5, 5) test := newInvoiceExpiryWatcherTest(t, testTime, 5, 5)
invoices := make(map[lntypes.Hash]*channeldb.Invoice) var invoices []*invoiceExpiry
for hash, invoice := range test.testData.expiredInvoices { for hash, invoice := range test.testData.expiredInvoices {
invoices[hash] = invoice invoices = append(invoices, makeInvoiceExpiry(hash, invoice))
} }
for hash, invoice := range test.testData.pendingInvoices { for hash, invoice := range test.testData.pendingInvoices {
invoices[hash] = invoice invoices = append(invoices, makeInvoiceExpiry(hash, invoice))
} }
test.watcher.AddInvoices(invoices) test.watcher.AddInvoices(invoices...)
test.waitForFinish(testTimeout) test.waitForFinish(testTimeout)
test.watcher.Stop() test.watcher.Stop()
test.checkExpectations() test.checkExpectations()

@ -160,7 +160,7 @@ func NewRegistry(cdb *channeldb.DB, expiryWatcher *InvoiceExpiryWatcher,
// invoices. // invoices.
func (i *InvoiceRegistry) scanInvoicesOnStart() error { func (i *InvoiceRegistry) scanInvoicesOnStart() error {
var ( var (
pending map[lntypes.Hash]*channeldb.Invoice pending []*invoiceExpiry
removable []channeldb.InvoiceDeleteRef removable []channeldb.InvoiceDeleteRef
) )
@ -170,7 +170,7 @@ func (i *InvoiceRegistry) scanInvoicesOnStart() error {
// layer needs to retry the View transaction underneath (eg. // layer needs to retry the View transaction underneath (eg.
// using the etcd driver, where all transactions are allowed // using the etcd driver, where all transactions are allowed
// to retry for serializability). // to retry for serializability).
pending = make(map[lntypes.Hash]*channeldb.Invoice) pending = nil
removable = make([]channeldb.InvoiceDeleteRef, 0) removable = make([]channeldb.InvoiceDeleteRef, 0)
} }
@ -178,7 +178,10 @@ func (i *InvoiceRegistry) scanInvoicesOnStart() error {
paymentHash lntypes.Hash, invoice *channeldb.Invoice) error { paymentHash lntypes.Hash, invoice *channeldb.Invoice) error {
if invoice.IsPending() { if invoice.IsPending() {
pending[paymentHash] = invoice expiryRef := makeInvoiceExpiry(paymentHash, invoice)
if expiryRef != nil {
pending = append(pending, expiryRef)
}
} else if i.cfg.GcCanceledInvoicesOnStartup && } else if i.cfg.GcCanceledInvoicesOnStartup &&
invoice.State == channeldb.ContractCanceled { invoice.State == channeldb.ContractCanceled {
@ -208,7 +211,7 @@ func (i *InvoiceRegistry) scanInvoicesOnStart() error {
log.Debugf("Adding %d pending invoices to the expiry watcher", log.Debugf("Adding %d pending invoices to the expiry watcher",
len(pending)) len(pending))
i.expiryWatcher.AddInvoices(pending) i.expiryWatcher.AddInvoices(pending...)
if err := i.cdb.DeleteInvoice(removable); err != nil { if err := i.cdb.DeleteInvoice(removable); err != nil {
log.Warnf("Deleting old invoices failed: %v", err) log.Warnf("Deleting old invoices failed: %v", err)
@ -562,7 +565,10 @@ func (i *InvoiceRegistry) AddInvoice(invoice *channeldb.Invoice,
// InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry // InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
// to avoid deadlock when a new invoice is added while an other is being // to avoid deadlock when a new invoice is added while an other is being
// canceled. // canceled.
i.expiryWatcher.AddInvoice(paymentHash, invoice) invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
if invoiceExpiryRef != nil {
i.expiryWatcher.AddInvoices(invoiceExpiryRef)
}
return addIndex, nil return addIndex, nil
} }