diff --git a/channeldb/db.go b/channeldb/db.go index 939bbdcb..df611326 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -74,6 +74,12 @@ var ( number: 5, migration: paymentStatusesMigration, }, + { + // The DB version that properly prunes stale entries + // from the edge update index. + number: 6, + migration: migratePruneEdgeUpdateIndex, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/graph.go b/channeldb/graph.go index dc481cf7..1d7625ba 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1222,7 +1222,9 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha // First, we'll fetch the static edge information. edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID) if err != nil { - return err + chanID := byteOrder.Uint64(chanID) + return fmt.Errorf("unable to fetch info for "+ + "edge with chan_id=%v: %v", chanID, err) } edgeInfo.db = c.db @@ -1232,7 +1234,10 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha edgeIndex, edges, nodes, chanID, c.db, ) if err != nil { - return err + chanID := byteOrder.Uint64(chanID) + return fmt.Errorf("unable to fetch policies "+ + "for edge with chan_id=%v: %v", chanID, + err) } // Finally, we'll collate this edge with the rest of @@ -1612,36 +1617,48 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { if err != nil { return err } - - // Create the channelID key be converting the channel ID - // integer into a byte slice. - var chanID [8]byte - byteOrder.PutUint64(chanID[:], edge.ChannelID) - - // With the channel ID, we then fetch the value storing the two - // nodes which connect this channel edge. - nodeInfo := edgeIndex.Get(chanID[:]) - if nodeInfo == nil { - return ErrEdgeNotFound + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err } - // Depending on the flags value passed above, either the first - // or second edge policy is being updated. - var fromNode, toNode []byte - if edge.Flags&lnwire.ChanUpdateDirection == 0 { - fromNode = nodeInfo[:33] - toNode = nodeInfo[33:67] - } else { - fromNode = nodeInfo[33:67] - toNode = nodeInfo[:33] - } - - // Finally, with the direction of the edge being updated - // identified, we update the on-disk edge representation. - return putChanEdgePolicy(edges, edge, fromNode, toNode) + return updateEdgePolicy(edges, edgeIndex, nodes, edge) }) } +// updateEdgePolicy attempts to update an edge's policy within the relevant +// buckets using an existing database transaction. +func updateEdgePolicy(edges, edgeIndex, nodes *bolt.Bucket, + edge *ChannelEdgePolicy) error { + + // Create the channelID key be converting the channel ID + // integer into a byte slice. + var chanID [8]byte + byteOrder.PutUint64(chanID[:], edge.ChannelID) + + // With the channel ID, we then fetch the value storing the two + // nodes which connect this channel edge. + nodeInfo := edgeIndex.Get(chanID[:]) + if nodeInfo == nil { + return ErrEdgeNotFound + } + + // Depending on the flags value passed above, either the first + // or second edge policy is being updated. + var fromNode, toNode []byte + if edge.Flags&lnwire.ChanUpdateDirection == 0 { + fromNode = nodeInfo[:33] + toNode = nodeInfo[33:66] + } else { + fromNode = nodeInfo[33:66] + toNode = nodeInfo[:33] + } + + // Finally, with the direction of the edge being updated + // identified, we update the on-disk edge representation. + return putChanEdgePolicy(edges, nodes, edge, fromNode, toNode) +} + // LightningNode represents an individual vertex/node within the channel graph. // A node is connected to other nodes by one or more channel edges emanating // from it. As the graph is directed, a node will also have an incoming edge @@ -2936,7 +2953,9 @@ func deserializeChanEdgeInfo(r io.Reader) (ChannelEdgeInfo, error) { return edgeInfo, nil } -func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []byte) error { +func putChanEdgePolicy(edges, nodes *bolt.Bucket, edge *ChannelEdgePolicy, + from, to []byte) error { + var edgeKey [33 + 8]byte copy(edgeKey[:], from) byteOrder.PutUint64(edgeKey[33:], edge.ChannelID) @@ -2999,17 +3018,20 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b // In order to delete the old entry, we'll need to obtain the // *prior* update time in order to delete it. To do this, we'll - // create an offset to slice in. Starting backwards, we'll - // create an offset than puts us right after the flags - // variable: - // - // * pubkeySize + fee+policySize + timelockSize + flagSize - updateEnd := 33 + (8 * 3) + 2 + 1 - updateStart := updateEnd - 8 - oldUpdateTime := edgeBytes[updateStart:updateEnd] + // need to deserialize the existing policy within the database + // (now outdated by the new one), and delete its corresponding + // entry within the update index. + oldEdgePolicy, err := deserializeChanEdgePolicy( + bytes.NewReader(edgeBytes), nodes, + ) + if err != nil { + return err + } + + oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix()) var oldIndexKey [8 + 8]byte - copy(oldIndexKey[:], oldUpdateTime) + byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime) byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID) if err := updateIndex.Delete(oldIndexKey[:]); err != nil { @@ -3090,7 +3112,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket, // Similarly, the second node is contained within the latter // half of the edge information. - node2Pub := edgeInfo[33:67] + node2Pub := edgeInfo[33:66] edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub, nodes) if err != nil { return nil, nil, err diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 56b4dd38..4e1d10f7 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -2130,44 +2130,74 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { t.Fatalf("unable to update edge: %v", err) } - // Now that both edges have been updated, if we manually check the - // update index, we should have an entry for both edges. - if err := db.View(func(tx *bolt.Tx) error { - edges := tx.Bucket(edgeBucket) - if edges == nil { - return ErrGraphNoEdgesFound - } - edgeIndex := edges.Bucket(edgeIndexBucket) - if edgeIndex == nil { - return ErrGraphNoEdgesFound - } - edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) - if edgeUpdateIndex == nil { - return ErrGraphNoEdgesFound + // checkIndexTimestamps is a helper function that checks the edge update + // index only includes the given timestamps. + checkIndexTimestamps := func(timestamps ...uint64) { + timestampSet := make(map[uint64]struct{}) + for _, t := range timestamps { + timestampSet[t] = struct{}{} } - var edgeKey [8 + 8]byte + err := db.View(func(tx *bolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) + if edgeUpdateIndex == nil { + return ErrGraphNoEdgesFound + } - byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID) + numEntries := edgeUpdateIndex.Stats().KeyN + expectedEntries := len(timestampSet) + if numEntries != expectedEntries { + return fmt.Errorf("expected %v entries in the "+ + "update index, got %v", expectedEntries, + numEntries) + } - if edgeUpdateIndex.Get(edgeKey[:]) == nil { - return fmt.Errorf("first edge not found in update " + - "index") + return edgeUpdateIndex.ForEach(func(k, _ []byte) error { + t := byteOrder.Uint64(k[:8]) + if _, ok := timestampSet[t]; !ok { + return fmt.Errorf("found unexpected "+ + "timestamp "+"%d", t) + } + + return nil + }) + }) + if err != nil { + t.Fatal(err) } - - byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID) - if edgeUpdateIndex.Get(edgeKey[:]) == nil { - return fmt.Errorf("second edge not found in update " + - "index") - } - - return nil - }); err != nil { - t.Fatalf("unable to read update index: %v", err) } + // With both edges policies added, we'll make sure to check they exist + // within the edge update index. + checkIndexTimestamps( + uint64(edge1.LastUpdate.Unix()), + uint64(edge2.LastUpdate.Unix()), + ) + + // Now, we'll update the edge policies to ensure the old timestamps are + // removed from the update index. + edge1.Flags = 2 + edge1.LastUpdate = time.Now() + if err := graph.UpdateEdgePolicy(edge1); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + edge2.Flags = 3 + edge2.LastUpdate = edge1.LastUpdate.Add(time.Hour) + if err := graph.UpdateEdgePolicy(edge2); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + + // With the policies updated, we should now be able to find their + // updated entries within the update index. + checkIndexTimestamps( + uint64(edge1.LastUpdate.Unix()), + uint64(edge2.LastUpdate.Unix()), + ) + // Now we'll prune the graph, removing the edges, and also the update // index entries from the database all together. var blockHash chainhash.Hash @@ -2179,43 +2209,10 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { t.Fatalf("unable to prune graph: %v", err) } - // We'll now check the database state again, at this point, we should - // no longer be able to locate the entries within the edge update - // index. - if err := db.View(func(tx *bolt.Tx) error { - edges := tx.Bucket(edgeBucket) - if edges == nil { - return ErrGraphNoEdgesFound - } - edgeIndex := edges.Bucket(edgeIndexBucket) - if edgeIndex == nil { - return ErrGraphNoEdgesFound - } - edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) - if edgeUpdateIndex == nil { - return ErrGraphNoEdgesFound - } - - var edgeKey [8 + 8]byte - - byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID) - if edgeUpdateIndex.Get(edgeKey[:]) != nil { - return fmt.Errorf("first edge still found in update " + - "index") - } - - byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID) - if edgeUpdateIndex.Get(edgeKey[:]) != nil { - return fmt.Errorf("second edge still found in update " + - "index") - } - - return nil - }); err != nil { - t.Fatalf("unable to read update index: %v", err) - } + // Finally, we'll check the database state one last time to conclude + // that we should no longer be able to locate _any_ entries within the + // edge update index. + checkIndexTimestamps() } // TestPruneGraphNodes tests that unconnected vertexes are pruned via the diff --git a/channeldb/migrations.go b/channeldb/migrations.go index c7beb638..810e14d9 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "errors" "fmt" "github.com/coreos/bbolt" @@ -458,3 +459,107 @@ func paymentStatusesMigration(tx *bolt.Tx) error { return nil } + +// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve +// some lingering bugs with regards to edge policies and their update index. +// Stale entries within the edge update index were not being properly pruned due +// to a miscalculation on the offset of an edge's policy last update. This +// migration also fixes the case where the public keys within edge policies were +// being serialized with an extra byte, causing an even greater error when +// attempting to perform the offset calculation described earlier. +func migratePruneEdgeUpdateIndex(tx *bolt.Tx) error { + // To begin the migration, we'll retrieve the update index bucket. If it + // does not exist, we have nothing left to do so we can simply exit. + edges := tx.Bucket(edgeBucket) + if edges == nil { + return nil + } + edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) + if edgeUpdateIndex == nil { + return nil + } + + // Retrieve some buckets that will be needed later on. These should + // already exist given the assumption that the buckets above do as well. + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return errors.New("edge index should exist but does not") + } + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return errors.New("node bucket should exist but does not") + } + + log.Info("Migrating database to properly prune edge update index") + + // We'll need to properly prune all the outdated entries within the edge + // update index. To do so, we'll gather all of the existing policies + // within the graph to re-populate them later on. + var edgeKeys [][]byte + err := edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error { + // All valid entries are indexed by a public key (33 bytes) + // followed by a channel ID (8 bytes), so we'll skip any entries + // with keys that do not match this. + if len(edgeKey) != 33+8 { + return nil + } + + edgeKeys = append(edgeKeys, edgeKey) + + return nil + }) + if err != nil { + return fmt.Errorf("unable to gather existing edge policies: %v", + err) + } + + // With the existing edge policies gathered, we'll recreate the index + // and populate it with the correct entries. + oldNumEntries := edgeUpdateIndex.Stats().KeyN + if err := edges.DeleteBucket(edgeUpdateIndexBucket); err != nil { + return fmt.Errorf("unable to remove existing edge update "+ + "index: %v", err) + } + edgeUpdateIndex, err = edges.CreateBucketIfNotExists( + edgeUpdateIndexBucket, + ) + if err != nil { + return fmt.Errorf("unable to recreate edge update index: %v", + err) + } + + // For each edge key, we'll retrieve the policy, deserialize it, and + // re-add it to the different buckets. By doing so, we'll ensure that + // all existing edge policies are serialized correctly within their + // respective buckets and that the correct entries are populated within + // the edge update index. + for _, edgeKey := range edgeKeys { + edgePolicyBytes := edges.Get(edgeKey) + + // Skip any entries with unknown policies as there will not be + // any entries for them in the edge update index. + if bytes.Equal(edgePolicyBytes[:], unknownPolicy) { + continue + } + + edgePolicy, err := deserializeChanEdgePolicy( + bytes.NewReader(edgePolicyBytes), nodes, + ) + if err != nil { + return err + } + + err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy) + if err != nil { + return err + } + } + + newNumEntries := edgeUpdateIndex.Stats().KeyN + log.Infof("Pruned %d stale entries from the edge update index", + oldNumEntries-newNumEntries) + + log.Info("Migration to properly prune edge update index complete!") + + return nil +}