From d1a118388d163297b20540fde784de7b02f9c158 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 24 Sep 2020 09:48:27 +0200 Subject: [PATCH 1/3] payment_control: remove unused Attempts field --- channeldb/payment_control.go | 36 ++---------------------------------- routing/mock_test.go | 7 +------ 2 files changed, 3 insertions(+), 40 deletions(-) diff --git a/channeldb/payment_control.go b/channeldb/payment_control.go index 5a538134..d67b0a8e 100644 --- a/channeldb/payment_control.go +++ b/channeldb/payment_control.go @@ -673,16 +673,11 @@ func ensureInFlight(payment *MPPayment) error { } } -// InFlightPayment is a wrapper around a payment that has status InFlight. +// InFlightPayment is a wrapper around the info for a payment that has status +// InFlight. type InFlightPayment struct { // Info is the PaymentCreationInfo of the in-flight payment. Info *PaymentCreationInfo - - // Attempts is the set of payment attempts that was made to this - // payment hash. - // - // NOTE: Might be empty. - Attempts []HTLCAttemptInfo } // FetchInFlightPayments returns all payments with status InFlight. @@ -718,33 +713,6 @@ func (p *PaymentControl) FetchInFlightPayments() ([]*InFlightPayment, error) { return err } - htlcsBucket := bucket.NestedReadBucket( - paymentHtlcsBucket, - ) - if htlcsBucket == nil { - return nil - } - - // Fetch all HTLCs attempted for this payment. - htlcs, err := fetchHtlcAttempts(htlcsBucket) - if err != nil { - return err - } - - // We only care about the static info for the HTLCs - // still in flight, so convert the result to a slice of - // HTLCAttemptInfos. - for _, h := range htlcs { - // Skip HTLCs not in flight. - if h.Settle != nil || h.Failure != nil { - continue - } - - inFlight.Attempts = append( - inFlight.Attempts, h.HTLCAttemptInfo, - ) - } - inFlights = append(inFlights, inFlight) return nil }) diff --git a/routing/mock_test.go b/routing/mock_test.go index e2f46687..f5a6d382 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -446,13 +446,8 @@ func (m *mockControlTower) FetchInFlightPayments() ( continue } - var attempts []channeldb.HTLCAttemptInfo - for _, a := range p.attempts { - attempts = append(attempts, a.HTLCAttemptInfo) - } ifl := channeldb.InFlightPayment{ - Info: &p.info, - Attempts: attempts, + Info: &p.info, } fl = append(fl, &ifl) From b071278f07ab7a9fc94dbe293854773bd109c0bb Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 24 Sep 2020 09:49:17 +0200 Subject: [PATCH 2/3] htlcswitch: add cleanStore method That let us clean up handed off payment results, as they will never be queried again. --- htlcswitch/payment_result.go | 45 +++++++++++++++++++++++++++++++ htlcswitch/payment_result_test.go | 23 +++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index 54130755..e6a1e59f 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -258,3 +258,48 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { return deserializeNetworkResult(r) } + +// cleanStore removes all entries from the store, except the payment IDs given. +// NOTE: Since every result not listed in the keep map will be deleted, care +// should be taken to ensure no new payment attempts are being made +// concurrently while this process is ongoing, as its result might end up being +// deleted. +func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error { + return kvdb.Update(store.db.Backend, func(tx kvdb.RwTx) error { + networkResults, err := tx.CreateTopLevelBucket( + networkResultStoreBucketKey, + ) + if err != nil { + return err + } + + // Iterate through the bucket, deleting all items not in the + // keep map. + var toClean [][]byte + if err := networkResults.ForEach(func(k, _ []byte) error { + pid := binary.BigEndian.Uint64(k) + if _, ok := keep[pid]; ok { + return nil + } + + toClean = append(toClean, k) + return nil + }); err != nil { + return err + } + + for _, k := range toClean { + err := networkResults.Delete(k) + if err != nil { + return err + } + } + + if len(toClean) > 0 { + log.Infof("Removed %d stale entries from network "+ + "result store", len(toClean)) + } + + return nil + }) +} diff --git a/htlcswitch/payment_result_test.go b/htlcswitch/payment_result_test.go index d0c48c4d..04ff57d8 100644 --- a/htlcswitch/payment_result_test.go +++ b/htlcswitch/payment_result_test.go @@ -12,6 +12,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" ) // TestNetworkResultSerialization checks that NetworkResults are properly @@ -180,7 +181,6 @@ func TestNetworkResultStore(t *testing.T) { // Since we don't delete results from the store (yet), make sure we // will get subscriptions for all of them. - // TODO(halseth): check deletion when we have reliable handoff. for i := uint64(0); i < numResults; i++ { sub, err := store.subscribeResult(i) if err != nil { @@ -193,4 +193,25 @@ func TestNetworkResultStore(t *testing.T) { t.Fatalf("no result received") } } + + // Clean the store keeping the first two results. + toKeep := map[uint64]struct{}{ + 0: {}, + 1: {}, + } + // Finally, delete the result. + err = store.cleanStore(toKeep) + require.NoError(t, err) + + // Payment IDs 0 and 1 should be found, 2 and 3 should be deleted. + for i := uint64(0); i < numResults; i++ { + _, err = store.getResult(i) + if i <= 1 { + require.NoError(t, err, "unable to get result") + } + if i >= 2 && err != ErrPaymentIDNotFound { + t.Fatalf("expected ErrPaymentIDNotFound, got %v", err) + } + + } } From 90a59fe70f00a50033d859c1f5edbe9e46a87fa8 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Thu, 24 Sep 2020 09:53:07 +0200 Subject: [PATCH 3/3] router: call CleanStore on startup --- htlcswitch/switch.go | 8 ++++++++ routing/mock_test.go | 7 +++++++ routing/router.go | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+) diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 1e7db9fe..72eebf59 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -438,6 +438,14 @@ func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, return resultChan, nil } +// CleanStore calls the underlying result store, telling it is safe to delete +// all entries except the ones in the keepPids map. This should be called +// preiodically to let the switch clean up payment results that we have +// handled. +func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error { + return s.networkResults.cleanStore(keepPids) +} + // SendHTLC is used by other subsystems which aren't belong to htlc switch // package in order to send the htlc update. The paymentID used MUST be unique // for this HTLC, and MUST be used only once, otherwise the switch might reject diff --git a/routing/mock_test.go b/routing/mock_test.go index f5a6d382..a284cf57 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -72,6 +72,9 @@ func (m *mockPaymentAttemptDispatcher) GetPaymentResult(paymentID uint64, return c, nil } +func (m *mockPaymentAttemptDispatcher) CleanStore(map[uint64]struct{}) error { + return nil +} func (m *mockPaymentAttemptDispatcher) setPaymentResult( f func(firstHop lnwire.ShortChannelID) ([32]byte, error)) { @@ -187,6 +190,10 @@ func (m *mockPayer) GetPaymentResult(paymentID uint64, _ lntypes.Hash, } } +func (m *mockPayer) CleanStore(pids map[uint64]struct{}) error { + return nil +} + type initArgs struct { c *channeldb.PaymentCreationInfo } diff --git a/routing/router.go b/routing/router.go index 84fb9d0b..af1e8d75 100644 --- a/routing/router.go +++ b/routing/router.go @@ -171,6 +171,14 @@ type PaymentAttemptDispatcher interface { GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash, deobfuscator htlcswitch.ErrorDecrypter) ( <-chan *htlcswitch.PaymentResult, error) + + // CleanStore calls the underlying result store, telling it is safe to + // delete all entries except the ones in the keepPids map. This should + // be called preiodically to let the switch clean up payment results + // that we have handled. + // NOTE: New payment attempts MUST NOT be made after the keepPids map + // has been created and this method has returned. + CleanStore(keepPids map[uint64]struct{}) error } // PaymentSessionSource is an interface that defines a source for the router to @@ -538,6 +546,30 @@ func (r *ChannelRouter) Start() error { return err } + // Before we restart existing payments and start accepting more + // payments to be made, we clean the network result store of the + // Switch. We do this here at startup to ensure no more payments can be + // made concurrently, so we know the toKeep map will be up-to-date + // until the cleaning has finished. + toKeep := make(map[uint64]struct{}) + for _, p := range payments { + payment, err := r.cfg.Control.FetchPayment( + p.Info.PaymentHash, + ) + if err != nil { + return err + } + + for _, a := range payment.HTLCs { + toKeep[a.AttemptID] = struct{}{} + } + } + + log.Debugf("Cleaning network result store.") + if err := r.cfg.Payer.CleanStore(toKeep); err != nil { + return err + } + for _, payment := range payments { log.Infof("Resuming payment with hash %v", payment.Info.PaymentHash) r.wg.Add(1)