channeldb: add migration to properly prune edge update index

In this commit, we introduce a migration to fix some of the recent
issues found w.r.t. the edge update index. The migration attempts to fix
two things:

1) Edge policies include an extra byte at the end due to reading an
extra byte for the node's public key from the serialized node info.

2) Properly prune all stale entries within the edge update index.

As a result of this migration, nodes will have a slightly smaller in
size channeldb. We will also no longer send stale edges to our peers in
response to their gossip queries, which should also fix the fetching
channel announcement for closed channels issue.
This commit is contained in:
Wilmer Paulino 2018-08-31 14:59:37 -07:00
parent c1633da252
commit 85ea08fd17
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 111 additions and 0 deletions

@ -74,6 +74,12 @@ var (
number: 5,
migration: paymentStatusesMigration,
},
{
// The DB version that properly prunes stale entries
// from the edge update index.
number: 6,
migration: migratePruneEdgeUpdateIndex,
},
}
// Big endian is the preferred byte order, due to cursor scans over

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"github.com/coreos/bbolt"
@ -458,3 +459,107 @@ func paymentStatusesMigration(tx *bolt.Tx) error {
return nil
}
// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve
// some lingering bugs with regards to edge policies and their update index.
// Stale entries within the edge update index were not being properly pruned due
// to a miscalculation on the offset of an edge's policy last update. This
// migration also fixes the case where the public keys within edge policies were
// being serialized with an extra byte, causing an even greater error when
// attempting to perform the offset calculation described earlier.
func migratePruneEdgeUpdateIndex(tx *bolt.Tx) error {
// To begin the migration, we'll retrieve the update index bucket. If it
// does not exist, we have nothing left to do so we can simply exit.
edges := tx.Bucket(edgeBucket)
if edges == nil {
return nil
}
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return nil
}
// Retrieve some buckets that will be needed later on. These should
// already exist given the assumption that the buckets above do as well.
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return errors.New("edge index should exist but does not")
}
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return errors.New("node bucket should exist but does not")
}
log.Info("Migrating database to properly prune edge update index")
// We'll need to properly prune all the outdated entries within the edge
// update index. To do so, we'll gather all of the existing policies
// within the graph to re-populate them later on.
var edgeKeys [][]byte
err := edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
// All valid entries are indexed by a public key (33 bytes)
// followed by a channel ID (8 bytes), so we'll skip any entries
// with keys that do not match this.
if len(edgeKey) != 33+8 {
return nil
}
edgeKeys = append(edgeKeys, edgeKey)
return nil
})
if err != nil {
return fmt.Errorf("unable to gather existing edge policies: %v",
err)
}
// With the existing edge policies gathered, we'll recreate the index
// and populate it with the correct entries.
oldNumEntries := edgeUpdateIndex.Stats().KeyN
if err := edges.DeleteBucket(edgeUpdateIndexBucket); err != nil {
return fmt.Errorf("unable to remove existing edge update "+
"index: %v", err)
}
edgeUpdateIndex, err = edges.CreateBucketIfNotExists(
edgeUpdateIndexBucket,
)
if err != nil {
return fmt.Errorf("unable to recreate edge update index: %v",
err)
}
// For each edge key, we'll retrieve the policy, deserialize it, and
// re-add it to the different buckets. By doing so, we'll ensure that
// all existing edge policies are serialized correctly within their
// respective buckets and that the correct entries are populated within
// the edge update index.
for _, edgeKey := range edgeKeys {
edgePolicyBytes := edges.Get(edgeKey)
// Skip any entries with unknown policies as there will not be
// any entries for them in the edge update index.
if bytes.Equal(edgePolicyBytes[:], unknownPolicy) {
continue
}
edgePolicy, err := deserializeChanEdgePolicy(
bytes.NewReader(edgePolicyBytes), nodes,
)
if err != nil {
return err
}
err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy)
if err != nil {
return err
}
}
newNumEntries := edgeUpdateIndex.Stats().KeyN
log.Infof("Pruned %d stale entries from the edge update index",
oldNumEntries-newNumEntries)
log.Info("Migration to properly prune edge update index complete!")
return nil
}