routing: extract zombie pruning to distinct method

This commit is contained in:
Olaoluwa Osuntokun 2018-01-30 20:34:40 -08:00
parent 1f3124f48a
commit 6d05cb5aae
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
2 changed files with 94 additions and 77 deletions

@ -504,6 +504,94 @@ func (r *ChannelRouter) syncGraphWithChain() error {
return nil return nil
} }
// pruneZombieChans is a method that will be called periodically to prune out
// any "zombie" channels. We consider channels zombies if *both* edges haven't
// been updated since our zombie horizon. We do this periodically to keep a
// health, lively routing table.
func (r *ChannelRouter) pruneZombieChans() error {
var chansToPrune []wire.OutPoint
chanExpiry := r.cfg.ChannelPruneExpiry
log.Infof("Examining Channel Graph for zombie channels")
// First, we'll collect all the channels which are eligible for garbage
// collection due to being zombies.
filterPruneChans := func(info *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
// We'll ensure that we don't attempt to prune our *own*
// channels from the graph, as in any case this should be
// re-advertised by the sub-system above us.
if info.NodeKey1Bytes == r.selfNode.PubKeyBytes ||
info.NodeKey2Bytes == r.selfNode.PubKeyBytes {
return nil
}
// If *both* edges haven't been updated for a period of
// chanExpiry, then we'll mark the channel itself as eligible
// for graph pruning.
e1Zombie, e2Zombie := true, true
if e1 != nil {
e1Zombie = time.Since(e1.LastUpdate) >= chanExpiry
if e1Zombie {
log.Tracef("Edge #1 of ChannelPoint(%v) "+
"last update: %v",
info.ChannelPoint, e1.LastUpdate)
}
}
if e2 != nil {
e2Zombie = time.Since(e2.LastUpdate) >= chanExpiry
if e2Zombie {
log.Tracef("Edge #2 of ChannelPoint(%v) "+
"last update: %v",
info.ChannelPoint, e2.LastUpdate)
}
}
if e1Zombie && e2Zombie {
log.Debugf("ChannelPoint(%v) is a zombie, collecting "+
"to prune", info.ChannelPoint)
// TODO(roasbeef): add ability to delete single
// directional edge
chansToPrune = append(chansToPrune, info.ChannelPoint)
// As we're detecting this as a zombie channel, we'll
// add this to the set of recently rejected items so we
// don't re-accept it shortly after.
r.rejectCache[info.ChannelID] = struct{}{}
}
return nil
}
r.rejectMtx.Lock()
err := r.cfg.Graph.ForEachChannel(filterPruneChans)
if err != nil {
r.rejectMtx.Unlock()
return fmt.Errorf("Unable to filter local zombie "+
"chans: %v", err)
}
log.Infof("Pruning %v Zombie Channels", len(chansToPrune))
// With the set zombie-like channels obtained, we'll do another pass to
// delete al zombie channels from the channel graph.
for _, chanToPrune := range chansToPrune {
log.Tracef("Pruning zombie chan ChannelPoint(%v)", chanToPrune)
err := r.cfg.Graph.DeleteChannelEdge(&chanToPrune)
if err != nil {
r.rejectMtx.Unlock()
return fmt.Errorf("Unable to prune zombie "+
"chans: %v", err)
}
}
r.rejectMtx.Unlock()
return nil
}
// networkHandler is the primary goroutine for the ChannelRouter. The roles of // networkHandler is the primary goroutine for the ChannelRouter. The roles of
// this goroutine include answering queries related to the state of the // this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network // network, pruning the graph on new block notification, applying network
@ -716,79 +804,8 @@ func (r *ChannelRouter) networkHandler() {
// state of the known graph to filter out any zombie channels // state of the known graph to filter out any zombie channels
// for pruning. // for pruning.
case <-graphPruneTicker.C: case <-graphPruneTicker.C:
if err := r.pruneZombieChans(); err != nil {
var chansToPrune []wire.OutPoint log.Errorf("unable to prune zombies: %v", err)
chanExpiry := r.cfg.ChannelPruneExpiry
log.Infof("Examining Channel Graph for zombie channels")
// First, we'll collect all the channels which are
// eligible for garbage collection due to being
// zombies.
filterPruneChans := func(info *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
// We'll ensure that we don't attempt to prune
// our *own* channels from the graph, as in any
// case this should be re-advertised by the
// sub-system above us.
if info.NodeKey1.IsEqual(r.selfNode.PubKey) ||
info.NodeKey2.IsEqual(r.selfNode.PubKey) {
return nil
}
// If *both* edges haven't been updated for a
// period of chanExpiry, then we'll mark the
// channel itself as eligible for graph
// pruning.
e1Zombie, e2Zombie := true, true
if e1 != nil {
e1Zombie = time.Since(e1.LastUpdate) >= chanExpiry
log.Tracef("Edge #1 of ChannelPoint(%v) "+
"last update: %v",
info.ChannelPoint, e1.LastUpdate)
}
if e2 != nil {
e2Zombie = time.Since(e2.LastUpdate) >= chanExpiry
log.Tracef("Edge #2 of ChannelPoint(%v) "+
"last update: %v",
info.ChannelPoint, e2.LastUpdate)
}
if e1Zombie && e2Zombie {
log.Infof("ChannelPoint(%v) is a "+
"zombie, collecting to prune",
info.ChannelPoint)
// TODO(roasbeef): add ability to
// delete single directional edge
chansToPrune = append(chansToPrune,
info.ChannelPoint)
}
return nil
}
err := r.cfg.Graph.ForEachChannel(filterPruneChans)
if err != nil {
log.Errorf("Unable to local zombie chans: %v", err)
continue
}
log.Infof("Pruning %v Zombie Channels", len(chansToPrune))
// With the set zombie-like channels obtained, we'll do
// another pass to delete al zombie channels from the
// channel graph.
for _, chanToPrune := range chansToPrune {
log.Tracef("Pruning zombie chan ChannelPoint(%v)",
chanToPrune)
err := r.cfg.Graph.DeleteChannelEdge(&chanToPrune)
if err != nil {
log.Errorf("Unable to prune zombie "+
"chans: %v", err)
continue
}
} }
// The router has been signalled to exit, to we exit our main // The router has been signalled to exit, to we exit our main

@ -116,8 +116,8 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) {
v.chanAnnFinSignal[shortID] = annFinCond v.chanAnnFinSignal[shortID] = annFinCond
v.chanEdgeDependencies[shortID] = annFinCond v.chanEdgeDependencies[shortID] = annFinCond
v.nodeAnnDependencies[Vertex(msg.NodeKey1)] = annFinCond v.nodeAnnDependencies[Vertex(msg.NodeKey1Bytes)] = annFinCond
v.nodeAnnDependencies[Vertex(msg.NodeKey2)] = annFinCond v.nodeAnnDependencies[Vertex(msg.NodeKey2Bytes)] = annFinCond
} }
// These other types don't have any dependants, so no further // These other types don't have any dependants, so no further
@ -168,7 +168,7 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) {
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
signal, ok = v.chanEdgeDependencies[shortID] signal, ok = v.chanEdgeDependencies[shortID]
case *channeldb.LightningNode: case *channeldb.LightningNode:
vertex := Vertex(msg.PubKey) vertex := Vertex(msg.PubKeyBytes)
signal, ok = v.nodeAnnDependencies[vertex] signal, ok = v.nodeAnnDependencies[vertex]
case *lnwire.ChannelUpdate: case *lnwire.ChannelUpdate:
signal, ok = v.chanEdgeDependencies[msg.ShortChannelID] signal, ok = v.chanEdgeDependencies[msg.ShortChannelID]
@ -234,7 +234,7 @@ func (v *ValidationBarrier) SignalDependants(job interface{}) {
// map, as if we reach this point, then all dependants have already // map, as if we reach this point, then all dependants have already
// finished executing and we can proceed. // finished executing and we can proceed.
case *channeldb.LightningNode: case *channeldb.LightningNode:
delete(v.nodeAnnDependencies, Vertex(msg.PubKey)) delete(v.nodeAnnDependencies, Vertex(msg.PubKeyBytes))
case *lnwire.NodeAnnouncement: case *lnwire.NodeAnnouncement:
delete(v.nodeAnnDependencies, Vertex(msg.NodeID)) delete(v.nodeAnnDependencies, Vertex(msg.NodeID))
case *lnwire.ChannelUpdate: case *lnwire.ChannelUpdate: