router: Optimize pruneZombieChannels.

The current approach iterates all channels in the graph in order to
filter those in need. This approach is time consuming, several seconds
on my mobile device for ~40,000 channels, while during this time the
db is locked in a transaction.

The proposed change is to use an existing functionality that utilize the
fact that channel update are saved indexed by date. This method enables
us to go over only a small subset of the channels, only those that
were updated before the "channel expiry" time and further filter
them for our need.
The same graph that took several seconds to prune was pruned, after
the change, in several milliseconds.

In addition for testing purposes I added Initiator field to the
testChannel structure to reflect the channeldEdgePolicy direction.
This commit is contained in:
Roei Erez 2019-07-10 15:20:42 +03:00
parent 7300f33fe2
commit 9781ea0082
3 changed files with 47 additions and 10 deletions

@ -314,6 +314,7 @@ type testChannelPolicy struct {
FeeRate lnwire.MilliSatoshi
LastUpdate time.Time
Disabled bool
Direction bool
}
type testChannelEnd struct {
@ -346,6 +347,9 @@ func symmetricTestChannel(alias1 string, alias2 string, capacity btcutil.Amount,
id = chanID[0]
}
node2Policy := *policy
node2Policy.Direction = !policy.Direction
return &testChannel{
Capacity: capacity,
Node1: &testChannelEnd{
@ -354,7 +358,7 @@ func symmetricTestChannel(alias1 string, alias2 string, capacity btcutil.Amount,
},
Node2: &testChannelEnd{
Alias: alias2,
testChannelPolicy: policy,
testChannelPolicy: &node2Policy,
},
ChannelID: id,
}
@ -527,6 +531,9 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
if testChannel.Node1.Disabled {
channelFlags |= lnwire.ChanUpdateDisabled
}
if testChannel.Node1.Direction {
channelFlags |= lnwire.ChanUpdateDirection
}
edgePolicy := &channeldb.ChannelEdgePolicy{
SigBytes: testSig.Serialize(),
MessageFlags: msgFlags,
@ -549,10 +556,13 @@ func createTestGraphFromChannels(testChannels []*testChannel, source string) (
if testChannel.Node2.MaxHTLC != 0 {
msgFlags |= lnwire.ChanUpdateOptionMaxHtlc
}
channelFlags := lnwire.ChanUpdateDirection
channelFlags := lnwire.ChanUpdateChanFlags(0)
if testChannel.Node2.Disabled {
channelFlags |= lnwire.ChanUpdateDisabled
}
if testChannel.Node2.Direction {
channelFlags |= lnwire.ChanUpdateDirection
}
edgePolicy := &channeldb.ChannelEdgePolicy{
SigBytes: testSig.Serialize(),
MessageFlags: msgFlags,

@ -734,7 +734,7 @@ func (r *ChannelRouter) syncGraphWithChain() error {
// usually signals that a channel has been closed on-chain. We do this
// periodically to keep a healthy, lively routing table.
func (r *ChannelRouter) pruneZombieChans() error {
var chansToPrune []uint64
chansToPrune := make(map[uint64]struct{})
chanExpiry := r.cfg.ChannelPruneExpiry
log.Infof("Examining channel graph for zombie channels")
@ -744,6 +744,11 @@ func (r *ChannelRouter) pruneZombieChans() error {
filterPruneChans := func(info *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
// Exit early in case this channel is already marked to be pruned
if _, markedToPrune := chansToPrune[info.ChannelID]; markedToPrune {
return nil
}
// We'll ensure that we don't attempt to prune our *own*
// channels from the graph, as in any case this should be
// re-advertised by the sub-system above us.
@ -809,25 +814,47 @@ func (r *ChannelRouter) pruneZombieChans() error {
info.ChannelID)
// TODO(roasbeef): add ability to delete single directional edge
chansToPrune = append(chansToPrune, info.ChannelID)
chansToPrune[info.ChannelID] = struct{}{}
return nil
}
err := r.cfg.Graph.ForEachChannel(filterPruneChans)
startTime := time.Unix(0, 0)
endTime := time.Now().Add(-1 * chanExpiry)
oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
if err != nil {
return fmt.Errorf("unable to filter local zombie channels: "+
"%v", err)
return fmt.Errorf("unable to filter local zombie "+
"chans: %v", err)
}
disabledChanIDs, err := r.cfg.Graph.DisabledChannelIDs()
if err != nil {
return fmt.Errorf("unable to filter local zombie "+
"chans: %v", err)
}
disabledEdges, err := r.cfg.Graph.FetchChanInfos(disabledChanIDs)
if err != nil {
return fmt.Errorf("unable to filter local zombie "+
"chans: %v", err)
}
edgesToFilter := append(oldEdges, disabledEdges...)
for _, u := range edgesToFilter {
filterPruneChans(u.Info, u.Policy1, u.Policy2)
}
log.Infof("Pruning %v zombie channels", len(chansToPrune))
// With the set of zombie-like channels obtained, we'll do another pass
// to delete them from the channel graph.
for _, chanID := range chansToPrune {
var toPrune []uint64
for chanID := range chansToPrune {
toPrune = append(toPrune, chanID)
log.Tracef("Pruning zombie channel with ChannelID(%v)", chanID)
}
if err := r.cfg.Graph.DeleteChannelEdges(chansToPrune...); err != nil {
if err := r.cfg.Graph.DeleteChannelEdges(toPrune...); err != nil {
return fmt.Errorf("unable to delete zombie channels: %v", err)
}

@ -1899,7 +1899,7 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) {
t.Parallel()
freshTimestamp := time.Now()
staleTimestamp := time.Time{}
staleTimestamp := time.Unix(0, 0)
// We'll create the following test graph so that only the last channel
// is pruned.