Merge pull request #3173 from cfromknecht/aggregate-router-logging

routing/router: log aggregate graph processing stats
This commit is contained in:
Olaoluwa Osuntokun 2019-06-14 06:27:11 +02:00 committed by GitHub
commit 866fa2b8f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 112 additions and 3 deletions

@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/multimutex" "github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/routing/chainview" "github.com/lightningnetwork/lnd/routing/chainview"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"github.com/lightningnetwork/lnd/zpay32" "github.com/lightningnetwork/lnd/zpay32"
) )
@ -37,6 +38,11 @@ const (
// DefaultChannelPruneExpiry is the default duration used to determine // DefaultChannelPruneExpiry is the default duration used to determine
// if a channel should be pruned or not. // if a channel should be pruned or not.
DefaultChannelPruneExpiry = time.Duration(time.Hour * 24 * 14) DefaultChannelPruneExpiry = time.Duration(time.Hour * 24 * 14)
// defaultStatInterval governs how often the router will log non-empty
// stats related to processing new channels, updates, or node
// announcements.
defaultStatInterval = 30 * time.Second
) )
var ( var (
@ -381,6 +387,14 @@ type ChannelRouter struct {
// consistency between the various database accesses. // consistency between the various database accesses.
channelEdgeMtx *multimutex.Mutex channelEdgeMtx *multimutex.Mutex
// statTicker is a resumable ticker that logs the router's progress as
// it discovers channels or receives updates.
statTicker ticker.Ticker
// stats tracks newly processed channels, updates, and node
// announcements over a window of defaultStatInterval.
stats *routerStats
sync.RWMutex sync.RWMutex
quit chan struct{} quit chan struct{}
@ -410,6 +424,8 @@ func New(cfg Config) (*ChannelRouter, error) {
ntfnClientUpdates: make(chan *topologyClientUpdate), ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex(), channelEdgeMtx: multimutex.NewMutex(),
selfNode: selfNode, selfNode: selfNode,
statTicker: ticker.New(defaultStatInterval),
stats: new(routerStats),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -845,6 +861,11 @@ func (r *ChannelRouter) networkHandler() {
graphPruneTicker := time.NewTicker(r.cfg.GraphPruneInterval) graphPruneTicker := time.NewTicker(r.cfg.GraphPruneInterval)
defer graphPruneTicker.Stop() defer graphPruneTicker.Stop()
r.statTicker.Resume()
defer r.statTicker.Stop()
r.stats.Reset()
// We'll use this validation barrier to ensure that we process all jobs // We'll use this validation barrier to ensure that we process all jobs
// in the proper order during parallel validation. // in the proper order during parallel validation.
validationBarrier := NewValidationBarrier(runtime.NumCPU()*4, r.quit) validationBarrier := NewValidationBarrier(runtime.NumCPU()*4, r.quit)
@ -1044,6 +1065,18 @@ func (r *ChannelRouter) networkHandler() {
log.Errorf("Unable to prune zombies: %v", err) log.Errorf("Unable to prune zombies: %v", err)
} }
// Log any stats if we've processed a non-empty number of
// channels, updates, or nodes. We'll only pause the ticker if
// the last window contained no updates to avoid resuming and
// pausing while consecutive windows contain new info.
case <-r.statTicker.Ticks():
if !r.stats.Empty() {
log.Infof(r.stats.String())
} else {
r.statTicker.Pause()
}
r.stats.Reset()
// The router has been signalled to exit, to we exit our main // The router has been signalled to exit, to we exit our main
// loop so the wait group can be decremented. // loop so the wait group can be decremented.
case <-r.quit: case <-r.quit:
@ -1108,7 +1141,8 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
"graph: %v", msg.PubKeyBytes, err) "graph: %v", msg.PubKeyBytes, err)
} }
log.Infof("Updated vertex data for node=%x", msg.PubKeyBytes) log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes)
r.stats.incNumNodeUpdates()
case *channeldb.ChannelEdgeInfo: case *channeldb.ChannelEdgeInfo:
// Prior to processing the announcement we first check if we // Prior to processing the announcement we first check if we
@ -1137,10 +1171,12 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
if err := r.cfg.Graph.AddChannelEdge(msg); err != nil { if err := r.cfg.Graph.AddChannelEdge(msg); err != nil {
return fmt.Errorf("unable to add edge: %v", err) return fmt.Errorf("unable to add edge: %v", err)
} }
log.Infof("New channel discovered! Link "+ log.Tracef("New channel discovered! Link "+
"connects %x and %x with ChannelID(%v)", "connects %x and %x with ChannelID(%v)",
msg.NodeKey1Bytes, msg.NodeKey2Bytes, msg.NodeKey1Bytes, msg.NodeKey2Bytes,
msg.ChannelID) msg.ChannelID)
r.stats.incNumEdgesDiscovered()
break break
} }
@ -1199,11 +1235,12 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
return errors.Errorf("unable to add edge: %v", err) return errors.Errorf("unable to add edge: %v", err)
} }
log.Infof("New channel discovered! Link "+ log.Tracef("New channel discovered! Link "+
"connects %x and %x with ChannelPoint(%v): "+ "connects %x and %x with ChannelPoint(%v): "+
"chan_id=%v, capacity=%v", "chan_id=%v, capacity=%v",
msg.NodeKey1Bytes, msg.NodeKey2Bytes, msg.NodeKey1Bytes, msg.NodeKey2Bytes,
fundingPoint, msg.ChannelID, msg.Capacity) fundingPoint, msg.ChannelID, msg.Capacity)
r.stats.incNumEdgesDiscovered()
// As a new edge has been added to the channel graph, we'll // As a new edge has been added to the channel graph, we'll
// update the current UTXO filter within our active // update the current UTXO filter within our active
@ -1301,11 +1338,14 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
log.Tracef("New channel update applied: %v", log.Tracef("New channel update applied: %v",
newLogClosure(func() string { return spew.Sdump(msg) })) newLogClosure(func() string { return spew.Sdump(msg) }))
r.stats.incNumChannelUpdates()
default: default:
return errors.Errorf("wrong routing update message type") return errors.Errorf("wrong routing update message type")
} }
r.statTicker.Resume()
return nil return nil
} }

