Merge pull request #3197 from breez/optimize_prune_zombie_channels

Optimize prune zombie channels
This commit is contained in:
Olaoluwa Osuntokun 2019-07-18 20:56:56 -07:00 committed by GitHub
commit 8c389d13f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 256 additions and 39 deletions

@ -118,6 +118,18 @@ var (
// edge's participants. // edge's participants.
zombieBucket = []byte("zombie-index") zombieBucket = []byte("zombie-index")
// disabledEdgePolicyBucket is a sub-bucket of the main edgeBucket bucket
// responsible for maintaining an index of disabled edge policies. Each
// entry exists within the bucket as follows:
//
// maps: <chanID><direction> -> []byte{}
//
// The chanID represents the channel ID of the edge and the direction is
// one byte representing the direction of the edge. The main purpose of
// this index is to allow pruning disabled channels in a fast way without
// the need to iterate all over the graph.
disabledEdgePolicyBucket = []byte("disabled-edge-policy-index")
// graphMetaBucket is a top-level bucket which stores various meta-deta // graphMetaBucket is a top-level bucket which stores various meta-deta
// related to the on-disk channel graph. Data stored in this bucket // related to the on-disk channel graph. Data stored in this bucket
// includes the block to which the graph has been synced to, the total // includes the block to which the graph has been synced to, the total
@ -258,6 +270,46 @@ func (c *ChannelGraph) ForEachNodeChannel(tx *bbolt.Tx, nodePub []byte,
return nodeTraversal(tx, nodePub, db, cb) return nodeTraversal(tx, nodePub, db, cb)
} }
// DisabledChannelIDs returns the channel ids of disabled channels.
// A channel is disabled when two of the associated ChanelEdgePolicies
// have their disabled bit on.
func (c *ChannelGraph) DisabledChannelIDs() ([]uint64, error) {
var disabledChanIDs []uint64
chanEdgeFound := make(map[uint64]struct{})
err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
disabledEdgePolicyIndex := edges.Bucket(disabledEdgePolicyBucket)
if disabledEdgePolicyIndex == nil {
return nil
}
// We iterate over all disabled policies and we add each channel that
// has more than one disabled policy to disabledChanIDs array.
return disabledEdgePolicyIndex.ForEach(func(k, v []byte) error {
chanID := byteOrder.Uint64(k[:8])
_, edgeFound := chanEdgeFound[chanID]
if edgeFound {
delete(chanEdgeFound, chanID)
disabledChanIDs = append(disabledChanIDs, chanID)
return nil
}
chanEdgeFound[chanID] = struct{}{}
return nil
})
})
if err != nil {
return nil, err
}
return disabledChanIDs, nil
}
// ForEachNode iterates through all the stored vertices/nodes in the graph, // ForEachNode iterates through all the stored vertices/nodes in the graph,
// executing the passed callback with each node encountered. If the callback // executing the passed callback with each node encountered. If the callback
// returns an error, then the transaction is aborted and the iteration stops // returns an error, then the transaction is aborted and the iteration stops
@ -1822,6 +1874,11 @@ func delChannelEdge(edges, edgeIndex, chanIndex, zombieIndex,
} }
} }
// As part of deleting the edge we also remove all disabled entries
// from the edgePolicyDisabledIndex bucket. We do that for both directions.
updateEdgePolicyDisabledIndex(edges, cid, false, false)
updateEdgePolicyDisabledIndex(edges, cid, true, false)
// With the edge data deleted, we can purge the information from the two // With the edge data deleted, we can purge the information from the two
// edge indexes. // edge indexes.
if err := edgeIndex.Delete(chanID); err != nil { if err := edgeIndex.Delete(chanID); err != nil {
@ -3672,9 +3729,47 @@ func putChanEdgePolicy(edges, nodes *bbolt.Bucket, edge *ChannelEdgePolicy,
return err return err
} }
updateEdgePolicyDisabledIndex(
edges, edge.ChannelID,
edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
edge.IsDisabled(),
)
return edges.Put(edgeKey[:], b.Bytes()[:]) return edges.Put(edgeKey[:], b.Bytes()[:])
} }
// updateEdgePolicyDisabledIndex is used to update the disabledEdgePolicyIndex
// bucket by either add a new disabled ChannelEdgePolicy or remove an existing
// one.
// The direction represents the direction of the edge and disabled is used for
// deciding whether to remove or add an entry to the bucket.
// In general a channel is disabled if two entries for the same chanID exist
// in this bucket.
// Maintaining the bucket this way allows a fast retrieval of disabled
// channels, for example when prune is needed.
func updateEdgePolicyDisabledIndex(edges *bbolt.Bucket, chanID uint64,
direction bool, disabled bool) error {
var disabledEdgeKey [8 + 1]byte
byteOrder.PutUint64(disabledEdgeKey[0:], chanID)
if direction {
disabledEdgeKey[8] = 1
}
disabledEdgePolicyIndex, err := edges.CreateBucketIfNotExists(
disabledEdgePolicyBucket,
)
if err != nil {
return err
}
if disabled {
return disabledEdgePolicyIndex.Put(disabledEdgeKey[:], []byte{})
}
return disabledEdgePolicyIndex.Delete(disabledEdgeKey[:])
}
// putChanEdgePolicyUnknown marks the edge policy as unknown // putChanEdgePolicyUnknown marks the edge policy as unknown
// in the edges bucket. // in the edges bucket.
func putChanEdgePolicyUnknown(edges *bbolt.Bucket, channelID uint64, func putChanEdgePolicyUnknown(edges *bbolt.Bucket, channelID uint64,

@ -2680,6 +2680,102 @@ func TestNodeIsPublic(t *testing.T) {
) )
} }
// TestDisabledChannelIDs ensures that the disabled channels within the
// disabledEdgePolicyBucket are managed properly and the list returned from
// DisabledChannelIDs is correct.
func TestDisabledChannelIDs(t *testing.T) {
t.Parallel()
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// Create first node and add it to the graph.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node1); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Create second node and add it to the graph.
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Adding a new channel edge to the graph.
edgeInfo, edge1, edge2 := createChannelEdge(db, node1, node2)
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
if err := graph.AddChannelEdge(edgeInfo); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}
// Ensure no disabled channels exist in the bucket on start.
disabledChanIds, err := graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) > 0 {
t.Fatalf("expected empty disabled channels, got %v disabled channels",
len(disabledChanIds))
}
// Add one disabled policy and ensure the channel is still not in the
// disabled list.
edge1.ChannelFlags |= lnwire.ChanUpdateDisabled
if err := graph.UpdateEdgePolicy(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
disabledChanIds, err = graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) > 0 {
t.Fatalf("expected empty disabled channels, got %v disabled channels",
len(disabledChanIds))
}
// Add second disabled policy and ensure the channel is now in the
// disabled list.
edge2.ChannelFlags |= lnwire.ChanUpdateDisabled
if err := graph.UpdateEdgePolicy(edge2); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
disabledChanIds, err = graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) != 1 || disabledChanIds[0] != edgeInfo.ChannelID {
t.Fatalf("expected disabled channel with id %v, "+
"got %v", edgeInfo.ChannelID, disabledChanIds)
}
// Delete the channel edge and ensure it is removed from the disabled list.
if err = graph.DeleteChannelEdges(edgeInfo.ChannelID); err != nil {
t.Fatalf("unable to delete channel edge: %v", err)
}
disabledChanIds, err = graph.DisabledChannelIDs()
if err != nil {
t.Fatalf("unable to get disabled channel ids: %v", err)
}
if len(disabledChanIds) > 0 {
t.Fatalf("expected empty disabled channels, got %v disabled channels",
len(disabledChanIds))
}
}
// TestEdgePolicyMissingMaxHtcl tests that if we find a ChannelEdgePolicy in // TestEdgePolicyMissingMaxHtcl tests that if we find a ChannelEdgePolicy in
// the DB that indicates that it should support the htlc_maximum_value_msat // the DB that indicates that it should support the htlc_maximum_value_msat
// field, but it is not part of the opaque data, then we'll handle it as it is // field, but it is not part of the opaque data, then we'll handle it as it is

