Merge pull request #1821 from wpaulino/prune-edge-update-index

channeldb: properly prune stale entries within edge update index
This commit is contained in:
Olaoluwa Osuntokun 2018-09-04 20:49:44 -07:00 committed by GitHub
commit 554bc314fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 236 additions and 106 deletions

@ -74,6 +74,12 @@ var (
number: 5,
migration: paymentStatusesMigration,
},
{
// The DB version that properly prunes stale entries
// from the edge update index.
number: 6,
migration: migratePruneEdgeUpdateIndex,
},
}
// Big endian is the preferred byte order, due to cursor scans over

@ -1222,7 +1222,9 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
// First, we'll fetch the static edge information.
edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
if err != nil {
return err
chanID := byteOrder.Uint64(chanID)
return fmt.Errorf("unable to fetch info for "+
"edge with chan_id=%v: %v", chanID, err)
}
edgeInfo.db = c.db
@ -1232,7 +1234,10 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
edgeIndex, edges, nodes, chanID, c.db,
)
if err != nil {
return err
chanID := byteOrder.Uint64(chanID)
return fmt.Errorf("unable to fetch policies "+
"for edge with chan_id=%v: %v", chanID,
err)
}
// Finally, we'll collate this edge with the rest of
@ -1612,36 +1617,48 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
if err != nil {
return err
}
// Create the channelID key be converting the channel ID
// integer into a byte slice.
var chanID [8]byte
byteOrder.PutUint64(chanID[:], edge.ChannelID)
// With the channel ID, we then fetch the value storing the two
// nodes which connect this channel edge.
nodeInfo := edgeIndex.Get(chanID[:])
if nodeInfo == nil {
return ErrEdgeNotFound
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
// Depending on the flags value passed above, either the first
// or second edge policy is being updated.
var fromNode, toNode []byte
if edge.Flags&lnwire.ChanUpdateDirection == 0 {
fromNode = nodeInfo[:33]
toNode = nodeInfo[33:67]
} else {
fromNode = nodeInfo[33:67]
toNode = nodeInfo[:33]
}
// Finally, with the direction of the edge being updated
// identified, we update the on-disk edge representation.
return putChanEdgePolicy(edges, edge, fromNode, toNode)
return updateEdgePolicy(edges, edgeIndex, nodes, edge)
})
}
// updateEdgePolicy attempts to update an edge's policy within the relevant
// buckets using an existing database transaction.
func updateEdgePolicy(edges, edgeIndex, nodes *bolt.Bucket,
edge *ChannelEdgePolicy) error {
// Create the channelID key be converting the channel ID
// integer into a byte slice.
var chanID [8]byte
byteOrder.PutUint64(chanID[:], edge.ChannelID)
// With the channel ID, we then fetch the value storing the two
// nodes which connect this channel edge.
nodeInfo := edgeIndex.Get(chanID[:])
if nodeInfo == nil {
return ErrEdgeNotFound
}
// Depending on the flags value passed above, either the first
// or second edge policy is being updated.
var fromNode, toNode []byte
if edge.Flags&lnwire.ChanUpdateDirection == 0 {
fromNode = nodeInfo[:33]
toNode = nodeInfo[33:66]
} else {
fromNode = nodeInfo[33:66]
toNode = nodeInfo[:33]
}
// Finally, with the direction of the edge being updated
// identified, we update the on-disk edge representation.
return putChanEdgePolicy(edges, nodes, edge, fromNode, toNode)
}
// LightningNode represents an individual vertex/node within the channel graph.
// A node is connected to other nodes by one or more channel edges emanating
// from it. As the graph is directed, a node will also have an incoming edge
@ -2936,7 +2953,9 @@ func deserializeChanEdgeInfo(r io.Reader) (ChannelEdgeInfo, error) {
return edgeInfo, nil
}
func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []byte) error {
func putChanEdgePolicy(edges, nodes *bolt.Bucket, edge *ChannelEdgePolicy,
from, to []byte) error {
var edgeKey [33 + 8]byte
copy(edgeKey[:], from)
byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
@ -2999,17 +3018,20 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b
// In order to delete the old entry, we'll need to obtain the
// *prior* update time in order to delete it. To do this, we'll
// create an offset to slice in. Starting backwards, we'll
// create an offset than puts us right after the flags
// variable:
//
// * pubkeySize + fee+policySize + timelockSize + flagSize
updateEnd := 33 + (8 * 3) + 2 + 1
updateStart := updateEnd - 8
oldUpdateTime := edgeBytes[updateStart:updateEnd]
// need to deserialize the existing policy within the database
// (now outdated by the new one), and delete its corresponding
// entry within the update index.
oldEdgePolicy, err := deserializeChanEdgePolicy(
bytes.NewReader(edgeBytes), nodes,
)
if err != nil {
return err
}
oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix())
var oldIndexKey [8 + 8]byte
copy(oldIndexKey[:], oldUpdateTime)
byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
@ -3090,7 +3112,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket,
// Similarly, the second node is contained within the latter
// half of the edge information.
node2Pub := edgeInfo[33:67]
node2Pub := edgeInfo[33:66]
edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub, nodes)
if err != nil {
return nil, nil, err

@ -2130,44 +2130,74 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
t.Fatalf("unable to update edge: %v", err)
}
// Now that both edges have been updated, if we manually check the
// update index, we should have an entry for both edges.
if err := db.View(func(tx *bolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return ErrGraphNoEdgesFound
// checkIndexTimestamps is a helper function that checks the edge update
// index only includes the given timestamps.
checkIndexTimestamps := func(timestamps ...uint64) {
timestampSet := make(map[uint64]struct{})
for _, t := range timestamps {
timestampSet[t] = struct{}{}
}
var edgeKey [8 + 8]byte
err := db.View(func(tx *bolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return ErrGraphNoEdgesFound
}
byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID)
numEntries := edgeUpdateIndex.Stats().KeyN
expectedEntries := len(timestampSet)
if numEntries != expectedEntries {
return fmt.Errorf("expected %v entries in the "+
"update index, got %v", expectedEntries,
numEntries)
}
if edgeUpdateIndex.Get(edgeKey[:]) == nil {
return fmt.Errorf("first edge not found in update " +
"index")
return edgeUpdateIndex.ForEach(func(k, _ []byte) error {
t := byteOrder.Uint64(k[:8])
if _, ok := timestampSet[t]; !ok {
return fmt.Errorf("found unexpected "+
"timestamp "+"%d", t)
}
return nil
})
})
if err != nil {
t.Fatal(err)
}
byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID)
if edgeUpdateIndex.Get(edgeKey[:]) == nil {
return fmt.Errorf("second edge not found in update " +
"index")
}
return nil
}); err != nil {
t.Fatalf("unable to read update index: %v", err)
}
// With both edges policies added, we'll make sure to check they exist
// within the edge update index.
checkIndexTimestamps(
uint64(edge1.LastUpdate.Unix()),
uint64(edge2.LastUpdate.Unix()),
)
// Now, we'll update the edge policies to ensure the old timestamps are
// removed from the update index.
edge1.Flags = 2
edge1.LastUpdate = time.Now()
if err := graph.UpdateEdgePolicy(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
edge2.Flags = 3
edge2.LastUpdate = edge1.LastUpdate.Add(time.Hour)
if err := graph.UpdateEdgePolicy(edge2); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
// With the policies updated, we should now be able to find their
// updated entries within the update index.
checkIndexTimestamps(
uint64(edge1.LastUpdate.Unix()),
uint64(edge2.LastUpdate.Unix()),
)
// Now we'll prune the graph, removing the edges, and also the update
// index entries from the database all together.
var blockHash chainhash.Hash
@ -2179,43 +2209,10 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
t.Fatalf("unable to prune graph: %v", err)
}
// We'll now check the database state again, at this point, we should
// no longer be able to locate the entries within the edge update
// index.
if err := db.View(func(tx *bolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return ErrGraphNoEdgesFound
}
var edgeKey [8 + 8]byte
byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID)
if edgeUpdateIndex.Get(edgeKey[:]) != nil {
return fmt.Errorf("first edge still found in update " +
"index")
}
byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID)
if edgeUpdateIndex.Get(edgeKey[:]) != nil {
return fmt.Errorf("second edge still found in update " +
"index")
}
return nil
}); err != nil {
t.Fatalf("unable to read update index: %v", err)
}
// Finally, we'll check the database state one last time to conclude
// that we should no longer be able to locate _any_ entries within the
// edge update index.
checkIndexTimestamps()
}
// TestPruneGraphNodes tests that unconnected vertexes are pruned via the

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"github.com/coreos/bbolt"
@ -458,3 +459,107 @@ func paymentStatusesMigration(tx *bolt.Tx) error {
return nil
}
// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve
// some lingering bugs with regards to edge policies and their update index.
// Stale entries within the edge update index were not being properly pruned due
// to a miscalculation on the offset of an edge's policy last update. This
// migration also fixes the case where the public keys within edge policies were
// being serialized with an extra byte, causing an even greater error when
// attempting to perform the offset calculation described earlier.
func migratePruneEdgeUpdateIndex(tx *bolt.Tx) error {
// To begin the migration, we'll retrieve the update index bucket. If it
// does not exist, we have nothing left to do so we can simply exit.
edges := tx.Bucket(edgeBucket)
if edges == nil {
return nil
}
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return nil
}
// Retrieve some buckets that will be needed later on. These should
// already exist given the assumption that the buckets above do as well.
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return errors.New("edge index should exist but does not")
}
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return errors.New("node bucket should exist but does not")
}
log.Info("Migrating database to properly prune edge update index")
// We'll need to properly prune all the outdated entries within the edge
// update index. To do so, we'll gather all of the existing policies
// within the graph to re-populate them later on.
var edgeKeys [][]byte
err := edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error {
// All valid entries are indexed by a public key (33 bytes)
// followed by a channel ID (8 bytes), so we'll skip any entries
// with keys that do not match this.
if len(edgeKey) != 33+8 {
return nil
}
edgeKeys = append(edgeKeys, edgeKey)
return nil
})
if err != nil {
return fmt.Errorf("unable to gather existing edge policies: %v",
err)
}
// With the existing edge policies gathered, we'll recreate the index
// and populate it with the correct entries.
oldNumEntries := edgeUpdateIndex.Stats().KeyN
if err := edges.DeleteBucket(edgeUpdateIndexBucket); err != nil {
return fmt.Errorf("unable to remove existing edge update "+
"index: %v", err)
}
edgeUpdateIndex, err = edges.CreateBucketIfNotExists(
edgeUpdateIndexBucket,
)
if err != nil {
return fmt.Errorf("unable to recreate edge update index: %v",
err)
}
// For each edge key, we'll retrieve the policy, deserialize it, and
// re-add it to the different buckets. By doing so, we'll ensure that
// all existing edge policies are serialized correctly within their
// respective buckets and that the correct entries are populated within
// the edge update index.
for _, edgeKey := range edgeKeys {
edgePolicyBytes := edges.Get(edgeKey)
// Skip any entries with unknown policies as there will not be
// any entries for them in the edge update index.
if bytes.Equal(edgePolicyBytes[:], unknownPolicy) {
continue
}
edgePolicy, err := deserializeChanEdgePolicy(
bytes.NewReader(edgePolicyBytes), nodes,
)
if err != nil {
return err
}
err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy)
if err != nil {
return err
}
}
newNumEntries := edgeUpdateIndex.Stats().KeyN
log.Infof("Pruned %d stale entries from the edge update index",
oldNumEntries-newNumEntries)
log.Info("Migration to properly prune edge update index complete!")
return nil
}