channeldb/migration: add migration for new payment bucket structure

migrateOutgoingPayments moves the OutgoingPayments into a new bucket format
where they all reside in a top-level bucket indexed by the payment hash. In
this sub-bucket we store information relevant to this payment, such as the
payment status.

To avoid that the router resend payments that have the status InFlight (we
cannot resume these payments for pre-migration payments) we delete those
statuses, so only Completed payments remain in the new bucket structure.
This commit is contained in:
Johan T. Halseth 2019-05-23 20:05:27 +02:00
parent 178996f0d3
commit 801521ed2d
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 203 additions and 0 deletions

@ -96,6 +96,13 @@ var (
number: 8,
migration: migrateGossipMessageStoreKeys,
},
{
// The DB version where the payments and payment
// statuses are moved to being stored in a combined
// bucket.
number: 9,
migration: migrateOutgoingPayments,
},
}
// Big endian is the preferred byte order, due to cursor scans over

@ -6,8 +6,10 @@ import (
"encoding/binary"
"fmt"
"github.com/btcsuite/btcd/btcec"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
// migrateNodeAndEdgeUpdateIndex is a migration function that will update the
@ -683,3 +685,197 @@ func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error {
return nil
}
// migrateOutgoingPayments moves the OutgoingPayments into a new bucket format
// where they all reside in a top-level bucket indexed by the payment hash. In
// this sub-bucket we store information relevant to this payment, such as the
// payment status.
//
// Since the router cannot handle resumed payments that have the status
// InFlight (we have no PaymentAttemptInfo available for pre-migration
// payments) we delete those statuses, so only Completed payments remain in the
// new bucket structure.
func migrateOutgoingPayments(tx *bbolt.Tx) error {
oldPayments, err := tx.CreateBucketIfNotExists(paymentBucket)
if err != nil {
return err
}
newPayments, err := tx.CreateBucket(paymentsRootBucket)
if err != nil {
return err
}
// Get the source pubkey.
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}
selfPub := nodes.Get(sourceKey)
if selfPub == nil {
return ErrSourceNodeNotSet
}
var sourcePubKey [33]byte
copy(sourcePubKey[:], selfPub[:])
log.Infof("Migrating outgoing payments to new bucket structure")
err = oldPayments.ForEach(func(k, v []byte) error {
// Ignores if it is sub-bucket.
if v == nil {
return nil
}
// Read the old payment format.
r := bytes.NewReader(v)
payment, err := deserializeOutgoingPayment(r)
if err != nil {
return err
}
// Calculate payment hash from the payment preimage.
paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
// Now create and add a PaymentCreationInfo to the bucket.
c := &PaymentCreationInfo{
PaymentHash: paymentHash,
Value: payment.Terms.Value,
CreationDate: payment.CreationDate,
PaymentRequest: payment.PaymentRequest,
}
var infoBuf bytes.Buffer
if err := serializePaymentCreationInfo(&infoBuf, c); err != nil {
return err
}
// Do the same for the PaymentAttemptInfo.
totalAmt := payment.Terms.Value + payment.Fee
rt := route.Route{
TotalTimeLock: payment.TimeLockLength,
TotalAmount: totalAmt,
SourcePubKey: sourcePubKey,
Hops: []*route.Hop{},
}
for _, hop := range payment.Path {
rt.Hops = append(rt.Hops, &route.Hop{
PubKeyBytes: hop,
AmtToForward: totalAmt,
})
}
// Since the old format didn't store the fee for individual
// hops, we let the last hop eat the whole fee for the total to
// add up.
if len(rt.Hops) > 0 {
rt.Hops[len(rt.Hops)-1].AmtToForward = payment.Terms.Value
}
// Since we don't have the session key for old payments, we
// create a random one to be able to serialize the attempt
// info.
priv, _ := btcec.NewPrivateKey(btcec.S256())
s := &PaymentAttemptInfo{
PaymentID: 0, // unknown.
SessionKey: priv, // unknown.
Route: rt,
}
var attemptBuf bytes.Buffer
if err := serializePaymentAttemptInfo(&attemptBuf, s); err != nil {
return err
}
// Reuse the existing payment sequence number.
var seqNum [8]byte
copy(seqNum[:], k)
// Create a bucket indexed by the payment hash.
bucket, err := newPayments.CreateBucket(paymentHash[:])
// If the bucket already exists, it means that we are migrating
// from a database containing duplicate payments to a payment
// hash. To keep this information, we store such duplicate
// payments in a sub-bucket.
if err == bbolt.ErrBucketExists {
pHashBucket := newPayments.Bucket(paymentHash[:])
// Create a bucket for duplicate payments within this
// payment hash's bucket.
dup, err := pHashBucket.CreateBucketIfNotExists(
paymentDuplicateBucket,
)
if err != nil {
return err
}
// Each duplicate will get its own sub-bucket within
// this bucket, so use their sequence number to index
// them by.
bucket, err = dup.CreateBucket(seqNum[:])
if err != nil {
return err
}
} else if err != nil {
return err
}
// Store the payment's information to the bucket.
err = bucket.Put(paymentSequenceKey, seqNum[:])
if err != nil {
return err
}
err = bucket.Put(paymentCreationInfoKey, infoBuf.Bytes())
if err != nil {
return err
}
err = bucket.Put(paymentAttemptInfoKey, attemptBuf.Bytes())
if err != nil {
return err
}
err = bucket.Put(paymentSettleInfoKey, payment.PaymentPreimage[:])
if err != nil {
return err
}
// Since only completed payments were previously stored as
// OutgoingPayments, set the status as Completed.
err = bucket.Put(paymentStatusKey, StatusCompleted.Bytes())
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
// To continue producing unique sequence numbers, we set the sequence
// of the new bucket to that of the old one.
seq := oldPayments.Sequence()
if err := newPayments.SetSequence(seq); err != nil {
return err
}
// Now we delete the old buckets. Deleting the payment status buckets
// deletes all payment statuses other than Complete.
err = tx.DeleteBucket(paymentStatusBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
// Finally delete the old payment bucket.
err = tx.DeleteBucket(paymentBucket)
if err != nil && err != bbolt.ErrBucketNotFound {
return err
}
log.Infof("Migration of outgoing payment bucket structure completed!")
return nil
}