rpc: re-write GetNetworkInfo implementation to use single db transaction
This commit re-writes the GetNetworkInfo implenetaiton to use a single database transaction. We’re now able to do this due to the recent change in the API for the ChannelGraph struct and it’s related objects. The recent change allows the passed callback to accept a db transaction, with this, the callback is now able to issue another traversal routine _within_ the prior one.
This commit is contained in:
parent
79807022a5
commit
7ac50b060e
77
rpcserver.go
77
rpcserver.go
@ -16,6 +16,7 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/lnrpc"
|
||||
@ -1397,7 +1398,7 @@ func (r *rpcServer) DescribeGraph(context.Context,
|
||||
|
||||
resp := &lnrpc.ChannelGraph{}
|
||||
|
||||
// Obtain the pinter to the global singleton channel graph, this will
|
||||
// Obtain the pointer to the global singleton channel graph, this will
|
||||
// provide a consistent view of the graph due to bolt db's
|
||||
// transactional model.
|
||||
graph := r.server.chanDB.ChannelGraph()
|
||||
@ -1405,7 +1406,7 @@ func (r *rpcServer) DescribeGraph(context.Context,
|
||||
// First iterate through all the known nodes (connected or unconnected
|
||||
// within the graph), collating their current state into the RPC
|
||||
// response.
|
||||
err := graph.ForEachNode(func(node *channeldb.LightningNode) error {
|
||||
err := graph.ForEachNode(nil, func(_ *bolt.Tx, node *channeldb.LightningNode) error {
|
||||
nodeAddrs := make([]*lnrpc.NodeAddress, 0)
|
||||
for _, addr := range node.Addresses {
|
||||
nodeAddr := &lnrpc.NodeAddress{
|
||||
@ -1540,7 +1541,7 @@ func (r *rpcServer) GetNodeInfo(_ context.Context, in *lnrpc.NodeInfoRequest) (*
|
||||
numChannels uint32
|
||||
totalCapcity btcutil.Amount
|
||||
)
|
||||
if err := node.ForEachChannel(nil, func(edge *channeldb.ChannelEdgeInfo,
|
||||
if err := node.ForEachChannel(nil, func(_ *bolt.Tx, edge *channeldb.ChannelEdgeInfo,
|
||||
_ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
numChannels++
|
||||
@ -1649,49 +1650,43 @@ func (r *rpcServer) GetNetworkInfo(context.Context, *lnrpc.NetworkInfoRequest) (
|
||||
maxChannelSize btcutil.Amount
|
||||
)
|
||||
|
||||
// TODO(roasbeef): ideally all below is completed in a single
|
||||
// transaction
|
||||
// We'll use this map to de-duplicate channels during our traversal.
|
||||
// This is needed since channels are directional, so there will be two
|
||||
// edges for each channel within the graph.
|
||||
seenChans := make(map[uint64]struct{})
|
||||
|
||||
// First run through all the known nodes in the within our view of the
|
||||
// We'll run through all the known nodes in the within our view of the
|
||||
// network, tallying up the total number of nodes, and also gathering
|
||||
// each node so we can measure the graph diamter and degree stats
|
||||
// each node so we can measure the graph diameter and degree stats
|
||||
// below.
|
||||
var nodes []*channeldb.LightningNode
|
||||
if err := graph.ForEachNode(func(node *channeldb.LightningNode) error {
|
||||
if err := graph.ForEachNode(nil, func(tx *bolt.Tx, node *channeldb.LightningNode) error {
|
||||
// Increment the total number of nodes with each iteration.
|
||||
numNodes++
|
||||
nodes = append(nodes, node)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// With all the nodes gathered, we can now perform a basic traversal to
|
||||
// ascertain the graph's diameter, and also the max out-degree of a
|
||||
// node.
|
||||
for _, node := range nodes {
|
||||
// For each channel we'll compute the out degree of each node,
|
||||
// and also update our running tallies of the min/max channel
|
||||
// capacity, as well as the total channel capacity. We pass
|
||||
// through the db transaction from the outer view so we can
|
||||
// re-use it within this inner view.
|
||||
var outDegree uint32
|
||||
err := node.ForEachChannel(nil, func(_ *channeldb.ChannelEdgeInfo,
|
||||
_ *channeldb.ChannelEdgePolicy) error {
|
||||
if err := node.ForEachChannel(tx, func(_ *bolt.Tx,
|
||||
edge *channeldb.ChannelEdgeInfo, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
// Bump up the out degree for this node for each
|
||||
// channel encountered.
|
||||
outDegree++
|
||||
|
||||
// If we've already seen this channel, then we'll
|
||||
// return early to ensure that we don't double-count
|
||||
// stats.
|
||||
if _, ok := seenChans[edge.ChannelID]; ok {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if outDegree > maxChanOut {
|
||||
maxChanOut = outDegree
|
||||
}
|
||||
}
|
||||
|
||||
// Finally, we traverse each channel visiting both channel edges at
|
||||
// once to avoid double counting any stats we're attempting to gather.
|
||||
if err := graph.ForEachChannel(func(edge *channeldb.ChannelEdgeInfo,
|
||||
_, _ *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
// Compare the capacity of this channel against the
|
||||
// running min/max to see if we should update the
|
||||
// extrema.
|
||||
chanCapacity := edge.Capacity
|
||||
|
||||
if chanCapacity < minChannelSize {
|
||||
minChannelSize = chanCapacity
|
||||
}
|
||||
@ -1699,10 +1694,24 @@ func (r *rpcServer) GetNetworkInfo(context.Context, *lnrpc.NetworkInfoRequest) (
|
||||
maxChannelSize = chanCapacity
|
||||
}
|
||||
|
||||
// Accumulate the total capacity of this channel to the
|
||||
// network wide-capacity.
|
||||
totalNetworkCapacity += chanCapacity
|
||||
|
||||
numChannels++
|
||||
|
||||
seenChans[edge.ChannelID] = struct{}{}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Finally, if the out degree of this node is greater than what
|
||||
// we've seen so far, update the maxChanOut variable.
|
||||
if outDegree > maxChanOut {
|
||||
maxChanOut = outDegree
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/lightningnetwork/lightning-onion"
|
||||
"github.com/lightningnetwork/lnd/brontide"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
@ -340,8 +341,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = sourceNode.ForEachChannel(nil, func(_ *channeldb.ChannelEdgeInfo,
|
||||
policy *channeldb.ChannelEdgePolicy) error {
|
||||
err = sourceNode.ForEachChannel(nil, func(_ *bolt.Tx,
|
||||
_ *channeldb.ChannelEdgeInfo, policy *channeldb.ChannelEdgePolicy) error {
|
||||
|
||||
pubStr := string(policy.Node.PubKey.SerializeCompressed())
|
||||
|
||||
// Add addresses from channel graph/NodeAnnouncements to the
|
||||
|
Loading…
Reference in New Issue
Block a user