Merge pull request #3131 from halseth/reliable-payments-switch-gc
[reliable payments] switch result store clean-up
This commit is contained in:
commit
22d7bb138c
@ -673,16 +673,11 @@ func ensureInFlight(payment *MPPayment) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// InFlightPayment is a wrapper around a payment that has status InFlight.
|
// InFlightPayment is a wrapper around the info for a payment that has status
|
||||||
|
// InFlight.
|
||||||
type InFlightPayment struct {
|
type InFlightPayment struct {
|
||||||
// Info is the PaymentCreationInfo of the in-flight payment.
|
// Info is the PaymentCreationInfo of the in-flight payment.
|
||||||
Info *PaymentCreationInfo
|
Info *PaymentCreationInfo
|
||||||
|
|
||||||
// Attempts is the set of payment attempts that was made to this
|
|
||||||
// payment hash.
|
|
||||||
//
|
|
||||||
// NOTE: Might be empty.
|
|
||||||
Attempts []HTLCAttemptInfo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FetchInFlightPayments returns all payments with status InFlight.
|
// FetchInFlightPayments returns all payments with status InFlight.
|
||||||
@ -718,33 +713,6 @@ func (p *PaymentControl) FetchInFlightPayments() ([]*InFlightPayment, error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
htlcsBucket := bucket.NestedReadBucket(
|
|
||||||
paymentHtlcsBucket,
|
|
||||||
)
|
|
||||||
if htlcsBucket == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch all HTLCs attempted for this payment.
|
|
||||||
htlcs, err := fetchHtlcAttempts(htlcsBucket)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// We only care about the static info for the HTLCs
|
|
||||||
// still in flight, so convert the result to a slice of
|
|
||||||
// HTLCAttemptInfos.
|
|
||||||
for _, h := range htlcs {
|
|
||||||
// Skip HTLCs not in flight.
|
|
||||||
if h.Settle != nil || h.Failure != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
inFlight.Attempts = append(
|
|
||||||
inFlight.Attempts, h.HTLCAttemptInfo,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
inFlights = append(inFlights, inFlight)
|
inFlights = append(inFlights, inFlight)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -258,3 +258,48 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
|
|||||||
|
|
||||||
return deserializeNetworkResult(r)
|
return deserializeNetworkResult(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cleanStore removes all entries from the store, except the payment IDs given.
|
||||||
|
// NOTE: Since every result not listed in the keep map will be deleted, care
|
||||||
|
// should be taken to ensure no new payment attempts are being made
|
||||||
|
// concurrently while this process is ongoing, as its result might end up being
|
||||||
|
// deleted.
|
||||||
|
func (store *networkResultStore) cleanStore(keep map[uint64]struct{}) error {
|
||||||
|
return kvdb.Update(store.db.Backend, func(tx kvdb.RwTx) error {
|
||||||
|
networkResults, err := tx.CreateTopLevelBucket(
|
||||||
|
networkResultStoreBucketKey,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate through the bucket, deleting all items not in the
|
||||||
|
// keep map.
|
||||||
|
var toClean [][]byte
|
||||||
|
if err := networkResults.ForEach(func(k, _ []byte) error {
|
||||||
|
pid := binary.BigEndian.Uint64(k)
|
||||||
|
if _, ok := keep[pid]; ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
toClean = append(toClean, k)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k := range toClean {
|
||||||
|
err := networkResults.Delete(k)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(toClean) > 0 {
|
||||||
|
log.Infof("Removed %d stale entries from network "+
|
||||||
|
"result store", len(toClean))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/lntypes"
|
"github.com/lightningnetwork/lnd/lntypes"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestNetworkResultSerialization checks that NetworkResults are properly
|
// TestNetworkResultSerialization checks that NetworkResults are properly
|
||||||
@ -180,7 +181,6 @@ func TestNetworkResultStore(t *testing.T) {
|
|||||||
|
|
||||||
// Since we don't delete results from the store (yet), make sure we
|
// Since we don't delete results from the store (yet), make sure we
|
||||||
// will get subscriptions for all of them.
|
// will get subscriptions for all of them.
|
||||||
// TODO(halseth): check deletion when we have reliable handoff.
|
|
||||||
for i := uint64(0); i < numResults; i++ {
|
for i := uint64(0); i < numResults; i++ {
|
||||||
sub, err := store.subscribeResult(i)
|
sub, err := store.subscribeResult(i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -193,4 +193,25 @@ func TestNetworkResultStore(t *testing.T) {
|
|||||||
t.Fatalf("no result received")
|
t.Fatalf("no result received")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean the store keeping the first two results.
|
||||||
|
toKeep := map[uint64]struct{}{
|
||||||
|
0: {},
|
||||||
|
1: {},
|
||||||
|
}
|
||||||
|
// Finally, delete the result.
|
||||||
|
err = store.cleanStore(toKeep)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Payment IDs 0 and 1 should be found, 2 and 3 should be deleted.
|
||||||
|
for i := uint64(0); i < numResults; i++ {
|
||||||
|
_, err = store.getResult(i)
|
||||||
|
if i <= 1 {
|
||||||
|
require.NoError(t, err, "unable to get result")
|
||||||
|
}
|
||||||
|
if i >= 2 && err != ErrPaymentIDNotFound {
|
||||||
|
t.Fatalf("expected ErrPaymentIDNotFound, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -438,6 +438,14 @@ func (s *Switch) GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash,
|
|||||||
return resultChan, nil
|
return resultChan, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CleanStore calls the underlying result store, telling it is safe to delete
|
||||||
|
// all entries except the ones in the keepPids map. This should be called
|
||||||
|
// preiodically to let the switch clean up payment results that we have
|
||||||
|
// handled.
|
||||||
|
func (s *Switch) CleanStore(keepPids map[uint64]struct{}) error {
|
||||||
|
return s.networkResults.cleanStore(keepPids)
|
||||||
|
}
|
||||||
|
|
||||||
// SendHTLC is used by other subsystems which aren't belong to htlc switch
|
// SendHTLC is used by other subsystems which aren't belong to htlc switch
|
||||||
// package in order to send the htlc update. The paymentID used MUST be unique
|
// package in order to send the htlc update. The paymentID used MUST be unique
|
||||||
// for this HTLC, and MUST be used only once, otherwise the switch might reject
|
// for this HTLC, and MUST be used only once, otherwise the switch might reject
|
||||||
|
@ -72,6 +72,9 @@ func (m *mockPaymentAttemptDispatcher) GetPaymentResult(paymentID uint64,
|
|||||||
return c, nil
|
return c, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
func (m *mockPaymentAttemptDispatcher) CleanStore(map[uint64]struct{}) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *mockPaymentAttemptDispatcher) setPaymentResult(
|
func (m *mockPaymentAttemptDispatcher) setPaymentResult(
|
||||||
f func(firstHop lnwire.ShortChannelID) ([32]byte, error)) {
|
f func(firstHop lnwire.ShortChannelID) ([32]byte, error)) {
|
||||||
@ -187,6 +190,10 @@ func (m *mockPayer) GetPaymentResult(paymentID uint64, _ lntypes.Hash,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *mockPayer) CleanStore(pids map[uint64]struct{}) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type initArgs struct {
|
type initArgs struct {
|
||||||
c *channeldb.PaymentCreationInfo
|
c *channeldb.PaymentCreationInfo
|
||||||
}
|
}
|
||||||
@ -446,13 +453,8 @@ func (m *mockControlTower) FetchInFlightPayments() (
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var attempts []channeldb.HTLCAttemptInfo
|
|
||||||
for _, a := range p.attempts {
|
|
||||||
attempts = append(attempts, a.HTLCAttemptInfo)
|
|
||||||
}
|
|
||||||
ifl := channeldb.InFlightPayment{
|
ifl := channeldb.InFlightPayment{
|
||||||
Info: &p.info,
|
Info: &p.info,
|
||||||
Attempts: attempts,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fl = append(fl, &ifl)
|
fl = append(fl, &ifl)
|
||||||
|
@ -171,6 +171,14 @@ type PaymentAttemptDispatcher interface {
|
|||||||
GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash,
|
GetPaymentResult(paymentID uint64, paymentHash lntypes.Hash,
|
||||||
deobfuscator htlcswitch.ErrorDecrypter) (
|
deobfuscator htlcswitch.ErrorDecrypter) (
|
||||||
<-chan *htlcswitch.PaymentResult, error)
|
<-chan *htlcswitch.PaymentResult, error)
|
||||||
|
|
||||||
|
// CleanStore calls the underlying result store, telling it is safe to
|
||||||
|
// delete all entries except the ones in the keepPids map. This should
|
||||||
|
// be called preiodically to let the switch clean up payment results
|
||||||
|
// that we have handled.
|
||||||
|
// NOTE: New payment attempts MUST NOT be made after the keepPids map
|
||||||
|
// has been created and this method has returned.
|
||||||
|
CleanStore(keepPids map[uint64]struct{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// PaymentSessionSource is an interface that defines a source for the router to
|
// PaymentSessionSource is an interface that defines a source for the router to
|
||||||
@ -538,6 +546,30 @@ func (r *ChannelRouter) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Before we restart existing payments and start accepting more
|
||||||
|
// payments to be made, we clean the network result store of the
|
||||||
|
// Switch. We do this here at startup to ensure no more payments can be
|
||||||
|
// made concurrently, so we know the toKeep map will be up-to-date
|
||||||
|
// until the cleaning has finished.
|
||||||
|
toKeep := make(map[uint64]struct{})
|
||||||
|
for _, p := range payments {
|
||||||
|
payment, err := r.cfg.Control.FetchPayment(
|
||||||
|
p.Info.PaymentHash,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, a := range payment.HTLCs {
|
||||||
|
toKeep[a.AttemptID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Cleaning network result store.")
|
||||||
|
if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
for _, payment := range payments {
|
for _, payment := range payments {
|
||||||
log.Infof("Resuming payment with hash %v", payment.Info.PaymentHash)
|
log.Infof("Resuming payment with hash %v", payment.Info.PaymentHash)
|
||||||
r.wg.Add(1)
|
r.wg.Add(1)
|
||||||
|
Loading…
Reference in New Issue
Block a user