Merge pull request #1600 from Roasbeef/prune-unconnected-graph-verticies

channeldb+routing: prune unconnected graph vertexes on start up
This commit is contained in:
Olaoluwa Osuntokun 2018-07-23 16:49:00 -07:00 committed by GitHub
commit 1a13e44cde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 313 additions and 83 deletions

@ -10,12 +10,12 @@ import (
"net"
"time"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
@ -29,7 +29,7 @@ var (
// traversals. The graph is formed as a star-graph with the source node
// at the center.
//
// maps: pubKey -> nofInfo
// maps: pubKey -> nodeInfo
// maps: source -> selfPubKey
nodeBucket = []byte("graph-node")
@ -439,6 +439,10 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
return c.db.Update(func(tx *bolt.Tx) error {
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
@ -459,6 +463,45 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return ErrEdgeAlreadyExist
}
// Before we insert the channel into the database, we'll ensure
// that both nodes already exist in the channel graph. If
// either node doesn't, then we'll insert a "shell" node that
// just includes its public key, so subsequent validation and
// queries can work properly.
_, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
switch {
case node1Err == ErrGraphNodeNotFound:
node1Shell := LightningNode{
PubKeyBytes: edge.NodeKey1Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node1Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey1Bytes)
}
case node1Err != nil:
return err
}
_, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
switch {
case node2Err == ErrGraphNodeNotFound:
node2Shell := LightningNode{
PubKeyBytes: edge.NodeKey2Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node2Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey2Bytes)
}
case node2Err != nil:
return err
}
// If the edge hasn't been created yet, then we'll first add it
// to the edge index in order to associate the edge between two
// nodes and also store the static components of the channel.
@ -477,7 +520,7 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
}
// HasChannelEdge returns true if the database knows of a channel edge with the
// passed channel ID, and false otherwise. If the an edge with that ID is found
// passed channel ID, and false otherwise. If an edge with that ID is found
// within the graph, then two time stamps representing the last time the edge
// was updated for both directed edges are returned along with the boolean.
func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool, error) {
@ -588,13 +631,6 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
var chansClosed []*ChannelEdgeInfo
// nodesWithChansClosed is the set of nodes, each identified by their
// compressed public key, who had a channel closed within the latest
// block. We'll use this later on to determine whether we should prune
// them from the channel graph due to no longer having any other open
// channels.
nodesWithChansClosed := make(map[[33]byte]struct{})
err := c.db.Update(func(tx *bolt.Tx) error {
// First grab the edges bucket which houses the information
// we'd like to delete
@ -655,11 +691,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
return err
}
// Include this channel in our list of closed channels
// and collect the node public keys at each end.
chansClosed = append(chansClosed, &edgeInfo)
nodesWithChansClosed[edgeInfo.NodeKey1Bytes] = struct{}{}
nodesWithChansClosed[edgeInfo.NodeKey2Bytes] = struct{}{}
}
metaBucket, err := tx.CreateBucketIfNotExists(graphMetaBucket)
@ -689,7 +721,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// Now that the graph has been pruned, we'll also attempt to
// prune any nodes that have had a channel closed within the
// latest block.
return c.pruneGraphNodes(tx, nodes, nodesWithChansClosed)
return c.pruneGraphNodes(tx, nodes, edgeIndex)
})
if err != nil {
return nil, err
@ -698,11 +730,34 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
return chansClosed, nil
}
// PruneGraphNodes is a garbage collection method which attempts to prune out
// any nodes from the channel graph that are currently unconnected. This ensure
// that we only maintain a graph of reachable nodes. In the event that a pruned
// node gains more channels, it will be re-added back to the graph.
func (c *ChannelGraph) PruneGraphNodes() error {
return c.db.Update(func(tx *bolt.Tx) error {
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNotFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}
return c.pruneGraphNodes(tx, nodes, edgeIndex)
})
}
// pruneGraphNodes attempts to remove any nodes from the graph who have had a
// channel closed within the current block. If the node still has existing
// channels in the graph, this will act as a no-op.
func (c *ChannelGraph) pruneGraphNodes(tx *bolt.Tx, nodes *bolt.Bucket,
nodePubKeys map[[33]byte]struct{}) error {
edgeIndex *bolt.Bucket) error {
log.Trace("Pruning nodes from graph with no open channels")
@ -713,38 +768,82 @@ func (c *ChannelGraph) pruneGraphNodes(tx *bolt.Tx, nodes *bolt.Bucket,
return err
}
// We'll now iterate over every node which had a channel closed and
// check whether they have any other open channels left within the
// graph. If they don't, they'll be pruned from the channel graph.
for nodePubKey := range nodePubKeys {
if bytes.Equal(nodePubKey[:], sourceNode.PubKeyBytes[:]) {
continue
}
node, err := fetchLightningNode(nodes, nodePubKey[:])
if err != nil {
continue
}
node.db = c.db
numChansLeft := 0
err = node.ForEachChannel(tx, func(*bolt.Tx, *ChannelEdgeInfo,
*ChannelEdgePolicy, *ChannelEdgePolicy) error {
numChansLeft++
// We'll use this map to keep count the number of references to a node
// in the graph. A node should only be removed once it has no more
// references in the graph.
nodeRefCounts := make(map[[33]byte]int)
err = nodes.ForEach(func(pubKey, nodeBytes []byte) error {
// If this is the source key, then we skip this
// iteration as the value for this key is a pubKey
// rather than raw node information.
if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 {
return nil
})
if err != nil {
}
var nodePub [33]byte
copy(nodePub[:], pubKey)
nodeRefCounts[nodePub] = 0
return nil
})
if err != nil {
return err
}
// To ensure we never delete the source node, we'll start off by
// bumping its ref count to 1.
nodeRefCounts[sourceNode.PubKeyBytes] = 1
// Next, we'll run through the edgeIndex which maps a channel ID to the
// edge info. We'll use this scan to populate our reference count map
// above.
err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
// The first 66 bytes of the edge info contain the pubkeys of
// the nodes that this edge attaches. We'll extract them, and
// add them to the ref count map.
var node1, node2 [33]byte
copy(node1[:], edgeInfoBytes[:33])
copy(node2[:], edgeInfoBytes[33:])
// With the nodes extracted, we'll increase the ref count of
// each of the nodes.
nodeRefCounts[node1] += 1
nodeRefCounts[node2] += 1
return nil
})
if err != nil {
return err
}
// Finally, we'll make a second pass over the set of nodes, and delete
// any nodes that have a ref count of zero.
var numNodesPruned int
for nodePubKey, refCount := range nodeRefCounts {
// If the ref count of the node isn't zero, then we can safely
// skip it as it still has edges to or from it within the
// graph.
if refCount != 0 {
continue
}
if numChansLeft == 0 {
err := c.deleteLightningNode(tx, nodePubKey[:])
if err != nil {
log.Tracef("Unable to prune node %x from the "+
"graph: %v", nodePubKey, err)
}
// If we reach this point, then there are no longer any edges
// that connect this node, so we can delete it.
if err := c.deleteLightningNode(tx, nodePubKey[:]); err != nil {
log.Warnf("Unable to prune node %x from the "+
"graph: %v", nodePubKey, err)
continue
}
log.Infof("Pruned unconnected node %x from channel graph",
nodePubKey[:])
numNodesPruned++
}
if numNodesPruned > 0 {
log.Infof("Pruned %v unconnected nodes from the channel graph",
numNodesPruned)
}
return nil
@ -2173,8 +2272,9 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelE
// Once we have the information about the channels' parameters,
// we'll fetch the routing policies for each for the directed
// edges.
e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes,
chanID, c.db)
e1, e2, err := fetchChanEdgePolicies(
edgeIndex, edges, nodes, chanID, c.db,
)
if err != nil {
return err
}
@ -2232,8 +2332,9 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, *
}
edgeInfo = &edge
e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes,
channelID[:], c.db)
e1, e2, err := fetchChanEdgePolicies(
edgeIndex, edges, nodes, channelID[:], c.db,
)
if err != nil {
return err
}
@ -2833,7 +2934,8 @@ func deserializeChanEdgePolicy(r io.Reader,
node, err := fetchLightningNode(nodes, pub[:])
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to fetch node: %x, %v",
pub[:], err)
}
edge.Node = &node

