Merge pull request #3597 from Roasbeef/revert-migrations
channeldb: revert to prior migration policy
This commit is contained in:
commit
f5113bc85f
@ -37,6 +37,72 @@ var (
|
|||||||
// for retrieving all migration function that are need to apply to the
|
// for retrieving all migration function that are need to apply to the
|
||||||
// current db.
|
// current db.
|
||||||
dbVersions = []version{
|
dbVersions = []version{
|
||||||
|
{
|
||||||
|
// The base DB version requires no migration.
|
||||||
|
number: 0,
|
||||||
|
migration: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The version of the database where two new indexes
|
||||||
|
// for the update time of node and channel updates were
|
||||||
|
// added.
|
||||||
|
number: 1,
|
||||||
|
migration: migrateNodeAndEdgeUpdateIndex,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The DB version that added the invoice event time
|
||||||
|
// series.
|
||||||
|
number: 2,
|
||||||
|
migration: migrateInvoiceTimeSeries,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The DB version that updated the embedded invoice in
|
||||||
|
// outgoing payments to match the new format.
|
||||||
|
number: 3,
|
||||||
|
migration: migrateInvoiceTimeSeriesOutgoingPayments,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The version of the database where every channel
|
||||||
|
// always has two entries in the edges bucket. If
|
||||||
|
// a policy is unknown, this will be represented
|
||||||
|
// by a special byte sequence.
|
||||||
|
number: 4,
|
||||||
|
migration: migrateEdgePolicies,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The DB version where we persist each attempt to send
|
||||||
|
// an HTLC to a payment hash, and track whether the
|
||||||
|
// payment is in-flight, succeeded, or failed.
|
||||||
|
number: 5,
|
||||||
|
migration: paymentStatusesMigration,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The DB version that properly prunes stale entries
|
||||||
|
// from the edge update index.
|
||||||
|
number: 6,
|
||||||
|
migration: migratePruneEdgeUpdateIndex,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The DB version that migrates the ChannelCloseSummary
|
||||||
|
// to a format where optional fields are indicated with
|
||||||
|
// boolean flags.
|
||||||
|
number: 7,
|
||||||
|
migration: migrateOptionalChannelCloseSummaryFields,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The DB version that changes the gossiper's message
|
||||||
|
// store keys to account for the message's type and
|
||||||
|
// ShortChannelID.
|
||||||
|
number: 8,
|
||||||
|
migration: migrateGossipMessageStoreKeys,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// The DB version where the payments and payment
|
||||||
|
// statuses are moved to being stored in a combined
|
||||||
|
// bucket.
|
||||||
|
number: 9,
|
||||||
|
migration: migrateOutgoingPayments,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
// The DB version where we started to store legacy
|
// The DB version where we started to store legacy
|
||||||
// payload information for all routes, as well as the
|
// payload information for all routes, as well as the
|
||||||
@ -200,6 +266,10 @@ func createChannelDB(dbPath string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err := tx.CreateBucket(paymentBucket); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if _, err := tx.CreateBucket(nodeInfoBucket); err != nil {
|
if _, err := tx.CreateBucket(nodeInfoBucket); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -1041,10 +1111,8 @@ func (d *DB) syncVersions(versions []version) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
latestVersion := getLatestDBVersion(versions)
|
latestVersion := getLatestDBVersion(versions)
|
||||||
minUpgradeVersion := getMinUpgradeVersion(versions)
|
|
||||||
log.Infof("Checking for schema update: latest_version=%v, "+
|
log.Infof("Checking for schema update: latest_version=%v, "+
|
||||||
"min_upgrade_version=%v, db_version=%v", latestVersion,
|
"db_version=%v", latestVersion, meta.DbVersionNumber)
|
||||||
minUpgradeVersion, meta.DbVersionNumber)
|
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
|
||||||
@ -1057,12 +1125,6 @@ func (d *DB) syncVersions(versions []version) error {
|
|||||||
latestVersion)
|
latestVersion)
|
||||||
return ErrDBReversion
|
return ErrDBReversion
|
||||||
|
|
||||||
case meta.DbVersionNumber < minUpgradeVersion:
|
|
||||||
log.Errorf("Refusing to upgrade from db_version=%d to "+
|
|
||||||
"latest_version=%d. Upgrade via intermediate major "+
|
|
||||||
"release(s).", meta.DbVersionNumber, latestVersion)
|
|
||||||
return ErrDBVersionTooLow
|
|
||||||
|
|
||||||
// If the current database version matches the latest version number,
|
// If the current database version matches the latest version number,
|
||||||
// then we don't need to perform any migrations.
|
// then we don't need to perform any migrations.
|
||||||
case meta.DbVersionNumber == latestVersion:
|
case meta.DbVersionNumber == latestVersion:
|
||||||
@ -1106,21 +1168,6 @@ func getLatestDBVersion(versions []version) uint32 {
|
|||||||
return versions[len(versions)-1].number
|
return versions[len(versions)-1].number
|
||||||
}
|
}
|
||||||
|
|
||||||
// getMinUpgradeVersion returns the minimum version required to upgrade the
|
|
||||||
// database.
|
|
||||||
func getMinUpgradeVersion(versions []version) uint32 {
|
|
||||||
firstMigrationVersion := versions[0].number
|
|
||||||
|
|
||||||
// If we can upgrade from the base version with this version of lnd,
|
|
||||||
// return the base version as the minimum required version.
|
|
||||||
if firstMigrationVersion == 0 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise require the version that the first migration upgrades from.
|
|
||||||
return firstMigrationVersion - 1
|
|
||||||
}
|
|
||||||
|
|
||||||
// getMigrationsToApply retrieves the migration function that should be
|
// getMigrationsToApply retrieves the migration function that should be
|
||||||
// applied to the database.
|
// applied to the database.
|
||||||
func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) {
|
func getMigrationsToApply(versions []version, version uint32) ([]migration, []uint32) {
|
||||||
|
@ -14,10 +14,6 @@ var (
|
|||||||
// prior database version.
|
// prior database version.
|
||||||
ErrDBReversion = fmt.Errorf("channel db cannot revert to prior version")
|
ErrDBReversion = fmt.Errorf("channel db cannot revert to prior version")
|
||||||
|
|
||||||
// ErrDBVersionTooLow is returned when detecting an attempt to upgrade a
|
|
||||||
// version for which migration is no longer supported.
|
|
||||||
ErrDBVersionTooLow = fmt.Errorf("channel db version too old to upgrade")
|
|
||||||
|
|
||||||
// ErrLinkNodesNotFound is returned when node info bucket hasn't been
|
// ErrLinkNodesNotFound is returned when node info bucket hasn't been
|
||||||
// created.
|
// created.
|
||||||
ErrLinkNodesNotFound = fmt.Errorf("no link nodes exist")
|
ErrLinkNodesNotFound = fmt.Errorf("no link nodes exist")
|
||||||
|
55
channeldb/legacy_serialization.go
Normal file
55
channeldb/legacy_serialization.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package channeldb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
)
|
||||||
|
|
||||||
|
// deserializeCloseChannelSummaryV6 reads the v6 database format for
|
||||||
|
// ChannelCloseSummary.
|
||||||
|
//
|
||||||
|
// NOTE: deprecated, only for migration.
|
||||||
|
func deserializeCloseChannelSummaryV6(r io.Reader) (*ChannelCloseSummary, error) {
|
||||||
|
c := &ChannelCloseSummary{}
|
||||||
|
|
||||||
|
err := ReadElements(r,
|
||||||
|
&c.ChanPoint, &c.ShortChanID, &c.ChainHash, &c.ClosingTXID,
|
||||||
|
&c.CloseHeight, &c.RemotePub, &c.Capacity, &c.SettledBalance,
|
||||||
|
&c.TimeLockedBalance, &c.CloseType, &c.IsPending,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// We'll now check to see if the channel close summary was encoded with
|
||||||
|
// any of the additional optional fields.
|
||||||
|
err = ReadElements(r, &c.RemoteCurrentRevocation)
|
||||||
|
switch {
|
||||||
|
case err == io.EOF:
|
||||||
|
return c, nil
|
||||||
|
|
||||||
|
// If we got a non-eof error, then we know there's an actually issue.
|
||||||
|
// Otherwise, it may have been the case that this summary didn't have
|
||||||
|
// the set of optional fields.
|
||||||
|
case err != nil:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := readChanConfig(r, &c.LocalChanConfig); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, we'll attempt to read the next unrevoked commitment point
|
||||||
|
// for the remote party. If we closed the channel before receiving a
|
||||||
|
// funding locked message, then this can be nil. As a result, we'll use
|
||||||
|
// the same technique to read the field, only if there's still data
|
||||||
|
// left in the buffer.
|
||||||
|
err = ReadElements(r, &c.RemoteNextRevocation)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
// If we got a non-eof error, then we know there's an actually
|
||||||
|
// issue. Otherwise, it may have been the case that this
|
||||||
|
// summary didn't have the set of optional fields.
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c, nil
|
||||||
|
}
|
497
channeldb/migration_09_legacy_serialization.go
Normal file
497
channeldb/migration_09_legacy_serialization.go
Normal file
@ -0,0 +1,497 @@
|
|||||||
|
package channeldb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/coreos/bbolt"
|
||||||
|
"github.com/lightningnetwork/lnd/lntypes"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// paymentBucket is the name of the bucket within the database that
|
||||||
|
// stores all data related to payments.
|
||||||
|
//
|
||||||
|
// Within the payments bucket, each invoice is keyed by its invoice ID
|
||||||
|
// which is a monotonically increasing uint64. BoltDB's sequence
|
||||||
|
// feature is used for generating monotonically increasing id.
|
||||||
|
//
|
||||||
|
// NOTE: Deprecated. Kept around for migration purposes.
|
||||||
|
paymentBucket = []byte("payments")
|
||||||
|
|
||||||
|
// paymentStatusBucket is the name of the bucket within the database
|
||||||
|
// that stores the status of a payment indexed by the payment's
|
||||||
|
// preimage.
|
||||||
|
//
|
||||||
|
// NOTE: Deprecated. Kept around for migration purposes.
|
||||||
|
paymentStatusBucket = []byte("payment-status")
|
||||||
|
)
|
||||||
|
|
||||||
|
// outgoingPayment represents a successful payment between the daemon and a
|
||||||
|
// remote node. Details such as the total fee paid, and the time of the payment
|
||||||
|
// are stored.
|
||||||
|
//
|
||||||
|
// NOTE: Deprecated. Kept around for migration purposes.
|
||||||
|
type outgoingPayment struct {
|
||||||
|
Invoice
|
||||||
|
|
||||||
|
// Fee is the total fee paid for the payment in milli-satoshis.
|
||||||
|
Fee lnwire.MilliSatoshi
|
||||||
|
|
||||||
|
// TotalTimeLock is the total cumulative time-lock in the HTLC extended
|
||||||
|
// from the second-to-last hop to the destination.
|
||||||
|
TimeLockLength uint32
|
||||||
|
|
||||||
|
// Path encodes the path the payment took through the network. The path
|
||||||
|
// excludes the outgoing node and consists of the hex-encoded
|
||||||
|
// compressed public key of each of the nodes involved in the payment.
|
||||||
|
Path [][33]byte
|
||||||
|
|
||||||
|
// PaymentPreimage is the preImage of a successful payment. This is used
|
||||||
|
// to calculate the PaymentHash as well as serve as a proof of payment.
|
||||||
|
PaymentPreimage [32]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// addPayment saves a successful payment to the database. It is assumed that
|
||||||
|
// all payment are sent using unique payment hashes.
|
||||||
|
//
|
||||||
|
// NOTE: Deprecated. Kept around for migration purposes.
|
||||||
|
func (db *DB) addPayment(payment *outgoingPayment) error {
|
||||||
|
// Validate the field of the inner voice within the outgoing payment,
|
||||||
|
// these must also adhere to the same constraints as regular invoices.
|
||||||
|
if err := validateInvoice(&payment.Invoice); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// We first serialize the payment before starting the database
|
||||||
|
// transaction so we can avoid creating a DB payment in the case of a
|
||||||
|
// serialization error.
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := serializeOutgoingPayment(&b, payment); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
paymentBytes := b.Bytes()
|
||||||
|
|
||||||
|
return db.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
payments, err := tx.CreateBucketIfNotExists(paymentBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain the new unique sequence number for this payment.
|
||||||
|
paymentID, err := payments.NextSequence()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// We use BigEndian for keys as it orders keys in
|
||||||
|
// ascending order. This allows bucket scans to order payments
|
||||||
|
// in the order in which they were created.
|
||||||
|
paymentIDBytes := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(paymentIDBytes, paymentID)
|
||||||
|
|
||||||
|
return payments.Put(paymentIDBytes, paymentBytes)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchAllPayments returns all outgoing payments in DB.
|
||||||
|
//
|
||||||
|
// NOTE: Deprecated. Kept around for migration purposes.
|
||||||
|
func (db *DB) fetchAllPayments() ([]*outgoingPayment, error) {
|
||||||
|
var payments []*outgoingPayment
|
||||||
|
|
||||||
|
err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
bucket := tx.Bucket(paymentBucket)
|
||||||
|
if bucket == nil {
|
||||||
|
return ErrNoPaymentsCreated
|
||||||
|
}
|
||||||
|
|
||||||
|
return bucket.ForEach(func(k, v []byte) error {
|
||||||
|
// If the value is nil, then we ignore it as it may be
|
||||||
|
// a sub-bucket.
|
||||||
|
if v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r := bytes.NewReader(v)
|
||||||
|
payment, err := deserializeOutgoingPayment(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
payments = append(payments, payment)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return payments, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchPaymentStatus returns the payment status for outgoing payment.
|
||||||
|
// If status of the payment isn't found, it will default to "StatusUnknown".
|
||||||
|
//
|
||||||
|
// NOTE: Deprecated. Kept around for migration purposes.
|
||||||
|
func (db *DB) fetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) {
|
||||||
|
var paymentStatus = StatusUnknown
|
||||||
|
err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
var err error
|
||||||
|
paymentStatus, err = fetchPaymentStatusTx(tx, paymentHash)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return StatusUnknown, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return paymentStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchPaymentStatusTx is a helper method that returns the payment status for
|
||||||
|
// outgoing payment. If status of the payment isn't found, it will default to
|
||||||
|
// "StatusUnknown". It accepts the boltdb transactions such that this method
|
||||||
|
// can be composed into other atomic operations.
|
||||||
|
//
|
||||||
|
// NOTE: Deprecated. Kept around for migration purposes.
|
||||||
|
func fetchPaymentStatusTx(tx *bbolt.Tx, paymentHash [32]byte) (PaymentStatus, error) {
|
||||||
|
// The default status for all payments that aren't recorded in database.
|
||||||
|
var paymentStatus = StatusUnknown
|
||||||
|
|
||||||
|
bucket := tx.Bucket(paymentStatusBucket)
|
||||||
|
if bucket == nil {
|
||||||
|
return paymentStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
paymentStatusBytes := bucket.Get(paymentHash[:])
|
||||||
|
if paymentStatusBytes == nil {
|
||||||
|
return paymentStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
paymentStatus.FromBytes(paymentStatusBytes)
|
||||||
|
|
||||||
|
return paymentStatus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serializeOutgoingPayment(w io.Writer, p *outgoingPayment) error {
|
||||||
|
var scratch [8]byte
|
||||||
|
|
||||||
|
if err := serializeInvoiceLegacy(w, &p.Invoice); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
byteOrder.PutUint64(scratch[:], uint64(p.Fee))
|
||||||
|
if _, err := w.Write(scratch[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// First write out the length of the bytes to prefix the value.
|
||||||
|
pathLen := uint32(len(p.Path))
|
||||||
|
byteOrder.PutUint32(scratch[:4], pathLen)
|
||||||
|
if _, err := w.Write(scratch[:4]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then with the path written, we write out the series of public keys
|
||||||
|
// involved in the path.
|
||||||
|
for _, hop := range p.Path {
|
||||||
|
if _, err := w.Write(hop[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
byteOrder.PutUint32(scratch[:4], p.TimeLockLength)
|
||||||
|
if _, err := w.Write(scratch[:4]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(p.PaymentPreimage[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deserializeOutgoingPayment(r io.Reader) (*outgoingPayment, error) {
|
||||||
|
var scratch [8]byte
|
||||||
|
|
||||||
|
p := &outgoingPayment{}
|
||||||
|
|
||||||
|
inv, err := deserializeInvoiceLegacy(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
p.Invoice = inv
|
||||||
|
|
||||||
|
if _, err := r.Read(scratch[:]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
p.Fee = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:]))
|
||||||
|
|
||||||
|
if _, err = r.Read(scratch[:4]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pathLen := byteOrder.Uint32(scratch[:4])
|
||||||
|
|
||||||
|
path := make([][33]byte, pathLen)
|
||||||
|
for i := uint32(0); i < pathLen; i++ {
|
||||||
|
if _, err := r.Read(path[i][:]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.Path = path
|
||||||
|
|
||||||
|
if _, err = r.Read(scratch[:4]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
p.TimeLockLength = byteOrder.Uint32(scratch[:4])
|
||||||
|
|
||||||
|
if _, err := r.Read(p.PaymentPreimage[:]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// serializePaymentAttemptInfoMigration9 is the serializePaymentAttemptInfo
|
||||||
|
// version as existed when migration #9 was created. We keep this around, along
|
||||||
|
// with the methods below to ensure that clients that upgrade will use the
|
||||||
|
// correct version of this method.
|
||||||
|
func serializePaymentAttemptInfoMigration9(w io.Writer, a *PaymentAttemptInfo) error {
|
||||||
|
if err := WriteElements(w, a.PaymentID, a.SessionKey); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := serializeRouteMigration9(w, a.Route); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serializeHopMigration9(w io.Writer, h *route.Hop) error {
|
||||||
|
if err := WriteElements(w,
|
||||||
|
h.PubKeyBytes[:], h.ChannelID, h.OutgoingTimeLock,
|
||||||
|
h.AmtToForward,
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func serializeRouteMigration9(w io.Writer, r route.Route) error {
|
||||||
|
if err := WriteElements(w,
|
||||||
|
r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:],
|
||||||
|
); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := WriteElements(w, uint32(len(r.Hops))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, h := range r.Hops {
|
||||||
|
if err := serializeHopMigration9(w, h); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deserializePaymentAttemptInfoMigration9(r io.Reader) (*PaymentAttemptInfo, error) {
|
||||||
|
a := &PaymentAttemptInfo{}
|
||||||
|
err := ReadElements(r, &a.PaymentID, &a.SessionKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
a.Route, err = deserializeRouteMigration9(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return a, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deserializeRouteMigration9(r io.Reader) (route.Route, error) {
|
||||||
|
rt := route.Route{}
|
||||||
|
if err := ReadElements(r,
|
||||||
|
&rt.TotalTimeLock, &rt.TotalAmount,
|
||||||
|
); err != nil {
|
||||||
|
return rt, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pub []byte
|
||||||
|
if err := ReadElements(r, &pub); err != nil {
|
||||||
|
return rt, err
|
||||||
|
}
|
||||||
|
copy(rt.SourcePubKey[:], pub)
|
||||||
|
|
||||||
|
var numHops uint32
|
||||||
|
if err := ReadElements(r, &numHops); err != nil {
|
||||||
|
return rt, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var hops []*route.Hop
|
||||||
|
for i := uint32(0); i < numHops; i++ {
|
||||||
|
hop, err := deserializeHopMigration9(r)
|
||||||
|
if err != nil {
|
||||||
|
return rt, err
|
||||||
|
}
|
||||||
|
hops = append(hops, hop)
|
||||||
|
}
|
||||||
|
rt.Hops = hops
|
||||||
|
|
||||||
|
return rt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func deserializeHopMigration9(r io.Reader) (*route.Hop, error) {
|
||||||
|
h := &route.Hop{}
|
||||||
|
|
||||||
|
var pub []byte
|
||||||
|
if err := ReadElements(r, &pub); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
copy(h.PubKeyBytes[:], pub)
|
||||||
|
|
||||||
|
if err := ReadElements(r,
|
||||||
|
&h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchPaymentsMigration9 returns all sent payments found in the DB using the
|
||||||
|
// payment attempt info format that was present as of migration #9. We need
|
||||||
|
// this as otherwise, the current FetchPayments version will use the latest
|
||||||
|
// decoding format. Note that we only need this for the
|
||||||
|
// TestOutgoingPaymentsMigration migration test case.
|
||||||
|
func (db *DB) fetchPaymentsMigration9() ([]*Payment, error) {
|
||||||
|
var payments []*Payment
|
||||||
|
|
||||||
|
err := db.View(func(tx *bbolt.Tx) error {
|
||||||
|
paymentsBucket := tx.Bucket(paymentsRootBucket)
|
||||||
|
if paymentsBucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return paymentsBucket.ForEach(func(k, v []byte) error {
|
||||||
|
bucket := paymentsBucket.Bucket(k)
|
||||||
|
if bucket == nil {
|
||||||
|
// We only expect sub-buckets to be found in
|
||||||
|
// this top-level bucket.
|
||||||
|
return fmt.Errorf("non bucket element in " +
|
||||||
|
"payments bucket")
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := fetchPaymentMigration9(bucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
payments = append(payments, p)
|
||||||
|
|
||||||
|
// For older versions of lnd, duplicate payments to a
|
||||||
|
// payment has was possible. These will be found in a
|
||||||
|
// sub-bucket indexed by their sequence number if
|
||||||
|
// available.
|
||||||
|
dup := bucket.Bucket(paymentDuplicateBucket)
|
||||||
|
if dup == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return dup.ForEach(func(k, v []byte) error {
|
||||||
|
subBucket := dup.Bucket(k)
|
||||||
|
if subBucket == nil {
|
||||||
|
// We one bucket for each duplicate to
|
||||||
|
// be found.
|
||||||
|
return fmt.Errorf("non bucket element" +
|
||||||
|
"in duplicate bucket")
|
||||||
|
}
|
||||||
|
|
||||||
|
p, err := fetchPaymentMigration9(subBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
payments = append(payments, p)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Before returning, sort the payments by their sequence number.
|
||||||
|
sort.Slice(payments, func(i, j int) bool {
|
||||||
|
return payments[i].sequenceNum < payments[j].sequenceNum
|
||||||
|
})
|
||||||
|
|
||||||
|
return payments, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchPaymentMigration9(bucket *bbolt.Bucket) (*Payment, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
p = &Payment{}
|
||||||
|
)
|
||||||
|
|
||||||
|
seqBytes := bucket.Get(paymentSequenceKey)
|
||||||
|
if seqBytes == nil {
|
||||||
|
return nil, fmt.Errorf("sequence number not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
p.sequenceNum = binary.BigEndian.Uint64(seqBytes)
|
||||||
|
|
||||||
|
// Get the payment status.
|
||||||
|
p.Status = fetchPaymentStatus(bucket)
|
||||||
|
|
||||||
|
// Get the PaymentCreationInfo.
|
||||||
|
b := bucket.Get(paymentCreationInfoKey)
|
||||||
|
if b == nil {
|
||||||
|
return nil, fmt.Errorf("creation info not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
r := bytes.NewReader(b)
|
||||||
|
p.Info, err = deserializePaymentCreationInfo(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the PaymentAttemptInfo. This can be unset.
|
||||||
|
b = bucket.Get(paymentAttemptInfoKey)
|
||||||
|
if b != nil {
|
||||||
|
r = bytes.NewReader(b)
|
||||||
|
p.Attempt, err = deserializePaymentAttemptInfoMigration9(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the payment preimage. This is only found for
|
||||||
|
// completed payments.
|
||||||
|
b = bucket.Get(paymentSettleInfoKey)
|
||||||
|
if b != nil {
|
||||||
|
var preimg lntypes.Preimage
|
||||||
|
copy(preimg[:], b[:])
|
||||||
|
p.PaymentPreimage = &preimg
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get failure reason if available.
|
||||||
|
b = bucket.Get(paymentFailInfoKey)
|
||||||
|
if b != nil {
|
||||||
|
reason := FailureReason(b[0])
|
||||||
|
p.Failure = &reason
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
939
channeldb/migrations.go
Normal file
939
channeldb/migrations.go
Normal file
@ -0,0 +1,939 @@
|
|||||||
|
package channeldb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcd/btcec"
|
||||||
|
"github.com/coreos/bbolt"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
|
)
|
||||||
|
|
||||||
|
// migrateNodeAndEdgeUpdateIndex is a migration function that will update the
|
||||||
|
// database from version 0 to version 1. In version 1, we add two new indexes
|
||||||
|
// (one for nodes and one for edges) to keep track of the last time a node or
|
||||||
|
// edge was updated on the network. These new indexes allow us to implement the
|
||||||
|
// new graph sync protocol added.
|
||||||
|
func migrateNodeAndEdgeUpdateIndex(tx *bbolt.Tx) error {
|
||||||
|
// First, we'll populating the node portion of the new index. Before we
|
||||||
|
// can add new values to the index, we'll first create the new bucket
|
||||||
|
// where these items will be housed.
|
||||||
|
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create node bucket: %v", err)
|
||||||
|
}
|
||||||
|
nodeUpdateIndex, err := nodes.CreateBucketIfNotExists(
|
||||||
|
nodeUpdateIndexBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create node update index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Populating new node update index bucket")
|
||||||
|
|
||||||
|
// Now that we know the bucket has been created, we'll iterate over the
|
||||||
|
// entire node bucket so we can add the (updateTime || nodePub) key
|
||||||
|
// into the node update index.
|
||||||
|
err = nodes.ForEach(func(nodePub, nodeInfo []byte) error {
|
||||||
|
if len(nodePub) != 33 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Tracef("Adding %x to node update index", nodePub)
|
||||||
|
|
||||||
|
// The first 8 bytes of a node's serialize data is the update
|
||||||
|
// time, so we can extract that without decoding the entire
|
||||||
|
// structure.
|
||||||
|
updateTime := nodeInfo[:8]
|
||||||
|
|
||||||
|
// Now that we have the update time, we can construct the key
|
||||||
|
// to insert into the index.
|
||||||
|
var indexKey [8 + 33]byte
|
||||||
|
copy(indexKey[:8], updateTime)
|
||||||
|
copy(indexKey[8:], nodePub)
|
||||||
|
|
||||||
|
return nodeUpdateIndex.Put(indexKey[:], nil)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to update node indexes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Populating new edge update index bucket")
|
||||||
|
|
||||||
|
// With the set of nodes updated, we'll now update all edges to have a
|
||||||
|
// corresponding entry in the edge update index.
|
||||||
|
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create edge bucket: %v", err)
|
||||||
|
}
|
||||||
|
edgeUpdateIndex, err := edges.CreateBucketIfNotExists(
|
||||||
|
edgeUpdateIndexBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to create edge update index: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We'll now run through each edge policy in the database, and update
|
||||||
|
// the index to ensure each edge has the proper record.
|
||||||
|
err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
|
||||||
|
if len(edgeKey) != 41 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we know this is the proper record, we'll grab the
|
||||||
|
// channel ID (last 8 bytes of the key), and then decode the
|
||||||
|
// edge policy so we can access the update time.
|
||||||
|
chanID := edgeKey[33:]
|
||||||
|
edgePolicyReader := bytes.NewReader(edgePolicyBytes)
|
||||||
|
|
||||||
|
edgePolicy, err := deserializeChanEdgePolicy(
|
||||||
|
edgePolicyReader, nodes,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Tracef("Adding chan_id=%v to edge update index",
|
||||||
|
edgePolicy.ChannelID)
|
||||||
|
|
||||||
|
// We'll now construct the index key using the channel ID, and
|
||||||
|
// the last time it was updated: (updateTime || chanID).
|
||||||
|
var indexKey [8 + 8]byte
|
||||||
|
byteOrder.PutUint64(
|
||||||
|
indexKey[:], uint64(edgePolicy.LastUpdate.Unix()),
|
||||||
|
)
|
||||||
|
copy(indexKey[8:], chanID)
|
||||||
|
|
||||||
|
return edgeUpdateIndex.Put(indexKey[:], nil)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to update edge indexes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migration to node and edge update indexes complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateInvoiceTimeSeries is a database migration that assigns all existing
|
||||||
|
// invoices an index in the add and/or the settle index. Additionally, all
|
||||||
|
// existing invoices will have their bytes padded out in order to encode the
|
||||||
|
// add+settle index as well as the amount paid.
|
||||||
|
func migrateInvoiceTimeSeries(tx *bbolt.Tx) error {
|
||||||
|
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
addIndex, err := invoices.CreateBucketIfNotExists(
|
||||||
|
addIndexBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
settleIndex, err := invoices.CreateBucketIfNotExists(
|
||||||
|
settleIndexBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migrating invoice database to new time series format")
|
||||||
|
|
||||||
|
// Now that we have all the buckets we need, we'll run through each
|
||||||
|
// invoice in the database, and update it to reflect the new format
|
||||||
|
// expected post migration.
|
||||||
|
// NOTE: we store the converted invoices and put them back into the
|
||||||
|
// database after the loop, since modifying the bucket within the
|
||||||
|
// ForEach loop is not safe.
|
||||||
|
var invoicesKeys [][]byte
|
||||||
|
var invoicesValues [][]byte
|
||||||
|
err = invoices.ForEach(func(invoiceNum, invoiceBytes []byte) error {
|
||||||
|
// If this is a sub bucket, then we'll skip it.
|
||||||
|
if invoiceBytes == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// First, we'll make a copy of the encoded invoice bytes.
|
||||||
|
invoiceBytesCopy := make([]byte, len(invoiceBytes))
|
||||||
|
copy(invoiceBytesCopy, invoiceBytes)
|
||||||
|
|
||||||
|
// With the bytes copied over, we'll append 24 additional
|
||||||
|
// bytes. We do this so we can decode the invoice under the new
|
||||||
|
// serialization format.
|
||||||
|
padding := bytes.Repeat([]byte{0}, 24)
|
||||||
|
invoiceBytesCopy = append(invoiceBytesCopy, padding...)
|
||||||
|
|
||||||
|
invoiceReader := bytes.NewReader(invoiceBytesCopy)
|
||||||
|
invoice, err := deserializeInvoiceLegacy(invoiceReader)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to decode invoice: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we have the fully decoded invoice, we can update
|
||||||
|
// the various indexes that we're added, and finally the
|
||||||
|
// invoice itself before re-inserting it.
|
||||||
|
|
||||||
|
// First, we'll get the new sequence in the addIndex in order
|
||||||
|
// to create the proper mapping.
|
||||||
|
nextAddSeqNo, err := addIndex.NextSequence()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var seqNoBytes [8]byte
|
||||||
|
byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo)
|
||||||
|
err = addIndex.Put(seqNoBytes[:], invoiceNum[:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Tracef("Adding invoice (preimage=%x, add_index=%v) to add "+
|
||||||
|
"time series", invoice.Terms.PaymentPreimage[:],
|
||||||
|
nextAddSeqNo)
|
||||||
|
|
||||||
|
// Next, we'll check if the invoice has been settled or not. If
|
||||||
|
// so, then we'll also add it to the settle index.
|
||||||
|
var nextSettleSeqNo uint64
|
||||||
|
if invoice.Terms.State == ContractSettled {
|
||||||
|
nextSettleSeqNo, err = settleIndex.NextSequence()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var seqNoBytes [8]byte
|
||||||
|
byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo)
|
||||||
|
err := settleIndex.Put(seqNoBytes[:], invoiceNum)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
invoice.AmtPaid = invoice.Terms.Value
|
||||||
|
|
||||||
|
log.Tracef("Adding invoice (preimage=%x, "+
|
||||||
|
"settle_index=%v) to add time series",
|
||||||
|
invoice.Terms.PaymentPreimage[:],
|
||||||
|
nextSettleSeqNo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, we'll update the invoice itself with the new
|
||||||
|
// indexing information as well as the amount paid if it has
|
||||||
|
// been settled or not.
|
||||||
|
invoice.AddIndex = nextAddSeqNo
|
||||||
|
invoice.SettleIndex = nextSettleSeqNo
|
||||||
|
|
||||||
|
// We've fully migrated an invoice, so we'll now update the
|
||||||
|
// invoice in-place.
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := serializeInvoiceLegacy(&b, &invoice); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the key and value pending update for after the ForEach
|
||||||
|
// is done.
|
||||||
|
invoicesKeys = append(invoicesKeys, invoiceNum)
|
||||||
|
invoicesValues = append(invoicesValues, b.Bytes())
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now put the converted invoices into the DB.
|
||||||
|
for i := range invoicesKeys {
|
||||||
|
key := invoicesKeys[i]
|
||||||
|
value := invoicesValues[i]
|
||||||
|
if err := invoices.Put(key, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migration to invoice time series index complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateInvoiceTimeSeriesOutgoingPayments is a follow up to the
|
||||||
|
// migrateInvoiceTimeSeries migration. As at the time of writing, the
|
||||||
|
// OutgoingPayment struct embeddeds an instance of the Invoice struct. As a
|
||||||
|
// result, we also need to migrate the internal invoice to the new format.
|
||||||
|
func migrateInvoiceTimeSeriesOutgoingPayments(tx *bbolt.Tx) error {
|
||||||
|
payBucket := tx.Bucket(paymentBucket)
|
||||||
|
if payBucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migrating invoice database to new outgoing payment format")
|
||||||
|
|
||||||
|
// We store the keys and values we want to modify since it is not safe
|
||||||
|
// to modify them directly within the ForEach loop.
|
||||||
|
var paymentKeys [][]byte
|
||||||
|
var paymentValues [][]byte
|
||||||
|
err := payBucket.ForEach(func(payID, paymentBytes []byte) error {
|
||||||
|
log.Tracef("Migrating payment %x", payID[:])
|
||||||
|
|
||||||
|
// The internal invoices for each payment only contain a
|
||||||
|
// populated contract term, and creation date, as a result,
|
||||||
|
// most of the bytes will be "empty".
|
||||||
|
|
||||||
|
// We'll calculate the end of the invoice index assuming a
|
||||||
|
// "minimal" index that's embedded within the greater
|
||||||
|
// OutgoingPayment. The breakdown is:
|
||||||
|
// 3 bytes empty var bytes, 16 bytes creation date, 16 bytes
|
||||||
|
// settled date, 32 bytes payment pre-image, 8 bytes value, 1
|
||||||
|
// byte settled.
|
||||||
|
endOfInvoiceIndex := 1 + 1 + 1 + 16 + 16 + 32 + 8 + 1
|
||||||
|
|
||||||
|
// We'll now extract the prefix of the pure invoice embedded
|
||||||
|
// within.
|
||||||
|
invoiceBytes := paymentBytes[:endOfInvoiceIndex]
|
||||||
|
|
||||||
|
// With the prefix extracted, we'll copy over the invoice, and
|
||||||
|
// also add padding for the new 24 bytes of fields, and finally
|
||||||
|
// append the remainder of the outgoing payment.
|
||||||
|
paymentCopy := make([]byte, len(invoiceBytes))
|
||||||
|
copy(paymentCopy[:], invoiceBytes)
|
||||||
|
|
||||||
|
padding := bytes.Repeat([]byte{0}, 24)
|
||||||
|
paymentCopy = append(paymentCopy, padding...)
|
||||||
|
paymentCopy = append(
|
||||||
|
paymentCopy, paymentBytes[endOfInvoiceIndex:]...,
|
||||||
|
)
|
||||||
|
|
||||||
|
// At this point, we now have the new format of the outgoing
|
||||||
|
// payments, we'll attempt to deserialize it to ensure the
|
||||||
|
// bytes are properly formatted.
|
||||||
|
paymentReader := bytes.NewReader(paymentCopy)
|
||||||
|
_, err := deserializeOutgoingPayment(paymentReader)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to deserialize payment: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we know the modifications was successful, we'll
|
||||||
|
// store it to our slice of keys and values, and write it back
|
||||||
|
// to disk in the new format after the ForEach loop is over.
|
||||||
|
paymentKeys = append(paymentKeys, payID)
|
||||||
|
paymentValues = append(paymentValues, paymentCopy)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally store the updated payments to the bucket.
|
||||||
|
for i := range paymentKeys {
|
||||||
|
key := paymentKeys[i]
|
||||||
|
value := paymentValues[i]
|
||||||
|
if err := payBucket.Put(key, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migration to outgoing payment invoices complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateEdgePolicies is a migration function that will update the edges
|
||||||
|
// bucket. It ensure that edges with unknown policies will also have an entry
|
||||||
|
// in the bucket. After the migration, there will be two edge entries for
|
||||||
|
// every channel, regardless of whether the policies are known.
|
||||||
|
func migrateEdgePolicies(tx *bbolt.Tx) error {
|
||||||
|
nodes := tx.Bucket(nodeBucket)
|
||||||
|
if nodes == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
edges := tx.Bucket(edgeBucket)
|
||||||
|
if edges == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
edgeIndex := edges.Bucket(edgeIndexBucket)
|
||||||
|
if edgeIndex == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkKey gets the policy from the database with a low-level call
|
||||||
|
// so that it is still possible to distinguish between unknown and
|
||||||
|
// not present.
|
||||||
|
checkKey := func(channelId uint64, keyBytes []byte) error {
|
||||||
|
var channelID [8]byte
|
||||||
|
byteOrder.PutUint64(channelID[:], channelId)
|
||||||
|
|
||||||
|
_, err := fetchChanEdgePolicy(edges,
|
||||||
|
channelID[:], keyBytes, nodes)
|
||||||
|
|
||||||
|
if err == ErrEdgeNotFound {
|
||||||
|
log.Tracef("Adding unknown edge policy present for node %x, channel %v",
|
||||||
|
keyBytes, channelId)
|
||||||
|
|
||||||
|
err := putChanEdgePolicyUnknown(edges, channelId, keyBytes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate over all channels and check both edge policies.
|
||||||
|
err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
|
||||||
|
infoReader := bytes.NewReader(edgeInfoBytes)
|
||||||
|
edgeInfo, err := deserializeChanEdgeInfo(infoReader)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:],
|
||||||
|
edgeInfo.NodeKey2Bytes[:]} {
|
||||||
|
|
||||||
|
if err := checkKey(edgeInfo.ChannelID, key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to update edge policies: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migration of edge policies complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// paymentStatusesMigration is a database migration intended for adding payment
|
||||||
|
// statuses for each existing payment entity in bucket to be able control
|
||||||
|
// transitions of statuses and prevent cases such as double payment
|
||||||
|
func paymentStatusesMigration(tx *bbolt.Tx) error {
|
||||||
|
// Get the bucket dedicated to storing statuses of payments,
|
||||||
|
// where a key is payment hash, value is payment status.
|
||||||
|
paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migrating database to support payment statuses")
|
||||||
|
|
||||||
|
circuitAddKey := []byte("circuit-adds")
|
||||||
|
circuits := tx.Bucket(circuitAddKey)
|
||||||
|
if circuits != nil {
|
||||||
|
log.Infof("Marking all known circuits with status InFlight")
|
||||||
|
|
||||||
|
err = circuits.ForEach(func(k, v []byte) error {
|
||||||
|
// Parse the first 8 bytes as the short chan ID for the
|
||||||
|
// circuit. We'll skip all short chan IDs are not
|
||||||
|
// locally initiated, which includes all non-zero short
|
||||||
|
// chan ids.
|
||||||
|
chanID := binary.BigEndian.Uint64(k[:8])
|
||||||
|
if chanID != 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The payment hash is the third item in the serialized
|
||||||
|
// payment circuit. The first two items are an AddRef
|
||||||
|
// (10 bytes) and the incoming circuit key (16 bytes).
|
||||||
|
const payHashOffset = 10 + 16
|
||||||
|
|
||||||
|
paymentHash := v[payHashOffset : payHashOffset+32]
|
||||||
|
|
||||||
|
return paymentStatuses.Put(
|
||||||
|
paymentHash[:], StatusInFlight.Bytes(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Marking all existing payments with status Completed")
|
||||||
|
|
||||||
|
// Get the bucket dedicated to storing payments
|
||||||
|
bucket := tx.Bucket(paymentBucket)
|
||||||
|
if bucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// For each payment in the bucket, deserialize the payment and mark it
|
||||||
|
// as completed.
|
||||||
|
err = bucket.ForEach(func(k, v []byte) error {
|
||||||
|
// Ignores if it is sub-bucket.
|
||||||
|
if v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
r := bytes.NewReader(v)
|
||||||
|
payment, err := deserializeOutgoingPayment(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate payment hash for current payment.
|
||||||
|
paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
|
||||||
|
|
||||||
|
// Update status for current payment to completed. If it fails,
|
||||||
|
// the migration is aborted and the payment bucket is returned
|
||||||
|
// to its previous state.
|
||||||
|
return paymentStatuses.Put(paymentHash[:], StatusSucceeded.Bytes())
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migration of payment statuses complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve
|
||||||
|
// some lingering bugs with regards to edge policies and their update index.
|
||||||
|
// Stale entries within the edge update index were not being properly pruned due
|
||||||
|
// to a miscalculation on the offset of an edge's policy last update. This
|
||||||
|
// migration also fixes the case where the public keys within edge policies were
|
||||||
|
// being serialized with an extra byte, causing an even greater error when
|
||||||
|
// attempting to perform the offset calculation described earlier.
|
||||||
|
func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error {
|
||||||
|
// To begin the migration, we'll retrieve the update index bucket. If it
|
||||||
|
// does not exist, we have nothing left to do so we can simply exit.
|
||||||
|
edges := tx.Bucket(edgeBucket)
|
||||||
|
if edges == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
|
||||||
|
if edgeUpdateIndex == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve some buckets that will be needed later on. These should
|
||||||
|
// already exist given the assumption that the buckets above do as
|
||||||
|
// well.
|
||||||
|
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error creating edge index bucket: %s", err)
|
||||||
|
}
|
||||||
|
if edgeIndex == nil {
|
||||||
|
return fmt.Errorf("unable to create/fetch edge index " +
|
||||||
|
"bucket")
|
||||||
|
}
|
||||||
|
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to make node bucket")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Migrating database to properly prune edge update index")
|
||||||
|
|
||||||
|
// We'll need to properly prune all the outdated entries within the edge
|
||||||
|
// update index. To do so, we'll gather all of the existing policies
|
||||||
|
// within the graph to re-populate them later on.
|
||||||
|
var edgeKeys [][]byte
|
||||||
|
err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
|
||||||
|
// All valid entries are indexed by a public key (33 bytes)
|
||||||
|
// followed by a channel ID (8 bytes), so we'll skip any entries
|
||||||
|
// with keys that do not match this.
|
||||||
|
if len(edgeKey) != 33+8 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
edgeKeys = append(edgeKeys, edgeKey)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to gather existing edge policies: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Constructing set of edge update entries to purge.")
|
||||||
|
|
||||||
|
// Build the set of keys that we will remove from the edge update index.
|
||||||
|
// This will include all keys contained within the bucket.
|
||||||
|
var updateKeysToRemove [][]byte
|
||||||
|
err = edgeUpdateIndex.ForEach(func(updKey, _ []byte) error {
|
||||||
|
updateKeysToRemove = append(updateKeysToRemove, updKey)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to gather existing edge updates: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Removing %d entries from edge update index.",
|
||||||
|
len(updateKeysToRemove))
|
||||||
|
|
||||||
|
// With the set of keys contained in the edge update index constructed,
|
||||||
|
// we'll proceed in purging all of them from the index.
|
||||||
|
for _, updKey := range updateKeysToRemove {
|
||||||
|
if err := edgeUpdateIndex.Delete(updKey); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Repopulating edge update index with %d valid entries.",
|
||||||
|
len(edgeKeys))
|
||||||
|
|
||||||
|
// For each edge key, we'll retrieve the policy, deserialize it, and
|
||||||
|
// re-add it to the different buckets. By doing so, we'll ensure that
|
||||||
|
// all existing edge policies are serialized correctly within their
|
||||||
|
// respective buckets and that the correct entries are populated within
|
||||||
|
// the edge update index.
|
||||||
|
for _, edgeKey := range edgeKeys {
|
||||||
|
edgePolicyBytes := edges.Get(edgeKey)
|
||||||
|
|
||||||
|
// Skip any entries with unknown policies as there will not be
|
||||||
|
// any entries for them in the edge update index.
|
||||||
|
if bytes.Equal(edgePolicyBytes[:], unknownPolicy) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
edgePolicy, err := deserializeChanEdgePolicy(
|
||||||
|
bytes.NewReader(edgePolicyBytes), nodes,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = updateEdgePolicy(tx, edgePolicy)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Migration to properly prune edge update index complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateOptionalChannelCloseSummaryFields migrates the serialized format of
|
||||||
|
// ChannelCloseSummary to a format where optional fields' presence is indicated
|
||||||
|
// with boolean markers.
|
||||||
|
func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error {
|
||||||
|
closedChanBucket := tx.Bucket(closedChannelBucket)
|
||||||
|
if closedChanBucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Migrating to new closed channel format...")
|
||||||
|
|
||||||
|
// We store the converted keys and values and put them back into the
|
||||||
|
// database after the loop, since modifying the bucket within the
|
||||||
|
// ForEach loop is not safe.
|
||||||
|
var closedChansKeys [][]byte
|
||||||
|
var closedChansValues [][]byte
|
||||||
|
err := closedChanBucket.ForEach(func(chanID, summary []byte) error {
|
||||||
|
r := bytes.NewReader(summary)
|
||||||
|
|
||||||
|
// Read the old (v6) format from the database.
|
||||||
|
c, err := deserializeCloseChannelSummaryV6(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serialize using the new format, and put back into the
|
||||||
|
// bucket.
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := serializeChannelCloseSummary(&b, c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that we know the modifications was successful, we'll
|
||||||
|
// Store the key and value to our slices, and write it back to
|
||||||
|
// disk in the new format after the ForEach loop is over.
|
||||||
|
closedChansKeys = append(closedChansKeys, chanID)
|
||||||
|
closedChansValues = append(closedChansValues, b.Bytes())
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to update closed channels: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now put the new format back into the DB.
|
||||||
|
for i := range closedChansKeys {
|
||||||
|
key := closedChansKeys[i]
|
||||||
|
value := closedChansValues[i]
|
||||||
|
if err := closedChanBucket.Put(key, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Migration to new closed channel format complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var messageStoreBucket = []byte("message-store")
|
||||||
|
|
||||||
|
// migrateGossipMessageStoreKeys migrates the key format for gossip messages
|
||||||
|
// found in the message store to a new one that takes into consideration the of
|
||||||
|
// the message being stored.
|
||||||
|
func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error {
|
||||||
|
// We'll start by retrieving the bucket in which these messages are
|
||||||
|
// stored within. If there isn't one, there's nothing left for us to do
|
||||||
|
// so we can avoid the migration.
|
||||||
|
messageStore := tx.Bucket(messageStoreBucket)
|
||||||
|
if messageStore == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Migrating to the gossip message store new key format")
|
||||||
|
|
||||||
|
// Otherwise we'll proceed with the migration. We'll start by coalescing
|
||||||
|
// all the current messages within the store, which are indexed by the
|
||||||
|
// public key of the peer which they should be sent to, followed by the
|
||||||
|
// short channel ID of the channel for which the message belongs to. We
|
||||||
|
// should only expect to find channel announcement signatures as that
|
||||||
|
// was the only support message type previously.
|
||||||
|
msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures)
|
||||||
|
err := messageStore.ForEach(func(k, v []byte) error {
|
||||||
|
var msgKey [33 + 8]byte
|
||||||
|
copy(msgKey[:], k)
|
||||||
|
|
||||||
|
msg := &lnwire.AnnounceSignatures{}
|
||||||
|
if err := msg.Decode(bytes.NewReader(v), 0); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs[msgKey] = msg
|
||||||
|
|
||||||
|
return nil
|
||||||
|
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then, we'll go over all of our messages, remove their previous entry,
|
||||||
|
// and add another with the new key format. Once we've done this for
|
||||||
|
// every message, we can consider the migration complete.
|
||||||
|
for oldMsgKey, msg := range msgs {
|
||||||
|
if err := messageStore.Delete(oldMsgKey[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct the new key for which we'll find this message with
|
||||||
|
// in the store. It'll be the same as the old, but we'll also
|
||||||
|
// include the message type.
|
||||||
|
var msgType [2]byte
|
||||||
|
binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType()))
|
||||||
|
newMsgKey := append(oldMsgKey[:], msgType[:]...)
|
||||||
|
|
||||||
|
// Serialize the message with its wire encoding.
|
||||||
|
var b bytes.Buffer
|
||||||
|
if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Migration to the gossip message store new key format complete!")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// migrateOutgoingPayments moves the OutgoingPayments into a new bucket format
|
||||||
|
// where they all reside in a top-level bucket indexed by the payment hash. In
|
||||||
|
// this sub-bucket we store information relevant to this payment, such as the
|
||||||
|
// payment status.
|
||||||
|
//
|
||||||
|
// Since the router cannot handle resumed payments that have the status
|
||||||
|
// InFlight (we have no PaymentAttemptInfo available for pre-migration
|
||||||
|
// payments) we delete those statuses, so only Completed payments remain in the
|
||||||
|
// new bucket structure.
|
||||||
|
func migrateOutgoingPayments(tx *bbolt.Tx) error {
|
||||||
|
log.Infof("Migrating outgoing payments to new bucket structure")
|
||||||
|
|
||||||
|
oldPayments := tx.Bucket(paymentBucket)
|
||||||
|
|
||||||
|
// Return early if there are no payments to migrate.
|
||||||
|
if oldPayments == nil {
|
||||||
|
log.Infof("No outgoing payments found, nothing to migrate.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
newPayments, err := tx.CreateBucket(paymentsRootBucket)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper method to get the source pubkey. We define it such that we
|
||||||
|
// only attempt to fetch it if needed.
|
||||||
|
sourcePub := func() ([33]byte, error) {
|
||||||
|
var pub [33]byte
|
||||||
|
nodes := tx.Bucket(nodeBucket)
|
||||||
|
if nodes == nil {
|
||||||
|
return pub, ErrGraphNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
selfPub := nodes.Get(sourceKey)
|
||||||
|
if selfPub == nil {
|
||||||
|
return pub, ErrSourceNodeNotSet
|
||||||
|
}
|
||||||
|
copy(pub[:], selfPub[:])
|
||||||
|
return pub, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err = oldPayments.ForEach(func(k, v []byte) error {
|
||||||
|
// Ignores if it is sub-bucket.
|
||||||
|
if v == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the old payment format.
|
||||||
|
r := bytes.NewReader(v)
|
||||||
|
payment, err := deserializeOutgoingPayment(r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate payment hash from the payment preimage.
|
||||||
|
paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
|
||||||
|
|
||||||
|
// Now create and add a PaymentCreationInfo to the bucket.
|
||||||
|
c := &PaymentCreationInfo{
|
||||||
|
PaymentHash: paymentHash,
|
||||||
|
Value: payment.Terms.Value,
|
||||||
|
CreationDate: payment.CreationDate,
|
||||||
|
PaymentRequest: payment.PaymentRequest,
|
||||||
|
}
|
||||||
|
|
||||||
|
var infoBuf bytes.Buffer
|
||||||
|
if err := serializePaymentCreationInfo(&infoBuf, c); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sourcePubKey, err := sourcePub()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do the same for the PaymentAttemptInfo.
|
||||||
|
totalAmt := payment.Terms.Value + payment.Fee
|
||||||
|
rt := route.Route{
|
||||||
|
TotalTimeLock: payment.TimeLockLength,
|
||||||
|
TotalAmount: totalAmt,
|
||||||
|
SourcePubKey: sourcePubKey,
|
||||||
|
Hops: []*route.Hop{},
|
||||||
|
}
|
||||||
|
for _, hop := range payment.Path {
|
||||||
|
rt.Hops = append(rt.Hops, &route.Hop{
|
||||||
|
PubKeyBytes: hop,
|
||||||
|
AmtToForward: totalAmt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since the old format didn't store the fee for individual
|
||||||
|
// hops, we let the last hop eat the whole fee for the total to
|
||||||
|
// add up.
|
||||||
|
if len(rt.Hops) > 0 {
|
||||||
|
rt.Hops[len(rt.Hops)-1].AmtToForward = payment.Terms.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we don't have the session key for old payments, we
|
||||||
|
// create a random one to be able to serialize the attempt
|
||||||
|
// info.
|
||||||
|
priv, _ := btcec.NewPrivateKey(btcec.S256())
|
||||||
|
s := &PaymentAttemptInfo{
|
||||||
|
PaymentID: 0, // unknown.
|
||||||
|
SessionKey: priv, // unknown.
|
||||||
|
Route: rt,
|
||||||
|
}
|
||||||
|
|
||||||
|
var attemptBuf bytes.Buffer
|
||||||
|
if err := serializePaymentAttemptInfoMigration9(&attemptBuf, s); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reuse the existing payment sequence number.
|
||||||
|
var seqNum [8]byte
|
||||||
|
copy(seqNum[:], k)
|
||||||
|
|
||||||
|
// Create a bucket indexed by the payment hash.
|
||||||
|
bucket, err := newPayments.CreateBucket(paymentHash[:])
|
||||||
|
|
||||||
|
// If the bucket already exists, it means that we are migrating
|
||||||
|
// from a database containing duplicate payments to a payment
|
||||||
|
// hash. To keep this information, we store such duplicate
|
||||||
|
// payments in a sub-bucket.
|
||||||
|
if err == bbolt.ErrBucketExists {
|
||||||
|
pHashBucket := newPayments.Bucket(paymentHash[:])
|
||||||
|
|
||||||
|
// Create a bucket for duplicate payments within this
|
||||||
|
// payment hash's bucket.
|
||||||
|
dup, err := pHashBucket.CreateBucketIfNotExists(
|
||||||
|
paymentDuplicateBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each duplicate will get its own sub-bucket within
|
||||||
|
// this bucket, so use their sequence number to index
|
||||||
|
// them by.
|
||||||
|
bucket, err = dup.CreateBucket(seqNum[:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store the payment's information to the bucket.
|
||||||
|
err = bucket.Put(paymentSequenceKey, seqNum[:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = bucket.Put(paymentCreationInfoKey, infoBuf.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = bucket.Put(paymentAttemptInfoKey, attemptBuf.Bytes())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = bucket.Put(paymentSettleInfoKey, payment.PaymentPreimage[:])
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// To continue producing unique sequence numbers, we set the sequence
|
||||||
|
// of the new bucket to that of the old one.
|
||||||
|
seq := oldPayments.Sequence()
|
||||||
|
if err := newPayments.SetSequence(seq); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now we delete the old buckets. Deleting the payment status buckets
|
||||||
|
// deletes all payment statuses other than Complete.
|
||||||
|
err = tx.DeleteBucket(paymentStatusBucket)
|
||||||
|
if err != nil && err != bbolt.ErrBucketNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally delete the old payment bucket.
|
||||||
|
err = tx.DeleteBucket(paymentBucket)
|
||||||
|
if err != nil && err != bbolt.ErrBucketNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Migration of outgoing payment bucket structure completed!")
|
||||||
|
return nil
|
||||||
|
}
|
@ -2,17 +2,732 @@ package channeldb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/btcsuite/btcutil"
|
||||||
"github.com/coreos/bbolt"
|
"github.com/coreos/bbolt"
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
"github.com/go-errors/errors"
|
||||||
"github.com/lightningnetwork/lnd/lntypes"
|
"github.com/lightningnetwork/lnd/lntypes"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/routing/route"
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TestPaymentStatusesMigration checks that already completed payments will have
|
||||||
|
// their payment statuses set to Completed after the migration.
|
||||||
|
func TestPaymentStatusesMigration(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
fakePayment := makeFakePayment()
|
||||||
|
paymentHash := sha256.Sum256(fakePayment.PaymentPreimage[:])
|
||||||
|
|
||||||
|
// Add fake payment to test database, verifying that it was created,
|
||||||
|
// that we have only one payment, and its status is not "Completed".
|
||||||
|
beforeMigrationFunc := func(d *DB) {
|
||||||
|
if err := d.addPayment(fakePayment); err != nil {
|
||||||
|
t.Fatalf("unable to add payment: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
payments, err := d.fetchAllPayments()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch payments: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(payments) != 1 {
|
||||||
|
t.Fatalf("wrong qty of paymets: expected 1, got %v",
|
||||||
|
len(payments))
|
||||||
|
}
|
||||||
|
|
||||||
|
paymentStatus, err := d.fetchPaymentStatus(paymentHash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch payment status: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should receive default status if we have any in database.
|
||||||
|
if paymentStatus != StatusUnknown {
|
||||||
|
t.Fatalf("wrong payment status: expected %v, got %v",
|
||||||
|
StatusUnknown.String(), paymentStatus.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lastly, we'll add a locally-sourced circuit and
|
||||||
|
// non-locally-sourced circuit to the circuit map. The
|
||||||
|
// locally-sourced payment should end up with an InFlight
|
||||||
|
// status, while the other should remain unchanged, which
|
||||||
|
// defaults to Grounded.
|
||||||
|
err = d.Update(func(tx *bbolt.Tx) error {
|
||||||
|
circuits, err := tx.CreateBucketIfNotExists(
|
||||||
|
[]byte("circuit-adds"),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
groundedKey := make([]byte, 16)
|
||||||
|
binary.BigEndian.PutUint64(groundedKey[:8], 1)
|
||||||
|
binary.BigEndian.PutUint64(groundedKey[8:], 1)
|
||||||
|
|
||||||
|
// Generated using TestHalfCircuitSerialization with nil
|
||||||
|
// ErrorEncrypter, which is the case for locally-sourced
|
||||||
|
// payments. No payment status should end up being set
|
||||||
|
// for this circuit, since the short channel id of the
|
||||||
|
// key is non-zero (e.g., a forwarded circuit). This
|
||||||
|
// will default it to Grounded.
|
||||||
|
groundedCircuit := []byte{
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x01,
|
||||||
|
// start payment hash
|
||||||
|
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
// end payment hash
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x0f,
|
||||||
|
0x42, 0x40, 0x00,
|
||||||
|
}
|
||||||
|
|
||||||
|
err = circuits.Put(groundedKey, groundedCircuit)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
inFlightKey := make([]byte, 16)
|
||||||
|
binary.BigEndian.PutUint64(inFlightKey[:8], 0)
|
||||||
|
binary.BigEndian.PutUint64(inFlightKey[8:], 1)
|
||||||
|
|
||||||
|
// Generated using TestHalfCircuitSerialization with nil
|
||||||
|
// ErrorEncrypter, which is not the case for forwarded
|
||||||
|
// payments, but should have no impact on the
|
||||||
|
// correctness of the test. The payment status for this
|
||||||
|
// circuit should be set to InFlight, since the short
|
||||||
|
// channel id in the key is 0 (sourceHop).
|
||||||
|
inFlightCircuit := []byte{
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x01,
|
||||||
|
// start payment hash
|
||||||
|
0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
// end payment hash
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x0f,
|
||||||
|
0x42, 0x40, 0x00,
|
||||||
|
}
|
||||||
|
|
||||||
|
return circuits.Put(inFlightKey, inFlightCircuit)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to add circuit map entry: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that the created payment status is "Completed" for our one
|
||||||
|
// fake payment.
|
||||||
|
afterMigrationFunc := func(d *DB) {
|
||||||
|
meta, err := d.FetchMeta(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if meta.DbVersionNumber != 1 {
|
||||||
|
t.Fatal("migration 'paymentStatusesMigration' wasn't applied")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that our completed payments were migrated.
|
||||||
|
paymentStatus, err := d.fetchPaymentStatus(paymentHash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch payment status: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if paymentStatus != StatusSucceeded {
|
||||||
|
t.Fatalf("wrong payment status: expected %v, got %v",
|
||||||
|
StatusSucceeded.String(), paymentStatus.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
inFlightHash := [32]byte{
|
||||||
|
0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the locally sourced payment was transitioned to
|
||||||
|
// InFlight.
|
||||||
|
paymentStatus, err = d.fetchPaymentStatus(inFlightHash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch payment status: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if paymentStatus != StatusInFlight {
|
||||||
|
t.Fatalf("wrong payment status: expected %v, got %v",
|
||||||
|
StatusInFlight.String(), paymentStatus.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
groundedHash := [32]byte{
|
||||||
|
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that non-locally sourced payments remain in the default
|
||||||
|
// Grounded state.
|
||||||
|
paymentStatus, err = d.fetchPaymentStatus(groundedHash)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch payment status: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if paymentStatus != StatusUnknown {
|
||||||
|
t.Fatalf("wrong payment status: expected %v, got %v",
|
||||||
|
StatusUnknown.String(), paymentStatus.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
applyMigration(t,
|
||||||
|
beforeMigrationFunc,
|
||||||
|
afterMigrationFunc,
|
||||||
|
paymentStatusesMigration,
|
||||||
|
false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMigrateOptionalChannelCloseSummaryFields properly converts a
|
||||||
|
// ChannelCloseSummary to the v7 format, where optional fields have their
|
||||||
|
// presence indicated with boolean markers.
|
||||||
|
func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
chanState, err := createTestChannelState(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create channel state: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var chanPointBuf bytes.Buffer
|
||||||
|
err = writeOutpoint(&chanPointBuf, &chanState.FundingOutpoint)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to write outpoint: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
chanID := chanPointBuf.Bytes()
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
closeSummary *ChannelCloseSummary
|
||||||
|
oldSerialization func(c *ChannelCloseSummary) []byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
// A close summary where none of the new fields are
|
||||||
|
// set.
|
||||||
|
closeSummary: &ChannelCloseSummary{
|
||||||
|
ChanPoint: chanState.FundingOutpoint,
|
||||||
|
ShortChanID: chanState.ShortChanID(),
|
||||||
|
ChainHash: chanState.ChainHash,
|
||||||
|
ClosingTXID: testTx.TxHash(),
|
||||||
|
CloseHeight: 100,
|
||||||
|
RemotePub: chanState.IdentityPub,
|
||||||
|
Capacity: chanState.Capacity,
|
||||||
|
SettledBalance: btcutil.Amount(50000),
|
||||||
|
CloseType: RemoteForceClose,
|
||||||
|
IsPending: true,
|
||||||
|
|
||||||
|
// The last fields will be unset.
|
||||||
|
RemoteCurrentRevocation: nil,
|
||||||
|
LocalChanConfig: ChannelConfig{},
|
||||||
|
RemoteNextRevocation: nil,
|
||||||
|
},
|
||||||
|
|
||||||
|
// In the old format the last field written is the
|
||||||
|
// IsPendingField. It should be converted by adding an
|
||||||
|
// extra boolean marker at the end to indicate that the
|
||||||
|
// remaining fields are not there.
|
||||||
|
oldSerialization: func(cs *ChannelCloseSummary) []byte {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err := WriteElements(&buf, cs.ChanPoint,
|
||||||
|
cs.ShortChanID, cs.ChainHash,
|
||||||
|
cs.ClosingTXID, cs.CloseHeight,
|
||||||
|
cs.RemotePub, cs.Capacity,
|
||||||
|
cs.SettledBalance, cs.TimeLockedBalance,
|
||||||
|
cs.CloseType, cs.IsPending,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For the old format, these are all the fields
|
||||||
|
// that are written.
|
||||||
|
return buf.Bytes()
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// A close summary where the new fields are present,
|
||||||
|
// but the optional RemoteNextRevocation field is not
|
||||||
|
// set.
|
||||||
|
closeSummary: &ChannelCloseSummary{
|
||||||
|
ChanPoint: chanState.FundingOutpoint,
|
||||||
|
ShortChanID: chanState.ShortChanID(),
|
||||||
|
ChainHash: chanState.ChainHash,
|
||||||
|
ClosingTXID: testTx.TxHash(),
|
||||||
|
CloseHeight: 100,
|
||||||
|
RemotePub: chanState.IdentityPub,
|
||||||
|
Capacity: chanState.Capacity,
|
||||||
|
SettledBalance: btcutil.Amount(50000),
|
||||||
|
CloseType: RemoteForceClose,
|
||||||
|
IsPending: true,
|
||||||
|
RemoteCurrentRevocation: chanState.RemoteCurrentRevocation,
|
||||||
|
LocalChanConfig: chanState.LocalChanCfg,
|
||||||
|
|
||||||
|
// RemoteNextRevocation is optional, and here
|
||||||
|
// it is not set.
|
||||||
|
RemoteNextRevocation: nil,
|
||||||
|
},
|
||||||
|
|
||||||
|
// In the old format the last field written is the
|
||||||
|
// LocalChanConfig. This indicates that the optional
|
||||||
|
// RemoteNextRevocation field is not present. It should
|
||||||
|
// be converted by adding boolean markers for all these
|
||||||
|
// fields.
|
||||||
|
oldSerialization: func(cs *ChannelCloseSummary) []byte {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err := WriteElements(&buf, cs.ChanPoint,
|
||||||
|
cs.ShortChanID, cs.ChainHash,
|
||||||
|
cs.ClosingTXID, cs.CloseHeight,
|
||||||
|
cs.RemotePub, cs.Capacity,
|
||||||
|
cs.SettledBalance, cs.TimeLockedBalance,
|
||||||
|
cs.CloseType, cs.IsPending,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = WriteElements(&buf, cs.RemoteCurrentRevocation)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = writeChanConfig(&buf, &cs.LocalChanConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoteNextRevocation is not written.
|
||||||
|
return buf.Bytes()
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// A close summary where all fields are present.
|
||||||
|
closeSummary: &ChannelCloseSummary{
|
||||||
|
ChanPoint: chanState.FundingOutpoint,
|
||||||
|
ShortChanID: chanState.ShortChanID(),
|
||||||
|
ChainHash: chanState.ChainHash,
|
||||||
|
ClosingTXID: testTx.TxHash(),
|
||||||
|
CloseHeight: 100,
|
||||||
|
RemotePub: chanState.IdentityPub,
|
||||||
|
Capacity: chanState.Capacity,
|
||||||
|
SettledBalance: btcutil.Amount(50000),
|
||||||
|
CloseType: RemoteForceClose,
|
||||||
|
IsPending: true,
|
||||||
|
RemoteCurrentRevocation: chanState.RemoteCurrentRevocation,
|
||||||
|
LocalChanConfig: chanState.LocalChanCfg,
|
||||||
|
|
||||||
|
// RemoteNextRevocation is optional, and in
|
||||||
|
// this case we set it.
|
||||||
|
RemoteNextRevocation: chanState.RemoteNextRevocation,
|
||||||
|
},
|
||||||
|
|
||||||
|
// In the old format all the fields are written. It
|
||||||
|
// should be converted by adding boolean markers for
|
||||||
|
// all these fields.
|
||||||
|
oldSerialization: func(cs *ChannelCloseSummary) []byte {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
err := WriteElements(&buf, cs.ChanPoint,
|
||||||
|
cs.ShortChanID, cs.ChainHash,
|
||||||
|
cs.ClosingTXID, cs.CloseHeight,
|
||||||
|
cs.RemotePub, cs.Capacity,
|
||||||
|
cs.SettledBalance, cs.TimeLockedBalance,
|
||||||
|
cs.CloseType, cs.IsPending,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = WriteElements(&buf, cs.RemoteCurrentRevocation)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = writeChanConfig(&buf, &cs.LocalChanConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = WriteElements(&buf, cs.RemoteNextRevocation)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf.Bytes()
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
|
||||||
|
// Before the migration we must add the old format to the DB.
|
||||||
|
beforeMigrationFunc := func(d *DB) {
|
||||||
|
|
||||||
|
// Get the old serialization format for this test's
|
||||||
|
// close summary, and it to the closed channel bucket.
|
||||||
|
old := test.oldSerialization(test.closeSummary)
|
||||||
|
err = d.Update(func(tx *bbolt.Tx) error {
|
||||||
|
closedChanBucket, err := tx.CreateBucketIfNotExists(
|
||||||
|
closedChannelBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return closedChanBucket.Put(chanID, old)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to add old serialization: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// After the migration it should be found in the new format.
|
||||||
|
afterMigrationFunc := func(d *DB) {
|
||||||
|
meta, err := d.FetchMeta(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if meta.DbVersionNumber != 1 {
|
||||||
|
t.Fatal("migration wasn't applied")
|
||||||
|
}
|
||||||
|
|
||||||
|
// We generate the new serialized version, to check
|
||||||
|
// against what is found in the DB.
|
||||||
|
var b bytes.Buffer
|
||||||
|
err = serializeChannelCloseSummary(&b, test.closeSummary)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to serialize: %v", err)
|
||||||
|
}
|
||||||
|
newSerialization := b.Bytes()
|
||||||
|
|
||||||
|
var dbSummary []byte
|
||||||
|
err = d.View(func(tx *bbolt.Tx) error {
|
||||||
|
closedChanBucket := tx.Bucket(closedChannelBucket)
|
||||||
|
if closedChanBucket == nil {
|
||||||
|
return errors.New("unable to find bucket")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the serialized verision from the DB and
|
||||||
|
// make sure it matches what we expected.
|
||||||
|
dbSummary = closedChanBucket.Get(chanID)
|
||||||
|
if !bytes.Equal(dbSummary, newSerialization) {
|
||||||
|
return fmt.Errorf("unexpected new " +
|
||||||
|
"serialization")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to view DB: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally we fetch the deserialized summary from the
|
||||||
|
// DB and check that it is equal to our original one.
|
||||||
|
dbChannels, err := d.FetchClosedChannels(false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch closed channels: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(dbChannels) != 1 {
|
||||||
|
t.Fatalf("expected 1 closed channels, found %v",
|
||||||
|
len(dbChannels))
|
||||||
|
}
|
||||||
|
|
||||||
|
dbChan := dbChannels[0]
|
||||||
|
if !reflect.DeepEqual(dbChan, test.closeSummary) {
|
||||||
|
dbChan.RemotePub.Curve = nil
|
||||||
|
test.closeSummary.RemotePub.Curve = nil
|
||||||
|
t.Fatalf("not equal: %v vs %v",
|
||||||
|
spew.Sdump(dbChan),
|
||||||
|
spew.Sdump(test.closeSummary))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
applyMigration(t,
|
||||||
|
beforeMigrationFunc,
|
||||||
|
afterMigrationFunc,
|
||||||
|
migrateOptionalChannelCloseSummaryFields,
|
||||||
|
false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMigrateGossipMessageStoreKeys ensures that the migration to the new
|
||||||
|
// gossip message store key format is successful/unsuccessful under various
|
||||||
|
// scenarios.
|
||||||
|
func TestMigrateGossipMessageStoreKeys(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
// Construct the message which we'll use to test the migration, along
|
||||||
|
// with its old and new key formats.
|
||||||
|
shortChanID := lnwire.ShortChannelID{BlockHeight: 10}
|
||||||
|
msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID}
|
||||||
|
|
||||||
|
var oldMsgKey [33 + 8]byte
|
||||||
|
copy(oldMsgKey[:33], pubKey.SerializeCompressed())
|
||||||
|
binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64())
|
||||||
|
|
||||||
|
var newMsgKey [33 + 8 + 2]byte
|
||||||
|
copy(newMsgKey[:41], oldMsgKey[:])
|
||||||
|
binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType()))
|
||||||
|
|
||||||
|
// Before the migration, we'll create the bucket where the messages
|
||||||
|
// should live and insert them.
|
||||||
|
beforeMigration := func(db *DB) {
|
||||||
|
var b bytes.Buffer
|
||||||
|
if err := msg.Encode(&b, 0); err != nil {
|
||||||
|
t.Fatalf("unable to serialize message: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
messageStore, err := tx.CreateBucketIfNotExists(
|
||||||
|
messageStoreBucket,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return messageStore.Put(oldMsgKey[:], b.Bytes())
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// After the migration, we'll make sure that:
|
||||||
|
// 1. We cannot find the message under its old key.
|
||||||
|
// 2. We can find the message under its new key.
|
||||||
|
// 3. The message matches the original.
|
||||||
|
afterMigration := func(db *DB) {
|
||||||
|
meta, err := db.FetchMeta(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch db version: %v", err)
|
||||||
|
}
|
||||||
|
if meta.DbVersionNumber != 1 {
|
||||||
|
t.Fatalf("migration should have succeeded but didn't")
|
||||||
|
}
|
||||||
|
|
||||||
|
var rawMsg []byte
|
||||||
|
err = db.View(func(tx *bbolt.Tx) error {
|
||||||
|
messageStore := tx.Bucket(messageStoreBucket)
|
||||||
|
if messageStore == nil {
|
||||||
|
return errors.New("message store bucket not " +
|
||||||
|
"found")
|
||||||
|
}
|
||||||
|
rawMsg = messageStore.Get(oldMsgKey[:])
|
||||||
|
if rawMsg != nil {
|
||||||
|
t.Fatal("expected to not find message under " +
|
||||||
|
"old key, but did")
|
||||||
|
}
|
||||||
|
rawMsg = messageStore.Get(newMsgKey[:])
|
||||||
|
if rawMsg == nil {
|
||||||
|
return fmt.Errorf("expected to find message " +
|
||||||
|
"under new key, but didn't")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to deserialize raw message: %v", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(msg, gotMsg) {
|
||||||
|
t.Fatalf("expected message: %v\ngot message: %v",
|
||||||
|
spew.Sdump(msg), spew.Sdump(gotMsg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
applyMigration(
|
||||||
|
t, beforeMigration, afterMigration,
|
||||||
|
migrateGossipMessageStoreKeys, false,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestOutgoingPaymentsMigration checks that OutgoingPayments are migrated to a
|
||||||
|
// new bucket structure after the migration.
|
||||||
|
func TestOutgoingPaymentsMigration(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
const numPayments = 4
|
||||||
|
var oldPayments []*outgoingPayment
|
||||||
|
|
||||||
|
// Add fake payments to test database, verifying that it was created.
|
||||||
|
beforeMigrationFunc := func(d *DB) {
|
||||||
|
for i := 0; i < numPayments; i++ {
|
||||||
|
var p *outgoingPayment
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// We fill the database with random payments. For the
|
||||||
|
// very last one we'll use a duplicate of the first, to
|
||||||
|
// ensure we are able to handle migration from a
|
||||||
|
// database that has copies.
|
||||||
|
if i < numPayments-1 {
|
||||||
|
p, err = makeRandomFakePayment()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create payment: %v",
|
||||||
|
err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
p = oldPayments[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := d.addPayment(p); err != nil {
|
||||||
|
t.Fatalf("unable to add payment: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
oldPayments = append(oldPayments, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
payments, err := d.fetchAllPayments()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch payments: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(payments) != numPayments {
|
||||||
|
t.Fatalf("wrong qty of paymets: expected %d got %v",
|
||||||
|
numPayments, len(payments))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that all payments were migrated.
|
||||||
|
afterMigrationFunc := func(d *DB) {
|
||||||
|
meta, err := d.FetchMeta(nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if meta.DbVersionNumber != 1 {
|
||||||
|
t.Fatal("migration 'paymentStatusesMigration' wasn't applied")
|
||||||
|
}
|
||||||
|
|
||||||
|
sentPayments, err := d.fetchPaymentsMigration9()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch sent payments: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(sentPayments) != numPayments {
|
||||||
|
t.Fatalf("expected %d payments, got %d", numPayments,
|
||||||
|
len(sentPayments))
|
||||||
|
}
|
||||||
|
|
||||||
|
graph := d.ChannelGraph()
|
||||||
|
sourceNode, err := graph.SourceNode()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to fetch source node: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, p := range sentPayments {
|
||||||
|
// The payment status should be Completed.
|
||||||
|
if p.Status != StatusSucceeded {
|
||||||
|
t.Fatalf("expected Completed, got %v", p.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the sequence number is preserved. They
|
||||||
|
// start counting at 1.
|
||||||
|
if p.sequenceNum != uint64(i+1) {
|
||||||
|
t.Fatalf("expected seqnum %d, got %d", i,
|
||||||
|
p.sequenceNum)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Order of payments should be be preserved.
|
||||||
|
old := oldPayments[i]
|
||||||
|
|
||||||
|
// Check the individial fields.
|
||||||
|
if p.Info.Value != old.Terms.Value {
|
||||||
|
t.Fatalf("value mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Info.CreationDate != old.CreationDate {
|
||||||
|
t.Fatalf("date mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(p.Info.PaymentRequest, old.PaymentRequest) {
|
||||||
|
t.Fatalf("payreq mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if *p.PaymentPreimage != old.PaymentPreimage {
|
||||||
|
t.Fatalf("preimage mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Attempt.Route.TotalFees() != old.Fee {
|
||||||
|
t.Fatalf("Fee mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Attempt.Route.TotalAmount != old.Fee+old.Terms.Value {
|
||||||
|
t.Fatalf("Total amount mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Attempt.Route.TotalTimeLock != old.TimeLockLength {
|
||||||
|
t.Fatalf("timelock mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Attempt.Route.SourcePubKey != sourceNode.PubKeyBytes {
|
||||||
|
t.Fatalf("source mismatch: %x vs %x",
|
||||||
|
p.Attempt.Route.SourcePubKey[:],
|
||||||
|
sourceNode.PubKeyBytes[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, hop := range old.Path {
|
||||||
|
if hop != p.Attempt.Route.Hops[i].PubKeyBytes {
|
||||||
|
t.Fatalf("path mismatch")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, check that the payment sequence number is updated
|
||||||
|
// to reflect the migrated payments.
|
||||||
|
err = d.View(func(tx *bbolt.Tx) error {
|
||||||
|
payments := tx.Bucket(paymentsRootBucket)
|
||||||
|
if payments == nil {
|
||||||
|
return fmt.Errorf("payments bucket not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
seq := payments.Sequence()
|
||||||
|
if seq != numPayments {
|
||||||
|
return fmt.Errorf("expected sequence to be "+
|
||||||
|
"%d, got %d", numPayments, seq)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
applyMigration(t,
|
||||||
|
beforeMigrationFunc,
|
||||||
|
afterMigrationFunc,
|
||||||
|
migrateOutgoingPayments,
|
||||||
|
false)
|
||||||
|
}
|
||||||
|
|
||||||
func makeRandPaymentCreationInfo() (*PaymentCreationInfo, error) {
|
func makeRandPaymentCreationInfo() (*PaymentCreationInfo, error) {
|
||||||
var payHash lntypes.Hash
|
var payHash lntypes.Hash
|
||||||
if _, err := rand.Read(payHash[:]); err != nil {
|
if _, err := rand.Read(payHash[:]); err != nil {
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -11,6 +12,7 @@ import (
|
|||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/lightningnetwork/lnd/lntypes"
|
"github.com/lightningnetwork/lnd/lntypes"
|
||||||
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
"github.com/lightningnetwork/lnd/routing/route"
|
"github.com/lightningnetwork/lnd/routing/route"
|
||||||
"github.com/lightningnetwork/lnd/tlv"
|
"github.com/lightningnetwork/lnd/tlv"
|
||||||
)
|
)
|
||||||
@ -51,6 +53,34 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func makeFakePayment() *outgoingPayment {
|
||||||
|
fakeInvoice := &Invoice{
|
||||||
|
// Use single second precision to avoid false positive test
|
||||||
|
// failures due to the monotonic time component.
|
||||||
|
CreationDate: time.Unix(time.Now().Unix(), 0),
|
||||||
|
Memo: []byte("fake memo"),
|
||||||
|
Receipt: []byte("fake receipt"),
|
||||||
|
PaymentRequest: []byte(""),
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(fakeInvoice.Terms.PaymentPreimage[:], rev[:])
|
||||||
|
fakeInvoice.Terms.Value = lnwire.NewMSatFromSatoshis(10000)
|
||||||
|
|
||||||
|
fakePath := make([][33]byte, 3)
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
copy(fakePath[i][:], bytes.Repeat([]byte{byte(i)}, 33))
|
||||||
|
}
|
||||||
|
|
||||||
|
fakePayment := &outgoingPayment{
|
||||||
|
Invoice: *fakeInvoice,
|
||||||
|
Fee: 101,
|
||||||
|
Path: fakePath,
|
||||||
|
TimeLockLength: 1000,
|
||||||
|
}
|
||||||
|
copy(fakePayment.PaymentPreimage[:], rev[:])
|
||||||
|
return fakePayment
|
||||||
|
}
|
||||||
|
|
||||||
func makeFakeInfo() (*PaymentCreationInfo, *PaymentAttemptInfo) {
|
func makeFakeInfo() (*PaymentCreationInfo, *PaymentAttemptInfo) {
|
||||||
var preimg lntypes.Preimage
|
var preimg lntypes.Preimage
|
||||||
copy(preimg[:], rev[:])
|
copy(preimg[:], rev[:])
|
||||||
@ -72,6 +102,70 @@ func makeFakeInfo() (*PaymentCreationInfo, *PaymentAttemptInfo) {
|
|||||||
return c, a
|
return c, a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// randomBytes creates random []byte with length in range [minLen, maxLen)
|
||||||
|
func randomBytes(minLen, maxLen int) ([]byte, error) {
|
||||||
|
randBuf := make([]byte, minLen+rand.Intn(maxLen-minLen))
|
||||||
|
|
||||||
|
if _, err := rand.Read(randBuf); err != nil {
|
||||||
|
return nil, fmt.Errorf("Internal error. "+
|
||||||
|
"Cannot generate random string: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return randBuf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeRandomFakePayment() (*outgoingPayment, error) {
|
||||||
|
var err error
|
||||||
|
fakeInvoice := &Invoice{
|
||||||
|
// Use single second precision to avoid false positive test
|
||||||
|
// failures due to the monotonic time component.
|
||||||
|
CreationDate: time.Unix(time.Now().Unix(), 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeInvoice.Memo, err = randomBytes(1, 50)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeInvoice.Receipt, err = randomBytes(1, 50)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
fakeInvoice.PaymentRequest, err = randomBytes(1, 50)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
preImg, err := randomBytes(32, 33)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
copy(fakeInvoice.Terms.PaymentPreimage[:], preImg)
|
||||||
|
|
||||||
|
fakeInvoice.Terms.Value = lnwire.MilliSatoshi(rand.Intn(10000))
|
||||||
|
|
||||||
|
fakePathLen := 1 + rand.Intn(5)
|
||||||
|
fakePath := make([][33]byte, fakePathLen)
|
||||||
|
for i := 0; i < fakePathLen; i++ {
|
||||||
|
b, err := randomBytes(33, 34)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
copy(fakePath[i][:], b)
|
||||||
|
}
|
||||||
|
|
||||||
|
fakePayment := &outgoingPayment{
|
||||||
|
Invoice: *fakeInvoice,
|
||||||
|
Fee: lnwire.MilliSatoshi(rand.Intn(1001)),
|
||||||
|
Path: fakePath,
|
||||||
|
TimeLockLength: uint32(rand.Intn(10000)),
|
||||||
|
}
|
||||||
|
copy(fakePayment.PaymentPreimage[:], fakeInvoice.Terms.PaymentPreimage[:])
|
||||||
|
|
||||||
|
return fakePayment, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestSentPaymentSerialization(t *testing.T) {
|
func TestSentPaymentSerialization(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user