htlcswitch+invoices: circuit key based hodl notifications

This commit modifies hodl htlc notification from invoice registry from a
single notification per hash to distinct notifications per htlc. This
prepares for htlc-specific information (accept height) to be added to the
notification.
This commit is contained in:
Joost Jager 2019-08-14 21:11:34 +02:00
parent 49a20a87a2
commit 4e140213f9
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
3 changed files with 102 additions and 99 deletions

@ -18,8 +18,9 @@ const (
) )
var ( var (
testResPreimage = lntypes.Preimage{1, 2, 3} testResPreimage = lntypes.Preimage{1, 2, 3}
testResHash = testResPreimage.Hash() testResHash = testResPreimage.Hash()
testResCircuitKey = channeldb.CircuitKey{}
) )
// TestHtlcIncomingResolverFwdPreimageKnown tests resolution of a forwarded htlc // TestHtlcIncomingResolverFwdPreimageKnown tests resolution of a forwarded htlc
@ -92,8 +93,8 @@ func TestHtlcIncomingResolverExitSettle(t *testing.T) {
ctx := newIncomingResolverTestContext(t) ctx := newIncomingResolverTestContext(t)
ctx.registry.notifyEvent = &invoices.HodlEvent{ ctx.registry.notifyEvent = &invoices.HodlEvent{
Hash: testResHash, CircuitKey: testResCircuitKey,
Preimage: &testResPreimage, Preimage: &testResPreimage,
} }
ctx.resolve() ctx.resolve()
@ -116,7 +117,7 @@ func TestHtlcIncomingResolverExitCancel(t *testing.T) {
ctx := newIncomingResolverTestContext(t) ctx := newIncomingResolverTestContext(t)
ctx.registry.notifyEvent = &invoices.HodlEvent{ ctx.registry.notifyEvent = &invoices.HodlEvent{
Hash: testResHash, CircuitKey: testResCircuitKey,
} }
ctx.resolve() ctx.resolve()
ctx.waitForResult(false) ctx.waitForResult(false)
@ -133,8 +134,8 @@ func TestHtlcIncomingResolverExitSettleHodl(t *testing.T) {
notifyData := <-ctx.registry.notifyChan notifyData := <-ctx.registry.notifyChan
notifyData.hodlChan <- invoices.HodlEvent{ notifyData.hodlChan <- invoices.HodlEvent{
Hash: testResHash, CircuitKey: testResCircuitKey,
Preimage: &testResPreimage, Preimage: &testResPreimage,
} }
ctx.waitForResult(true) ctx.waitForResult(true)
@ -162,7 +163,7 @@ func TestHtlcIncomingResolverExitCancelHodl(t *testing.T) {
ctx.resolve() ctx.resolve()
notifyData := <-ctx.registry.notifyChan notifyData := <-ctx.registry.notifyChan
notifyData.hodlChan <- invoices.HodlEvent{ notifyData.hodlChan <- invoices.HodlEvent{
Hash: testResHash, CircuitKey: testResCircuitKey,
} }
ctx.waitForResult(false) ctx.waitForResult(false)
} }

@ -364,9 +364,9 @@ type channelLink struct {
// registry. // registry.
hodlQueue *queue.ConcurrentQueue hodlQueue *queue.ConcurrentQueue
// hodlMap stores a list of htlc data structs per hash. It allows // hodlMap stores related htlc data for a circuit key. It allows
// resolving those htlcs when we receive a message on hodlQueue. // resolving those htlcs when we receive a message on hodlQueue.
hodlMap map[lntypes.Hash][]hodlHtlc hodlMap map[channeldb.CircuitKey]hodlHtlc
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
@ -391,7 +391,7 @@ func NewChannelLink(cfg ChannelLinkConfig,
logCommitTimer: time.NewTimer(300 * time.Millisecond), logCommitTimer: time.NewTimer(300 * time.Millisecond),
overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2), overflowQueue: newPacketQueue(input.MaxHTLCNumber / 2),
htlcUpdates: make(chan *contractcourt.ContractUpdate), htlcUpdates: make(chan *contractcourt.ContractUpdate),
hodlMap: make(map[lntypes.Hash][]hodlHtlc), hodlMap: make(map[channeldb.CircuitKey]hodlHtlc),
hodlQueue: queue.NewConcurrentQueue(10), hodlQueue: queue.NewConcurrentQueue(10),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -1151,10 +1151,21 @@ func (l *channelLink) processHodlQueue(firstHodlEvent invoices.HodlEvent) error
hodlEvent := firstHodlEvent hodlEvent := firstHodlEvent
loop: loop:
for { for {
if err := l.processHodlMapEvent(hodlEvent); err != nil { // Lookup all hodl htlcs that can be failed or settled with this event.
// The hodl htlc must be present in the map.
circuitKey := hodlEvent.CircuitKey
hodlHtlc, ok := l.hodlMap[circuitKey]
if !ok {
return fmt.Errorf("hodl htlc not found: %v", circuitKey)
}
if err := l.processHodlEvent(hodlEvent, hodlHtlc); err != nil {
return err return err
} }
// Clean up hodl map.
delete(l.hodlMap, circuitKey)
select { select {
case item := <-l.hodlQueue.ChanOut(): case item := <-l.hodlQueue.ChanOut():
hodlEvent = item.(invoices.HodlEvent) hodlEvent = item.(invoices.HodlEvent)
@ -1171,73 +1182,37 @@ loop:
return nil return nil
} }
// processHodlMapEvent resolves stored hodl htlcs based using the information in
// hodlEvent.
func (l *channelLink) processHodlMapEvent(hodlEvent invoices.HodlEvent) error {
// Lookup all hodl htlcs that can be failed or settled with this event.
// The hodl htlc must be present in the map.
hash := hodlEvent.Hash
hodlHtlcs, ok := l.hodlMap[hash]
if !ok {
return fmt.Errorf("hodl htlc not found: %v", hash)
}
if err := l.processHodlEvent(hodlEvent, hodlHtlcs...); err != nil {
return err
}
// Clean up hodl map.
delete(l.hodlMap, hash)
return nil
}
// processHodlEvent applies a received hodl event to the provided htlc. When // processHodlEvent applies a received hodl event to the provided htlc. When
// this function returns without an error, the commit tx should be updated. // this function returns without an error, the commit tx should be updated.
func (l *channelLink) processHodlEvent(hodlEvent invoices.HodlEvent, func (l *channelLink) processHodlEvent(hodlEvent invoices.HodlEvent,
htlcs ...hodlHtlc) error { htlc hodlHtlc) error {
hash := hodlEvent.Hash l.batchCounter++
circuitKey := hodlEvent.CircuitKey
// Determine required action for the resolution. // Determine required action for the resolution.
var hodlAction func(htlc hodlHtlc) error
if hodlEvent.Preimage != nil { if hodlEvent.Preimage != nil {
l.debugf("Received hodl settle event for %v", hash) l.debugf("Received hodl settle event for %v", circuitKey)
hodlAction = func(htlc hodlHtlc) error { return l.settleHTLC(
return l.settleHTLC( *hodlEvent.Preimage, htlc.pd.HtlcIndex,
*hodlEvent.Preimage, htlc.pd.HtlcIndex, htlc.pd.SourceRef,
htlc.pd.SourceRef, )
)
}
} else {
l.debugf("Received hodl cancel event for %v", hash)
hodlAction = func(htlc hodlHtlc) error {
// In case of a cancel, always return
// incorrect_or_unknown_payment_details in order to
// avoid leaking info.
failure := lnwire.NewFailIncorrectDetails(
htlc.pd.Amount,
)
l.sendHTLCError(
htlc.pd.HtlcIndex, failure, htlc.obfuscator,
htlc.pd.SourceRef,
)
return nil
}
} }
// Apply action for all htlcs matching this hash. l.debugf("Received hodl cancel event for %v", circuitKey)
for _, htlc := range htlcs {
if err := hodlAction(htlc); err != nil {
return err
}
l.batchCounter++ // In case of a cancel, always return
} // incorrect_or_unknown_payment_details in order to avoid leaking info.
failure := lnwire.NewFailIncorrectDetails(
htlc.pd.Amount,
)
l.sendHTLCError(
htlc.pd.HtlcIndex, failure, htlc.obfuscator,
htlc.pd.SourceRef,
)
return nil return nil
} }
@ -2913,8 +2888,7 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor,
if event == nil { if event == nil {
// Save payment descriptor for future reference. // Save payment descriptor for future reference.
hodlHtlcs := l.hodlMap[invoiceHash] l.hodlMap[circuitKey] = htlc
l.hodlMap[invoiceHash] = append(hodlHtlcs, htlc)
return false, nil return false, nil
} }

@ -36,8 +36,9 @@ type HodlEvent struct {
// Preimage is the htlc preimage. Its value is nil in case of a cancel. // Preimage is the htlc preimage. Its value is nil in case of a cancel.
Preimage *lntypes.Preimage Preimage *lntypes.Preimage
// Hash is the htlc hash. // CircuitKey is the key of the htlc for which we have a resolution
Hash lntypes.Hash // decision.
CircuitKey channeldb.CircuitKey
} }
// InvoiceRegistry is a central registry of all the outstanding invoices // InvoiceRegistry is a central registry of all the outstanding invoices
@ -60,13 +61,13 @@ type InvoiceRegistry struct {
// new single invoice subscriptions are carried. // new single invoice subscriptions are carried.
invoiceEvents chan interface{} invoiceEvents chan interface{}
// subscriptions is a map from a payment hash to a list of subscribers. // subscriptions is a map from a circuit key to a list of subscribers.
// It is used for efficient notification of links. // It is used for efficient notification of links.
hodlSubscriptions map[lntypes.Hash]map[chan<- interface{}]struct{} hodlSubscriptions map[channeldb.CircuitKey]map[chan<- interface{}]struct{}
// reverseSubscriptions tracks hashes subscribed to per subscriber. This // reverseSubscriptions tracks circuit keys subscribed to per
// is used to unsubscribe from all hashes efficiently. // subscriber. This is used to unsubscribe from all hashes efficiently.
hodlReverseSubscriptions map[chan<- interface{}]map[lntypes.Hash]struct{} hodlReverseSubscriptions map[chan<- interface{}]map[channeldb.CircuitKey]struct{}
// finalCltvRejectDelta defines the number of blocks before the expiry // finalCltvRejectDelta defines the number of blocks before the expiry
// of the htlc where we no longer settle it as an exit hop and instead // of the htlc where we no longer settle it as an exit hop and instead
@ -92,8 +93,8 @@ func NewRegistry(cdb *channeldb.DB, finalCltvRejectDelta int32) *InvoiceRegistry
newSubscriptions: make(chan *InvoiceSubscription), newSubscriptions: make(chan *InvoiceSubscription),
subscriptionCancels: make(chan uint32), subscriptionCancels: make(chan uint32),
invoiceEvents: make(chan interface{}, 100), invoiceEvents: make(chan interface{}, 100),
hodlSubscriptions: make(map[lntypes.Hash]map[chan<- interface{}]struct{}), hodlSubscriptions: make(map[channeldb.CircuitKey]map[chan<- interface{}]struct{}),
hodlReverseSubscriptions: make(map[chan<- interface{}]map[lntypes.Hash]struct{}), hodlReverseSubscriptions: make(map[chan<- interface{}]map[channeldb.CircuitKey]struct{}),
finalCltvRejectDelta: finalCltvRejectDelta, finalCltvRejectDelta: finalCltvRejectDelta,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -551,24 +552,24 @@ func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
// If it isn't recorded, cancel htlc. // If it isn't recorded, cancel htlc.
if !ok { if !ok {
return &HodlEvent{ return &HodlEvent{
Hash: rHash, CircuitKey: circuitKey,
}, nil }, nil
} }
switch invoiceHtlc.State { switch invoiceHtlc.State {
case channeldb.HtlcStateCancelled: case channeldb.HtlcStateCancelled:
return &HodlEvent{ return &HodlEvent{
Hash: rHash, CircuitKey: circuitKey,
}, nil }, nil
case channeldb.HtlcStateSettled: case channeldb.HtlcStateSettled:
return &HodlEvent{ return &HodlEvent{
Hash: rHash, CircuitKey: circuitKey,
Preimage: &invoice.Terms.PaymentPreimage, Preimage: &invoice.Terms.PaymentPreimage,
}, nil }, nil
case channeldb.HtlcStateAccepted: case channeldb.HtlcStateAccepted:
i.hodlSubscribe(hodlChan, rHash) i.hodlSubscribe(hodlChan, circuitKey)
return nil, nil return nil, nil
default: default:
@ -609,10 +610,22 @@ func (i *InvoiceRegistry) SettleHodlInvoice(preimage lntypes.Preimage) error {
log.Debugf("Invoice(%v): settled with preimage %v", hash, log.Debugf("Invoice(%v): settled with preimage %v", hash,
invoice.Terms.PaymentPreimage) invoice.Terms.PaymentPreimage)
i.notifyHodlSubscribers(HodlEvent{ // In the callback, we marked the invoice as settled. UpdateInvoice will
Hash: hash, // have seen this and should have moved all htlcs that were accepted to
Preimage: &preimage, // the settled state. In the loop below, we go through all of these and
}) // notify links and resolvers that are waiting for resolution. Any htlcs
// that were already settled before, will be notified again. This isn't
// necessary but doesn't hurt either.
for key, htlc := range invoice.Htlcs {
if htlc.State != channeldb.HtlcStateSettled {
continue
}
i.notifyHodlSubscribers(HodlEvent{
CircuitKey: key,
Preimage: &preimage,
})
}
i.notifyClients(hash, invoice, invoice.Terms.State) i.notifyClients(hash, invoice, invoice.Terms.State)
return nil return nil
@ -678,9 +691,21 @@ func (i *InvoiceRegistry) CancelInvoice(payHash lntypes.Hash) error {
} }
log.Debugf("Invoice(%v): canceled", payHash) log.Debugf("Invoice(%v): canceled", payHash)
i.notifyHodlSubscribers(HodlEvent{
Hash: payHash, // In the callback, some htlcs may have been moved to the canceled
}) // state. We now go through all of these and notify links and resolvers
// that are waiting for resolution. Any htlcs that were already canceled
// before, will be notified again. This isn't necessary but doesn't hurt
// either.
for key, htlc := range invoice.Htlcs {
if htlc.State != channeldb.HtlcStateCancelled {
continue
}
i.notifyHodlSubscribers(HodlEvent{
CircuitKey: key,
})
}
i.notifyClients(payHash, invoice, channeldb.ContractCanceled) i.notifyClients(payHash, invoice, channeldb.ContractCanceled)
return nil return nil
@ -947,7 +972,7 @@ func (i *InvoiceRegistry) SubscribeSingleInvoice(
// notifyHodlSubscribers sends out the hodl event to all current subscribers. // notifyHodlSubscribers sends out the hodl event to all current subscribers.
func (i *InvoiceRegistry) notifyHodlSubscribers(hodlEvent HodlEvent) { func (i *InvoiceRegistry) notifyHodlSubscribers(hodlEvent HodlEvent) {
subscribers, ok := i.hodlSubscriptions[hodlEvent.Hash] subscribers, ok := i.hodlSubscriptions[hodlEvent.CircuitKey]
if !ok { if !ok {
return return
} }
@ -962,31 +987,34 @@ func (i *InvoiceRegistry) notifyHodlSubscribers(hodlEvent HodlEvent) {
return return
} }
delete(i.hodlReverseSubscriptions[subscriber], hodlEvent.Hash) delete(
i.hodlReverseSubscriptions[subscriber],
hodlEvent.CircuitKey,
)
} }
delete(i.hodlSubscriptions, hodlEvent.Hash) delete(i.hodlSubscriptions, hodlEvent.CircuitKey)
} }
// hodlSubscribe adds a new invoice subscription. // hodlSubscribe adds a new invoice subscription.
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{}, func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
hash lntypes.Hash) { circuitKey channeldb.CircuitKey) {
log.Debugf("Hodl subscribe for %v", hash) log.Debugf("Hodl subscribe for %v", circuitKey)
subscriptions, ok := i.hodlSubscriptions[hash] subscriptions, ok := i.hodlSubscriptions[circuitKey]
if !ok { if !ok {
subscriptions = make(map[chan<- interface{}]struct{}) subscriptions = make(map[chan<- interface{}]struct{})
i.hodlSubscriptions[hash] = subscriptions i.hodlSubscriptions[circuitKey] = subscriptions
} }
subscriptions[subscriber] = struct{}{} subscriptions[subscriber] = struct{}{}
reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber] reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
if !ok { if !ok {
reverseSubscriptions = make(map[lntypes.Hash]struct{}) reverseSubscriptions = make(map[channeldb.CircuitKey]struct{})
i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
} }
reverseSubscriptions[hash] = struct{}{} reverseSubscriptions[circuitKey] = struct{}{}
} }
// HodlUnsubscribeAll cancels the subscription. // HodlUnsubscribeAll cancels the subscription.