@ -14,12 +14,12 @@ import (
"testing"
"time"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
@ -2058,6 +2058,155 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
}
}
// TestPruneGraphNodes tests that unconnected vertexes are pruned via the
// PruneSyncState method.
func TestPruneGraphNodes(t *testing.T) {
t.Parallel()
db, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
// We'll start off by inserting our source node, to ensure that it's
// the only node left after we prune the graph.
graph := db.ChannelGraph()
sourceNode, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create source node: %v", err)
}
if err := graph.SetSourceNode(sourceNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
}
// With the source node inserted, we'll now add three nodes to the
// channel graph, at the end of the scenario, only two of these nodes
// should still be in the graph.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node1); err != nil {
t.Fatalf("unable to add node: %v", err)
}
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
node3, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node3); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// We'll now add a new edge to the graph, but only actually advertise
// the edge of *one* of the nodes.
edgeInfo, chanID := createEdge(100, 0, 0, 0, node1, node2)
if err := graph.AddChannelEdge(&edgeInfo); err != nil {
t.Fatalf("unable to add edge: %v", err)
}
// We'll now insert an advertised edge, but it'll only be the edge that
// points from the first to the second node.
edge1 := randEdgePolicy(chanID.ToUint64(), edgeInfo.ChannelPoint, db)
edge1.Flags = 0
edge1.Node = node1
edge1.SigBytes = testSig.Serialize()
if err := graph.UpdateEdgePolicy(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
// We'll now initiate a around of graph pruning.
if err := graph.PruneGraphNodes(); err != nil {
t.Fatalf("unable to prune graph nodes: %v", err)
}
// At this point, there should be 3 nodes left in the graph still: the
// source node (which can't be pruned), and node 1+2. Nodes 1 and two
// should still be left in the graph as there's half of an advertised
// edge between them.
assertNumNodes(t, graph, 3)
// Finally, we'll ensure that node3, the only fully unconnected node as
// properly deleted from the graph and not another node in its place.
node3Pub, err := node3.PubKey()
if err != nil {
t.Fatalf("unable to fetch the pubkey of node3: %v", err)
}
if _, err := graph.FetchLightningNode(node3Pub); err == nil {
t.Fatalf("node 3 should have been deleted!")
}
}
// TestAddChannelEdgeShellNodes tests that when we attempt to add a ChannelEdge
// to the graph, one or both of the nodes the edge involves aren't found in the
// database, then shell edges are created for each node if needed.
func TestAddChannelEdgeShellNodes(t *testing.T) {
t.Parallel()
db, cleanUp, err := makeTestDB()
defer cleanUp()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
graph := db.ChannelGraph()
// To start, we'll create two nodes, and only add one of them to the
// channel graph.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node1); err != nil {
t.Fatalf("unable to add node: %v", err)
}
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
// We'll now create an edge between the two nodes, as a result, node2
// should be inserted into the database as a shell node.
edgeInfo, _ := createEdge(100, 0, 0, 0, node1, node2)
if err := graph.AddChannelEdge(&edgeInfo); err != nil {
t.Fatalf("unable to add edge: %v", err)
}
node1Pub, err := node1.PubKey()
if err != nil {
t.Fatalf("unable to parse node 1 pub: %v", err)
}
node2Pub, err := node2.PubKey()
if err != nil {
t.Fatalf("unable to parse node 2 pub: %v", err)
}
// Ensure that node1 was inserted as a full node, while node2 only has
// a shell node present.
node1, err = graph.FetchLightningNode(node1Pub)
if err != nil {
t.Fatalf("unable to fetch node1: %v", err)
}
if !node1.HaveNodeAnnouncement {
t.Fatalf("have shell announcement for node1, shouldn't")
}
node2, err = graph.FetchLightningNode(node2Pub)
if err != nil {
t.Fatalf("unable to fetch node2: %v", err)
}
if node2.HaveNodeAnnouncement {
t.Fatalf("should have shell announcement for node2, but is full")
}
}
// compareNodes is used to compare two LightningNodes while excluding the
// Features struct, which cannot be compared as the semantics for reserializing
// the featuresMap have not been defined.

