Merge pull request #3950 from bhandras/invoice-expiry-startup-fix

invoices: fix slow startup with many expired invoices
This commit is contained in:
Olaoluwa Osuntokun 2020-01-24 16:49:43 -08:00 committed by GitHub
commit e79897e651
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 34 deletions

@ -49,7 +49,7 @@ type InvoiceExpiryWatcher struct {
// newInvoices channel is used to wake up the main loop when a new invoices // newInvoices channel is used to wake up the main loop when a new invoices
// is added. // is added.
newInvoices chan *invoiceExpiry newInvoices chan []*invoiceExpiry
wg sync.WaitGroup wg sync.WaitGroup
@ -61,7 +61,7 @@ type InvoiceExpiryWatcher struct {
func NewInvoiceExpiryWatcher(clock clock.Clock) *InvoiceExpiryWatcher { func NewInvoiceExpiryWatcher(clock clock.Clock) *InvoiceExpiryWatcher {
return &InvoiceExpiryWatcher{ return &InvoiceExpiryWatcher{
clock: clock, clock: clock,
newInvoices: make(chan *invoiceExpiry), newInvoices: make(chan []*invoiceExpiry),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
} }
@ -102,15 +102,14 @@ func (ew *InvoiceExpiryWatcher) Stop() {
} }
} }
// AddInvoice adds a new invoice to the InvoiceExpiryWatcher. This won't check // prepareInvoice checks if the passed invoice may be canceled and calculates
// if the invoice is already added and will only add invoices with ContractOpen // the expiry time.
// state. func (ew *InvoiceExpiryWatcher) prepareInvoice(
func (ew *InvoiceExpiryWatcher) AddInvoice( paymentHash lntypes.Hash, invoice *channeldb.Invoice) *invoiceExpiry {
paymentHash lntypes.Hash, invoice *channeldb.Invoice) {
if invoice.State != channeldb.ContractOpen { if invoice.State != channeldb.ContractOpen {
log.Debugf("Invoice not added to expiry watcher: %v", invoice) log.Debugf("Invoice not added to expiry watcher: %v", invoice)
return return nil
} }
realExpiry := invoice.Terms.Expiry realExpiry := invoice.Terms.Expiry
@ -119,20 +118,55 @@ func (ew *InvoiceExpiryWatcher) AddInvoice(
} }
expiry := invoice.CreationDate.Add(realExpiry) expiry := invoice.CreationDate.Add(realExpiry)
return &invoiceExpiry{
log.Debugf("Adding invoice '%v' to expiry watcher, expiration: %v",
paymentHash, expiry)
newInvoiceExpiry := &invoiceExpiry{
PaymentHash: paymentHash, PaymentHash: paymentHash,
Expiry: expiry, Expiry: expiry,
} }
}
select { // AddInvoices adds multiple invoices to the InvoiceExpiryWatcher.
case ew.newInvoices <- newInvoiceExpiry: func (ew *InvoiceExpiryWatcher) AddInvoices(
case <-ew.quit: invoices []channeldb.InvoiceWithPaymentHash) {
invoicesWithExpiry := make([]*invoiceExpiry, 0, len(invoices))
for _, invoiceWithPaymentHash := range invoices {
newInvoiceExpiry := ew.prepareInvoice(
invoiceWithPaymentHash.PaymentHash, &invoiceWithPaymentHash.Invoice,
)
if newInvoiceExpiry != nil {
invoicesWithExpiry = append(invoicesWithExpiry, newInvoiceExpiry)
}
}
if len(invoicesWithExpiry) > 0 {
log.Debugf("Added %v invoices to the expiry watcher: %v",
len(invoicesWithExpiry))
select {
case ew.newInvoices <- invoicesWithExpiry:
// Select on quit too so that callers won't get blocked in case // Select on quit too so that callers won't get blocked in case
// of concurrent shutdown. // of concurrent shutdown.
case <-ew.quit:
}
}
}
// AddInvoice adds a new invoice to the InvoiceExpiryWatcher. This won't check
// if the invoice is already added and will only add invoices with ContractOpen
// state.
func (ew *InvoiceExpiryWatcher) AddInvoice(
paymentHash lntypes.Hash, invoice *channeldb.Invoice) {
newInvoiceExpiry := ew.prepareInvoice(paymentHash, invoice)
if newInvoiceExpiry != nil {
log.Debugf("Adding invoice '%v' to expiry watcher, expiration: %v",
paymentHash, newInvoiceExpiry.Expiry)
select {
case ew.newInvoices <- []*invoiceExpiry{newInvoiceExpiry}:
// Select on quit too so that callers won't get blocked in case
// of concurrent shutdown.
case <-ew.quit:
}
} }
} }
@ -147,13 +181,13 @@ func (ew *InvoiceExpiryWatcher) nextExpiry() <-chan time.Time {
return nil return nil
} }
// cancelExpiredInvoices will cancel all expired invoices and removes them from // cancelNextExpiredInvoice will cancel the next expired invoice and removes
// the expiry queue. // it from the expiry queue.
func (ew *InvoiceExpiryWatcher) cancelExpiredInvoices() { func (ew *InvoiceExpiryWatcher) cancelNextExpiredInvoice() {
for !ew.expiryQueue.Empty() { if !ew.expiryQueue.Empty() {
top := ew.expiryQueue.Top().(*invoiceExpiry) top := ew.expiryQueue.Top().(*invoiceExpiry)
if !top.Expiry.Before(ew.clock.Now()) { if !top.Expiry.Before(ew.clock.Now()) {
break return
} }
err := ew.cancelInvoice(top.PaymentHash) err := ew.cancelInvoice(top.PaymentHash)
@ -174,18 +208,33 @@ func (ew *InvoiceExpiryWatcher) mainLoop() {
for { for {
// Cancel any invoices that may have expired. // Cancel any invoices that may have expired.
ew.cancelExpiredInvoices() ew.cancelNextExpiredInvoice()
select { select {
case <-ew.nextExpiry():
// Wait until the next invoice expires, then cancel expired invoices. case invoicesWithExpiry := <-ew.newInvoices:
// Take newly forwarded invoices with higher priority
// in order to not block the newInvoices channel.
for _, invoiceWithExpiry := range invoicesWithExpiry {
ew.expiryQueue.Push(invoiceWithExpiry)
}
continue continue
case newInvoiceExpiry := <-ew.newInvoices: default:
ew.expiryQueue.Push(newInvoiceExpiry) select {
case <-ew.quit: case <-ew.nextExpiry():
return // Wait until the next invoice expires.
continue
case invoicesWithExpiry := <-ew.newInvoices:
for _, invoiceWithExpiry := range invoicesWithExpiry {
ew.expiryQueue.Push(invoiceWithExpiry)
}
case <-ew.quit:
return
}
} }
} }
} }

@ -4,6 +4,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
) )
@ -123,3 +124,31 @@ func TestInvoiceExpiryWithPendingAndExpiredInvoices(t *testing.T) {
test.watcher.Stop() test.watcher.Stop()
test.checkExpectations() test.checkExpectations()
} }
// Tests adding multiple invoices at once.
func TestInvoiceExpiryWhenAddingMultipleInvoices(t *testing.T) {
t.Parallel()
test := newInvoiceExpiryWatcherTest(t, testTime, 5, 5)
var invoices []channeldb.InvoiceWithPaymentHash
for hash, invoice := range test.testData.expiredInvoices {
invoices = append(invoices,
channeldb.InvoiceWithPaymentHash{
Invoice: *invoice,
PaymentHash: hash,
},
)
}
for hash, invoice := range test.testData.pendingInvoices {
invoices = append(invoices,
channeldb.InvoiceWithPaymentHash{
Invoice: *invoice,
PaymentHash: hash,
},
)
}
test.watcher.AddInvoices(invoices)
time.Sleep(testTimeout)
test.watcher.Stop()
test.checkExpectations()
}

@ -198,12 +198,8 @@ func (i *InvoiceRegistry) populateExpiryWatcher() error {
return err return err
} }
for idx := range pendingInvoices { log.Debugf("Adding %v pending invoices to the expiry watcher")
i.expiryWatcher.AddInvoice( i.expiryWatcher.AddInvoices(pendingInvoices)
pendingInvoices[idx].PaymentHash, &pendingInvoices[idx].Invoice,
)
}
return nil return nil
} }