diff --git a/routing/router.go b/routing/router.go index 4d468f90..31afb90e 100644 --- a/routing/router.go +++ b/routing/router.go @@ -25,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/multimutex" "github.com/lightningnetwork/lnd/routing/chainview" "github.com/lightningnetwork/lnd/routing/route" + "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/zpay32" ) @@ -37,6 +38,11 @@ const ( // DefaultChannelPruneExpiry is the default duration used to determine // if a channel should be pruned or not. DefaultChannelPruneExpiry = time.Duration(time.Hour * 24 * 14) + + // defaultStatInterval governs how often the router will log non-empty + // stats related to processing new channels, updates, or node + // announcements. + defaultStatInterval = 30 * time.Second ) var ( @@ -381,6 +387,14 @@ type ChannelRouter struct { // consistency between the various database accesses. channelEdgeMtx *multimutex.Mutex + // statTicker is a resumable ticker that logs the router's progress as + // it discovers channels or receives updates. + statTicker ticker.Ticker + + // stats tracks newly processed channels, updates, and node + // announcements over a window of defaultStatInterval. + stats *routerStats + sync.RWMutex quit chan struct{} @@ -410,6 +424,8 @@ func New(cfg Config) (*ChannelRouter, error) { ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex(), selfNode: selfNode, + statTicker: ticker.New(defaultStatInterval), + stats: new(routerStats), quit: make(chan struct{}), } @@ -845,6 +861,11 @@ func (r *ChannelRouter) networkHandler() { graphPruneTicker := time.NewTicker(r.cfg.GraphPruneInterval) defer graphPruneTicker.Stop() + r.statTicker.Resume() + defer r.statTicker.Stop() + + r.stats.Reset() + // We'll use this validation barrier to ensure that we process all jobs // in the proper order during parallel validation. validationBarrier := NewValidationBarrier(runtime.NumCPU()*4, r.quit) @@ -1044,6 +1065,18 @@ func (r *ChannelRouter) networkHandler() { log.Errorf("Unable to prune zombies: %v", err) } + // Log any stats if we've processed a non-empty number of + // channels, updates, or nodes. We'll only pause the ticker if + // the last window contained no updates to avoid resuming and + // pausing while consecutive windows contain new info. + case <-r.statTicker.Ticks(): + if !r.stats.Empty() { + log.Infof(r.stats.String()) + } else { + r.statTicker.Pause() + } + r.stats.Reset() + // The router has been signalled to exit, to we exit our main // loop so the wait group can be decremented. case <-r.quit: @@ -1108,7 +1141,8 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { "graph: %v", msg.PubKeyBytes, err) } - log.Infof("Updated vertex data for node=%x", msg.PubKeyBytes) + log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes) + r.stats.incNumNodeUpdates() case *channeldb.ChannelEdgeInfo: // Prior to processing the announcement we first check if we @@ -1137,10 +1171,12 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { if err := r.cfg.Graph.AddChannelEdge(msg); err != nil { return fmt.Errorf("unable to add edge: %v", err) } - log.Infof("New channel discovered! Link "+ + log.Tracef("New channel discovered! Link "+ "connects %x and %x with ChannelID(%v)", msg.NodeKey1Bytes, msg.NodeKey2Bytes, msg.ChannelID) + r.stats.incNumEdgesDiscovered() + break } @@ -1199,11 +1235,12 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { return errors.Errorf("unable to add edge: %v", err) } - log.Infof("New channel discovered! Link "+ + log.Tracef("New channel discovered! Link "+ "connects %x and %x with ChannelPoint(%v): "+ "chan_id=%v, capacity=%v", msg.NodeKey1Bytes, msg.NodeKey2Bytes, fundingPoint, msg.ChannelID, msg.Capacity) + r.stats.incNumEdgesDiscovered() // As a new edge has been added to the channel graph, we'll // update the current UTXO filter within our active @@ -1301,11 +1338,14 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { log.Tracef("New channel update applied: %v", newLogClosure(func() string { return spew.Sdump(msg) })) + r.stats.incNumChannelUpdates() default: return errors.Errorf("wrong routing update message type") } + r.statTicker.Resume() + return nil } diff --git a/routing/stats.go b/routing/stats.go new file mode 100644 index 00000000..b960025c --- /dev/null +++ b/routing/stats.go @@ -0,0 +1,69 @@ +package routing + +import ( + "fmt" + "sync" + "time" +) + +// routerStats is a struct that tracks various updates to the graph and +// facilitates aggregate logging of the statistics. +type routerStats struct { + numChannels uint32 + numUpdates uint32 + numNodes uint32 + lastReset time.Time + + mu sync.RWMutex +} + +// incNumEdges increments the number of discovered edges. +func (g *routerStats) incNumEdgesDiscovered() { + g.mu.Lock() + g.numChannels++ + g.mu.Unlock() +} + +// incNumUpdates increments the number of channel updates processed. +func (g *routerStats) incNumChannelUpdates() { + g.mu.Lock() + g.numUpdates++ + g.mu.Unlock() +} + +// incNumNodeUpdates increments the number of node updates processed. +func (g *routerStats) incNumNodeUpdates() { + g.mu.Lock() + g.numNodes++ + g.mu.Unlock() +} + +// Empty returns true if all stats are zero. +func (g *routerStats) Empty() bool { + g.mu.RLock() + isEmpty := g.numChannels == 0 && + g.numUpdates == 0 && + g.numNodes == 0 + g.mu.RUnlock() + return isEmpty +} + +// Reset clears any router stats and sets the lastReset field to now. +func (g *routerStats) Reset() { + g.mu.Lock() + g.numChannels = 0 + g.numUpdates = 0 + g.numNodes = 0 + g.lastReset = time.Now() + g.mu.Unlock() +} + +// String returns a human-readable description of the router stats. +func (g *routerStats) String() string { + g.mu.RLock() + str := fmt.Sprintf("Processed channels=%d updates=%d nodes=%d in "+ + "last %v", g.numChannels, g.numUpdates, g.numNodes, + time.Since(g.lastReset)) + g.mu.RUnlock() + return str +}