@ -9,6 +9,9 @@ import (
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/channeldb"
@ -17,9 +20,6 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/routing/chainview"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"crypto/sha256"
@ -382,6 +382,13 @@ func (r *ChannelRouter) Start() error {
return err
}
// Finally, before we proceed, we'll prune any unconnected nodes from
// the graph in order to ensure we maintain a tight graph of "useful"
// nodes.
if err := r.cfg.Graph.PruneGraphNodes(); err != nil {
return err
}
r.wg.Add(1)
go r.networkHandler()
@ -949,34 +956,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
"chan_id=%v", msg.ChannelID)
}
// Query the database for the existence of the two nodes in this
// channel. If not found, add a partial node to the database,
// containing only the node keys.
_, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey1Bytes)
if !exists {
node1 := &channeldb.LightningNode{
PubKeyBytes: msg.NodeKey1Bytes,
HaveNodeAnnouncement: false,
}
err := r.cfg.Graph.AddLightningNode(node1)
if err != nil {
return errors.Errorf("unable to add node %v to"+
" the graph: %v", node1.PubKeyBytes, err)
}
}
_, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey2Bytes)
if !exists {
node2 := &channeldb.LightningNode{
PubKeyBytes: msg.NodeKey2Bytes,
HaveNodeAnnouncement: false,
}
err := r.cfg.Graph.AddLightningNode(node2)
if err != nil {
return errors.Errorf("unable to add node %v to"+
" the graph: %v", node2.PubKeyBytes, err)
}
}
// Before we can add the channel to the channel graph, we need
// to obtain the full funding outpoint that's encoded within
// the channel ID.