invoices: fix slow startup with many expired invoices

This commit intends to fix slow first startup time when there are many
invoices that need to be canceled. The slowdown is caused by a combination
of adding invoices to the expiry watcher one-by-one and slow
cancellation. Due to slow cancellation and the unbuffered channel which
we use to pass invoices to the expiry watcher blocks the registry.
With this fix we'll instead batch add invoices to the expiry watcher and
thereby won't block the registry startup.
This commit is contained in:
Andras Banki-Horvath 2020-01-23 00:21:12 +01:00
parent a8ec8f2db6
commit fabcdf754a
3 changed files with 108 additions and 34 deletions

@ -49,7 +49,7 @@ type InvoiceExpiryWatcher struct {
// newInvoices channel is used to wake up the main loop when a new invoices // newInvoices channel is used to wake up the main loop when a new invoices
// is added. // is added.
newInvoices chan *invoiceExpiry newInvoices chan []*invoiceExpiry
wg sync.WaitGroup wg sync.WaitGroup
@ -61,7 +61,7 @@ type InvoiceExpiryWatcher struct {
func NewInvoiceExpiryWatcher(clock clock.Clock) *InvoiceExpiryWatcher { func NewInvoiceExpiryWatcher(clock clock.Clock) *InvoiceExpiryWatcher {
return &InvoiceExpiryWatcher{ return &InvoiceExpiryWatcher{
clock: clock, clock: clock,
newInvoices: make(chan *invoiceExpiry), newInvoices: make(chan []*invoiceExpiry),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
} }
@ -102,15 +102,14 @@ func (ew *InvoiceExpiryWatcher) Stop() {
} }
} }
// AddInvoice adds a new invoice to the InvoiceExpiryWatcher. This won't check // prepareInvoice checks if the passed invoice may be canceled and calculates
// if the invoice is already added and will only add invoices with ContractOpen // the expiry time.
// state. func (ew *InvoiceExpiryWatcher) prepareInvoice(
func (ew *InvoiceExpiryWatcher) AddInvoice( paymentHash lntypes.Hash, invoice *channeldb.Invoice) *invoiceExpiry {
paymentHash lntypes.Hash, invoice *channeldb.Invoice) {
if invoice.State != channeldb.ContractOpen { if invoice.State != channeldb.ContractOpen {
log.Debugf("Invoice not added to expiry watcher: %v", invoice) log.Debugf("Invoice not added to expiry watcher: %v", invoice)
return return nil
} }
realExpiry := invoice.Terms.Expiry realExpiry := invoice.Terms.Expiry
@ -119,20 +118,55 @@ func (ew *InvoiceExpiryWatcher) AddInvoice(
} }
expiry := invoice.CreationDate.Add(realExpiry) expiry := invoice.CreationDate.Add(realExpiry)
return &invoiceExpiry{
log.Debugf("Adding invoice '%v' to expiry watcher, expiration: %v",
paymentHash, expiry)
newInvoiceExpiry := &invoiceExpiry{
PaymentHash: paymentHash, PaymentHash: paymentHash,
Expiry: expiry, Expiry: expiry,
} }
}
// AddInvoices adds multiple invoices to the InvoiceExpiryWatcher.
func (ew *InvoiceExpiryWatcher) AddInvoices(
invoices []channeldb.InvoiceWithPaymentHash) {
invoicesWithExpiry := make([]*invoiceExpiry, 0, len(invoices))
for _, invoiceWithPaymentHash := range invoices {
newInvoiceExpiry := ew.prepareInvoice(
invoiceWithPaymentHash.PaymentHash, &invoiceWithPaymentHash.Invoice,
)
if newInvoiceExpiry != nil {
invoicesWithExpiry = append(invoicesWithExpiry, newInvoiceExpiry)
}
}
if len(invoicesWithExpiry) > 0 {
log.Debugf("Added %v invoices to the expiry watcher: %v",
len(invoicesWithExpiry))
select { select {
case ew.newInvoices <- newInvoiceExpiry: case ew.newInvoices <- invoicesWithExpiry:
case <-ew.quit:
// Select on quit too so that callers won't get blocked in case // Select on quit too so that callers won't get blocked in case
// of concurrent shutdown. // of concurrent shutdown.
case <-ew.quit:
}
}
}
// AddInvoice adds a new invoice to the InvoiceExpiryWatcher. This won't check
// if the invoice is already added and will only add invoices with ContractOpen
// state.
func (ew *InvoiceExpiryWatcher) AddInvoice(
paymentHash lntypes.Hash, invoice *channeldb.Invoice) {
newInvoiceExpiry := ew.prepareInvoice(paymentHash, invoice)
if newInvoiceExpiry != nil {
log.Debugf("Adding invoice '%v' to expiry watcher, expiration: %v",
paymentHash, newInvoiceExpiry.Expiry)
select {
case ew.newInvoices <- []*invoiceExpiry{newInvoiceExpiry}:
// Select on quit too so that callers won't get blocked in case
// of concurrent shutdown.
case <-ew.quit:
}
} }
} }
@ -147,13 +181,13 @@ func (ew *InvoiceExpiryWatcher) nextExpiry() <-chan time.Time {
return nil return nil
} }
// cancelExpiredInvoices will cancel all expired invoices and removes them from // cancelNextExpiredInvoice will cancel the next expired invoice and removes
// the expiry queue. // it from the expiry queue.
func (ew *InvoiceExpiryWatcher) cancelExpiredInvoices() { func (ew *InvoiceExpiryWatcher) cancelNextExpiredInvoice() {
for !ew.expiryQueue.Empty() { if !ew.expiryQueue.Empty() {
top := ew.expiryQueue.Top().(*invoiceExpiry) top := ew.expiryQueue.Top().(*invoiceExpiry)
if !top.Expiry.Before(ew.clock.Now()) { if !top.Expiry.Before(ew.clock.Now()) {
break return
} }
err := ew.cancelInvoice(top.PaymentHash) err := ew.cancelInvoice(top.PaymentHash)
@ -174,18 +208,33 @@ func (ew *InvoiceExpiryWatcher) mainLoop() {
for { for {
// Cancel any invoices that may have expired. // Cancel any invoices that may have expired.
ew.cancelExpiredInvoices() ew.cancelNextExpiredInvoice()
select { select {
case <-ew.nextExpiry():
// Wait until the next invoice expires, then cancel expired invoices. case invoicesWithExpiry := <-ew.newInvoices:
// Take newly forwarded invoices with higher priority
// in order to not block the newInvoices channel.
for _, invoiceWithExpiry := range invoicesWithExpiry {
ew.expiryQueue.Push(invoiceWithExpiry)
}
continue continue
case newInvoiceExpiry := <-ew.newInvoices: default:
ew.expiryQueue.Push(newInvoiceExpiry) select {
case <-ew.nextExpiry():
// Wait until the next invoice expires.
continue
case invoicesWithExpiry := <-ew.newInvoices:
for _, invoiceWithExpiry := range invoicesWithExpiry {
ew.expiryQueue.Push(invoiceWithExpiry)
}
case <-ew.quit: case <-ew.quit:
return return
} }
} }
}
} }

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
) )
@ -123,3 +124,31 @@ func TestInvoiceExpiryWithPendingAndExpiredInvoices(t *testing.T) {
test.watcher.Stop() test.watcher.Stop()
test.checkExpectations() test.checkExpectations()
} }
// Tests adding multiple invoices at once.
func TestInvoiceExpiryWhenAddingMultipleInvoices(t *testing.T) {
t.Parallel()
test := newInvoiceExpiryWatcherTest(t, testTime, 5, 5)
var invoices []channeldb.InvoiceWithPaymentHash
for hash, invoice := range test.testData.expiredInvoices {
invoices = append(invoices,
channeldb.InvoiceWithPaymentHash{
Invoice: *invoice,
PaymentHash: hash,
},
)
}
for hash, invoice := range test.testData.pendingInvoices {
invoices = append(invoices,
channeldb.InvoiceWithPaymentHash{
Invoice: *invoice,
PaymentHash: hash,
},
)
}
test.watcher.AddInvoices(invoices)
time.Sleep(testTimeout)
test.watcher.Stop()
test.checkExpectations()
}

@ -198,12 +198,8 @@ func (i *InvoiceRegistry) populateExpiryWatcher() error {
return err return err
} }
for idx := range pendingInvoices { log.Debugf("Adding %v pending invoices to the expiry watcher")
i.expiryWatcher.AddInvoice( i.expiryWatcher.AddInvoices(pendingInvoices)
pendingInvoices[idx].PaymentHash, &pendingInvoices[idx].Invoice,
)
}
return nil return nil
} }