graph: implement index for disabled channels.

This commit adds an index bucket, disabledEdgePolicyBucket, for those
ChannelEdgePolicy with disabled bit on.
The main purpose is to be able to iterate over these fast when prune is
needed without the need for iterating the whole graph.

The entry points for accessing this index are:
1. When updating ChannelEdgePolicy - insert an entry.
2. When deleting ChannelEdge - delete the associated entries.
3. When querying for disabled channels - implemented DisabledChannelIDs
function
This commit is contained in:
Roei Erez 2019-06-12 13:40:02 +03:00
parent a7152efca4
commit 7300f33fe2
2 changed files with 191 additions and 0 deletions

@ -118,6 +118,18 @@ var (
// edge's participants.
zombieBucket = []byte("zombie-index")
// disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket bucket
// responsible for maintaining an index of disabled edge policies. Each
// entry exists within the bucket as follows:
//
// maps: <chanID><direction> -> []byte{}
//
// The chanID represents the channel ID of the edge and the direction is
// one byte representing the direction of the edge. The main purpose of
// this index is to allow pruning disabled channels in a fast way without
// the need to iterate all over the graph.
disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
// graphMetaBucket is a top-level bucket which stores various meta-deta
// related to the on-disk channel graph. Data stored in this bucket
// includes the block to which the graph has been synced to, the total
@ -258,6 +270,46 @@ func (c *ChannelGraph) ForEachNodeChannel(tx *bbolt.Tx, nodePub []byte,
return nodeTraversal(tx, nodePub, db, cb)
}
// DisabledChannelIDs returns the channel ids of disabled channels.
// A channel is disabled when two of the associated ChanelEdgePolicies
// have their disabled bit on.
func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
var disabledChanIDs []uint64
chanEdgeFound := make(map[uint64]struct{})
err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
disabledEdgePolicyIndex := edges.Bucket(disabledEdgePolicyBucket)
if disabledEdgePolicyIndex == nil {
return nil
}
// We iterate over all disabled policies and we add each channel that
// has more than one disabled policy to disabledChanIDs array.
return disabledEdgePolicyIndex.ForEach(func(k, v []byte) error {
chanID := byteOrder.Uint64(k[:8])
_, edgeFound := chanEdgeFound[chanID]
if edgeFound {
delete(chanEdgeFound, chanID)
disabledChanIDs = append(disabledChanIDs, chanID)
return nil
}
chanEdgeFound[chanID] = struct{}{}
return nil
})
})
if err != nil {
return nil, err
}
return disabledChanIDs, nil
}
// ForEachNode iterates through all the stored vertices/nodes in the graph,
// executing the passed callback with each node encountered. If the callback
// returns an error, then the transaction is aborted and the iteration stops
@ -1822,6 +1874,11 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
}
}
// As part of deleting the edge we also remove all disabled entries
// from the edgePolicyDisabledIndex bucket. We do that for both directions.
updateEdgePolicyDisabledIndex(edges, cid, false, false)
updateEdgePolicyDisabledIndex(edges, cid, true, false)
// With the edge data deleted, we can purge the information from the two
// edge indexes.
if err := edgeIndex.Delete(chanID); err != nil {
@ -3647,9 +3704,47 @@ func putChanEdgePolicy(edges, nodes *bbolt.Bucket, edge *ChannelEdgePolicy,
return err
}
updateEdgePolicyDisabledIndex(
edges, edge.ChannelID,
edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
edge.IsDisabled(),
)
return edges.Put(edgeKey[:], b.Bytes()[:])
}
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
// one.
// The direction represents the direction of the edge and disabled is used for
// deciding whether to remove or add an entry to the bucket.
// In general a channel is disabled if two entries for the same chanID exist
// in this bucket.
// Maintaining the bucket this way allows a fast retrieval of disabled
// channels, for example when prune is needed.
func updateEdgePolicyDisabledIndex(edges *bbolt.Bucket, chanID uint64,
direction bool, disabled bool) error {
var disabledEdgeKey [8 + 1]byte
byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
if direction {
disabledEdgeKey[8] = 1
}
disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
disabledEdgePolicyBucket,
)
if err != nil {
return err
}
if disabled {
return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
}
return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
}
// putChanEdgePolicyUnknown marks the edge policy as unknown
// in the edges bucket.
func putChanEdgePolicyUnknown(edges *bbolt.Bucket, channelID uint64,

@ -2680,6 +2680,102 @@ func TestNodeIsPublic(t *testing.T) {
)
}
// TestDisabledChannelIDs ensures that the disabled channels within the
// disabledEdgePolicyBucket are managed properly and the list returned from
// DisabledChannelIDs is correct.
func TestDisabledChannelIDs(t *testing.T) {
t.Parallel()
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// Create first node and add it to the graph.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node1); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Create second node and add it to the graph.
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Adding a new channel edge to the graph.
edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2)
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
if err := graph.AddChannelEdge(edgeInfo); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}
// Ensure no disabled channels exist in the bucket on start.
disabledChanIds, err := graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) > 0 {
t.Fatalf("expected empty disabled channels, got %v disabled channels",
len(disabledChanIds))
}
// Add one disabled policy and ensure the channel is still not in the
// disabled list.
edge1.ChannelFlags |= lnwire.ChanUpdateDisabled
if err := graph.UpdateEdgePolicy(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
disabledChanIds, err = graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) > 0 {
t.Fatalf("expected empty disabled channels, got %v disabled channels",
len(disabledChanIds))
}
// Add second disabled policy and ensure the channel is now in the
// disabled list.
edge2.ChannelFlags |= lnwire.ChanUpdateDisabled
if err := graph.UpdateEdgePolicy(edge2); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
disabledChanIds, err = graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) != 1 || disabledChanIds[0] != edgeInfo.ChannelID {
t.Fatalf("expected disabled channel with id %v, "+
"got %v", edgeInfo.ChannelID, disabledChanIds)
}
// Delete the channel edge and ensure it is removed from the disabled list.
if err = graph.DeleteChannelEdges(edgeInfo.ChannelID); err != nil {
t.Fatalf("unable to delete channel edge: %v", err)
}
disabledChanIds, err = graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) > 0 {
t.Fatalf("expected empty disabled channels, got %v disabled channels",
len(disabledChanIds))
}
}
// TestEdgePolicyMissingMaxHtcl tests that if we find a ChannelEdgePolicy in
// the DB that indicates that it should support the htlc_maximum_value_msat
// field, but it is not part of the opaque data, then we'll handle it as it is