3b2c807288
In this commit, we fix a bug in the latest database migration when migrating from 0.4 to 0.5. There's an issue in bolt db where if one deletes a bucket that has a key with a nil value, it thinks that's a sub bucket and attempts a bucket deletion. This will fail as it's not actually a sub-bucket. We get around this by using a cursor to manually delete items in the bucket. Fixes #1907.
561 lines
17 KiB
Go
561 lines
17 KiB
Go
package channeldb
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"encoding/binary"
|
|
"fmt"
|
|
|
|
"github.com/coreos/bbolt"
|
|
)
|
|
|
|
// migrateNodeAndEdgeUpdateIndex is a migration function that will update the
|
|
// database from version 0 to version 1. In version 1, we add two new indexes
|
|
// (one for nodes and one for edges) to keep track of the last time a node or
|
|
// edge was updated on the network. These new indexes allow us to implement the
|
|
// new graph sync protocol added.
|
|
func migrateNodeAndEdgeUpdateIndex(tx *bolt.Tx) error {
|
|
// First, we'll populating the node portion of the new index. Before we
|
|
// can add new values to the index, we'll first create the new bucket
|
|
// where these items will be housed.
|
|
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to create node bucket: %v", err)
|
|
}
|
|
nodeUpdateIndex, err := nodes.CreateBucketIfNotExists(
|
|
nodeUpdateIndexBucket,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to create node update index: %v", err)
|
|
}
|
|
|
|
log.Infof("Populating new node update index bucket")
|
|
|
|
// Now that we know the bucket has been created, we'll iterate over the
|
|
// entire node bucket so we can add the (updateTime || nodePub) key
|
|
// into the node update index.
|
|
err = nodes.ForEach(func(nodePub, nodeInfo []byte) error {
|
|
if len(nodePub) != 33 {
|
|
return nil
|
|
}
|
|
|
|
log.Tracef("Adding %x to node update index", nodePub)
|
|
|
|
// The first 8 bytes of a node's serialize data is the update
|
|
// time, so we can extract that without decoding the entire
|
|
// structure.
|
|
updateTime := nodeInfo[:8]
|
|
|
|
// Now that we have the update time, we can construct the key
|
|
// to insert into the index.
|
|
var indexKey [8 + 33]byte
|
|
copy(indexKey[:8], updateTime)
|
|
copy(indexKey[8:], nodePub)
|
|
|
|
return nodeUpdateIndex.Put(indexKey[:], nil)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to update node indexes: %v", err)
|
|
}
|
|
|
|
log.Infof("Populating new edge update index bucket")
|
|
|
|
// With the set of nodes updated, we'll now update all edges to have a
|
|
// corresponding entry in the edge update index.
|
|
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to create edge bucket: %v", err)
|
|
}
|
|
edgeUpdateIndex, err := edges.CreateBucketIfNotExists(
|
|
edgeUpdateIndexBucket,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to create edge update index: %v", err)
|
|
}
|
|
|
|
// We'll now run through each edge policy in the database, and update
|
|
// the index to ensure each edge has the proper record.
|
|
err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
|
|
if len(edgeKey) != 41 {
|
|
return nil
|
|
}
|
|
|
|
// Now that we know this is the proper record, we'll grab the
|
|
// channel ID (last 8 bytes of the key), and then decode the
|
|
// edge policy so we can access the update time.
|
|
chanID := edgeKey[33:]
|
|
edgePolicyReader := bytes.NewReader(edgePolicyBytes)
|
|
|
|
edgePolicy, err := deserializeChanEdgePolicy(
|
|
edgePolicyReader, nodes,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Tracef("Adding chan_id=%v to edge update index",
|
|
edgePolicy.ChannelID)
|
|
|
|
// We'll now construct the index key using the channel ID, and
|
|
// the last time it was updated: (updateTime || chanID).
|
|
var indexKey [8 + 8]byte
|
|
byteOrder.PutUint64(
|
|
indexKey[:], uint64(edgePolicy.LastUpdate.Unix()),
|
|
)
|
|
copy(indexKey[8:], chanID)
|
|
|
|
return edgeUpdateIndex.Put(indexKey[:], nil)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to update edge indexes: %v", err)
|
|
}
|
|
|
|
log.Infof("Migration to node and edge update indexes complete!")
|
|
|
|
return nil
|
|
}
|
|
|
|
// migrateInvoiceTimeSeries is a database migration that assigns all existing
|
|
// invoices an index in the add and/or the settle index. Additionally, all
|
|
// existing invoices will have their bytes padded out in order to encode the
|
|
// add+settle index as well as the amount paid.
|
|
func migrateInvoiceTimeSeries(tx *bolt.Tx) error {
|
|
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
addIndex, err := invoices.CreateBucketIfNotExists(
|
|
addIndexBucket,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
settleIndex, err := invoices.CreateBucketIfNotExists(
|
|
settleIndexBucket,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Migrating invoice database to new time series format")
|
|
|
|
// Now that we have all the buckets we need, we'll run through each
|
|
// invoice in the database, and update it to reflect the new format
|
|
// expected post migration.
|
|
err = invoices.ForEach(func(invoiceNum, invoiceBytes []byte) error {
|
|
// If this is a sub bucket, then we'll skip it.
|
|
if invoiceBytes == nil {
|
|
return nil
|
|
}
|
|
|
|
// First, we'll make a copy of the encoded invoice bytes.
|
|
invoiceBytesCopy := make([]byte, len(invoiceBytes))
|
|
copy(invoiceBytesCopy, invoiceBytes)
|
|
|
|
// With the bytes copied over, we'll append 24 additional
|
|
// bytes. We do this so we can decode the invoice under the new
|
|
// serialization format.
|
|
padding := bytes.Repeat([]byte{0}, 24)
|
|
invoiceBytesCopy = append(invoiceBytesCopy, padding...)
|
|
|
|
invoiceReader := bytes.NewReader(invoiceBytesCopy)
|
|
invoice, err := deserializeInvoice(invoiceReader)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to decode invoice: %v", err)
|
|
}
|
|
|
|
// Now that we have the fully decoded invoice, we can update
|
|
// the various indexes that we're added, and finally the
|
|
// invoice itself before re-inserting it.
|
|
|
|
// First, we'll get the new sequence in the addIndex in order
|
|
// to create the proper mapping.
|
|
nextAddSeqNo, err := addIndex.NextSequence()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var seqNoBytes [8]byte
|
|
byteOrder.PutUint64(seqNoBytes[:], nextAddSeqNo)
|
|
err = addIndex.Put(seqNoBytes[:], invoiceNum[:])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Tracef("Adding invoice (preimage=%x, add_index=%v) to add "+
|
|
"time series", invoice.Terms.PaymentPreimage[:],
|
|
nextAddSeqNo)
|
|
|
|
// Next, we'll check if the invoice has been settled or not. If
|
|
// so, then we'll also add it to the settle index.
|
|
var nextSettleSeqNo uint64
|
|
if invoice.Terms.Settled {
|
|
nextSettleSeqNo, err = settleIndex.NextSequence()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var seqNoBytes [8]byte
|
|
byteOrder.PutUint64(seqNoBytes[:], nextSettleSeqNo)
|
|
err := settleIndex.Put(seqNoBytes[:], invoiceNum)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
invoice.AmtPaid = invoice.Terms.Value
|
|
|
|
log.Tracef("Adding invoice (preimage=%x, "+
|
|
"settle_index=%v) to add time series",
|
|
invoice.Terms.PaymentPreimage[:],
|
|
nextSettleSeqNo)
|
|
}
|
|
|
|
// Finally, we'll update the invoice itself with the new
|
|
// indexing information as well as the amount paid if it has
|
|
// been settled or not.
|
|
invoice.AddIndex = nextAddSeqNo
|
|
invoice.SettleIndex = nextSettleSeqNo
|
|
|
|
// We've fully migrated an invoice, so we'll now update the
|
|
// invoice in-place.
|
|
var b bytes.Buffer
|
|
if err := serializeInvoice(&b, &invoice); err != nil {
|
|
return err
|
|
}
|
|
|
|
return invoices.Put(invoiceNum, b.Bytes())
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Migration to invoice time series index complete!")
|
|
|
|
return nil
|
|
}
|
|
|
|
// migrateInvoiceTimeSeriesOutgoingPayments is a follow up to the
|
|
// migrateInvoiceTimeSeries migration. As at the time of writing, the
|
|
// OutgoingPayment struct embeddeds an instance of the Invoice struct. As a
|
|
// result, we also need to migrate the internal invoice to the new format.
|
|
func migrateInvoiceTimeSeriesOutgoingPayments(tx *bolt.Tx) error {
|
|
payBucket := tx.Bucket(paymentBucket)
|
|
if payBucket == nil {
|
|
return nil
|
|
}
|
|
|
|
log.Infof("Migrating invoice database to new outgoing payment format")
|
|
|
|
err := payBucket.ForEach(func(payID, paymentBytes []byte) error {
|
|
log.Tracef("Migrating payment %x", payID[:])
|
|
|
|
// The internal invoices for each payment only contain a
|
|
// populated contract term, and creation date, as a result,
|
|
// most of the bytes will be "empty".
|
|
|
|
// We'll calculate the end of the invoice index assuming a
|
|
// "minimal" index that's embedded within the greater
|
|
// OutgoingPayment. The breakdown is:
|
|
// 3 bytes empty var bytes, 16 bytes creation date, 16 bytes
|
|
// settled date, 32 bytes payment pre-image, 8 bytes value, 1
|
|
// byte settled.
|
|
endOfInvoiceIndex := 1 + 1 + 1 + 16 + 16 + 32 + 8 + 1
|
|
|
|
// We'll now extract the prefix of the pure invoice embedded
|
|
// within.
|
|
invoiceBytes := paymentBytes[:endOfInvoiceIndex]
|
|
|
|
// With the prefix extracted, we'll copy over the invoice, and
|
|
// also add padding for the new 24 bytes of fields, and finally
|
|
// append the remainder of the outgoing payment.
|
|
paymentCopy := make([]byte, len(invoiceBytes))
|
|
copy(paymentCopy[:], invoiceBytes)
|
|
|
|
padding := bytes.Repeat([]byte{0}, 24)
|
|
paymentCopy = append(paymentCopy, padding...)
|
|
paymentCopy = append(
|
|
paymentCopy, paymentBytes[endOfInvoiceIndex:]...,
|
|
)
|
|
|
|
// At this point, we now have the new format of the outgoing
|
|
// payments, we'll attempt to deserialize it to ensure the
|
|
// bytes are properly formatted.
|
|
paymentReader := bytes.NewReader(paymentCopy)
|
|
_, err := deserializeOutgoingPayment(paymentReader)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to deserialize payment: %v", err)
|
|
}
|
|
|
|
// Now that we know the modifications was successful, we'll
|
|
// write it back to disk in the new format.
|
|
if err := payBucket.Put(payID, paymentCopy); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Migration to outgoing payment invoices complete!")
|
|
|
|
return nil
|
|
}
|
|
|
|
// migrateEdgePolicies is a migration function that will update the edges
|
|
// bucket. It ensure that edges with unknown policies will also have an entry
|
|
// in the bucket. After the migration, there will be two edge entries for
|
|
// every channel, regardless of whether the policies are known.
|
|
func migrateEdgePolicies(tx *bolt.Tx) error {
|
|
nodes := tx.Bucket(nodeBucket)
|
|
if nodes == nil {
|
|
return nil
|
|
}
|
|
|
|
edges := tx.Bucket(edgeBucket)
|
|
if edges == nil {
|
|
return nil
|
|
}
|
|
|
|
edgeIndex := edges.Bucket(edgeIndexBucket)
|
|
if edgeIndex == nil {
|
|
return nil
|
|
}
|
|
|
|
// checkKey gets the policy from the database with a low-level call
|
|
// so that it is still possible to distinguish between unknown and
|
|
// not present.
|
|
checkKey := func(channelId uint64, keyBytes []byte) error {
|
|
var channelID [8]byte
|
|
byteOrder.PutUint64(channelID[:], channelId)
|
|
|
|
_, err := fetchChanEdgePolicy(edges,
|
|
channelID[:], keyBytes, nodes)
|
|
|
|
if err == ErrEdgeNotFound {
|
|
log.Tracef("Adding unknown edge policy present for node %x, channel %v",
|
|
keyBytes, channelId)
|
|
|
|
err := putChanEdgePolicyUnknown(edges, channelId, keyBytes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Iterate over all channels and check both edge policies.
|
|
err := edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
|
|
infoReader := bytes.NewReader(edgeInfoBytes)
|
|
edgeInfo, err := deserializeChanEdgeInfo(infoReader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, key := range [][]byte{edgeInfo.NodeKey1Bytes[:],
|
|
edgeInfo.NodeKey2Bytes[:]} {
|
|
|
|
if err := checkKey(edgeInfo.ChannelID, key); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("unable to update edge policies: %v", err)
|
|
}
|
|
|
|
log.Infof("Migration of edge policies complete!")
|
|
|
|
return nil
|
|
}
|
|
|
|
// paymentStatusesMigration is a database migration intended for adding payment
|
|
// statuses for each existing payment entity in bucket to be able control
|
|
// transitions of statuses and prevent cases such as double payment
|
|
func paymentStatusesMigration(tx *bolt.Tx) error {
|
|
// Get the bucket dedicated to storing statuses of payments,
|
|
// where a key is payment hash, value is payment status.
|
|
paymentStatuses, err := tx.CreateBucketIfNotExists(paymentStatusBucket)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Migrating database to support payment statuses")
|
|
|
|
circuitAddKey := []byte("circuit-adds")
|
|
circuits := tx.Bucket(circuitAddKey)
|
|
if circuits != nil {
|
|
log.Infof("Marking all known circuits with status InFlight")
|
|
|
|
err = circuits.ForEach(func(k, v []byte) error {
|
|
// Parse the first 8 bytes as the short chan ID for the
|
|
// circuit. We'll skip all short chan IDs are not
|
|
// locally initiated, which includes all non-zero short
|
|
// chan ids.
|
|
chanID := binary.BigEndian.Uint64(k[:8])
|
|
if chanID != 0 {
|
|
return nil
|
|
}
|
|
|
|
// The payment hash is the third item in the serialized
|
|
// payment circuit. The first two items are an AddRef
|
|
// (10 bytes) and the incoming circuit key (16 bytes).
|
|
const payHashOffset = 10 + 16
|
|
|
|
paymentHash := v[payHashOffset : payHashOffset+32]
|
|
|
|
return paymentStatuses.Put(
|
|
paymentHash[:], StatusInFlight.Bytes(),
|
|
)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Infof("Marking all existing payments with status Completed")
|
|
|
|
// Get the bucket dedicated to storing payments
|
|
bucket := tx.Bucket(paymentBucket)
|
|
if bucket == nil {
|
|
return nil
|
|
}
|
|
|
|
// For each payment in the bucket, deserialize the payment and mark it
|
|
// as completed.
|
|
err = bucket.ForEach(func(k, v []byte) error {
|
|
// Ignores if it is sub-bucket.
|
|
if v == nil {
|
|
return nil
|
|
}
|
|
|
|
r := bytes.NewReader(v)
|
|
payment, err := deserializeOutgoingPayment(r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Calculate payment hash for current payment.
|
|
paymentHash := sha256.Sum256(payment.PaymentPreimage[:])
|
|
|
|
// Update status for current payment to completed. If it fails,
|
|
// the migration is aborted and the payment bucket is returned
|
|
// to its previous state.
|
|
return paymentStatuses.Put(paymentHash[:], StatusCompleted.Bytes())
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Migration of payment statuses complete!")
|
|
|
|
return nil
|
|
}
|
|
|
|
// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve
|
|
// some lingering bugs with regards to edge policies and their update index.
|
|
// Stale entries within the edge update index were not being properly pruned due
|
|
// to a miscalculation on the offset of an edge's policy last update. This
|
|
// migration also fixes the case where the public keys within edge policies were
|
|
// being serialized with an extra byte, causing an even greater error when
|
|
// attempting to perform the offset calculation described earlier.
|
|
func migratePruneEdgeUpdateIndex(tx *bolt.Tx) error {
|
|
// To begin the migration, we'll retrieve the update index bucket. If it
|
|
// does not exist, we have nothing left to do so we can simply exit.
|
|
edges := tx.Bucket(edgeBucket)
|
|
if edges == nil {
|
|
return nil
|
|
}
|
|
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
|
|
if edgeUpdateIndex == nil {
|
|
return nil
|
|
}
|
|
|
|
// Retrieve some buckets that will be needed later on. These should
|
|
// already exist given the assumption that the buckets above do as
|
|
// well.
|
|
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
|
|
if edgeIndex == nil {
|
|
return fmt.Errorf("unable to create/fetch edge index " +
|
|
"bucket")
|
|
}
|
|
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to make node bucket")
|
|
}
|
|
|
|
log.Info("Migrating database to properly prune edge update index")
|
|
|
|
// We'll need to properly prune all the outdated entries within the edge
|
|
// update index. To do so, we'll gather all of the existing policies
|
|
// within the graph to re-populate them later on.
|
|
var edgeKeys [][]byte
|
|
err = edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
|
|
// All valid entries are indexed by a public key (33 bytes)
|
|
// followed by a channel ID (8 bytes), so we'll skip any entries
|
|
// with keys that do not match this.
|
|
if len(edgeKey) != 33+8 {
|
|
return nil
|
|
}
|
|
|
|
edgeKeys = append(edgeKeys, edgeKey)
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to gather existing edge policies: %v",
|
|
err)
|
|
}
|
|
|
|
// With the existing edge policies gathered, we'll recreate the index
|
|
// and populate it with the correct entries.
|
|
//
|
|
// NOTE: In bolt DB, calling Delete() on an iterator will actually move
|
|
// it to the NEXT item. As a result, we call First() here again to go
|
|
// back to the front after the item has been deleted.
|
|
updateCursor := edgeUpdateIndex.Cursor()
|
|
for k, _ := updateCursor.First(); k != nil; k, _ = updateCursor.First() {
|
|
if err := updateCursor.Delete(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// For each edge key, we'll retrieve the policy, deserialize it, and
|
|
// re-add it to the different buckets. By doing so, we'll ensure that
|
|
// all existing edge policies are serialized correctly within their
|
|
// respective buckets and that the correct entries are populated within
|
|
// the edge update index.
|
|
for _, edgeKey := range edgeKeys {
|
|
edgePolicyBytes := edges.Get(edgeKey)
|
|
|
|
// Skip any entries with unknown policies as there will not be
|
|
// any entries for them in the edge update index.
|
|
if bytes.Equal(edgePolicyBytes[:], unknownPolicy) {
|
|
continue
|
|
}
|
|
|
|
edgePolicy, err := deserializeChanEdgePolicy(
|
|
bytes.NewReader(edgePolicyBytes), nodes,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
log.Info("Migration to properly prune edge update index complete!")
|
|
|
|
return nil
|
|
}
|