channeldb: ensure items from the update index are actually deleted

In this commit, we fix an existing bug in the new graph query sync
feature. Before this commit, when a block is pruned, we would never
actually delete the update index entries. This is due to the fact that
we would attempt to delete the entries from the update index _after_ we
had already removed the edges from the update index.

We fix this by simply swapping the order: first we delete from the
update index, then we delete the edges themselves. A test ensuring that
the entires are cleared (which failed before this commit), has been
added.
This commit is contained in:
Olaoluwa Osuntokun 2018-06-05 17:30:50 -07:00
parent 4af9da40c5
commit dd4cab054d
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
2 changed files with 171 additions and 24 deletions

@ -1309,6 +1309,21 @@ func delChannelByEdge(edges *bolt.Bucket, edgeIndex *bolt.Bucket,
chanID) chanID)
} }
// We'll also remove the entry in the edge update index bucket before
// we delete the edges themselves so we can access their last update
// times.
cid := byteOrder.Uint64(chanID)
edge1, edge2, err := fetchChanEdgePolicies(
edgeIndex, edges, nodes, chanID, nil,
)
if err != nil {
return err
}
err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
if err != nil {
return err
}
// The edge key is of the format pubKey || chanID. First we construct // The edge key is of the format pubKey || chanID. First we construct
// the latter half, populating the channel ID. // the latter half, populating the channel ID.
var edgeKey [33 + 8]byte var edgeKey [33 + 8]byte
@ -1330,19 +1345,6 @@ func delChannelByEdge(edges *bolt.Bucket, edgeIndex *bolt.Bucket,
} }
} }
// We'll also remove the entry in the edge update index bucket.
cid := byteOrder.Uint64(chanID)
edge1, edge2, err := fetchChanEdgePolicies(
edgeIndex, edges, nodes, chanID, nil,
)
if err != nil {
return err
}
err = delEdgeUpdateIndexEntry(edges, cid, edge1, edge2)
if err != nil {
return err
}
// Finally, with the edge data deleted, we can purge the information // Finally, with the edge data deleted, we can purge the information
// from the two edge indexes. // from the two edge indexes.
if err := edgeIndex.Delete(chanID); err != nil { if err := edgeIndex.Delete(chanID); err != nil {

@ -1098,8 +1098,8 @@ func TestGraphPruning(t *testing.T) {
t.Fatalf("unable to prune graph: %v", err) t.Fatalf("unable to prune graph: %v", err)
} }
if len(prunedChans) != 2 { if len(prunedChans) != 2 {
t.Fatalf("incorrect number of channels pruned: expected %v, got %v", t.Fatalf("incorrect number of channels pruned: "+
2, prunedChans) "expected %v, got %v", 2, prunedChans)
} }
// Now ensure that the prune tip has been updated. // Now ensure that the prune tip has been updated.
@ -1125,8 +1125,9 @@ func TestGraphPruning(t *testing.T) {
} }
blockHash = sha256.Sum256(blockHash[:]) blockHash = sha256.Sum256(blockHash[:])
blockHeight = 2 blockHeight = 2
prunedChans, err = graph.PruneGraph([]*wire.OutPoint{nonChannel}, prunedChans, err = graph.PruneGraph(
&blockHash, blockHeight) []*wire.OutPoint{nonChannel}, &blockHash, blockHeight,
)
if err != nil { if err != nil {
t.Fatalf("unable to prune graph: %v", err) t.Fatalf("unable to prune graph: %v", err)
} }
@ -1144,16 +1145,18 @@ func TestGraphPruning(t *testing.T) {
// from the graph. // from the graph.
blockHash = sha256.Sum256(blockHash[:]) blockHash = sha256.Sum256(blockHash[:])
blockHeight = 3 blockHeight = 3
prunedChans, err = graph.PruneGraph(channelPoints[2:], &blockHash, prunedChans, err = graph.PruneGraph(
blockHeight) channelPoints[2:], &blockHash, blockHeight,
)
if err != nil { if err != nil {
t.Fatalf("unable to prune graph: %v", err) t.Fatalf("unable to prune graph: %v", err)
} }
// The remainder of the channels should have been pruned from the graph. // The remainder of the channels should have been pruned from the
// graph.
if len(prunedChans) != 2 { if len(prunedChans) != 2 {
t.Fatalf("incorrect number of channels pruned: expected %v, got %v", t.Fatalf("incorrect number of channels pruned: "+
2, len(prunedChans)) "expected %v, got %v", 2, len(prunedChans))
} }
// The prune tip should be updated, and no channels should be found // The prune tip should be updated, and no channels should be found
@ -1162,8 +1165,8 @@ func TestGraphPruning(t *testing.T) {
assertNumChans(t, graph, 0) assertNumChans(t, graph, 0)
// Finally, the channel view at this point in the graph should now be // Finally, the channel view at this point in the graph should now be
// completely empty. // completely empty. Those channels should also be missing from the
// Those channels should also be missing from the channel view. // channel view.
channelView, err = graph.ChannelView() channelView, err = graph.ChannelView()
if err != nil { if err != nil {
t.Fatalf("unable to get graph channel view: %v", err) t.Fatalf("unable to get graph channel view: %v", err)
@ -1872,6 +1875,148 @@ func TestFetchChanInfos(t *testing.T) {
} }
} }
// TestChannelEdgePruningUpdateIndexDeletion tests that once edges are deleted
// from the graph, then their entries within the update index are also cleaned
// up.
func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
t.Parallel()
db, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// We'll first populate our graph with two nodes. All channels created
// below will be made between these two nodes.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node1); err != nil {
t.Fatalf("unable to add node: %v", err)
}
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// With the two nodes created, we'll now create a random channel, as
// well as two edges in the database with distinct update times.
edgeInfo, chanID := createEdge(100, 0, 0, 0, node1, node2)
if err := graph.AddChannelEdge(&edgeInfo); err != nil {
t.Fatalf("unable to add edge: %v", err)
}
edge1 := randEdgePolicy(chanID.ToUint64(), edgeInfo.ChannelPoint, db)
edge1.Flags = 0
edge1.Node = node1
edge1.SigBytes = testSig.Serialize()
if err := graph.UpdateEdgePolicy(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
edge2 := randEdgePolicy(chanID.ToUint64(), edgeInfo.ChannelPoint, db)
edge2.Flags = 1
edge2.Node = node2
edge2.SigBytes = testSig.Serialize()
if err := graph.UpdateEdgePolicy(edge2); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
// Now that both edges have been updated, if we manually check the
// update index, we should have an entry for both edges.
if err := db.View(func(tx *bolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return ErrGraphNoEdgesFound
}
var edgeKey [8 + 8]byte
byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID)
if edgeUpdateIndex.Get(edgeKey[:]) == nil {
return fmt.Errorf("first edge not found in update " +
"index")
}
byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID)
if edgeUpdateIndex.Get(edgeKey[:]) == nil {
return fmt.Errorf("second edge not found in update " +
"index")
}
return nil
}); err != nil {
t.Fatalf("unable to read update index: %v", err)
}
// Now we'll prune the graph, removing the edges, and also the update
// index entries from the database all together.
var blockHash chainhash.Hash
copy(blockHash[:], bytes.Repeat([]byte{2}, 32))
_, err = graph.PruneGraph(
[]*wire.OutPoint{&edgeInfo.ChannelPoint}, &blockHash, 101,
)
if err != nil {
t.Fatalf("unable to prune graph: %v", err)
}
// We'll now check the database state again, at this point, we should
// no longer be able to locate the entries within the edge update
// index.
if err := db.View(func(tx *bolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}
edgeUpdateIndex := edges.Bucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return ErrGraphNoEdgesFound
}
var edgeKey [8 + 8]byte
byteOrder.PutUint64(edgeKey[:8], uint64(edge1.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge1.ChannelID)
if edgeUpdateIndex.Get(edgeKey[:]) != nil {
return fmt.Errorf("first edge still found in update " +
"index")
}
byteOrder.PutUint64(edgeKey[:8], uint64(edge2.LastUpdate.Unix()))
byteOrder.PutUint64(edgeKey[8:], edge2.ChannelID)
if edgeUpdateIndex.Get(edgeKey[:]) != nil {
return fmt.Errorf("second edge still found in update " +
"index")
}
return nil
}); err != nil {
t.Fatalf("unable to read update index: %v", err)
}
}
// compareNodes is used to compare two LightningNodes while excluding the // compareNodes is used to compare two LightningNodes while excluding the
// Features struct, which cannot be compared as the semantics for reserializing // Features struct, which cannot be compared as the semantics for reserializing
// the featuresMap have not been defined. // the featuresMap have not been defined.