From 85ea08fd1769ed168072193d9747a3ce9e41cc6b Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:37 -0700 Subject: [PATCH] channeldb: add migration to properly prune edge update index In this commit, we introduce a migration to fix some of the recent issues found w.r.t. the edge update index. The migration attempts to fix two things: 1) Edge policies include an extra byte at the end due to reading an extra byte for the node's public key from the serialized node info. 2) Properly prune all stale entries within the edge update index. As a result of this migration, nodes will have a slightly smaller in size channeldb. We will also no longer send stale edges to our peers in response to their gossip queries, which should also fix the fetching channel announcement for closed channels issue. --- channeldb/db.go | 6 +++ channeldb/migrations.go | 105 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 939bbdcb..df611326 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -74,6 +74,12 @@ var ( number: 5, migration: paymentStatusesMigration, }, + { + // The DB version that properly prunes stale entries + // from the edge update index. + number: 6, + migration: migratePruneEdgeUpdateIndex, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/migrations.go b/channeldb/migrations.go index c7beb638..810e14d9 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "errors" "fmt" "github.com/coreos/bbolt" @@ -458,3 +459,107 @@ func paymentStatusesMigration(tx *bolt.Tx) error { return nil } + +// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve +// some lingering bugs with regards to edge policies and their update index. +// Stale entries within the edge update index were not being properly pruned due +// to a miscalculation on the offset of an edge's policy last update. This +// migration also fixes the case where the public keys within edge policies were +// being serialized with an extra byte, causing an even greater error when +// attempting to perform the offset calculation described earlier. +func migratePruneEdgeUpdateIndex(tx *bolt.Tx) error { + // To begin the migration, we'll retrieve the update index bucket. If it + // does not exist, we have nothing left to do so we can simply exit. + edges := tx.Bucket(edgeBucket) + if edges == nil { + return nil + } + edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) + if edgeUpdateIndex == nil { + return nil + } + + // Retrieve some buckets that will be needed later on. These should + // already exist given the assumption that the buckets above do as well. + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return errors.New("edge index should exist but does not") + } + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return errors.New("node bucket should exist but does not") + } + + log.Info("Migrating database to properly prune edge update index") + + // We'll need to properly prune all the outdated entries within the edge + // update index. To do so, we'll gather all of the existing policies + // within the graph to re-populate them later on. + var edgeKeys [][]byte + err := edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error { + // All valid entries are indexed by a public key (33 bytes) + // followed by a channel ID (8 bytes), so we'll skip any entries + // with keys that do not match this. + if len(edgeKey) != 33+8 { + return nil + } + + edgeKeys = append(edgeKeys, edgeKey) + + return nil + }) + if err != nil { + return fmt.Errorf("unable to gather existing edge policies: %v", + err) + } + + // With the existing edge policies gathered, we'll recreate the index + // and populate it with the correct entries. + oldNumEntries := edgeUpdateIndex.Stats().KeyN + if err := edges.DeleteBucket(edgeUpdateIndexBucket); err != nil { + return fmt.Errorf("unable to remove existing edge update "+ + "index: %v", err) + } + edgeUpdateIndex, err = edges.CreateBucketIfNotExists( + edgeUpdateIndexBucket, + ) + if err != nil { + return fmt.Errorf("unable to recreate edge update index: %v", + err) + } + + // For each edge key, we'll retrieve the policy, deserialize it, and + // re-add it to the different buckets. By doing so, we'll ensure that + // all existing edge policies are serialized correctly within their + // respective buckets and that the correct entries are populated within + // the edge update index. + for _, edgeKey := range edgeKeys { + edgePolicyBytes := edges.Get(edgeKey) + + // Skip any entries with unknown policies as there will not be + // any entries for them in the edge update index. + if bytes.Equal(edgePolicyBytes[:], unknownPolicy) { + continue + } + + edgePolicy, err := deserializeChanEdgePolicy( + bytes.NewReader(edgePolicyBytes), nodes, + ) + if err != nil { + return err + } + + err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy) + if err != nil { + return err + } + } + + newNumEntries := edgeUpdateIndex.Stats().KeyN + log.Infof("Pruned %d stale entries from the edge update index", + oldNumEntries-newNumEntries) + + log.Info("Migration to properly prune edge update index complete!") + + return nil +}