From aa3e2b6ba4b3ddb1d255ce47e48c9e708c6dabea Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:32 -0700 Subject: [PATCH 1/7] channeldb/graph: identify edge chan id on failure --- channeldb/graph.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index dc481cf7..34da4391 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1222,7 +1222,9 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha // First, we'll fetch the static edge information. edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID) if err != nil { - return err + chanID := byteOrder.Uint64(chanID) + return fmt.Errorf("unable to fetch info for "+ + "edge with chan_id=%v: %v", chanID, err) } edgeInfo.db = c.db @@ -1232,7 +1234,10 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha edgeIndex, edges, nodes, chanID, c.db, ) if err != nil { - return err + chanID := byteOrder.Uint64(chanID) + return fmt.Errorf("unable to fetch policies "+ + "for edge with chan_id=%v: %v", chanID, + err) } // Finally, we'll collate this edge with the rest of From 06344da62e46e49cd3d0bc6756a1a4b83cde48c5 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:33 -0700 Subject: [PATCH 2/7] channeldb/graph: refactor UpdateEdgePolicy to use existing db transaction --- channeldb/graph.go | 60 ++++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 34da4391..e5057bc2 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1618,35 +1618,43 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { return err } - // Create the channelID key be converting the channel ID - // integer into a byte slice. - var chanID [8]byte - byteOrder.PutUint64(chanID[:], edge.ChannelID) - - // With the channel ID, we then fetch the value storing the two - // nodes which connect this channel edge. - nodeInfo := edgeIndex.Get(chanID[:]) - if nodeInfo == nil { - return ErrEdgeNotFound - } - - // Depending on the flags value passed above, either the first - // or second edge policy is being updated. - var fromNode, toNode []byte - if edge.Flags&lnwire.ChanUpdateDirection == 0 { - fromNode = nodeInfo[:33] - toNode = nodeInfo[33:67] - } else { - fromNode = nodeInfo[33:67] - toNode = nodeInfo[:33] - } - - // Finally, with the direction of the edge being updated - // identified, we update the on-disk edge representation. - return putChanEdgePolicy(edges, edge, fromNode, toNode) + return updateEdgePolicy(edges, edgeIndex, edge) }) } +// updateEdgePolicy attempts to update an edge's policy within the relevant +// buckets using an existing database transaction. +func updateEdgePolicy(edges, edgeIndex *bolt.Bucket, + edge *ChannelEdgePolicy) error { + + // Create the channelID key be converting the channel ID + // integer into a byte slice. + var chanID [8]byte + byteOrder.PutUint64(chanID[:], edge.ChannelID) + + // With the channel ID, we then fetch the value storing the two + // nodes which connect this channel edge. + nodeInfo := edgeIndex.Get(chanID[:]) + if nodeInfo == nil { + return ErrEdgeNotFound + } + + // Depending on the flags value passed above, either the first + // or second edge policy is being updated. + var fromNode, toNode []byte + if edge.Flags&lnwire.ChanUpdateDirection == 0 { + fromNode = nodeInfo[:33] + toNode = nodeInfo[33:67] + } else { + fromNode = nodeInfo[33:67] + toNode = nodeInfo[:33] + } + + // Finally, with the direction of the edge being updated + // identified, we update the on-disk edge representation. + return putChanEdgePolicy(edges, edge, fromNode, toNode) +} + // LightningNode represents an individual vertex/node within the channel graph. // A node is connected to other nodes by one or more channel edges emanating // from it. As the graph is directed, a node will also have an incoming edge From 492d581df632fac6c9fe6753efb3f5b3b42db220 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:34 -0700 Subject: [PATCH 3/7] channeldb/graph: fix off-by-one public key slice In this commit, we fix an off-by-one error when slicing the public key from the serialized node info byte slice. This would cause us to write an extra byte to all edge policies. Even though the values were read correctly, when attempting to calculate the offset of an edge's update time going backwards, we'd always be incorrect, causing us to not properly prune the edge update index. --- channeldb/graph.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index e5057bc2..b53bf344 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1644,9 +1644,9 @@ func updateEdgePolicy(edges, edgeIndex *bolt.Bucket, var fromNode, toNode []byte if edge.Flags&lnwire.ChanUpdateDirection == 0 { fromNode = nodeInfo[:33] - toNode = nodeInfo[33:67] + toNode = nodeInfo[33:66] } else { - fromNode = nodeInfo[33:67] + fromNode = nodeInfo[33:66] toNode = nodeInfo[:33] } @@ -3103,7 +3103,7 @@ func fetchChanEdgePolicies(edgeIndex *bolt.Bucket, edges *bolt.Bucket, // Similarly, the second node is contained within the latter // half of the edge information. - node2Pub := edgeInfo[33:67] + node2Pub := edgeInfo[33:66] edge2, err := fetchChanEdgePolicy(edges, chanID, node2Pub, nodes) if err != nil { return nil, nil, err From 2f22e6c35f08c34ae1b86525f2bde8e3d98291ef Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:35 -0700 Subject: [PATCH 4/7] channeldb/graph: properly determine old update timestamp for an edge In this commit, we fix a lingering issue within the edge update index where entries were not being properly pruned due to an incorrect calculation of the offset of an edge's last update time. Since the offset is being determined from the end to the start, we need to subtract all the fields after an edge policy's last update time from the total amount of bytes of the serialized edge policy to determine the correct offset. This was also slightly off as the edge policy included an extra byte, which has been fixed in the previous commit. Instead of continuing the slicing approach however, we'll switch to deserializing the raw bytes of an edge's policy to ensure this doesn't happen in the future when/if the serialization methods change or extra data is included. --- channeldb/graph.go | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index b53bf344..1d7625ba 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -1617,14 +1617,18 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { if err != nil { return err } + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } - return updateEdgePolicy(edges, edgeIndex, edge) + return updateEdgePolicy(edges, edgeIndex, nodes, edge) }) } // updateEdgePolicy attempts to update an edge's policy within the relevant // buckets using an existing database transaction. -func updateEdgePolicy(edges, edgeIndex *bolt.Bucket, +func updateEdgePolicy(edges, edgeIndex, nodes *bolt.Bucket, edge *ChannelEdgePolicy) error { // Create the channelID key be converting the channel ID @@ -1652,7 +1656,7 @@ func updateEdgePolicy(edges, edgeIndex *bolt.Bucket, // Finally, with the direction of the edge being updated // identified, we update the on-disk edge representation. - return putChanEdgePolicy(edges, edge, fromNode, toNode) + return putChanEdgePolicy(edges, nodes, edge, fromNode, toNode) } // LightningNode represents an individual vertex/node within the channel graph. @@ -2949,7 +2953,9 @@ func deserializeChanEdgeInfo(r io.Reader) (ChannelEdgeInfo, error) { return edgeInfo, nil } -func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []byte) error { +func putChanEdgePolicy(edges, nodes *bolt.Bucket, edge *ChannelEdgePolicy, + from, to []byte) error { + var edgeKey [33 + 8]byte copy(edgeKey[:], from) byteOrder.PutUint64(edgeKey[33:], edge.ChannelID) @@ -3012,17 +3018,20 @@ func putChanEdgePolicy(edges *bolt.Bucket, edge *ChannelEdgePolicy, from, to []b // In order to delete the old entry, we'll need to obtain the // *prior* update time in order to delete it. To do this, we'll - // create an offset to slice in. Starting backwards, we'll - // create an offset than puts us right after the flags - // variable: - // - // * pubkeySize + fee+policySize + timelockSize + flagSize - updateEnd := 33 + (8 * 3) + 2 + 1 - updateStart := updateEnd - 8 - oldUpdateTime := edgeBytes[updateStart:updateEnd] + // need to deserialize the existing policy within the database + // (now outdated by the new one), and delete its corresponding + // entry within the update index. + oldEdgePolicy, err := deserializeChanEdgePolicy( + bytes.NewReader(edgeBytes), nodes, + ) + if err != nil { + return err + } + + oldUpdateTime := uint64(oldEdgePolicy.LastUpdate.Unix()) var oldIndexKey [8 + 8]byte - copy(oldIndexKey[:], oldUpdateTime) + byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime) byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID) if err := updateIndex.Delete(oldIndexKey[:]); err != nil { From 8dec659e10b3bcefd489786286d114289e500525 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:35 -0700 Subject: [PATCH 5/7] channeldb/graph_test: properly check entries within edge update index Due to entries within the edge update index having a nil value, the tests need to be modified to account for this. Previously, we'd assume that if we were unable to retrieve a value for a certain key that the entry was non-existent, which is why the improper pruning bug was not caught. Instead, we'll assert the number of entries to be the expected value and populate a lookup map to determine whether the correct entries exist within it. --- channeldb/graph_test.go | 113 ++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 68 deletions(-) diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 56b4dd38..ef274b09 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -2130,44 +2130,54 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { t.Fatalf("unable to update edge: %v", err) } - // Now that both edges have been updated, if we manually check the - // update index, we should have an entry for both edges. - if err := db.View(func(tx *bolt.Tx) error { - edges := tx.Bucket(edgeBucket) - if edges == nil { - return ErrGraphNoEdgesFound - } - edgeIndex := edges.Bucket(edgeIndexBucket) - if edgeIndex == nil { - return ErrGraphNoEdgesFound - } - edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) - if edgeUpdateIndex == nil { - return ErrGraphNoEdgesFound + // checkIndexTimestamps is a helper function that checks the edge update + // index only includes the given timestamps. + checkIndexTimestamps := func(timestamps ...uint64) { + timestampSet := make(map[uint64]struct{}) + for _, t := range timestamps { + timestampSet[t] = struct{}{} } - var edgeKey [8 + 8]byte + err := db.View(func(tx *bolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) + if edgeUpdateIndex == nil { + return ErrGraphNoEdgesFound + } - byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID) + numEntries := edgeUpdateIndex.Stats().KeyN + expectedEntries := len(timestampSet) + if numEntries != expectedEntries { + return fmt.Errorf("expected %v entries in the "+ + "update index, got %v", expectedEntries, + numEntries) + } - if edgeUpdateIndex.Get(edgeKey[:]) == nil { - return fmt.Errorf("first edge not found in update " + - "index") + return edgeUpdateIndex.ForEach(func(k, _ []byte) error { + t := byteOrder.Uint64(k[:8]) + if _, ok := timestampSet[t]; !ok { + return fmt.Errorf("found unexpected "+ + "timestamp "+"%d", t) + } + + return nil + }) + }) + if err != nil { + t.Fatal(err) } - - byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID) - if edgeUpdateIndex.Get(edgeKey[:]) == nil { - return fmt.Errorf("second edge not found in update " + - "index") - } - - return nil - }); err != nil { - t.Fatalf("unable to read update index: %v", err) } + // With both edges policies added, we'll make sure to check they exist + // within the edge update index. + checkIndexTimestamps( + uint64(edge1.LastUpdate.Unix()), + uint64(edge2.LastUpdate.Unix()), + ) + // Now we'll prune the graph, removing the edges, and also the update // index entries from the database all together. var blockHash chainhash.Hash @@ -2179,43 +2189,10 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { t.Fatalf("unable to prune graph: %v", err) } - // We'll now check the database state again, at this point, we should - // no longer be able to locate the entries within the edge update - // index. - if err := db.View(func(tx *bolt.Tx) error { - edges := tx.Bucket(edgeBucket) - if edges == nil { - return ErrGraphNoEdgesFound - } - edgeIndex := edges.Bucket(edgeIndexBucket) - if edgeIndex == nil { - return ErrGraphNoEdgesFound - } - edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) - if edgeUpdateIndex == nil { - return ErrGraphNoEdgesFound - } - - var edgeKey [8 + 8]byte - - byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID) - if edgeUpdateIndex.Get(edgeKey[:]) != nil { - return fmt.Errorf("first edge still found in update " + - "index") - } - - byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix())) - byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID) - if edgeUpdateIndex.Get(edgeKey[:]) != nil { - return fmt.Errorf("second edge still found in update " + - "index") - } - - return nil - }); err != nil { - t.Fatalf("unable to read update index: %v", err) - } + // Finally, we'll check the database state one last time to conclude + // that we should no longer be able to locate _any_ entries within the + // edge update index. + checkIndexTimestamps() } // TestPruneGraphNodes tests that unconnected vertexes are pruned via the From c1633da2522a37b27b3dc2e5cd4824898fe187dd Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:36 -0700 Subject: [PATCH 6/7] channeldb/graph_test: extend prune edge update index test to update edges In this commit, we extend TestChannelEdgePruningUpdateIndexDeletion test to include one more update for each edge. By doing this, we can correctly determine whether old entries were properly pruned from the index once a new update has arrived. --- channeldb/graph_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index ef274b09..4e1d10f7 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -2178,6 +2178,26 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { uint64(edge2.LastUpdate.Unix()), ) + // Now, we'll update the edge policies to ensure the old timestamps are + // removed from the update index. + edge1.Flags = 2 + edge1.LastUpdate = time.Now() + if err := graph.UpdateEdgePolicy(edge1); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + edge2.Flags = 3 + edge2.LastUpdate = edge1.LastUpdate.Add(time.Hour) + if err := graph.UpdateEdgePolicy(edge2); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + + // With the policies updated, we should now be able to find their + // updated entries within the update index. + checkIndexTimestamps( + uint64(edge1.LastUpdate.Unix()), + uint64(edge2.LastUpdate.Unix()), + ) + // Now we'll prune the graph, removing the edges, and also the update // index entries from the database all together. var blockHash chainhash.Hash From 85ea08fd1769ed168072193d9747a3ce9e41cc6b Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Fri, 31 Aug 2018 14:59:37 -0700 Subject: [PATCH 7/7] channeldb: add migration to properly prune edge update index In this commit, we introduce a migration to fix some of the recent issues found w.r.t. the edge update index. The migration attempts to fix two things: 1) Edge policies include an extra byte at the end due to reading an extra byte for the node's public key from the serialized node info. 2) Properly prune all stale entries within the edge update index. As a result of this migration, nodes will have a slightly smaller in size channeldb. We will also no longer send stale edges to our peers in response to their gossip queries, which should also fix the fetching channel announcement for closed channels issue. --- channeldb/db.go | 6 +++ channeldb/migrations.go | 105 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/channeldb/db.go b/channeldb/db.go index 939bbdcb..df611326 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -74,6 +74,12 @@ var ( number: 5, migration: paymentStatusesMigration, }, + { + // The DB version that properly prunes stale entries + // from the edge update index. + number: 6, + migration: migratePruneEdgeUpdateIndex, + }, } // Big endian is the preferred byte order, due to cursor scans over diff --git a/channeldb/migrations.go b/channeldb/migrations.go index c7beb638..810e14d9 100644 --- a/channeldb/migrations.go +++ b/channeldb/migrations.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/sha256" "encoding/binary" + "errors" "fmt" "github.com/coreos/bbolt" @@ -458,3 +459,107 @@ func paymentStatusesMigration(tx *bolt.Tx) error { return nil } + +// migratePruneEdgeUpdateIndex is a database migration that attempts to resolve +// some lingering bugs with regards to edge policies and their update index. +// Stale entries within the edge update index were not being properly pruned due +// to a miscalculation on the offset of an edge's policy last update. This +// migration also fixes the case where the public keys within edge policies were +// being serialized with an extra byte, causing an even greater error when +// attempting to perform the offset calculation described earlier. +func migratePruneEdgeUpdateIndex(tx *bolt.Tx) error { + // To begin the migration, we'll retrieve the update index bucket. If it + // does not exist, we have nothing left to do so we can simply exit. + edges := tx.Bucket(edgeBucket) + if edges == nil { + return nil + } + edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket) + if edgeUpdateIndex == nil { + return nil + } + + // Retrieve some buckets that will be needed later on. These should + // already exist given the assumption that the buckets above do as well. + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return errors.New("edge index should exist but does not") + } + nodes := tx.Bucket(nodeBucket) + if nodes == nil { + return errors.New("node bucket should exist but does not") + } + + log.Info("Migrating database to properly prune edge update index") + + // We'll need to properly prune all the outdated entries within the edge + // update index. To do so, we'll gather all of the existing policies + // within the graph to re-populate them later on. + var edgeKeys [][]byte + err := edges.ForEach(func(edgeKey, edgePolicyBytes []byte) error { + // All valid entries are indexed by a public key (33 bytes) + // followed by a channel ID (8 bytes), so we'll skip any entries + // with keys that do not match this. + if len(edgeKey) != 33+8 { + return nil + } + + edgeKeys = append(edgeKeys, edgeKey) + + return nil + }) + if err != nil { + return fmt.Errorf("unable to gather existing edge policies: %v", + err) + } + + // With the existing edge policies gathered, we'll recreate the index + // and populate it with the correct entries. + oldNumEntries := edgeUpdateIndex.Stats().KeyN + if err := edges.DeleteBucket(edgeUpdateIndexBucket); err != nil { + return fmt.Errorf("unable to remove existing edge update "+ + "index: %v", err) + } + edgeUpdateIndex, err = edges.CreateBucketIfNotExists( + edgeUpdateIndexBucket, + ) + if err != nil { + return fmt.Errorf("unable to recreate edge update index: %v", + err) + } + + // For each edge key, we'll retrieve the policy, deserialize it, and + // re-add it to the different buckets. By doing so, we'll ensure that + // all existing edge policies are serialized correctly within their + // respective buckets and that the correct entries are populated within + // the edge update index. + for _, edgeKey := range edgeKeys { + edgePolicyBytes := edges.Get(edgeKey) + + // Skip any entries with unknown policies as there will not be + // any entries for them in the edge update index. + if bytes.Equal(edgePolicyBytes[:], unknownPolicy) { + continue + } + + edgePolicy, err := deserializeChanEdgePolicy( + bytes.NewReader(edgePolicyBytes), nodes, + ) + if err != nil { + return err + } + + err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy) + if err != nil { + return err + } + } + + newNumEntries := edgeUpdateIndex.Stats().KeyN + log.Infof("Pruned %d stale entries from the edge update index", + oldNumEntries-newNumEntries) + + log.Info("Migration to properly prune edge update index complete!") + + return nil +}