69
routing/stats.go Normal file

@ -0,0 +1,69 @@
package routing
import (
"fmt"
"sync"
"time"
)
// routerStats is a struct that tracks various updates to the graph and
// facilitates aggregate logging of the statistics.
type routerStats struct {
numChannels uint32
numUpdates uint32
numNodes uint32
lastReset time.Time
mu sync.RWMutex
}
// incNumEdges increments the number of discovered edges.
func (g *routerStats) incNumEdgesDiscovered() {
g.mu.Lock()
g.numChannels++
g.mu.Unlock()
}
// incNumUpdates increments the number of channel updates processed.
func (g *routerStats) incNumChannelUpdates() {
g.mu.Lock()
g.numUpdates++
g.mu.Unlock()
}
// incNumNodeUpdates increments the number of node updates processed.
func (g *routerStats) incNumNodeUpdates() {
g.mu.Lock()
g.numNodes++
g.mu.Unlock()
}
// Empty returns true if all stats are zero.
func (g *routerStats) Empty() bool {
g.mu.RLock()
isEmpty := g.numChannels == 0 &&
g.numUpdates == 0 &&
g.numNodes == 0
g.mu.RUnlock()
return isEmpty
}
// Reset clears any router stats and sets the lastReset field to now.
func (g *routerStats) Reset() {
g.mu.Lock()
g.numChannels = 0
g.numUpdates = 0
g.numNodes = 0
g.lastReset = time.Now()
g.mu.Unlock()
}
// String returns a human-readable description of the router stats.
func (g *routerStats) String() string {
g.mu.RLock()
str := fmt.Sprintf("Processed channels=%d updates=%d nodes=%d in "+
"last %v", g.numChannels, g.numUpdates, g.numNodes,
time.Since(g.lastReset))
g.mu.RUnlock()
return str
}