Browse Source

multi: add height-based invoice expiry

This commit adds height-based invoice expiry for hodl invoices
that have active htlcs. This allows us to cancel our intentionally
held htlcs before channels are force closed. We only add this for
hodl invoices because we expect regular invoices to automatically
be resolved.

We still keep hodl invoices in the time-based expiry queue,
because we want to expire open invoices that reach their timeout
before any htlcs are added. Since htlcs are added after the
invoice is created, we add new htlcs as they arrive in the
invoice registry. In this commit, we allow adding of duplicate
entries for an invoice to be added to the expiry queue as each
htlc arrives to keep implementation simple. Our cancellation
logic can already handle the case where an entry is already
canceled, so this is ok.
master
carla 3 years ago
parent
commit
34de5922ed
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
  1. 17
      config.go
  2. 19
      htlcswitch/mock.go
  3. 209
      invoices/invoice_expiry_watcher.go
  4. 35
      invoices/invoice_expiry_watcher_test.go
  5. 10
      invoices/invoiceregistry.go
  6. 19
      invoices/invoiceregistry_test.go
  7. 4
      invoices/test_utils_test.go
  8. 12
      lncfg/invoices.go
  9. 20
      sample-lnd.conf
  10. 15
      server.go

17
config.go

@ -344,6 +344,8 @@ type Config struct {
GcCanceledInvoicesOnTheFly bool `long:"gc-canceled-invoices-on-the-fly" description:"If true, we'll delete newly canceled invoices on the fly."`
Invoices *lncfg.Invoices `group:"invoices" namespace:"invoices"`
Routing *lncfg.Routing `group:"routing" namespace:"routing"`
Gossip *lncfg.Gossip `group:"gossip" namespace:"gossip"`
@ -529,6 +531,9 @@ func DefaultConfig() Config {
MaxChannelUpdateBurst: discovery.DefaultMaxChannelUpdateBurst,
ChannelUpdateInterval: discovery.DefaultChannelUpdateInterval,
},
Invoices: &lncfg.Invoices{
HoldExpiryDelta: lncfg.DefaultHoldInvoiceExpiryDelta,
},
MaxOutgoingCltvExpiry: htlcswitch.DefaultMaxOutgoingCltvExpiry,
MaxChannelFeeAllocation: htlcswitch.DefaultMaxLinkFeeAllocation,
MaxCommitFeeRateAnchors: lnwallet.DefaultAnchorsCommitMaxFeeRateSatPerVByte,
@ -1369,6 +1374,18 @@ func ValidateConfig(cfg Config, usageMessage string,
return nil, err
}
// Log a warning if our expiry delta is not greater than our incoming
// broadcast delta. We do not fail here because this value may be set
// to zero to intentionally keep lnd's behavior unchanged from when we
// didn't auto-cancel these invoices.
if cfg.Invoices.HoldExpiryDelta <= lncfg.DefaultIncomingBroadcastDelta {
ltndLog.Warnf("Invoice hold expiry delta: %v <= incoming "+
"delta: %v, accepted hold invoices will force close "+
"channels if they are not canceled manually",
cfg.Invoices.HoldExpiryDelta,
lncfg.DefaultIncomingBroadcastDelta)
}
// Validate the subconfigs for workers, caches, and the tower client.
err = lncfg.Validate(
cfg.Workers,

19
htlcswitch/mock.go

@ -797,6 +797,20 @@ type mockInvoiceRegistry struct {
cleanup func()
}
type mockChainNotifier struct {
chainntnfs.ChainNotifier
}
// RegisterBlockEpochNtfn mocks a successful call to register block
// notifications.
func (m *mockChainNotifier) RegisterBlockEpochNtfn(*chainntnfs.BlockEpoch) (
*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{
Cancel: func() {},
}, nil
}
func newMockRegistry(minDelta uint32) *mockInvoiceRegistry {
cdb, cleanup, err := newDB()
if err != nil {
@ -805,7 +819,10 @@ func newMockRegistry(minDelta uint32) *mockInvoiceRegistry {
registry := invoices.NewRegistry(
cdb,
invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()),
invoices.NewInvoiceExpiryWatcher(
clock.NewDefaultClock(), 0, 0, nil,
&mockChainNotifier{},
),
&invoices.RegistryConfig{
FinalCltvRejectDelta: 5,
},

209
invoices/invoice_expiry_watcher.go

@ -5,6 +5,8 @@ import (
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes"
@ -34,6 +36,28 @@ func (e invoiceExpiryTs) Less(other queue.PriorityQueueItem) bool {
return e.Expiry.Before(other.(*invoiceExpiryTs).Expiry)
}
// Compile time assertion that invoiceExpiryHeight implements invoiceExpiry.
var _ invoiceExpiry = (*invoiceExpiryHeight)(nil)
// invoiceExpiryHeight holds information about an invoice which can be used to
// cancel it based on its expiry height.
type invoiceExpiryHeight struct {
paymentHash lntypes.Hash
expiryHeight uint32
}
// Less implements PriorityQueueItem.Less such that the top item in the
// priority queue is the lowest block height.
func (b invoiceExpiryHeight) Less(other queue.PriorityQueueItem) bool {
return b.expiryHeight < other.(*invoiceExpiryHeight).expiryHeight
}
// expired returns a boolean that indicates whether this entry has expired,
// taking our expiry delta into account.
func (b invoiceExpiryHeight) expired(currentHeight, delta uint32) bool {
return currentHeight+delta >= b.expiryHeight
}
// InvoiceExpiryWatcher handles automatic invoice cancellation of expried
// invoices. Upon start InvoiceExpiryWatcher will retrieve all pending (not yet
// settled or canceled) invoices invoices to its watcing queue. When a new
@ -49,6 +73,21 @@ type InvoiceExpiryWatcher struct {
// It is useful for testing.
clock clock.Clock
// notifier provides us with block height updates.
notifier chainntnfs.ChainNotifier
// blockExpiryDelta is the number of blocks before a htlc's expiry that
// we expire the invoice based on expiry height. We use a delta because
// we will go to some delta before our expiry, so we want to cancel
// before this to prevent force closes.
blockExpiryDelta uint32
// currentHeight is the current block height.
currentHeight uint32
// currentHash is the block hash for our current height.
currentHash *chainhash.Hash
// cancelInvoice is a template method that cancels an expired invoice.
cancelInvoice func(lntypes.Hash, bool) error
@ -56,6 +95,15 @@ type InvoiceExpiryWatcher struct {
// the next invoice to expire.
timestampExpiryQueue queue.PriorityQueue
// blockExpiryQueue holds blockExpiry items and is used to find the
// next invoice to expire based on block height. Only hold invoices
// with active htlcs are added to this queue, because they require
// manual cancellation when the hltc is going to time out. Items in
// this queue may already be in the timestampExpiryQueue, this is ok
// because they will not be expired based on timestamp if they have
// active htlcs.
blockExpiryQueue queue.PriorityQueue
// newInvoices channel is used to wake up the main loop when a new
// invoices is added.
newInvoices chan []invoiceExpiry
@ -67,11 +115,18 @@ type InvoiceExpiryWatcher struct {
}
// NewInvoiceExpiryWatcher creates a new InvoiceExpiryWatcher instance.
func NewInvoiceExpiryWatcher(clock clock.Clock) *InvoiceExpiryWatcher {
func NewInvoiceExpiryWatcher(clock clock.Clock,
expiryDelta, startHeight uint32, startHash *chainhash.Hash,
notifier chainntnfs.ChainNotifier) *InvoiceExpiryWatcher {
return &InvoiceExpiryWatcher{
clock: clock,
newInvoices: make(chan []invoiceExpiry),
quit: make(chan struct{}),
clock: clock,
notifier: notifier,
blockExpiryDelta: expiryDelta,
currentHeight: startHeight,
currentHash: startHash,
newInvoices: make(chan []invoiceExpiry),
quit: make(chan struct{}),
}
}
@ -91,8 +146,17 @@ func (ew *InvoiceExpiryWatcher) Start(
ew.started = true
ew.cancelInvoice = cancelInvoice
ntfn, err := ew.notifier.RegisterBlockEpochNtfn(&chainntnfs.BlockEpoch{
Height: int32(ew.currentHeight),
Hash: ew.currentHash,
})
if err != nil {
return err
}
ew.wg.Add(1)
go ew.mainLoop()
go ew.mainLoop(ntfn)
return nil
}
@ -122,6 +186,32 @@ func makeInvoiceExpiry(paymentHash lntypes.Hash,
case channeldb.ContractOpen:
return makeTimestampExpiry(paymentHash, invoice)
// If an invoice has active htlcs, we want to expire it based on block
// height. We only do this for hodl invoices, since regular invoices
// should resolve themselves automatically.
case channeldb.ContractAccepted:
if !invoice.HodlInvoice {
log.Debugf("Invoice in accepted state not added to "+
"expiry watcher: %v", paymentHash)
return nil
}
var minHeight uint32
for _, htlc := range invoice.Htlcs {
// We only care about accepted htlcs, since they will
// trigger force-closes.
if htlc.State != channeldb.HtlcStateAccepted {
continue
}
if minHeight == 0 || htlc.Expiry < minHeight {
minHeight = htlc.Expiry
}
}
return makeHeightExpiry(paymentHash, minHeight)
default:
log.Debugf("Invoice not added to expiry watcher: %v",
paymentHash)
@ -151,18 +241,36 @@ func makeTimestampExpiry(paymentHash lntypes.Hash,
}
}
// makeHeightExpiry creates height-based expiry for an invoice based on its
// lowest htlc expiry height.
func makeHeightExpiry(paymentHash lntypes.Hash,
minHeight uint32) *invoiceExpiryHeight {
if minHeight == 0 {
log.Warnf("make height expiry called with 0 height")
return nil
}
return &invoiceExpiryHeight{
paymentHash: paymentHash,
expiryHeight: minHeight,
}
}
// AddInvoices adds invoices to the InvoiceExpiryWatcher.
func (ew *InvoiceExpiryWatcher) AddInvoices(invoices ...invoiceExpiry) {
if len(invoices) > 0 {
select {
case ew.newInvoices <- invoices:
log.Debugf("Added %d invoices to the expiry watcher",
len(invoices))
if len(invoices) == 0 {
return
}
// Select on quit too so that callers won't get blocked in case
// of concurrent shutdown.
case <-ew.quit:
}
select {
case ew.newInvoices <- invoices:
log.Debugf("Added %d invoices to the expiry watcher",
len(invoices))
// Select on quit too so that callers won't get blocked in case
// of concurrent shutdown.
case <-ew.quit:
}
}
@ -178,6 +286,23 @@ func (ew *InvoiceExpiryWatcher) nextTimestampExpiry() <-chan time.Time {
return nil
}
// nextHeightExpiry returns a channel that will immediately be read from if
// the top item on our queue has expired.
func (ew *InvoiceExpiryWatcher) nextHeightExpiry() <-chan uint32 {
if ew.blockExpiryQueue.Empty() {
return nil
}
top := ew.blockExpiryQueue.Top().(*invoiceExpiryHeight)
if !top.expired(ew.currentHeight, ew.blockExpiryDelta) {
return nil
}
blockChan := make(chan uint32, 1)
blockChan <- top.expiryHeight
return blockChan
}
// cancelNextExpiredInvoice will cancel the next expired invoice and removes
// it from the expiry queue.
func (ew *InvoiceExpiryWatcher) cancelNextExpiredInvoice() {
@ -198,6 +323,25 @@ func (ew *InvoiceExpiryWatcher) cancelNextExpiredInvoice() {
}
}
// cancelNextHeightExpiredInvoice looks at our height based queue and expires
// the next invoice if we have reached its expiry block.
func (ew *InvoiceExpiryWatcher) cancelNextHeightExpiredInvoice() {
if ew.blockExpiryQueue.Empty() {
return
}
top := ew.blockExpiryQueue.Top().(*invoiceExpiryHeight)
if !top.expired(ew.currentHeight, ew.blockExpiryDelta) {
return
}
// We always force-cancel block-based expiry so that we can
// cancel invoices that have been accepted but not yet resolved.
// This helps us avoid force closes.
ew.expireInvoice(top.paymentHash, true)
ew.blockExpiryQueue.Pop()
}
// expireInvoice attempts to expire an invoice and logs an error if we get an
// unexpected error.
func (ew *InvoiceExpiryWatcher) expireInvoice(hash lntypes.Hash, force bool) {
@ -226,6 +370,11 @@ func (ew *InvoiceExpiryWatcher) pushInvoices(invoices []invoiceExpiry) {
ew.timestampExpiryQueue.Push(expiry)
}
case *invoiceExpiryHeight:
if expiry != nil {
ew.blockExpiryQueue.Push(expiry)
}
default:
log.Errorf("unexpected queue item: %T", inv)
}
@ -234,12 +383,20 @@ func (ew *InvoiceExpiryWatcher) pushInvoices(invoices []invoiceExpiry) {
// mainLoop is a goroutine that receives new invoices and handles cancellation
// of expired invoices.
func (ew *InvoiceExpiryWatcher) mainLoop() {
defer ew.wg.Done()
func (ew *InvoiceExpiryWatcher) mainLoop(blockNtfns *chainntnfs.BlockEpochEvent) {
defer func() {
blockNtfns.Cancel()
ew.wg.Done()
}()
// We have two different queues, so we use a different cancel method
// depending on which expiry condition we have hit. Starting with time
// based expiry is an arbitrary choice to start off.
cancelNext := ew.cancelNextExpiredInvoice
for {
// Cancel any invoices that may have expired.
ew.cancelNextExpiredInvoice()
cancelNext()
select {
@ -252,13 +409,29 @@ func (ew *InvoiceExpiryWatcher) mainLoop() {
default:
select {
// Wait until the next invoice expires.
case <-ew.nextTimestampExpiry():
// Wait until the next invoice expires.
cancelNext = ew.cancelNextExpiredInvoice
continue
case <-ew.nextHeightExpiry():
cancelNext = ew.cancelNextHeightExpiredInvoice
continue
case newInvoices := <-ew.newInvoices:
ew.pushInvoices(newInvoices)
// Consume new blocks.
case block, ok := <-blockNtfns.Epochs:
if !ok {
log.Debugf("block notifications " +
"canceled")
return
}
ew.currentHeight = uint32(block.Height)
ew.currentHash = block.Hash
case <-ew.quit:
return
}

35
invoices/invoice_expiry_watcher_test.go

@ -5,6 +5,7 @@ import (
"testing"
"time"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes"
)
@ -19,13 +20,40 @@ type invoiceExpiryWatcherTest struct {
canceledInvoices []lntypes.Hash
}
type mockChainNotifier struct {
chainntnfs.ChainNotifier
blockChan chan *chainntnfs.BlockEpoch
}
func newMockNotifier() *mockChainNotifier {
return &mockChainNotifier{
blockChan: make(chan *chainntnfs.BlockEpoch),
}
}
// RegisterBlockEpochNtfn mocks a block epoch notification, using the mock's
// block channel to deliver blocks to the client.
func (m *mockChainNotifier) RegisterBlockEpochNtfn(*chainntnfs.BlockEpoch) (
*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{
Epochs: m.blockChan,
Cancel: func() {},
}, nil
}
// newInvoiceExpiryWatcherTest creates a new InvoiceExpiryWatcher test fixture
// and sets up the test environment.
func newInvoiceExpiryWatcherTest(t *testing.T, now time.Time,
numExpiredInvoices, numPendingInvoices int) *invoiceExpiryWatcherTest {
mockNotifier := newMockNotifier()
test := &invoiceExpiryWatcherTest{
watcher: NewInvoiceExpiryWatcher(clock.NewTestClock(testTime)),
watcher: NewInvoiceExpiryWatcher(
clock.NewTestClock(testTime), 0,
uint32(testCurrentHeight), nil, mockNotifier,
),
testData: generateInvoiceExpiryTestData(
t, now, 0, numExpiredInvoices, numPendingInvoices,
),
@ -84,7 +112,10 @@ func (t *invoiceExpiryWatcherTest) checkExpectations() {
// Tests that InvoiceExpiryWatcher can be started and stopped.
func TestInvoiceExpiryWatcherStartStop(t *testing.T) {
watcher := NewInvoiceExpiryWatcher(clock.NewTestClock(testTime))
watcher := NewInvoiceExpiryWatcher(
clock.NewTestClock(testTime), 0, uint32(testCurrentHeight), nil,
newMockNotifier(),
)
cancel := func(lntypes.Hash, bool) error {
t.Fatalf("unexpected call")
return nil

10
invoices/invoiceregistry.go

@ -1101,6 +1101,16 @@ func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
}
// If we have fully accepted the set of htlcs for this invoice,
// we can now add it to our invoice expiry watcher. We do not
// add invoices before they are fully accepted, because it is
// possible that we MppTimeout the htlcs, and then our relevant
// expiry height could change.
if res.outcome == resultAccepted {
expiry := makeInvoiceExpiry(ctx.hash, invoice)
i.expiryWatcher.AddInvoices(expiry)
}
i.hodlSubscribe(hodlChan, ctx.circuitKey)
default:

19
invoices/invoiceregistry_test.go

@ -352,7 +352,11 @@ func TestSettleHoldInvoice(t *testing.T) {
FinalCltvRejectDelta: testFinalCltvRejectDelta,
Clock: clock.NewTestClock(testTime),
}
registry := NewRegistry(cdb, NewInvoiceExpiryWatcher(cfg.Clock), &cfg)
expiryWatcher := NewInvoiceExpiryWatcher(
cfg.Clock, 0, uint32(testCurrentHeight), nil, newMockNotifier(),
)
registry := NewRegistry(cdb, expiryWatcher, &cfg)
err = registry.Start()
if err != nil {
@ -521,7 +525,10 @@ func TestCancelHoldInvoice(t *testing.T) {
FinalCltvRejectDelta: testFinalCltvRejectDelta,
Clock: clock.NewTestClock(testTime),
}
registry := NewRegistry(cdb, NewInvoiceExpiryWatcher(cfg.Clock), &cfg)
expiryWatcher := NewInvoiceExpiryWatcher(
cfg.Clock, 0, uint32(testCurrentHeight), nil, newMockNotifier(),
)
registry := NewRegistry(cdb, expiryWatcher, &cfg)
err = registry.Start()
if err != nil {
@ -946,7 +953,9 @@ func TestInvoiceExpiryWithRegistry(t *testing.T) {
Clock: testClock,
}
expiryWatcher := NewInvoiceExpiryWatcher(cfg.Clock)
expiryWatcher := NewInvoiceExpiryWatcher(
cfg.Clock, 0, uint32(testCurrentHeight), nil, newMockNotifier(),
)
registry := NewRegistry(cdb, expiryWatcher, &cfg)
// First prefill the Channel DB with some pre-existing invoices,
@ -1049,7 +1058,9 @@ func TestOldInvoiceRemovalOnStart(t *testing.T) {
GcCanceledInvoicesOnStartup: true,
}
expiryWatcher := NewInvoiceExpiryWatcher(cfg.Clock)
expiryWatcher := NewInvoiceExpiryWatcher(
cfg.Clock, 0, uint32(testCurrentHeight), nil, newMockNotifier(),
)
registry := NewRegistry(cdb, expiryWatcher, &cfg)
// First prefill the Channel DB with some pre-existing expired invoices.

4
invoices/test_utils_test.go

@ -193,7 +193,9 @@ func newTestContext(t *testing.T) *testContext {
t.Fatal(err)
}
expiryWatcher := NewInvoiceExpiryWatcher(clock)
expiryWatcher := NewInvoiceExpiryWatcher(
clock, 0, uint32(testCurrentHeight), nil, newMockNotifier(),
)
// Instantiate and start the invoice ctx.registry.
cfg := RegistryConfig{

12
lncfg/invoices.go

@ -0,0 +1,12 @@
package lncfg
// DefaultHoldInvoiceExpiryDelta defines the number of blocks before the expiry
// height of a hold invoice's htlc that lnd will automatically cancel the
// invoice to prevent the channel from force closing. This value *must* be
// greater than DefaultIncomingBroadcastDelta to prevent force closes.
const DefaultHoldInvoiceExpiryDelta = DefaultIncomingBroadcastDelta + 2
// Invoices holds the configuration options for invoices.
type Invoices struct {
HoldExpiryDelta uint32 `long:"holdexpirydelta" description:"The number of blocks before a hold invoice's htlc expires that the invoice should be canceled to prevent a force close. Force closes will not be prevented if this value is not greater than DefaultIncomingBroadcastDelta."`
}

20
sample-lnd.conf

@ -1120,3 +1120,23 @@ litecoin.node=ltcd
; will accept over the channel update interval.
; gossip.max-channel-update-burst=10
; gossip.channel-update-interval=1m
[invoices]
; If a hold invoice has accepted htlcs that reach their expiry height and are
; not timed out, the channel holding the htlc is force closed to resolve the
; invoice's htlcs. To prevent force closes, lnd automatically cancels these
; invoices before they reach their expiry height.
;
; Hold expiry delta describes the number of blocks before expiry that these
; invoices should be canceled. Setting this value to 0 will ensure that hold
; invoices can be settled right up until their expiry height, but will result
; in the channel they are on being force closed if they are not resolved before
; expiry.
;
; Lnd goes to chain before the expiry for a htlc is reached so that there is
; time to resolve it on chain. This value needs to be greater than the
; DefaultIncomingBroadcastDelta set by lnd, otherwise the channel will be force
; closed anyway. A warning will be logged on startup if this value is not large
; enough to prevent force closes.
;
; invoices.holdexpirydelta=15

15
server.go

@ -442,11 +442,6 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
readPool: readPool,
chansToRestore: chansToRestore,
invoices: invoices.NewRegistry(
remoteChanDB, invoices.NewInvoiceExpiryWatcher(clock.NewDefaultClock()),
&registryConfig,
),
channelNotifier: channelnotifier.New(remoteChanDB),
identityECDH: nodeKeyECDH,
@ -483,11 +478,19 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
subscribers: make(map[uint64]*preimageSubscriber),
}
_, currentHeight, err := s.cc.ChainIO.GetBestBlock()
currentHash, currentHeight, err := s.cc.ChainIO.GetBestBlock()
if err != nil {
return nil, err
}
expiryWatcher := invoices.NewInvoiceExpiryWatcher(
clock.NewDefaultClock(), cfg.Invoices.HoldExpiryDelta,
uint32(currentHeight), currentHash, cc.ChainNotifier,
)
s.invoices = invoices.NewRegistry(
remoteChanDB, expiryWatcher, &registryConfig,
)
s.htlcNotifier = htlcswitch.NewHtlcNotifier(time.Now)
s.htlcSwitch, err = htlcswitch.New(htlcswitch.Config{

Loading…
Cancel
Save