From 0386e0c297d98bc2ab08e00fb627c770b17ea0d3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 12 Oct 2019 00:47:45 -0700 Subject: [PATCH] Revert "channeldb: remove unsupported migrations" This reverts commit 3ab4c749c6da4c2693b3978c0a0863787c9c3495. --- channeldb/db.go | 70 ++ channeldb/legacy_serialization.go | 55 + .../migration_09_legacy_serialization.go | 497 +++++++++ channeldb/migrations.go | 939 ++++++++++++++++++ channeldb/migrations_test.go | 715 +++++++++++++ channeldb/payments_test.go | 94 ++ 6 files changed, 2370 insertions(+) create mode 100644 channeldb/legacy_serialization.go create mode 100644 channeldb/migration_09_legacy_serialization.go create mode 100644 channeldb/migrations.go diff --git a/channeldb/db.go b/channeldb/db.go index 667ea418..59706e00 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -37,6 +37,72 @@ var ( // for retrieving all migration function that are need to apply to the // current db. dbVersions = []version{ + { + // The base DB version requires no migration. + number: 0, + migration: nil, + }, + { + // The version of the database where two new indexes + // for the update time of node and channel updates were + // added. + number: 1, + migration: migrateNodeAndEdgeUpdateIndex, + }, + { + // The DB version that added the invoice event time + // series. + number: 2, + migration: migrateInvoiceTimeSeries, + }, + { + // The DB version that updated the embedded invoice in + // outgoing payments to match the new format. + number: 3, + migration: migrateInvoiceTimeSeriesOutgoingPayments, + }, + { + // The version of the database where every channel + // always has two entries in the edges bucket. If + // a policy is unknown, this will be represented + // by a special byte sequence. + number: 4, + migration: migrateEdgePolicies, + }, + { + // The DB version where we persist each attempt to send + // an HTLC to a payment hash, and track whether the + // payment is in-flight, succeeded, or failed. + number: 5, + migration: paymentStatusesMigration, + }, + { + // The DB version that properly prunes stale entries + // from the edge update index. + number: 6, + migration: migratePruneEdgeUpdateIndex, + }, + { + // The DB version that migrates the ChannelCloseSummary + // to a format where optional fields are indicated with + // boolean flags. + number: 7, + migration: migrateOptionalChannelCloseSummaryFields, + }, + { + // The DB version that changes the gossiper's message + // store keys to account for the message's type and + // ShortChannelID. + number: 8, + migration: migrateGossipMessageStoreKeys, + }, + { + // The DB version where the payments and payment + // statuses are moved to being stored in a combined + // bucket. + number: 9, + migration: migrateOutgoingPayments, + }, { // The DB version where we started to store legacy // payload information for all routes, as well as the @@ -200,6 +266,10 @@ func createChannelDB(dbPath string) error { return err } + if _, err := tx.CreateBucket(paymentBucket); err != nil { + return err + } + if _, err := tx.CreateBucket(nodeInfoBucket); err != nil { return err } diff --git a/channeldb/legacy_serialization.go b/channeldb/legacy_serialization.go new file mode 100644 index 00000000..3ca9b5ec --- /dev/null +++ b/channeldb/legacy_serialization.go @@ -0,0 +1,55 @@ +package channeldb + +import ( + "io" +) + +// deserializeCloseChannelSummaryV6 reads the v6 database format for +// ChannelCloseSummary. +// +// NOTE: deprecated, only for migration. +func deserializeCloseChannelSummaryV6(r io.Reader) (*ChannelCloseSummary, error) { + c := &ChannelCloseSummary{} + + err := ReadElements(r, + &c.ChanPoint, &c.ShortChanID, &c.ChainHash, &c.ClosingTXID, + &c.CloseHeight, &c.RemotePub, &c.Capacity, &c.SettledBalance, + &c.TimeLockedBalance, &c.CloseType, &c.IsPending, + ) + if err != nil { + return nil, err + } + + // We'll now check to see if the channel close summary was encoded with + // any of the additional optional fields. + err = ReadElements(r, &c.RemoteCurrentRevocation) + switch { + case err == io.EOF: + return c, nil + + // If we got a non-eof error, then we know there's an actually issue. + // Otherwise, it may have been the case that this summary didn't have + // the set of optional fields. + case err != nil: + return nil, err + } + + if err := readChanConfig(r, &c.LocalChanConfig); err != nil { + return nil, err + } + + // Finally, we'll attempt to read the next unrevoked commitment point + // for the remote party. If we closed the channel before receiving a + // funding locked message, then this can be nil. As a result, we'll use + // the same technique to read the field, only if there's still data + // left in the buffer. + err = ReadElements(r, &c.RemoteNextRevocation) + if err != nil && err != io.EOF { + // If we got a non-eof error, then we know there's an actually + // issue. Otherwise, it may have been the case that this + // summary didn't have the set of optional fields. + return nil, err + } + + return c, nil +} diff --git a/channeldb/migration_09_legacy_serialization.go b/channeldb/migration_09_legacy_serialization.go new file mode 100644 index 00000000..1205cf9b --- /dev/null +++ b/channeldb/migration_09_legacy_serialization.go @@ -0,0 +1,497 @@ +package channeldb + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "sort" + + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) + +var ( + // paymentBucket is the name of the bucket within the database that + // stores all data related to payments. + // + // Within the payments bucket, each invoice is keyed by its invoice ID + // which is a monotonically increasing uint64. BoltDB's sequence + // feature is used for generating monotonically increasing id. + // + // NOTE: Deprecated. Kept around for migration purposes. + paymentBucket = []byte("payments") + + // paymentStatusBucket is the name of the bucket within the database + // that stores the status of a payment indexed by the payment's + // preimage. + // + // NOTE: Deprecated. Kept around for migration purposes. + paymentStatusBucket = []byte("payment-status") +) + +// outgoingPayment represents a successful payment between the daemon and a +// remote node. Details such as the total fee paid, and the time of the payment +// are stored. +// +// NOTE: Deprecated. Kept around for migration purposes. +type outgoingPayment struct { + Invoice + + // Fee is the total fee paid for the payment in milli-satoshis. + Fee lnwire.MilliSatoshi + + // TotalTimeLock is the total cumulative time-lock in the HTLC extended + // from the second-to-last hop to the destination. + TimeLockLength uint32 + + // Path encodes the path the payment took through the network. The path + // excludes the outgoing node and consists of the hex-encoded + // compressed public key of each of the nodes involved in the payment. + Path [][33]byte + + // PaymentPreimage is the preImage of a successful payment. This is used + // to calculate the PaymentHash as well as serve as a proof of payment. + PaymentPreimage [32]byte +} + +// addPayment saves a successful payment to the database. It is assumed that +// all payment are sent using unique payment hashes. +// +// NOTE: Deprecated. Kept around for migration purposes. +func (db *DB) addPayment(payment *outgoingPayment) error { + // Validate the field of the inner voice within the outgoing payment, + // these must also adhere to the same constraints as regular invoices. + if err := validateInvoice(&payment.Invoice); err != nil { + return err + } + + // We first serialize the payment before starting the database + // transaction so we can avoid creating a DB payment in the case of a + // serialization error. + var b bytes.Buffer + if err := serializeOutgoingPayment(&b, payment); err != nil { + return err + } + paymentBytes := b.Bytes() + + return db.Batch(func(tx *bbolt.Tx) error { + payments, err := tx.CreateBucketIfNotExists(paymentBucket) + if err != nil { + return err + } + + // Obtain the new unique sequence number for this payment. + paymentID, err := payments.NextSequence() + if err != nil { + return err + } + + // We use BigEndian for keys as it orders keys in + // ascending order. This allows bucket scans to order payments + // in the order in which they were created. + paymentIDBytes := make([]byte, 8) + binary.BigEndian.PutUint64(paymentIDBytes, paymentID) + + return payments.Put(paymentIDBytes, paymentBytes) + }) +} + +// fetchAllPayments returns all outgoing payments in DB. +// +// NOTE: Deprecated. Kept around for migration purposes. +func (db *DB) fetchAllPayments() ([]*outgoingPayment, error) { + var payments []*outgoingPayment + + err := db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(paymentBucket) + if bucket == nil { + return ErrNoPaymentsCreated + } + + return bucket.ForEach(func(k, v []byte) error { + // If the value is nil, then we ignore it as it may be + // a sub-bucket. + if v == nil { + return nil + } + + r := bytes.NewReader(v) + payment, err := deserializeOutgoingPayment(r) + if err != nil { + return err + } + + payments = append(payments, payment) + return nil + }) + }) + if err != nil { + return nil, err + } + + return payments, nil +} + +// fetchPaymentStatus returns the payment status for outgoing payment. +// If status of the payment isn't found, it will default to "StatusUnknown". +// +// NOTE: Deprecated. Kept around for migration purposes. +func (db *DB) fetchPaymentStatus(paymentHash [32]byte) (PaymentStatus, error) { + var paymentStatus = StatusUnknown + err := db.View(func(tx *bbolt.Tx) error { + var err error + paymentStatus, err = fetchPaymentStatusTx(tx, paymentHash) + return err + }) + if err != nil { + return StatusUnknown, err + } + + return paymentStatus, nil +} + +// fetchPaymentStatusTx is a helper method that returns the payment status for +// outgoing payment. If status of the payment isn't found, it will default to +// "StatusUnknown". It accepts the boltdb transactions such that this method +// can be composed into other atomic operations. +// +// NOTE: Deprecated. Kept around for migration purposes. +func fetchPaymentStatusTx(tx *bbolt.Tx, paymentHash [32]byte) (PaymentStatus, error) { + // The default status for all payments that aren't recorded in database. + var paymentStatus = StatusUnknown + + bucket := tx.Bucket(paymentStatusBucket) + if bucket == nil { + return paymentStatus, nil + } + + paymentStatusBytes := bucket.Get(paymentHash[:]) + if paymentStatusBytes == nil { + return paymentStatus, nil + } + + paymentStatus.FromBytes(paymentStatusBytes) + + return paymentStatus, nil +} + +func serializeOutgoingPayment(w io.Writer, p *outgoingPayment) error { + var scratch [8]byte + + if err := serializeInvoiceLegacy(w, &p.Invoice); err != nil { + return err + } + + byteOrder.PutUint64(scratch[:], uint64(p.Fee)) + if _, err := w.Write(scratch[:]); err != nil { + return err + } + + // First write out the length of the bytes to prefix the value. + pathLen := uint32(len(p.Path)) + byteOrder.PutUint32(scratch[:4], pathLen) + if _, err := w.Write(scratch[:4]); err != nil { + return err + } + + // Then with the path written, we write out the series of public keys + // involved in the path. + for _, hop := range p.Path { + if _, err := w.Write(hop[:]); err != nil { + return err + } + } + + byteOrder.PutUint32(scratch[:4], p.TimeLockLength) + if _, err := w.Write(scratch[:4]); err != nil { + return err + } + + if _, err := w.Write(p.PaymentPreimage[:]); err != nil { + return err + } + + return nil +} + +func deserializeOutgoingPayment(r io.Reader) (*outgoingPayment, error) { + var scratch [8]byte + + p := &outgoingPayment{} + + inv, err := deserializeInvoiceLegacy(r) + if err != nil { + return nil, err + } + p.Invoice = inv + + if _, err := r.Read(scratch[:]); err != nil { + return nil, err + } + p.Fee = lnwire.MilliSatoshi(byteOrder.Uint64(scratch[:])) + + if _, err = r.Read(scratch[:4]); err != nil { + return nil, err + } + pathLen := byteOrder.Uint32(scratch[:4]) + + path := make([][33]byte, pathLen) + for i := uint32(0); i < pathLen; i++ { + if _, err := r.Read(path[i][:]); err != nil { + return nil, err + } + } + p.Path = path + + if _, err = r.Read(scratch[:4]); err != nil { + return nil, err + } + p.TimeLockLength = byteOrder.Uint32(scratch[:4]) + + if _, err := r.Read(p.PaymentPreimage[:]); err != nil { + return nil, err + } + + return p, nil +} + +// serializePaymentAttemptInfoMigration9 is the serializePaymentAttemptInfo +// version as existed when migration #9 was created. We keep this around, along +// with the methods below to ensure that clients that upgrade will use the +// correct version of this method. +func serializePaymentAttemptInfoMigration9(w io.Writer, a *PaymentAttemptInfo) error { + if err := WriteElements(w, a.PaymentID, a.SessionKey); err != nil { + return err + } + + if err := serializeRouteMigration9(w, a.Route); err != nil { + return err + } + + return nil +} + +func serializeHopMigration9(w io.Writer, h *route.Hop) error { + if err := WriteElements(w, + h.PubKeyBytes[:], h.ChannelID, h.OutgoingTimeLock, + h.AmtToForward, + ); err != nil { + return err + } + + return nil +} + +func serializeRouteMigration9(w io.Writer, r route.Route) error { + if err := WriteElements(w, + r.TotalTimeLock, r.TotalAmount, r.SourcePubKey[:], + ); err != nil { + return err + } + + if err := WriteElements(w, uint32(len(r.Hops))); err != nil { + return err + } + + for _, h := range r.Hops { + if err := serializeHopMigration9(w, h); err != nil { + return err + } + } + + return nil +} + +func deserializePaymentAttemptInfoMigration9(r io.Reader) (*PaymentAttemptInfo, error) { + a := &PaymentAttemptInfo{} + err := ReadElements(r, &a.PaymentID, &a.SessionKey) + if err != nil { + return nil, err + } + a.Route, err = deserializeRouteMigration9(r) + if err != nil { + return nil, err + } + return a, nil +} + +func deserializeRouteMigration9(r io.Reader) (route.Route, error) { + rt := route.Route{} + if err := ReadElements(r, + &rt.TotalTimeLock, &rt.TotalAmount, + ); err != nil { + return rt, err + } + + var pub []byte + if err := ReadElements(r, &pub); err != nil { + return rt, err + } + copy(rt.SourcePubKey[:], pub) + + var numHops uint32 + if err := ReadElements(r, &numHops); err != nil { + return rt, err + } + + var hops []*route.Hop + for i := uint32(0); i < numHops; i++ { + hop, err := deserializeHopMigration9(r) + if err != nil { + return rt, err + } + hops = append(hops, hop) + } + rt.Hops = hops + + return rt, nil +} + +func deserializeHopMigration9(r io.Reader) (*route.Hop, error) { + h := &route.Hop{} + + var pub []byte + if err := ReadElements(r, &pub); err != nil { + return nil, err + } + copy(h.PubKeyBytes[:], pub) + + if err := ReadElements(r, + &h.ChannelID, &h.OutgoingTimeLock, &h.AmtToForward, + ); err != nil { + return nil, err + } + + return h, nil +} + +// fetchPaymentsMigration9 returns all sent payments found in the DB using the +// payment attempt info format that was present as of migration #9. We need +// this as otherwise, the current FetchPayments version will use the latest +// decoding format. Note that we only need this for the +// TestOutgoingPaymentsMigration migration test case. +func (db *DB) fetchPaymentsMigration9() ([]*Payment, error) { + var payments []*Payment + + err := db.View(func(tx *bbolt.Tx) error { + paymentsBucket := tx.Bucket(paymentsRootBucket) + if paymentsBucket == nil { + return nil + } + + return paymentsBucket.ForEach(func(k, v []byte) error { + bucket := paymentsBucket.Bucket(k) + if bucket == nil { + // We only expect sub-buckets to be found in + // this top-level bucket. + return fmt.Errorf("non bucket element in " + + "payments bucket") + } + + p, err := fetchPaymentMigration9(bucket) + if err != nil { + return err + } + + payments = append(payments, p) + + // For older versions of lnd, duplicate payments to a + // payment has was possible. These will be found in a + // sub-bucket indexed by their sequence number if + // available. + dup := bucket.Bucket(paymentDuplicateBucket) + if dup == nil { + return nil + } + + return dup.ForEach(func(k, v []byte) error { + subBucket := dup.Bucket(k) + if subBucket == nil { + // We one bucket for each duplicate to + // be found. + return fmt.Errorf("non bucket element" + + "in duplicate bucket") + } + + p, err := fetchPaymentMigration9(subBucket) + if err != nil { + return err + } + + payments = append(payments, p) + return nil + }) + }) + }) + if err != nil { + return nil, err + } + + // Before returning, sort the payments by their sequence number. + sort.Slice(payments, func(i, j int) bool { + return payments[i].sequenceNum < payments[j].sequenceNum + }) + + return payments, nil +} + +func fetchPaymentMigration9(bucket *bbolt.Bucket) (*Payment, error) { + var ( + err error + p = &Payment{} + ) + + seqBytes := bucket.Get(paymentSequenceKey) + if seqBytes == nil { + return nil, fmt.Errorf("sequence number not found") + } + + p.sequenceNum = binary.BigEndian.Uint64(seqBytes) + + // Get the payment status. + p.Status = fetchPaymentStatus(bucket) + + // Get the PaymentCreationInfo. + b := bucket.Get(paymentCreationInfoKey) + if b == nil { + return nil, fmt.Errorf("creation info not found") + } + + r := bytes.NewReader(b) + p.Info, err = deserializePaymentCreationInfo(r) + if err != nil { + return nil, err + + } + + // Get the PaymentAttemptInfo. This can be unset. + b = bucket.Get(paymentAttemptInfoKey) + if b != nil { + r = bytes.NewReader(b) + p.Attempt, err = deserializePaymentAttemptInfoMigration9(r) + if err != nil { + return nil, err + } + } + + // Get the payment preimage. This is only found for + // completed payments. + b = bucket.Get(paymentSettleInfoKey) + if b != nil { + var preimg lntypes.Preimage + copy(preimg[:], b[:]) + p.PaymentPreimage = &preimg + } + + // Get failure reason if available. + b = bucket.Get(paymentFailInfoKey) + if b != nil { + reason := FailureReason(b[0]) + p.Failure = &reason + } + + return p, nil +} diff --git a/channeldb/migrations.go b/channeldb/migrations.go new file mode 100644 index 00000000..a78d1314 --- /dev/null +++ b/channeldb/migrations.go @@ -0,0 +1,939 @@ +package channeldb + +import ( + "bytes" + "crypto/sha256" + "encoding/binary" + "fmt" + + "github.com/btcsuite/btcd/btcec" + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) + +// migrateNodeAndEdgeUpdateIndex is a migration function that will update the +// database from version 0 to version 1. In version 1, we add two new indexes +// (one for nodes and one for edges) to keep track of the last time a node or +// edge was updated on the network. These new indexes allow us to implement the +// new graph sync protocol added. +func migrateNodeAndEdgeUpdateIndex(tx *bbolt.Tx) error { + // First, we'll populating the node portion of the new index. Before we + // can add new values to the index, we'll first create the new bucket + // where these items will be housed. + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return fmt.Errorf("unable to create node bucket: %v", err) + } + nodeUpdateIndex, err := nodes.CreateBucketIfNotExists( + nodeUpdateIndexBucket, + ) + if err != nil { + return fmt.Errorf("unable to create node update index: %v", err) + } + + log.Infof("Populating new node update index bucket") + + // Now that we know the bucket has been created, we'll iterate over the + // entire node bucket so we can add the (updateTime || nodePub) key + // into the node update index. + err = nodes.ForEach(func(nodePub, nodeInfo []byte) error { + if len(nodePub) != 33 { + return nil + } + + log.Tracef("Adding %x to node update index", nodePub) + + // The first 8 bytes of a node's serialize data is the update + // time, so we can extract that without decoding the entire + // structure. + updateTime := nodeInfo[:8] + + // Now that we have the update time, we can construct the key + // to insert into the index. + var indexKey [8 + 33]byte + copy(indexKey[:8], updateTime) + copy(indexKey[8:], nodePub) + + return nodeUpdateIndex.Put(indexKey[:], nil) + }) + if err != nil { + return fmt.Errorf("unable to update node indexes: %v", err) + } + + log.Infof("Populating new edge update index bucket") + + // With the set of nodes updated, we'll now update all edges to have a + // corresponding entry in the edge update index. + edges, err := tx.CreateBucketIfNotExists(edgeBucket) + if err != nil { + return fmt.Errorf("unable to create edge bucket: %v", err) + } + edgeUpdateIndex, err := edges.CreateBucketIfNotExists( + edgeUpdateIndexBucket, + ) + if err != nil { + return fmt.Errorf("unable to create edge update index: %v", err) + } + + // We'll now run through each edge policy in the database, and update + // the index to ensure each edge has the proper record. + err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error { + if len(edgeKey) != 41 { + return nil + } + + // Now that we know this is the proper record, we'll grab the + // channel ID (last 8 bytes of the key), and then decode the + // edge policy so we can access the update time. + chanID := edgeKey[33:] + edgePolicyReader := bytes.NewReader(edgePolicyBytes) + + edgePolicy, err := deserializeChanEdgePolicy( + edgePolicyReader, nodes, + ) + if err != nil { + return err + } + + log.Tracef("Adding chan_id=%v to edge update index", + edgePolicy.ChannelID) + + // We'll now construct the index key using the channel ID, and + // the last time it was updated: (updateTime || chanID). + var indexKey [8 + 8]byte + byteOrder.PutUint64( + indexKey[:], uint64(edgePolicy.LastUpdate.Unix()), + ) + copy(indexKey[8:], chanID) + + return edgeUpdateIndex.Put(indexKey[:], nil) + }) + if err != nil { + return fmt.Errorf("unable to update edge indexes: %v", err) + } + + log.Infof("Migration to node and edge update indexes complete!") + + return nil +} + +// migrateInvoiceTimeSeries is a database migration that assigns all existing +// invoices an index in the add and/or the settle index. Additionally, all +// existing invoices will have their bytes padded out in order to encode the +// add+settle index as well as the amount paid. +func migrateInvoiceTimeSeries(tx *bbolt.Tx) error { + invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) + if err != nil { + return err + } + + addIndex, err := invoices.CreateBucketIfNotExists( + addIndexBucket, + ) + if err != nil { + return err + } + settleIndex, err := invoices.CreateBucketIfNotExists( + settleIndexBucket, + ) + if err != nil { + return err + } + + log.Infof("Migrating invoice database to new time series format") + + // Now that we have all the buckets we need, we'll run through each + // invoice in the database, and update it to reflect the new format + // expected post migration. + // NOTE: we store the converted invoices and put them back into the + // database after the loop, since modifying the bucket within the + // ForEach loop is not safe. + var invoicesKeys [][]byte + var invoicesValues [][]byte + err = invoices.ForEach(func(invoiceNum, invoiceBytes []byte) error { + // If this is a sub bucket, then we'll skip it. + if invoiceBytes == nil { + return nil + } + + // First, we'll make a copy of the encoded invoice bytes. + invoiceBytesCopy := make([]byte, len(invoiceBytes)) + copy(invoiceBytesCopy, invoiceBytes) + + // With the bytes copied over, we'll append 24 additional + // bytes. We do this so we can decode the invoice under the new + // serialization format. + padding := bytes.Repeat([]byte{0}, 24) + invoiceBytesCopy = append(invoiceBytesCopy, padding...) + + invoiceReader := bytes.NewReader(invoiceBytesCopy) + invoice, err := deserializeInvoiceLegacy(invoiceReader) + if err != nil { + return fmt.Errorf("unable to decode invoice: %v", err) + } + + // Now that we have the fully decoded invoice, we can update + // the various indexes that we're added, and finally the + // invoice itself before re-inserting it. + + // First, we'll get the new sequence in the addIndex in order + // to create the proper mapping. + nextAddSeqNo, err := addIndex.NextSequence() + if err != nil { + return err + } + var seqNoBytes [8]byte + byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo) + err = addIndex.Put(seqNoBytes[:], invoiceNum[:]) + if err != nil { + return err + } + + log.Tracef("Adding invoice (preimage=%x, add_index=%v) to add "+ + "time series", invoice.Terms.PaymentPreimage[:], + nextAddSeqNo) + + // Next, we'll check if the invoice has been settled or not. If + // so, then we'll also add it to the settle index. + var nextSettleSeqNo uint64 + if invoice.Terms.State == ContractSettled { + nextSettleSeqNo, err = settleIndex.NextSequence() + if err != nil { + return err + } + + var seqNoBytes [8]byte + byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo) + err := settleIndex.Put(seqNoBytes[:], invoiceNum) + if err != nil { + return err + } + + invoice.AmtPaid = invoice.Terms.Value + + log.Tracef("Adding invoice (preimage=%x, "+ + "settle_index=%v) to add time series", + invoice.Terms.PaymentPreimage[:], + nextSettleSeqNo) + } + + // Finally, we'll update the invoice itself with the new + // indexing information as well as the amount paid if it has + // been settled or not. + invoice.AddIndex = nextAddSeqNo + invoice.SettleIndex = nextSettleSeqNo + + // We've fully migrated an invoice, so we'll now update the + // invoice in-place. + var b bytes.Buffer + if err := serializeInvoiceLegacy(&b, &invoice); err != nil { + return err + } + + // Save the key and value pending update for after the ForEach + // is done. + invoicesKeys = append(invoicesKeys, invoiceNum) + invoicesValues = append(invoicesValues, b.Bytes()) + return nil + }) + if err != nil { + return err + } + + // Now put the converted invoices into the DB. + for i := range invoicesKeys { + key := invoicesKeys[i] + value := invoicesValues[i] + if err := invoices.Put(key, value); err != nil { + return err + } + } + + log.Infof("Migration to invoice time series index complete!") + + return nil +} + +// migrateInvoiceTimeSeriesOutgoingPayments is a follow up to the +// migrateInvoiceTimeSeries migration. As at the time of writing, the +// OutgoingPayment struct embeddeds an instance of the Invoice struct. As a +// result, we also need to migrate the internal invoice to the new format. +func migrateInvoiceTimeSeriesOutgoingPayments(tx *bbolt.Tx) error { + payBucket := tx.Bucket(paymentBucket) + if payBucket == nil { + return nil + } + + log.Infof("Migrating invoice database to new outgoing payment format") + + // We store the keys and values we want to modify since it is not safe + // to modify them directly within the ForEach loop. + var paymentKeys [][]byte + var paymentValues [][]byte + err := payBucket.ForEach(func(payID, paymentBytes []byte) error { + log.Tracef("Migrating payment %x", payID[:]) + + // The internal invoices for each payment only contain a + // populated contract term, and creation date, as a result, + // most of the bytes will be "empty". + + // We'll calculate the end of the invoice index assuming a + // "minimal" index that's embedded within the greater + // OutgoingPayment. The breakdown is: + // 3 bytes empty var bytes, 16 bytes creation date, 16 bytes + // settled date, 32 bytes payment pre-image, 8 bytes value, 1 + // byte settled. + endOfInvoiceIndex := 1 + 1 + 1 + 16 + 16 + 32 + 8 + 1 + + // We'll now extract the prefix of the pure invoice embedded + // within. + invoiceBytes := paymentBytes[:endOfInvoiceIndex] + + // With the prefix extracted, we'll copy over the invoice, and + // also add padding for the new 24 bytes of fields, and finally + // append the remainder of the outgoing payment. + paymentCopy := make([]byte, len(invoiceBytes)) + copy(paymentCopy[:], invoiceBytes) + + padding := bytes.Repeat([]byte{0}, 24) + paymentCopy = append(paymentCopy, padding...) + paymentCopy = append( + paymentCopy, paymentBytes[endOfInvoiceIndex:]..., + ) + + // At this point, we now have the new format of the outgoing + // payments, we'll attempt to deserialize it to ensure the + // bytes are properly formatted. + paymentReader := bytes.NewReader(paymentCopy) + _, err := deserializeOutgoingPayment(paymentReader) + if err != nil { + return fmt.Errorf("unable to deserialize payment: %v", err) + } + + // Now that we know the modifications was successful, we'll + // store it to our slice of keys and values, and write it back + // to disk in the new format after the ForEach loop is over. + paymentKeys = append(paymentKeys, payID) + paymentValues = append(paymentValues, paymentCopy) + return nil + }) + if err != nil { + return err + } + + // Finally store the updated payments to the bucket. + for i := range paymentKeys { + key := paymentKeys[i] + value := paymentValues[i] + if err := payBucket.Put(key, value); err != nil { + return err + } + } + + log.Infof("Migration to outgoing payment invoices complete!") + + return nil +} + +// migrateEdgePolicies is a migration function that will update the edges +// bucket. It ensure that edges with unknown policies will also have an entry +// in the bucket. After the migration, there will be two edge entries for +// every channel, regardless of whether the policies are known. +func migrateEdgePolicies(tx *bbolt.Tx) error { + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return nil + } + + edges := tx.Bucket(edgeBucket) + if edges == nil { + return nil + } + + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return nil + } + + // checkKey gets the policy from the database with a low-level call + // so that it is still possible to distinguish between unknown and + // not present. + checkKey := func(channelId uint64, keyBytes []byte) error { + var channelID [8]byte + byteOrder.PutUint64(channelID[:], channelId) + + _, err := fetchChanEdgePolicy(edges, + channelID[:], keyBytes, nodes) + + if err == ErrEdgeNotFound { + log.Tracef("Adding unknown edge policy present for node %x, channel %v", + keyBytes, channelId) + + err := putChanEdgePolicyUnknown(edges, channelId, keyBytes) + if err != nil { + return err + } + + return nil + } + + return err + } + + // Iterate over all channels and check both edge policies. + err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error { + infoReader := bytes.NewReader(edgeInfoBytes) + edgeInfo, err := deserializeChanEdgeInfo(infoReader) + if err != nil { + return err + } + + for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:], + edgeInfo.NodeKey2Bytes[:]} { + + if err := checkKey(edgeInfo.ChannelID, key); err != nil { + return err + } + } + + return nil + }) + + if err != nil { + return fmt.Errorf("unable to update edge policies: %v", err) + } + + log.Infof("Migration of edge policies complete!") + + return nil +} + +// paymentStatusesMigration is a database migration intended for adding payment +// statuses for each existing payment entity in bucket to be able control +// transitions of statuses and prevent cases such as double payment +func paymentStatusesMigration(tx *bbolt.Tx) error { + // Get the bucket dedicated to storing statuses of payments, + // where a key is payment hash, value is payment status. + paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket) + if err != nil { + return err + } + + log.Infof("Migrating database to support payment statuses") + + circuitAddKey := []byte("circuit-adds") + circuits := tx.Bucket(circuitAddKey) + if circuits != nil { + log.Infof("Marking all known circuits with status InFlight") + + err = circuits.ForEach(func(k, v []byte) error { + // Parse the first 8 bytes as the short chan ID for the + // circuit. We'll skip all short chan IDs are not + // locally initiated, which includes all non-zero short + // chan ids. + chanID := binary.BigEndian.Uint64(k[:8]) + if chanID != 0 { + return nil + } + + // The payment hash is the third item in the serialized + // payment circuit. The first two items are an AddRef + // (10 bytes) and the incoming circuit key (16 bytes). + const payHashOffset = 10 + 16 + + paymentHash := v[payHashOffset : payHashOffset+32] + + return paymentStatuses.Put( + paymentHash[:], StatusInFlight.Bytes(), + ) + }) + if err != nil { + return err + } + } + + log.Infof("Marking all existing payments with status Completed") + + // Get the bucket dedicated to storing payments + bucket := tx.Bucket(paymentBucket) + if bucket == nil { + return nil + } + + // For each payment in the bucket, deserialize the payment and mark it + // as completed. + err = bucket.ForEach(func(k, v []byte) error { + // Ignores if it is sub-bucket. + if v == nil { + return nil + } + + r := bytes.NewReader(v) + payment, err := deserializeOutgoingPayment(r) + if err != nil { + return err + } + + // Calculate payment hash for current payment. + paymentHash := sha256.Sum256(payment.PaymentPreimage[:]) + + // Update status for current payment to completed. If it fails, + // the migration is aborted and the payment bucket is returned + // to its previous state. + return paymentStatuses.Put(paymentHash[:], StatusSucceeded.Bytes()) + }) + if err != nil { + return err + } + + log.Infof("Migration of payment statuses complete!") + + return nil +} + +// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve +// some lingering bugs with regards to edge policies and their update index. +// Stale entries within the edge update index were not being properly pruned due +// to a miscalculation on the offset of an edge's policy last update. This +// migration also fixes the case where the public keys within edge policies were +// being serialized with an extra byte, causing an even greater error when +// attempting to perform the offset calculation described earlier. +func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error { + // To begin the migration, we'll retrieve the update index bucket. If it + // does not exist, we have nothing left to do so we can simply exit. + edges := tx.Bucket(edgeBucket) + if edges == nil { + return nil + } + edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) + if edgeUpdateIndex == nil { + return nil + } + + // Retrieve some buckets that will be needed later on. These should + // already exist given the assumption that the buckets above do as + // well. + edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket) + if err != nil { + return fmt.Errorf("error creating edge index bucket: %s", err) + } + if edgeIndex == nil { + return fmt.Errorf("unable to create/fetch edge index " + + "bucket") + } + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return fmt.Errorf("unable to make node bucket") + } + + log.Info("Migrating database to properly prune edge update index") + + // We'll need to properly prune all the outdated entries within the edge + // update index. To do so, we'll gather all of the existing policies + // within the graph to re-populate them later on. + var edgeKeys [][]byte + err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error { + // All valid entries are indexed by a public key (33 bytes) + // followed by a channel ID (8 bytes), so we'll skip any entries + // with keys that do not match this. + if len(edgeKey) != 33+8 { + return nil + } + + edgeKeys = append(edgeKeys, edgeKey) + + return nil + }) + if err != nil { + return fmt.Errorf("unable to gather existing edge policies: %v", + err) + } + + log.Info("Constructing set of edge update entries to purge.") + + // Build the set of keys that we will remove from the edge update index. + // This will include all keys contained within the bucket. + var updateKeysToRemove [][]byte + err = edgeUpdateIndex.ForEach(func(updKey, _ []byte) error { + updateKeysToRemove = append(updateKeysToRemove, updKey) + return nil + }) + if err != nil { + return fmt.Errorf("unable to gather existing edge updates: %v", + err) + } + + log.Infof("Removing %d entries from edge update index.", + len(updateKeysToRemove)) + + // With the set of keys contained in the edge update index constructed, + // we'll proceed in purging all of them from the index. + for _, updKey := range updateKeysToRemove { + if err := edgeUpdateIndex.Delete(updKey); err != nil { + return err + } + } + + log.Infof("Repopulating edge update index with %d valid entries.", + len(edgeKeys)) + + // For each edge key, we'll retrieve the policy, deserialize it, and + // re-add it to the different buckets. By doing so, we'll ensure that + // all existing edge policies are serialized correctly within their + // respective buckets and that the correct entries are populated within + // the edge update index. + for _, edgeKey := range edgeKeys { + edgePolicyBytes := edges.Get(edgeKey) + + // Skip any entries with unknown policies as there will not be + // any entries for them in the edge update index. + if bytes.Equal(edgePolicyBytes[:], unknownPolicy) { + continue + } + + edgePolicy, err := deserializeChanEdgePolicy( + bytes.NewReader(edgePolicyBytes), nodes, + ) + if err != nil { + return err + } + + _, err = updateEdgePolicy(tx, edgePolicy) + if err != nil { + return err + } + } + + log.Info("Migration to properly prune edge update index complete!") + + return nil +} + +// migrateOptionalChannelCloseSummaryFields migrates the serialized format of +// ChannelCloseSummary to a format where optional fields' presence is indicated +// with boolean markers. +func migrateOptionalChannelCloseSummaryFields(tx *bbolt.Tx) error { + closedChanBucket := tx.Bucket(closedChannelBucket) + if closedChanBucket == nil { + return nil + } + + log.Info("Migrating to new closed channel format...") + + // We store the converted keys and values and put them back into the + // database after the loop, since modifying the bucket within the + // ForEach loop is not safe. + var closedChansKeys [][]byte + var closedChansValues [][]byte + err := closedChanBucket.ForEach(func(chanID, summary []byte) error { + r := bytes.NewReader(summary) + + // Read the old (v6) format from the database. + c, err := deserializeCloseChannelSummaryV6(r) + if err != nil { + return err + } + + // Serialize using the new format, and put back into the + // bucket. + var b bytes.Buffer + if err := serializeChannelCloseSummary(&b, c); err != nil { + return err + } + + // Now that we know the modifications was successful, we'll + // Store the key and value to our slices, and write it back to + // disk in the new format after the ForEach loop is over. + closedChansKeys = append(closedChansKeys, chanID) + closedChansValues = append(closedChansValues, b.Bytes()) + return nil + }) + if err != nil { + return fmt.Errorf("unable to update closed channels: %v", err) + } + + // Now put the new format back into the DB. + for i := range closedChansKeys { + key := closedChansKeys[i] + value := closedChansValues[i] + if err := closedChanBucket.Put(key, value); err != nil { + return err + } + } + + log.Info("Migration to new closed channel format complete!") + + return nil +} + +var messageStoreBucket = []byte("message-store") + +// migrateGossipMessageStoreKeys migrates the key format for gossip messages +// found in the message store to a new one that takes into consideration the of +// the message being stored. +func migrateGossipMessageStoreKeys(tx *bbolt.Tx) error { + // We'll start by retrieving the bucket in which these messages are + // stored within. If there isn't one, there's nothing left for us to do + // so we can avoid the migration. + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return nil + } + + log.Info("Migrating to the gossip message store new key format") + + // Otherwise we'll proceed with the migration. We'll start by coalescing + // all the current messages within the store, which are indexed by the + // public key of the peer which they should be sent to, followed by the + // short channel ID of the channel for which the message belongs to. We + // should only expect to find channel announcement signatures as that + // was the only support message type previously. + msgs := make(map[[33 + 8]byte]*lnwire.AnnounceSignatures) + err := messageStore.ForEach(func(k, v []byte) error { + var msgKey [33 + 8]byte + copy(msgKey[:], k) + + msg := &lnwire.AnnounceSignatures{} + if err := msg.Decode(bytes.NewReader(v), 0); err != nil { + return err + } + + msgs[msgKey] = msg + + return nil + + }) + if err != nil { + return err + } + + // Then, we'll go over all of our messages, remove their previous entry, + // and add another with the new key format. Once we've done this for + // every message, we can consider the migration complete. + for oldMsgKey, msg := range msgs { + if err := messageStore.Delete(oldMsgKey[:]); err != nil { + return err + } + + // Construct the new key for which we'll find this message with + // in the store. It'll be the same as the old, but we'll also + // include the message type. + var msgType [2]byte + binary.BigEndian.PutUint16(msgType[:], uint16(msg.MsgType())) + newMsgKey := append(oldMsgKey[:], msgType[:]...) + + // Serialize the message with its wire encoding. + var b bytes.Buffer + if _, err := lnwire.WriteMessage(&b, msg, 0); err != nil { + return err + } + + if err := messageStore.Put(newMsgKey, b.Bytes()); err != nil { + return err + } + } + + log.Info("Migration to the gossip message store new key format complete!") + + return nil +} + +// migrateOutgoingPayments moves the OutgoingPayments into a new bucket format +// where they all reside in a top-level bucket indexed by the payment hash. In +// this sub-bucket we store information relevant to this payment, such as the +// payment status. +// +// Since the router cannot handle resumed payments that have the status +// InFlight (we have no PaymentAttemptInfo available for pre-migration +// payments) we delete those statuses, so only Completed payments remain in the +// new bucket structure. +func migrateOutgoingPayments(tx *bbolt.Tx) error { + log.Infof("Migrating outgoing payments to new bucket structure") + + oldPayments := tx.Bucket(paymentBucket) + + // Return early if there are no payments to migrate. + if oldPayments == nil { + log.Infof("No outgoing payments found, nothing to migrate.") + return nil + } + + newPayments, err := tx.CreateBucket(paymentsRootBucket) + if err != nil { + return err + } + + // Helper method to get the source pubkey. We define it such that we + // only attempt to fetch it if needed. + sourcePub := func() ([33]byte, error) { + var pub [33]byte + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return pub, ErrGraphNotFound + } + + selfPub := nodes.Get(sourceKey) + if selfPub == nil { + return pub, ErrSourceNodeNotSet + } + copy(pub[:], selfPub[:]) + return pub, nil + } + + err = oldPayments.ForEach(func(k, v []byte) error { + // Ignores if it is sub-bucket. + if v == nil { + return nil + } + + // Read the old payment format. + r := bytes.NewReader(v) + payment, err := deserializeOutgoingPayment(r) + if err != nil { + return err + } + + // Calculate payment hash from the payment preimage. + paymentHash := sha256.Sum256(payment.PaymentPreimage[:]) + + // Now create and add a PaymentCreationInfo to the bucket. + c := &PaymentCreationInfo{ + PaymentHash: paymentHash, + Value: payment.Terms.Value, + CreationDate: payment.CreationDate, + PaymentRequest: payment.PaymentRequest, + } + + var infoBuf bytes.Buffer + if err := serializePaymentCreationInfo(&infoBuf, c); err != nil { + return err + } + + sourcePubKey, err := sourcePub() + if err != nil { + return err + } + + // Do the same for the PaymentAttemptInfo. + totalAmt := payment.Terms.Value + payment.Fee + rt := route.Route{ + TotalTimeLock: payment.TimeLockLength, + TotalAmount: totalAmt, + SourcePubKey: sourcePubKey, + Hops: []*route.Hop{}, + } + for _, hop := range payment.Path { + rt.Hops = append(rt.Hops, &route.Hop{ + PubKeyBytes: hop, + AmtToForward: totalAmt, + }) + } + + // Since the old format didn't store the fee for individual + // hops, we let the last hop eat the whole fee for the total to + // add up. + if len(rt.Hops) > 0 { + rt.Hops[len(rt.Hops)-1].AmtToForward = payment.Terms.Value + } + + // Since we don't have the session key for old payments, we + // create a random one to be able to serialize the attempt + // info. + priv, _ := btcec.NewPrivateKey(btcec.S256()) + s := &PaymentAttemptInfo{ + PaymentID: 0, // unknown. + SessionKey: priv, // unknown. + Route: rt, + } + + var attemptBuf bytes.Buffer + if err := serializePaymentAttemptInfoMigration9(&attemptBuf, s); err != nil { + return err + } + + // Reuse the existing payment sequence number. + var seqNum [8]byte + copy(seqNum[:], k) + + // Create a bucket indexed by the payment hash. + bucket, err := newPayments.CreateBucket(paymentHash[:]) + + // If the bucket already exists, it means that we are migrating + // from a database containing duplicate payments to a payment + // hash. To keep this information, we store such duplicate + // payments in a sub-bucket. + if err == bbolt.ErrBucketExists { + pHashBucket := newPayments.Bucket(paymentHash[:]) + + // Create a bucket for duplicate payments within this + // payment hash's bucket. + dup, err := pHashBucket.CreateBucketIfNotExists( + paymentDuplicateBucket, + ) + if err != nil { + return err + } + + // Each duplicate will get its own sub-bucket within + // this bucket, so use their sequence number to index + // them by. + bucket, err = dup.CreateBucket(seqNum[:]) + if err != nil { + return err + } + + } else if err != nil { + return err + } + + // Store the payment's information to the bucket. + err = bucket.Put(paymentSequenceKey, seqNum[:]) + if err != nil { + return err + } + + err = bucket.Put(paymentCreationInfoKey, infoBuf.Bytes()) + if err != nil { + return err + } + + err = bucket.Put(paymentAttemptInfoKey, attemptBuf.Bytes()) + if err != nil { + return err + } + + err = bucket.Put(paymentSettleInfoKey, payment.PaymentPreimage[:]) + if err != nil { + return err + } + + return nil + }) + if err != nil { + return err + } + + // To continue producing unique sequence numbers, we set the sequence + // of the new bucket to that of the old one. + seq := oldPayments.Sequence() + if err := newPayments.SetSequence(seq); err != nil { + return err + } + + // Now we delete the old buckets. Deleting the payment status buckets + // deletes all payment statuses other than Complete. + err = tx.DeleteBucket(paymentStatusBucket) + if err != nil && err != bbolt.ErrBucketNotFound { + return err + } + + // Finally delete the old payment bucket. + err = tx.DeleteBucket(paymentBucket) + if err != nil && err != bbolt.ErrBucketNotFound { + return err + } + + log.Infof("Migration of outgoing payment bucket structure completed!") + return nil +} diff --git a/channeldb/migrations_test.go b/channeldb/migrations_test.go index 7731e2fc..93bf602f 100644 --- a/channeldb/migrations_test.go +++ b/channeldb/migrations_test.go @@ -2,17 +2,732 @@ package channeldb import ( "bytes" + "crypto/sha256" + "encoding/binary" + "fmt" "math/rand" "reflect" "testing" "time" + "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" + "github.com/davecgh/go-spew/spew" + "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) +// TestPaymentStatusesMigration checks that already completed payments will have +// their payment statuses set to Completed after the migration. +func TestPaymentStatusesMigration(t *testing.T) { + t.Parallel() + + fakePayment := makeFakePayment() + paymentHash := sha256.Sum256(fakePayment.PaymentPreimage[:]) + + // Add fake payment to test database, verifying that it was created, + // that we have only one payment, and its status is not "Completed". + beforeMigrationFunc := func(d *DB) { + if err := d.addPayment(fakePayment); err != nil { + t.Fatalf("unable to add payment: %v", err) + } + + payments, err := d.fetchAllPayments() + if err != nil { + t.Fatalf("unable to fetch payments: %v", err) + } + + if len(payments) != 1 { + t.Fatalf("wrong qty of paymets: expected 1, got %v", + len(payments)) + } + + paymentStatus, err := d.fetchPaymentStatus(paymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + // We should receive default status if we have any in database. + if paymentStatus != StatusUnknown { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusUnknown.String(), paymentStatus.String()) + } + + // Lastly, we'll add a locally-sourced circuit and + // non-locally-sourced circuit to the circuit map. The + // locally-sourced payment should end up with an InFlight + // status, while the other should remain unchanged, which + // defaults to Grounded. + err = d.Update(func(tx *bbolt.Tx) error { + circuits, err := tx.CreateBucketIfNotExists( + []byte("circuit-adds"), + ) + if err != nil { + return err + } + + groundedKey := make([]byte, 16) + binary.BigEndian.PutUint64(groundedKey[:8], 1) + binary.BigEndian.PutUint64(groundedKey[8:], 1) + + // Generated using TestHalfCircuitSerialization with nil + // ErrorEncrypter, which is the case for locally-sourced + // payments. No payment status should end up being set + // for this circuit, since the short channel id of the + // key is non-zero (e.g., a forwarded circuit). This + // will default it to Grounded. + groundedCircuit := []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, + // start payment hash + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // end payment hash + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, + 0x42, 0x40, 0x00, + } + + err = circuits.Put(groundedKey, groundedCircuit) + if err != nil { + return err + } + + inFlightKey := make([]byte, 16) + binary.BigEndian.PutUint64(inFlightKey[:8], 0) + binary.BigEndian.PutUint64(inFlightKey[8:], 1) + + // Generated using TestHalfCircuitSerialization with nil + // ErrorEncrypter, which is not the case for forwarded + // payments, but should have no impact on the + // correctness of the test. The payment status for this + // circuit should be set to InFlight, since the short + // channel id in the key is 0 (sourceHop). + inFlightCircuit := []byte{ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x01, + // start payment hash + 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // end payment hash + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, + 0x42, 0x40, 0x00, + } + + return circuits.Put(inFlightKey, inFlightCircuit) + }) + if err != nil { + t.Fatalf("unable to add circuit map entry: %v", err) + } + } + + // Verify that the created payment status is "Completed" for our one + // fake payment. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.DbVersionNumber != 1 { + t.Fatal("migration 'paymentStatusesMigration' wasn't applied") + } + + // Check that our completed payments were migrated. + paymentStatus, err := d.fetchPaymentStatus(paymentHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if paymentStatus != StatusSucceeded { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusSucceeded.String(), paymentStatus.String()) + } + + inFlightHash := [32]byte{ + 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } + + // Check that the locally sourced payment was transitioned to + // InFlight. + paymentStatus, err = d.fetchPaymentStatus(inFlightHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if paymentStatus != StatusInFlight { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusInFlight.String(), paymentStatus.String()) + } + + groundedHash := [32]byte{ + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } + + // Check that non-locally sourced payments remain in the default + // Grounded state. + paymentStatus, err = d.fetchPaymentStatus(groundedHash) + if err != nil { + t.Fatalf("unable to fetch payment status: %v", err) + } + + if paymentStatus != StatusUnknown { + t.Fatalf("wrong payment status: expected %v, got %v", + StatusUnknown.String(), paymentStatus.String()) + } + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + paymentStatusesMigration, + false) +} + +// TestMigrateOptionalChannelCloseSummaryFields properly converts a +// ChannelCloseSummary to the v7 format, where optional fields have their +// presence indicated with boolean markers. +func TestMigrateOptionalChannelCloseSummaryFields(t *testing.T) { + t.Parallel() + + chanState, err := createTestChannelState(nil) + if err != nil { + t.Fatalf("unable to create channel state: %v", err) + } + + var chanPointBuf bytes.Buffer + err = writeOutpoint(&chanPointBuf, &chanState.FundingOutpoint) + if err != nil { + t.Fatalf("unable to write outpoint: %v", err) + } + + chanID := chanPointBuf.Bytes() + + testCases := []struct { + closeSummary *ChannelCloseSummary + oldSerialization func(c *ChannelCloseSummary) []byte + }{ + { + // A close summary where none of the new fields are + // set. + closeSummary: &ChannelCloseSummary{ + ChanPoint: chanState.FundingOutpoint, + ShortChanID: chanState.ShortChanID(), + ChainHash: chanState.ChainHash, + ClosingTXID: testTx.TxHash(), + CloseHeight: 100, + RemotePub: chanState.IdentityPub, + Capacity: chanState.Capacity, + SettledBalance: btcutil.Amount(50000), + CloseType: RemoteForceClose, + IsPending: true, + + // The last fields will be unset. + RemoteCurrentRevocation: nil, + LocalChanConfig: ChannelConfig{}, + RemoteNextRevocation: nil, + }, + + // In the old format the last field written is the + // IsPendingField. It should be converted by adding an + // extra boolean marker at the end to indicate that the + // remaining fields are not there. + oldSerialization: func(cs *ChannelCloseSummary) []byte { + var buf bytes.Buffer + err := WriteElements(&buf, cs.ChanPoint, + cs.ShortChanID, cs.ChainHash, + cs.ClosingTXID, cs.CloseHeight, + cs.RemotePub, cs.Capacity, + cs.SettledBalance, cs.TimeLockedBalance, + cs.CloseType, cs.IsPending, + ) + if err != nil { + t.Fatal(err) + } + + // For the old format, these are all the fields + // that are written. + return buf.Bytes() + }, + }, + { + // A close summary where the new fields are present, + // but the optional RemoteNextRevocation field is not + // set. + closeSummary: &ChannelCloseSummary{ + ChanPoint: chanState.FundingOutpoint, + ShortChanID: chanState.ShortChanID(), + ChainHash: chanState.ChainHash, + ClosingTXID: testTx.TxHash(), + CloseHeight: 100, + RemotePub: chanState.IdentityPub, + Capacity: chanState.Capacity, + SettledBalance: btcutil.Amount(50000), + CloseType: RemoteForceClose, + IsPending: true, + RemoteCurrentRevocation: chanState.RemoteCurrentRevocation, + LocalChanConfig: chanState.LocalChanCfg, + + // RemoteNextRevocation is optional, and here + // it is not set. + RemoteNextRevocation: nil, + }, + + // In the old format the last field written is the + // LocalChanConfig. This indicates that the optional + // RemoteNextRevocation field is not present. It should + // be converted by adding boolean markers for all these + // fields. + oldSerialization: func(cs *ChannelCloseSummary) []byte { + var buf bytes.Buffer + err := WriteElements(&buf, cs.ChanPoint, + cs.ShortChanID, cs.ChainHash, + cs.ClosingTXID, cs.CloseHeight, + cs.RemotePub, cs.Capacity, + cs.SettledBalance, cs.TimeLockedBalance, + cs.CloseType, cs.IsPending, + ) + if err != nil { + t.Fatal(err) + } + + err = WriteElements(&buf, cs.RemoteCurrentRevocation) + if err != nil { + t.Fatal(err) + } + + err = writeChanConfig(&buf, &cs.LocalChanConfig) + if err != nil { + t.Fatal(err) + } + + // RemoteNextRevocation is not written. + return buf.Bytes() + }, + }, + { + // A close summary where all fields are present. + closeSummary: &ChannelCloseSummary{ + ChanPoint: chanState.FundingOutpoint, + ShortChanID: chanState.ShortChanID(), + ChainHash: chanState.ChainHash, + ClosingTXID: testTx.TxHash(), + CloseHeight: 100, + RemotePub: chanState.IdentityPub, + Capacity: chanState.Capacity, + SettledBalance: btcutil.Amount(50000), + CloseType: RemoteForceClose, + IsPending: true, + RemoteCurrentRevocation: chanState.RemoteCurrentRevocation, + LocalChanConfig: chanState.LocalChanCfg, + + // RemoteNextRevocation is optional, and in + // this case we set it. + RemoteNextRevocation: chanState.RemoteNextRevocation, + }, + + // In the old format all the fields are written. It + // should be converted by adding boolean markers for + // all these fields. + oldSerialization: func(cs *ChannelCloseSummary) []byte { + var buf bytes.Buffer + err := WriteElements(&buf, cs.ChanPoint, + cs.ShortChanID, cs.ChainHash, + cs.ClosingTXID, cs.CloseHeight, + cs.RemotePub, cs.Capacity, + cs.SettledBalance, cs.TimeLockedBalance, + cs.CloseType, cs.IsPending, + ) + if err != nil { + t.Fatal(err) + } + + err = WriteElements(&buf, cs.RemoteCurrentRevocation) + if err != nil { + t.Fatal(err) + } + + err = writeChanConfig(&buf, &cs.LocalChanConfig) + if err != nil { + t.Fatal(err) + } + + err = WriteElements(&buf, cs.RemoteNextRevocation) + if err != nil { + t.Fatal(err) + } + + return buf.Bytes() + }, + }, + } + + for _, test := range testCases { + + // Before the migration we must add the old format to the DB. + beforeMigrationFunc := func(d *DB) { + + // Get the old serialization format for this test's + // close summary, and it to the closed channel bucket. + old := test.oldSerialization(test.closeSummary) + err = d.Update(func(tx *bbolt.Tx) error { + closedChanBucket, err := tx.CreateBucketIfNotExists( + closedChannelBucket, + ) + if err != nil { + return err + } + return closedChanBucket.Put(chanID, old) + }) + if err != nil { + t.Fatalf("unable to add old serialization: %v", + err) + } + } + + // After the migration it should be found in the new format. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.DbVersionNumber != 1 { + t.Fatal("migration wasn't applied") + } + + // We generate the new serialized version, to check + // against what is found in the DB. + var b bytes.Buffer + err = serializeChannelCloseSummary(&b, test.closeSummary) + if err != nil { + t.Fatalf("unable to serialize: %v", err) + } + newSerialization := b.Bytes() + + var dbSummary []byte + err = d.View(func(tx *bbolt.Tx) error { + closedChanBucket := tx.Bucket(closedChannelBucket) + if closedChanBucket == nil { + return errors.New("unable to find bucket") + } + + // Get the serialized verision from the DB and + // make sure it matches what we expected. + dbSummary = closedChanBucket.Get(chanID) + if !bytes.Equal(dbSummary, newSerialization) { + return fmt.Errorf("unexpected new " + + "serialization") + } + return nil + }) + if err != nil { + t.Fatalf("unable to view DB: %v", err) + } + + // Finally we fetch the deserialized summary from the + // DB and check that it is equal to our original one. + dbChannels, err := d.FetchClosedChannels(false) + if err != nil { + t.Fatalf("unable to fetch closed channels: %v", + err) + } + + if len(dbChannels) != 1 { + t.Fatalf("expected 1 closed channels, found %v", + len(dbChannels)) + } + + dbChan := dbChannels[0] + if !reflect.DeepEqual(dbChan, test.closeSummary) { + dbChan.RemotePub.Curve = nil + test.closeSummary.RemotePub.Curve = nil + t.Fatalf("not equal: %v vs %v", + spew.Sdump(dbChan), + spew.Sdump(test.closeSummary)) + } + + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + migrateOptionalChannelCloseSummaryFields, + false) + } +} + +// TestMigrateGossipMessageStoreKeys ensures that the migration to the new +// gossip message store key format is successful/unsuccessful under various +// scenarios. +func TestMigrateGossipMessageStoreKeys(t *testing.T) { + t.Parallel() + + // Construct the message which we'll use to test the migration, along + // with its old and new key formats. + shortChanID := lnwire.ShortChannelID{BlockHeight: 10} + msg := &lnwire.AnnounceSignatures{ShortChannelID: shortChanID} + + var oldMsgKey [33 + 8]byte + copy(oldMsgKey[:33], pubKey.SerializeCompressed()) + binary.BigEndian.PutUint64(oldMsgKey[33:41], shortChanID.ToUint64()) + + var newMsgKey [33 + 8 + 2]byte + copy(newMsgKey[:41], oldMsgKey[:]) + binary.BigEndian.PutUint16(newMsgKey[41:43], uint16(msg.MsgType())) + + // Before the migration, we'll create the bucket where the messages + // should live and insert them. + beforeMigration := func(db *DB) { + var b bytes.Buffer + if err := msg.Encode(&b, 0); err != nil { + t.Fatalf("unable to serialize message: %v", err) + } + + err := db.Update(func(tx *bbolt.Tx) error { + messageStore, err := tx.CreateBucketIfNotExists( + messageStoreBucket, + ) + if err != nil { + return err + } + + return messageStore.Put(oldMsgKey[:], b.Bytes()) + }) + if err != nil { + t.Fatal(err) + } + } + + // After the migration, we'll make sure that: + // 1. We cannot find the message under its old key. + // 2. We can find the message under its new key. + // 3. The message matches the original. + afterMigration := func(db *DB) { + meta, err := db.FetchMeta(nil) + if err != nil { + t.Fatalf("unable to fetch db version: %v", err) + } + if meta.DbVersionNumber != 1 { + t.Fatalf("migration should have succeeded but didn't") + } + + var rawMsg []byte + err = db.View(func(tx *bbolt.Tx) error { + messageStore := tx.Bucket(messageStoreBucket) + if messageStore == nil { + return errors.New("message store bucket not " + + "found") + } + rawMsg = messageStore.Get(oldMsgKey[:]) + if rawMsg != nil { + t.Fatal("expected to not find message under " + + "old key, but did") + } + rawMsg = messageStore.Get(newMsgKey[:]) + if rawMsg == nil { + return fmt.Errorf("expected to find message " + + "under new key, but didn't") + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + gotMsg, err := lnwire.ReadMessage(bytes.NewReader(rawMsg), 0) + if err != nil { + t.Fatalf("unable to deserialize raw message: %v", err) + } + if !reflect.DeepEqual(msg, gotMsg) { + t.Fatalf("expected message: %v\ngot message: %v", + spew.Sdump(msg), spew.Sdump(gotMsg)) + } + } + + applyMigration( + t, beforeMigration, afterMigration, + migrateGossipMessageStoreKeys, false, + ) +} + +// TestOutgoingPaymentsMigration checks that OutgoingPayments are migrated to a +// new bucket structure after the migration. +func TestOutgoingPaymentsMigration(t *testing.T) { + t.Parallel() + + const numPayments = 4 + var oldPayments []*outgoingPayment + + // Add fake payments to test database, verifying that it was created. + beforeMigrationFunc := func(d *DB) { + for i := 0; i < numPayments; i++ { + var p *outgoingPayment + var err error + + // We fill the database with random payments. For the + // very last one we'll use a duplicate of the first, to + // ensure we are able to handle migration from a + // database that has copies. + if i < numPayments-1 { + p, err = makeRandomFakePayment() + if err != nil { + t.Fatalf("unable to create payment: %v", + err) + } + } else { + p = oldPayments[0] + } + + if err := d.addPayment(p); err != nil { + t.Fatalf("unable to add payment: %v", err) + } + + oldPayments = append(oldPayments, p) + } + + payments, err := d.fetchAllPayments() + if err != nil { + t.Fatalf("unable to fetch payments: %v", err) + } + + if len(payments) != numPayments { + t.Fatalf("wrong qty of paymets: expected %d got %v", + numPayments, len(payments)) + } + } + + // Verify that all payments were migrated. + afterMigrationFunc := func(d *DB) { + meta, err := d.FetchMeta(nil) + if err != nil { + t.Fatal(err) + } + + if meta.DbVersionNumber != 1 { + t.Fatal("migration 'paymentStatusesMigration' wasn't applied") + } + + sentPayments, err := d.fetchPaymentsMigration9() + if err != nil { + t.Fatalf("unable to fetch sent payments: %v", err) + } + + if len(sentPayments) != numPayments { + t.Fatalf("expected %d payments, got %d", numPayments, + len(sentPayments)) + } + + graph := d.ChannelGraph() + sourceNode, err := graph.SourceNode() + if err != nil { + t.Fatalf("unable to fetch source node: %v", err) + } + + for i, p := range sentPayments { + // The payment status should be Completed. + if p.Status != StatusSucceeded { + t.Fatalf("expected Completed, got %v", p.Status) + } + + // Check that the sequence number is preserved. They + // start counting at 1. + if p.sequenceNum != uint64(i+1) { + t.Fatalf("expected seqnum %d, got %d", i, + p.sequenceNum) + } + + // Order of payments should be be preserved. + old := oldPayments[i] + + // Check the individial fields. + if p.Info.Value != old.Terms.Value { + t.Fatalf("value mismatch") + } + + if p.Info.CreationDate != old.CreationDate { + t.Fatalf("date mismatch") + } + + if !bytes.Equal(p.Info.PaymentRequest, old.PaymentRequest) { + t.Fatalf("payreq mismatch") + } + + if *p.PaymentPreimage != old.PaymentPreimage { + t.Fatalf("preimage mismatch") + } + + if p.Attempt.Route.TotalFees() != old.Fee { + t.Fatalf("Fee mismatch") + } + + if p.Attempt.Route.TotalAmount != old.Fee+old.Terms.Value { + t.Fatalf("Total amount mismatch") + } + + if p.Attempt.Route.TotalTimeLock != old.TimeLockLength { + t.Fatalf("timelock mismatch") + } + + if p.Attempt.Route.SourcePubKey != sourceNode.PubKeyBytes { + t.Fatalf("source mismatch: %x vs %x", + p.Attempt.Route.SourcePubKey[:], + sourceNode.PubKeyBytes[:]) + } + + for i, hop := range old.Path { + if hop != p.Attempt.Route.Hops[i].PubKeyBytes { + t.Fatalf("path mismatch") + } + } + } + + // Finally, check that the payment sequence number is updated + // to reflect the migrated payments. + err = d.View(func(tx *bbolt.Tx) error { + payments := tx.Bucket(paymentsRootBucket) + if payments == nil { + return fmt.Errorf("payments bucket not found") + } + + seq := payments.Sequence() + if seq != numPayments { + return fmt.Errorf("expected sequence to be "+ + "%d, got %d", numPayments, seq) + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + } + + applyMigration(t, + beforeMigrationFunc, + afterMigrationFunc, + migrateOutgoingPayments, + false) +} + func makeRandPaymentCreationInfo() (*PaymentCreationInfo, error) { var payHash lntypes.Hash if _, err := rand.Read(payHash[:]); err != nil { diff --git a/channeldb/payments_test.go b/channeldb/payments_test.go index 4ea40888..8cc036fc 100644 --- a/channeldb/payments_test.go +++ b/channeldb/payments_test.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "math/rand" "reflect" "testing" "time" @@ -11,6 +12,7 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/tlv" ) @@ -51,6 +53,34 @@ var ( } ) +func makeFakePayment() *outgoingPayment { + fakeInvoice := &Invoice{ + // Use single second precision to avoid false positive test + // failures due to the monotonic time component. + CreationDate: time.Unix(time.Now().Unix(), 0), + Memo: []byte("fake memo"), + Receipt: []byte("fake receipt"), + PaymentRequest: []byte(""), + } + + copy(fakeInvoice.Terms.PaymentPreimage[:], rev[:]) + fakeInvoice.Terms.Value = lnwire.NewMSatFromSatoshis(10000) + + fakePath := make([][33]byte, 3) + for i := 0; i < 3; i++ { + copy(fakePath[i][:], bytes.Repeat([]byte{byte(i)}, 33)) + } + + fakePayment := &outgoingPayment{ + Invoice: *fakeInvoice, + Fee: 101, + Path: fakePath, + TimeLockLength: 1000, + } + copy(fakePayment.PaymentPreimage[:], rev[:]) + return fakePayment +} + func makeFakeInfo() (*PaymentCreationInfo, *PaymentAttemptInfo) { var preimg lntypes.Preimage copy(preimg[:], rev[:]) @@ -72,6 +102,70 @@ func makeFakeInfo() (*PaymentCreationInfo, *PaymentAttemptInfo) { return c, a } +// randomBytes creates random []byte with length in range [minLen, maxLen) +func randomBytes(minLen, maxLen int) ([]byte, error) { + randBuf := make([]byte, minLen+rand.Intn(maxLen-minLen)) + + if _, err := rand.Read(randBuf); err != nil { + return nil, fmt.Errorf("Internal error. "+ + "Cannot generate random string: %v", err) + } + + return randBuf, nil +} + +func makeRandomFakePayment() (*outgoingPayment, error) { + var err error + fakeInvoice := &Invoice{ + // Use single second precision to avoid false positive test + // failures due to the monotonic time component. + CreationDate: time.Unix(time.Now().Unix(), 0), + } + + fakeInvoice.Memo, err = randomBytes(1, 50) + if err != nil { + return nil, err + } + + fakeInvoice.Receipt, err = randomBytes(1, 50) + if err != nil { + return nil, err + } + + fakeInvoice.PaymentRequest, err = randomBytes(1, 50) + if err != nil { + return nil, err + } + + preImg, err := randomBytes(32, 33) + if err != nil { + return nil, err + } + copy(fakeInvoice.Terms.PaymentPreimage[:], preImg) + + fakeInvoice.Terms.Value = lnwire.MilliSatoshi(rand.Intn(10000)) + + fakePathLen := 1 + rand.Intn(5) + fakePath := make([][33]byte, fakePathLen) + for i := 0; i < fakePathLen; i++ { + b, err := randomBytes(33, 34) + if err != nil { + return nil, err + } + copy(fakePath[i][:], b) + } + + fakePayment := &outgoingPayment{ + Invoice: *fakeInvoice, + Fee: lnwire.MilliSatoshi(rand.Intn(1001)), + Path: fakePath, + TimeLockLength: uint32(rand.Intn(10000)), + } + copy(fakePayment.PaymentPreimage[:], fakeInvoice.Terms.PaymentPreimage[:]) + + return fakePayment, nil +} + func TestSentPaymentSerialization(t *testing.T) { t.Parallel()