From 7300f33fe2bc82cc63d1aac5ec92f3c663272b96 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Wed, 12 Jun 2019 13:40:02 +0300 Subject: [PATCH 1/3] graph: implement index for disabled channels. This commit adds an index bucket, disabledEdgePolicyBucket, for those ChannelEdgePolicy with disabled bit on. The main purpose is to be able to iterate over these fast when prune is needed without the need for iterating the whole graph. The entry points for accessing this index are: 1. When updating ChannelEdgePolicy - insert an entry. 2. When deleting ChannelEdge - delete the associated entries. 3. When querying for disabled channels - implemented DisabledChannelIDs function --- channeldb/graph.go | 95 ++++++++++++++++++++++++++++++++++++++++ channeldb/graph_test.go | 96 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+) diff --git a/channeldb/graph.go b/channeldb/graph.go index df233f1d..b78ba20c 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -118,6 +118,18 @@ var ( // edge's participants. zombieBucket = []byte("zombie-index") + // disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket bucket + // responsible for maintaining an index of disabled edge policies. Each + // entry exists within the bucket as follows: + // + // maps: -> []byte{} + // + // The chanID represents the channel ID of the edge and the direction is + // one byte representing the direction of the edge. The main purpose of + // this index is to allow pruning disabled channels in a fast way without + // the need to iterate all over the graph. + disabledEdgePolicyBucket = []byte("disabled-edge-policy-index") + // graphMetaBucket is a top-level bucket which stores various meta-deta // related to the on-disk channel graph. Data stored in this bucket // includes the block to which the graph has been synced to, the total @@ -258,6 +270,46 @@ func (c *ChannelGraph) ForEachNodeChannel(tx *bbolt.Tx, nodePub []byte, return nodeTraversal(tx, nodePub, db, cb) } +// DisabledChannelIDs returns the channel ids of disabled channels. +// A channel is disabled when two of the associated ChanelEdgePolicies +// have their disabled bit on. +func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) { + var disabledChanIDs []uint64 + chanEdgeFound := make(map[uint64]struct{}) + + err := c.db.View(func(tx *bbolt.Tx) error { + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNoEdgesFound + } + + disabledEdgePolicyIndex := edges.Bucket(disabledEdgePolicyBucket) + if disabledEdgePolicyIndex == nil { + return nil + } + + // We iterate over all disabled policies and we add each channel that + // has more than one disabled policy to disabledChanIDs array. + return disabledEdgePolicyIndex.ForEach(func(k, v []byte) error { + chanID := byteOrder.Uint64(k[:8]) + _, edgeFound := chanEdgeFound[chanID] + if edgeFound { + delete(chanEdgeFound, chanID) + disabledChanIDs = append(disabledChanIDs, chanID) + return nil + } + + chanEdgeFound[chanID] = struct{}{} + return nil + }) + }) + if err != nil { + return nil, err + } + + return disabledChanIDs, nil +} + // ForEachNode iterates through all the stored vertices/nodes in the graph, // executing the passed callback with each node encountered. If the callback // returns an error, then the transaction is aborted and the iteration stops @@ -1822,6 +1874,11 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex, } } + // As part of deleting the edge we also remove all disabled entries + // from the edgePolicyDisabledIndex bucket. We do that for both directions. + updateEdgePolicyDisabledIndex(edges, cid, false, false) + updateEdgePolicyDisabledIndex(edges, cid, true, false) + // With the edge data deleted, we can purge the information from the two // edge indexes. if err := edgeIndex.Delete(chanID); err != nil { @@ -3647,9 +3704,47 @@ func putChanEdgePolicy(edges, nodes *bbolt.Bucket, edge *ChannelEdgePolicy, return err } + updateEdgePolicyDisabledIndex( + edges, edge.ChannelID, + edge.ChannelFlags&lnwire.ChanUpdateDirection > 0, + edge.IsDisabled(), + ) + return edges.Put(edgeKey[:], b.Bytes()[:]) } +// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex +// bucket by either add a new disabled ChannelEdgePolicy or remove an existing +// one. +// The direction represents the direction of the edge and disabled is used for +// deciding whether to remove or add an entry to the bucket. +// In general a channel is disabled if two entries for the same chanID exist +// in this bucket. +// Maintaining the bucket this way allows a fast retrieval of disabled +// channels, for example when prune is needed. +func updateEdgePolicyDisabledIndex(edges *bbolt.Bucket, chanID uint64, + direction bool, disabled bool) error { + + var disabledEdgeKey [8 + 1]byte + byteOrder.PutUint64(disabledEdgeKey[0:], chanID) + if direction { + disabledEdgeKey[8] = 1 + } + + disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists( + disabledEdgePolicyBucket, + ) + if err != nil { + return err + } + + if disabled { + return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{}) + } + + return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:]) +} + // putChanEdgePolicyUnknown marks the edge policy as unknown // in the edges bucket. func putChanEdgePolicyUnknown(edges *bbolt.Bucket, channelID uint64, diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index fd471231..05969dfc 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -2680,6 +2680,102 @@ func TestNodeIsPublic(t *testing.T) { ) } +// TestDisabledChannelIDs ensures that the disabled channels within the +// disabledEdgePolicyBucket are managed properly and the list returned from +// DisabledChannelIDs is correct. +func TestDisabledChannelIDs(t *testing.T) { + t.Parallel() + + db, cleanUp, err := makeTestDB() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + defer cleanUp() + + graph := db.ChannelGraph() + + // Create first node and add it to the graph. + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node1); err != nil { + t.Fatalf("unable to add node: %v", err) + } + + // Create second node and add it to the graph. + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node2); err != nil { + t.Fatalf("unable to add node: %v", err) + } + + // Adding a new channel edge to the graph. + edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2) + if err := graph.AddLightningNode(node2); err != nil { + t.Fatalf("unable to add node: %v", err) + } + + if err := graph.AddChannelEdge(edgeInfo); err != nil { + t.Fatalf("unable to create channel edge: %v", err) + } + + // Ensure no disabled channels exist in the bucket on start. + disabledChanIds, err := graph.DisabledChannelIDs() + if err != nil { + t.Fatalf("unable to get disabled channel ids: %v", err) + } + if len(disabledChanIds) > 0 { + t.Fatalf("expected empty disabled channels, got %v disabled channels", + len(disabledChanIds)) + } + + // Add one disabled policy and ensure the channel is still not in the + // disabled list. + edge1.ChannelFlags |= lnwire.ChanUpdateDisabled + if err := graph.UpdateEdgePolicy(edge1); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + disabledChanIds, err = graph.DisabledChannelIDs() + if err != nil { + t.Fatalf("unable to get disabled channel ids: %v", err) + } + if len(disabledChanIds) > 0 { + t.Fatalf("expected empty disabled channels, got %v disabled channels", + len(disabledChanIds)) + } + + // Add second disabled policy and ensure the channel is now in the + // disabled list. + edge2.ChannelFlags |= lnwire.ChanUpdateDisabled + if err := graph.UpdateEdgePolicy(edge2); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + disabledChanIds, err = graph.DisabledChannelIDs() + if err != nil { + t.Fatalf("unable to get disabled channel ids: %v", err) + } + if len(disabledChanIds) != 1 || disabledChanIds[0] != edgeInfo.ChannelID { + t.Fatalf("expected disabled channel with id %v, "+ + "got %v", edgeInfo.ChannelID, disabledChanIds) + } + + // Delete the channel edge and ensure it is removed from the disabled list. + if err = graph.DeleteChannelEdges(edgeInfo.ChannelID); err != nil { + t.Fatalf("unable to delete channel edge: %v", err) + } + disabledChanIds, err = graph.DisabledChannelIDs() + if err != nil { + t.Fatalf("unable to get disabled channel ids: %v", err) + } + if len(disabledChanIds) > 0 { + t.Fatalf("expected empty disabled channels, got %v disabled channels", + len(disabledChanIds)) + } +} + // TestEdgePolicyMissingMaxHtcl tests that if we find a ChannelEdgePolicy in // the DB that indicates that it should support the htlc_maximum_value_msat // field, but it is not part of the opaque data, then we'll handle it as it is From 9781ea0082ac48f4434ed8f15f98f2da6a623a8c Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Wed, 10 Jul 2019 15:20:42 +0300 Subject: [PATCH 2/3] router: Optimize pruneZombieChannels. The current approach iterates all channels in the graph in order to filter those in need. This approach is time consuming, several seconds on my mobile device for ~40,000 channels, while during this time the db is locked in a transaction. The proposed change is to use an existing functionality that utilize the fact that channel update are saved indexed by date. This method enables us to go over only a small subset of the channels, only those that were updated before the "channel expiry" time and further filter them for our need. The same graph that took several seconds to prune was pruned, after the change, in several milliseconds. In addition for testing purposes I added Initiator field to the testChannel structure to reflect the channeldEdgePolicy direction. --- routing/pathfind_test.go | 14 ++++++++++++-- routing/router.go | 41 +++++++++++++++++++++++++++++++++------- routing/router_test.go | 2 +- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index 88adade4..96f6dc6e 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -314,6 +314,7 @@ type testChannelPolicy struct { FeeRate lnwire.MilliSatoshi LastUpdate time.Time Disabled bool + Direction bool } type testChannelEnd struct { @@ -346,6 +347,9 @@ func symmetricTestChannel(alias1 string, alias2 string, capacity btcutil.Amount, id = chanID[0] } + node2Policy := *policy + node2Policy.Direction = !policy.Direction + return &testChannel{ Capacity: capacity, Node1: &testChannelEnd{ @@ -354,7 +358,7 @@ func symmetricTestChannel(alias1 string, alias2 string, capacity btcutil.Amount, }, Node2: &testChannelEnd{ Alias: alias2, - testChannelPolicy: policy, + testChannelPolicy: &node2Policy, }, ChannelID: id, } @@ -527,6 +531,9 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) ( if testChannel.Node1.Disabled { channelFlags |= lnwire.ChanUpdateDisabled } + if testChannel.Node1.Direction { + channelFlags |= lnwire.ChanUpdateDirection + } edgePolicy := &channeldb.ChannelEdgePolicy{ SigBytes: testSig.Serialize(), MessageFlags: msgFlags, @@ -549,10 +556,13 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) ( if testChannel.Node2.MaxHTLC != 0 { msgFlags |= lnwire.ChanUpdateOptionMaxHtlc } - channelFlags := lnwire.ChanUpdateDirection + channelFlags := lnwire.ChanUpdateChanFlags(0) if testChannel.Node2.Disabled { channelFlags |= lnwire.ChanUpdateDisabled } + if testChannel.Node2.Direction { + channelFlags |= lnwire.ChanUpdateDirection + } edgePolicy := &channeldb.ChannelEdgePolicy{ SigBytes: testSig.Serialize(), MessageFlags: msgFlags, diff --git a/routing/router.go b/routing/router.go index 87bb5c75..d10d7df8 100644 --- a/routing/router.go +++ b/routing/router.go @@ -734,7 +734,7 @@ func (r *ChannelRouter) syncGraphWithChain() error { // usually signals that a channel has been closed on-chain. We do this // periodically to keep a healthy, lively routing table. func (r *ChannelRouter) pruneZombieChans() error { - var chansToPrune []uint64 + chansToPrune := make(map[uint64]struct{}) chanExpiry := r.cfg.ChannelPruneExpiry log.Infof("Examining channel graph for zombie channels") @@ -744,6 +744,11 @@ func (r *ChannelRouter) pruneZombieChans() error { filterPruneChans := func(info *channeldb.ChannelEdgeInfo, e1, e2 *channeldb.ChannelEdgePolicy) error { + // Exit early in case this channel is already marked to be pruned + if _, markedToPrune := chansToPrune[info.ChannelID]; markedToPrune { + return nil + } + // We'll ensure that we don't attempt to prune our *own* // channels from the graph, as in any case this should be // re-advertised by the sub-system above us. @@ -809,25 +814,47 @@ func (r *ChannelRouter) pruneZombieChans() error { info.ChannelID) // TODO(roasbeef): add ability to delete single directional edge - chansToPrune = append(chansToPrune, info.ChannelID) + chansToPrune[info.ChannelID] = struct{}{} return nil } - err := r.cfg.Graph.ForEachChannel(filterPruneChans) + startTime := time.Unix(0, 0) + endTime := time.Now().Add(-1 * chanExpiry) + oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime) if err != nil { - return fmt.Errorf("unable to filter local zombie channels: "+ - "%v", err) + return fmt.Errorf("unable to filter local zombie "+ + "chans: %v", err) + } + + disabledChanIDs, err := r.cfg.Graph.DisabledChannelIDs() + if err != nil { + return fmt.Errorf("unable to filter local zombie "+ + "chans: %v", err) + } + + disabledEdges, err := r.cfg.Graph.FetchChanInfos(disabledChanIDs) + if err != nil { + return fmt.Errorf("unable to filter local zombie "+ + "chans: %v", err) + } + + edgesToFilter := append(oldEdges, disabledEdges...) + + for _, u := range edgesToFilter { + filterPruneChans(u.Info, u.Policy1, u.Policy2) } log.Infof("Pruning %v zombie channels", len(chansToPrune)) // With the set of zombie-like channels obtained, we'll do another pass // to delete them from the channel graph. - for _, chanID := range chansToPrune { + var toPrune []uint64 + for chanID := range chansToPrune { + toPrune = append(toPrune, chanID) log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID) } - if err := r.cfg.Graph.DeleteChannelEdges(chansToPrune...); err != nil { + if err := r.cfg.Graph.DeleteChannelEdges(toPrune...); err != nil { return fmt.Errorf("unable to delete zombie channels: %v", err) } diff --git a/routing/router_test.go b/routing/router_test.go index c50dba1c..5efe3ea1 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -1899,7 +1899,7 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) { t.Parallel() freshTimestamp := time.Now() - staleTimestamp := time.Time{} + staleTimestamp := time.Unix(0, 0) // We'll create the following test graph so that only the last channel // is pruned. From da9edc876acc8bb499e073c19244aaaa4c740206 Mon Sep 17 00:00:00 2001 From: Roei Erez Date: Tue, 16 Jul 2019 17:39:57 +0300 Subject: [PATCH 3/3] router: only prune disabled channels when AssumeChannelValid=true. --- routing/router.go | 81 ++++++++++++++++++++--------------------------- 1 file changed, 35 insertions(+), 46 deletions(-) diff --git a/routing/router.go b/routing/router.go index d10d7df8..149f7b89 100644 --- a/routing/router.go +++ b/routing/router.go @@ -739,6 +739,12 @@ func (r *ChannelRouter) pruneZombieChans() error { log.Infof("Examining channel graph for zombie channels") + // A helper method to detect if the channel belongs to this node + isSelfChannelEdge := func(info *channeldb.ChannelEdgeInfo) bool { + return info.NodeKey1Bytes == r.selfNode.PubKeyBytes || + info.NodeKey2Bytes == r.selfNode.PubKeyBytes + } + // First, we'll collect all the channels which are eligible for garbage // collection due to being zombies. filterPruneChans := func(info *channeldb.ChannelEdgeInfo, @@ -752,9 +758,7 @@ func (r *ChannelRouter) pruneZombieChans() error { // We'll ensure that we don't attempt to prune our *own* // channels from the graph, as in any case this should be // re-advertised by the sub-system above us. - if info.NodeKey1Bytes == r.selfNode.PubKeyBytes || - info.NodeKey2Bytes == r.selfNode.PubKeyBytes { - + if isSelfChannelEdge(info) { return nil } @@ -779,34 +783,9 @@ func (r *ChannelRouter) pruneZombieChans() error { } } - isZombieChan := e1Zombie && e2Zombie - - // If AssumeChannelValid is present and we've determined the - // channel is not a zombie, we'll look at the disabled bit for - // both edges. If they're both disabled, then we can interpret - // this as the channel being closed and can prune it from our - // graph. - if r.cfg.AssumeChannelValid && !isZombieChan { - var e1Disabled, e2Disabled bool - if e1 != nil { - e1Disabled = e1.IsDisabled() - log.Tracef("Edge #1 of ChannelID(%v) "+ - "disabled=%v", info.ChannelID, - e1Disabled) - } - if e2 != nil { - e2Disabled = e2.IsDisabled() - log.Tracef("Edge #2 of ChannelID(%v) "+ - "disabled=%v", info.ChannelID, - e2Disabled) - } - - isZombieChan = e1Disabled && e2Disabled - } - // If the channel is not considered zombie, we can move on to // the next. - if !isZombieChan { + if !e1Zombie || !e2Zombie { return nil } @@ -819,29 +798,39 @@ func (r *ChannelRouter) pruneZombieChans() error { return nil } + // If AssumeChannelValid is present we'll look at the disabled bit for both + // edges. If they're both disabled, then we can interpret this as the + // channel being closed and can prune it from our graph. + if r.cfg.AssumeChannelValid { + disabledChanIDs, err := r.cfg.Graph.DisabledChannelIDs() + if err != nil { + return fmt.Errorf("unable to get disabled channels ids "+ + "chans: %v", err) + } + + disabledEdges, err := r.cfg.Graph.FetchChanInfos(disabledChanIDs) + if err != nil { + return fmt.Errorf("unable to fetch disabled channels edges "+ + "chans: %v", err) + } + + // Ensuring we won't prune our own channel from the graph. + for _, disabledEdge := range disabledEdges { + if !isSelfChannelEdge(disabledEdge.Info) { + chansToPrune[disabledEdge.Info.ChannelID] = struct{}{} + } + } + } + startTime := time.Unix(0, 0) endTime := time.Now().Add(-1 * chanExpiry) oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime) if err != nil { - return fmt.Errorf("unable to filter local zombie "+ + return fmt.Errorf("unable to fetch expired channel updates "+ "chans: %v", err) } - disabledChanIDs, err := r.cfg.Graph.DisabledChannelIDs() - if err != nil { - return fmt.Errorf("unable to filter local zombie "+ - "chans: %v", err) - } - - disabledEdges, err := r.cfg.Graph.FetchChanInfos(disabledChanIDs) - if err != nil { - return fmt.Errorf("unable to filter local zombie "+ - "chans: %v", err) - } - - edgesToFilter := append(oldEdges, disabledEdges...) - - for _, u := range edgesToFilter { + for _, u := range oldEdges { filterPruneChans(u.Info, u.Policy1, u.Policy2) } @@ -849,7 +838,7 @@ func (r *ChannelRouter) pruneZombieChans() error { // With the set of zombie-like channels obtained, we'll do another pass // to delete them from the channel graph. - var toPrune []uint64 + toPrune := make([]uint64, 0, len(chansToPrune)) for chanID := range chansToPrune { toPrune = append(toPrune, chanID) log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)