@ -316,6 +316,7 @@ type testChannelPolicy struct {
FeeRate lnwire.MilliSatoshi FeeRate lnwire.MilliSatoshi
LastUpdate time.Time LastUpdate time.Time
Disabled bool Disabled bool
Direction bool
} }
type testChannelEnd struct { type testChannelEnd struct {
@ -348,6 +349,9 @@ func symmetricTestChannel(alias1 string, alias2 string, capacity btcutil.Amount,
id = chanID[0] id = chanID[0]
} }
node2Policy := *policy
node2Policy.Direction = !policy.Direction
return &testChannel{ return &testChannel{
Capacity: capacity, Capacity: capacity,
Node1: &testChannelEnd{ Node1: &testChannelEnd{
@ -356,7 +360,7 @@ func symmetricTestChannel(alias1 string, alias2 string, capacity btcutil.Amount,
}, },
Node2: &testChannelEnd{ Node2: &testChannelEnd{
Alias: alias2, Alias: alias2,
testChannelPolicy: policy, testChannelPolicy: &node2Policy,
}, },
ChannelID: id, ChannelID: id,
} }
@ -529,6 +533,9 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
if testChannel.Node1.Disabled { if testChannel.Node1.Disabled {
channelFlags |= lnwire.ChanUpdateDisabled channelFlags |= lnwire.ChanUpdateDisabled
} }
if testChannel.Node1.Direction {
channelFlags |= lnwire.ChanUpdateDirection
}
edgePolicy := &channeldb.ChannelEdgePolicy{ edgePolicy := &channeldb.ChannelEdgePolicy{
SigBytes: testSig.Serialize(), SigBytes: testSig.Serialize(),
MessageFlags: msgFlags, MessageFlags: msgFlags,
@ -551,10 +558,13 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
if testChannel.Node2.MaxHTLC != 0 { if testChannel.Node2.MaxHTLC != 0 {
msgFlags |= lnwire.ChanUpdateOptionMaxHtlc msgFlags |= lnwire.ChanUpdateOptionMaxHtlc
} }
channelFlags := lnwire.ChanUpdateDirection channelFlags := lnwire.ChanUpdateChanFlags(0)
if testChannel.Node2.Disabled { if testChannel.Node2.Disabled {
channelFlags |= lnwire.ChanUpdateDisabled channelFlags |= lnwire.ChanUpdateDisabled
} }
if testChannel.Node2.Direction {
channelFlags |= lnwire.ChanUpdateDirection
}
edgePolicy := &channeldb.ChannelEdgePolicy{ edgePolicy := &channeldb.ChannelEdgePolicy{
SigBytes: testSig.Serialize(), SigBytes: testSig.Serialize(),
MessageFlags: msgFlags, MessageFlags: msgFlags,

@ -760,22 +760,31 @@ func (r *ChannelRouter) syncGraphWithChain() error {
// usually signals that a channel has been closed on-chain. We do this // usually signals that a channel has been closed on-chain. We do this
// periodically to keep a healthy, lively routing table. // periodically to keep a healthy, lively routing table.
func (r *ChannelRouter) pruneZombieChans() error { func (r *ChannelRouter) pruneZombieChans() error {
var chansToPrune []uint64 chansToPrune := make(map[uint64]struct{})
chanExpiry := r.cfg.ChannelPruneExpiry chanExpiry := r.cfg.ChannelPruneExpiry
log.Infof("Examining channel graph for zombie channels") log.Infof("Examining channel graph for zombie channels")
// A helper method to detect if the channel belongs to this node
isSelfChannelEdge := func(info *channeldb.ChannelEdgeInfo) bool {
return info.NodeKey1Bytes == r.selfNode.PubKeyBytes ||
info.NodeKey2Bytes == r.selfNode.PubKeyBytes
}
// First, we'll collect all the channels which are eligible for garbage // First, we'll collect all the channels which are eligible for garbage
// collection due to being zombies. // collection due to being zombies.
filterPruneChans := func(info *channeldb.ChannelEdgeInfo, filterPruneChans := func(info *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error { e1, e2 *channeldb.ChannelEdgePolicy) error {
// Exit early in case this channel is already marked to be pruned
if _, markedToPrune := chansToPrune[info.ChannelID]; markedToPrune {
return nil
}
// We'll ensure that we don't attempt to prune our *own* // We'll ensure that we don't attempt to prune our *own*
// channels from the graph, as in any case this should be // channels from the graph, as in any case this should be
// re-advertised by the sub-system above us. // re-advertised by the sub-system above us.
if info.NodeKey1Bytes == r.selfNode.PubKeyBytes || if isSelfChannelEdge(info) {
info.NodeKey2Bytes == r.selfNode.PubKeyBytes {
return nil return nil
} }
@ -800,34 +809,9 @@ func (r *ChannelRouter) pruneZombieChans() error {
} }
} }
isZombieChan := e1Zombie && e2Zombie
// If AssumeChannelValid is present and we've determined the
// channel is not a zombie, we'll look at the disabled bit for
// both edges. If they're both disabled, then we can interpret
// this as the channel being closed and can prune it from our
// graph.
if r.cfg.AssumeChannelValid && !isZombieChan {
var e1Disabled, e2Disabled bool
if e1 != nil {
e1Disabled = e1.IsDisabled()
log.Tracef("Edge #1 of ChannelID(%v) "+
"disabled=%v", info.ChannelID,
e1Disabled)
}
if e2 != nil {
e2Disabled = e2.IsDisabled()
log.Tracef("Edge #2 of ChannelID(%v) "+
"disabled=%v", info.ChannelID,
e2Disabled)
}
isZombieChan = e1Disabled && e2Disabled
}
// If the channel is not considered zombie, we can move on to // If the channel is not considered zombie, we can move on to
// the next. // the next.
if !isZombieChan { if !e1Zombie || !e2Zombie {
return nil return nil
} }
@ -835,25 +819,57 @@ func (r *ChannelRouter) pruneZombieChans() error {
info.ChannelID) info.ChannelID)
// TODO(roasbeef): add ability to delete single directional edge // TODO(roasbeef): add ability to delete single directional edge
chansToPrune = append(chansToPrune, info.ChannelID) chansToPrune[info.ChannelID] = struct{}{}
return nil return nil
} }
err := r.cfg.Graph.ForEachChannel(filterPruneChans) // If AssumeChannelValid is present we'll look at the disabled bit for both
// edges. If they're both disabled, then we can interpret this as the
// channel being closed and can prune it from our graph.
if r.cfg.AssumeChannelValid {
disabledChanIDs, err := r.cfg.Graph.DisabledChannelIDs()
if err != nil {
return fmt.Errorf("unable to get disabled channels ids "+
"chans: %v", err)
}
disabledEdges, err := r.cfg.Graph.FetchChanInfos(disabledChanIDs)
if err != nil {
return fmt.Errorf("unable to fetch disabled channels edges "+
"chans: %v", err)
}
// Ensuring we won't prune our own channel from the graph.
for _, disabledEdge := range disabledEdges {
if !isSelfChannelEdge(disabledEdge.Info) {
chansToPrune[disabledEdge.Info.ChannelID] = struct{}{}
}
}
}
startTime := time.Unix(0, 0)
endTime := time.Now().Add(-1 * chanExpiry)
oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
if err != nil { if err != nil {
return fmt.Errorf("unable to filter local zombie channels: "+ return fmt.Errorf("unable to fetch expired channel updates "+
"%v", err) "chans: %v", err)
}
for _, u := range oldEdges {
filterPruneChans(u.Info, u.Policy1, u.Policy2)
} }
log.Infof("Pruning %v zombie channels", len(chansToPrune)) log.Infof("Pruning %v zombie channels", len(chansToPrune))
// With the set of zombie-like channels obtained, we'll do another pass // With the set of zombie-like channels obtained, we'll do another pass
// to delete them from the channel graph. // to delete them from the channel graph.
for _, chanID := range chansToPrune { toPrune := make([]uint64, 0, len(chansToPrune))
for chanID := range chansToPrune {
toPrune = append(toPrune, chanID)
log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID) log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
} }
if err := r.cfg.Graph.DeleteChannelEdges(chansToPrune...); err != nil { if err := r.cfg.Graph.DeleteChannelEdges(toPrune...); err != nil {
return fmt.Errorf("unable to delete zombie channels: %v", err) return fmt.Errorf("unable to delete zombie channels: %v", err)
} }

@ -1916,7 +1916,7 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) {
t.Parallel() t.Parallel()
freshTimestamp := time.Now() freshTimestamp := time.Now()
staleTimestamp := time.Time{} staleTimestamp := time.Unix(0, 0)
// We'll create the following test graph so that only the last channel // We'll create the following test graph so that only the last channel
// is pruned. // is pruned.