diff --git a/channeldb/graph.go b/channeldb/graph.go index 920193aa..49b73aad 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -10,12 +10,12 @@ import ( "net" "time" - "github.com/coreos/bbolt" - "github.com/lightningnetwork/lnd/lnwire" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" + "github.com/coreos/bbolt" + "github.com/lightningnetwork/lnd/lnwire" ) var ( @@ -29,7 +29,7 @@ var ( // traversals. The graph is formed as a star-graph with the source node // at the center. // - // maps: pubKey -> nofInfo + // maps: pubKey -> nodeInfo // maps: source -> selfPubKey nodeBucket = []byte("graph-node") @@ -439,6 +439,10 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) return c.db.Update(func(tx *bolt.Tx) error { + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } edges, err := tx.CreateBucketIfNotExists(edgeBucket) if err != nil { return err @@ -459,6 +463,45 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { return ErrEdgeAlreadyExist } + // Before we insert the channel into the database, we'll ensure + // that both nodes already exist in the channel graph. If + // either node doesn't, then we'll insert a "shell" node that + // just includes its public key, so subsequent validation and + // queries can work properly. + _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:]) + switch { + case node1Err == ErrGraphNodeNotFound: + node1Shell := LightningNode{ + PubKeyBytes: edge.NodeKey1Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node1Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey1Bytes) + + } + case node1Err != nil: + return err + } + + _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:]) + switch { + case node2Err == ErrGraphNodeNotFound: + node2Shell := LightningNode{ + PubKeyBytes: edge.NodeKey2Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node2Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey2Bytes) + + } + case node2Err != nil: + return err + } + // If the edge hasn't been created yet, then we'll first add it // to the edge index in order to associate the edge between two // nodes and also store the static components of the channel. @@ -477,7 +520,7 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { } // HasChannelEdge returns true if the database knows of a channel edge with the -// passed channel ID, and false otherwise. If the an edge with that ID is found +// passed channel ID, and false otherwise. If an edge with that ID is found // within the graph, then two time stamps representing the last time the edge // was updated for both directed edges are returned along with the boolean. func (c *ChannelGraph) HasChannelEdge(chanID uint64) (time.Time, time.Time, bool, error) { @@ -588,13 +631,6 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, var chansClosed []*ChannelEdgeInfo - // nodesWithChansClosed is the set of nodes, each identified by their - // compressed public key, who had a channel closed within the latest - // block. We'll use this later on to determine whether we should prune - // them from the channel graph due to no longer having any other open - // channels. - nodesWithChansClosed := make(map[[33]byte]struct{}) - err := c.db.Update(func(tx *bolt.Tx) error { // First grab the edges bucket which houses the information // we'd like to delete @@ -655,11 +691,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, return err } - // Include this channel in our list of closed channels - // and collect the node public keys at each end. chansClosed = append(chansClosed, &edgeInfo) - nodesWithChansClosed[edgeInfo.NodeKey1Bytes] = struct{}{} - nodesWithChansClosed[edgeInfo.NodeKey2Bytes] = struct{}{} } metaBucket, err := tx.CreateBucketIfNotExists(graphMetaBucket) @@ -689,7 +721,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, // Now that the graph has been pruned, we'll also attempt to // prune any nodes that have had a channel closed within the // latest block. - return c.pruneGraphNodes(tx, nodes, nodesWithChansClosed) + return c.pruneGraphNodes(tx, nodes, edgeIndex) }) if err != nil { return nil, err @@ -698,11 +730,34 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, return chansClosed, nil } +// PruneGraphNodes is a garbage collection method which attempts to prune out +// any nodes from the channel graph that are currently unconnected. This ensure +// that we only maintain a graph of reachable nodes. In the event that a pruned +// node gains more channels, it will be re-added back to the graph. +func (c *ChannelGraph) PruneGraphNodes() error { + return c.db.Update(func(tx *bolt.Tx) error { + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } + edges := tx.Bucket(edgeBucket) + if edges == nil { + return ErrGraphNotFound + } + edgeIndex := edges.Bucket(edgeIndexBucket) + if edgeIndex == nil { + return ErrGraphNoEdgesFound + } + + return c.pruneGraphNodes(tx, nodes, edgeIndex) + }) +} + // pruneGraphNodes attempts to remove any nodes from the graph who have had a // channel closed within the current block. If the node still has existing // channels in the graph, this will act as a no-op. func (c *ChannelGraph) pruneGraphNodes(tx *bolt.Tx, nodes *bolt.Bucket, - nodePubKeys map[[33]byte]struct{}) error { + edgeIndex *bolt.Bucket) error { log.Trace("Pruning nodes from graph with no open channels") @@ -713,38 +768,82 @@ func (c *ChannelGraph) pruneGraphNodes(tx *bolt.Tx, nodes *bolt.Bucket, return err } - // We'll now iterate over every node which had a channel closed and - // check whether they have any other open channels left within the - // graph. If they don't, they'll be pruned from the channel graph. - for nodePubKey := range nodePubKeys { - if bytes.Equal(nodePubKey[:], sourceNode.PubKeyBytes[:]) { - continue - } - - node, err := fetchLightningNode(nodes, nodePubKey[:]) - if err != nil { - continue - } - node.db = c.db - - numChansLeft := 0 - err = node.ForEachChannel(tx, func(*bolt.Tx, *ChannelEdgeInfo, - *ChannelEdgePolicy, *ChannelEdgePolicy) error { - - numChansLeft++ + // We'll use this map to keep count the number of references to a node + // in the graph. A node should only be removed once it has no more + // references in the graph. + nodeRefCounts := make(map[[33]byte]int) + err = nodes.ForEach(func(pubKey, nodeBytes []byte) error { + // If this is the source key, then we skip this + // iteration as the value for this key is a pubKey + // rather than raw node information. + if bytes.Equal(pubKey, sourceKey) || len(pubKey) != 33 { return nil - }) - if err != nil { + } + + var nodePub [33]byte + copy(nodePub[:], pubKey) + nodeRefCounts[nodePub] = 0 + + return nil + }) + if err != nil { + return err + } + + // To ensure we never delete the source node, we'll start off by + // bumping its ref count to 1. + nodeRefCounts[sourceNode.PubKeyBytes] = 1 + + // Next, we'll run through the edgeIndex which maps a channel ID to the + // edge info. We'll use this scan to populate our reference count map + // above. + err = edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error { + // The first 66 bytes of the edge info contain the pubkeys of + // the nodes that this edge attaches. We'll extract them, and + // add them to the ref count map. + var node1, node2 [33]byte + copy(node1[:], edgeInfoBytes[:33]) + copy(node2[:], edgeInfoBytes[33:]) + + // With the nodes extracted, we'll increase the ref count of + // each of the nodes. + nodeRefCounts[node1] += 1 + nodeRefCounts[node2] += 1 + + return nil + }) + if err != nil { + return err + } + + // Finally, we'll make a second pass over the set of nodes, and delete + // any nodes that have a ref count of zero. + var numNodesPruned int + for nodePubKey, refCount := range nodeRefCounts { + // If the ref count of the node isn't zero, then we can safely + // skip it as it still has edges to or from it within the + // graph. + if refCount != 0 { continue } - if numChansLeft == 0 { - err := c.deleteLightningNode(tx, nodePubKey[:]) - if err != nil { - log.Tracef("Unable to prune node %x from the "+ - "graph: %v", nodePubKey, err) - } + // If we reach this point, then there are no longer any edges + // that connect this node, so we can delete it. + if err := c.deleteLightningNode(tx, nodePubKey[:]); err != nil { + log.Warnf("Unable to prune node %x from the "+ + "graph: %v", nodePubKey, err) + continue } + + log.Infof("Pruned unconnected node %x from channel graph", + nodePubKey[:]) + + numNodesPruned++ + } + + if numNodesPruned > 0 { + log.Infof("Pruned %v unconnected nodes from the channel graph", + numNodesPruned) } return nil @@ -2173,8 +2272,9 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelE // Once we have the information about the channels' parameters, // we'll fetch the routing policies for each for the directed // edges. - e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes, - chanID, c.db) + e1, e2, err := fetchChanEdgePolicies( + edgeIndex, edges, nodes, chanID, c.db, + ) if err != nil { return err } @@ -2232,8 +2332,9 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, * } edgeInfo = &edge - e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes, - channelID[:], c.db) + e1, e2, err := fetchChanEdgePolicies( + edgeIndex, edges, nodes, channelID[:], c.db, + ) if err != nil { return err } @@ -2833,7 +2934,8 @@ func deserializeChanEdgePolicy(r io.Reader, node, err := fetchLightningNode(nodes, pub[:]) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to fetch node: %x, %v", + pub[:], err) } edge.Node = &node diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 2533a66a..cc3055b3 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -14,12 +14,12 @@ import ( "testing" "time" - "github.com/coreos/bbolt" - "github.com/davecgh/go-spew/spew" - "github.com/lightningnetwork/lnd/lnwire" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/coreos/bbolt" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnwire" ) var ( @@ -2058,6 +2058,155 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { } } +// TestPruneGraphNodes tests that unconnected vertexes are pruned via the +// PruneSyncState method. +func TestPruneGraphNodes(t *testing.T) { + t.Parallel() + + db, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + + // We'll start off by inserting our source node, to ensure that it's + // the only node left after we prune the graph. + graph := db.ChannelGraph() + sourceNode, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create source node: %v", err) + } + if err := graph.SetSourceNode(sourceNode); err != nil { + t.Fatalf("unable to set source node: %v", err) + } + + // With the source node inserted, we'll now add three nodes to the + // channel graph, at the end of the scenario, only two of these nodes + // should still be in the graph. + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node1); err != nil { + t.Fatalf("unable to add node: %v", err) + } + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node2); err != nil { + t.Fatalf("unable to add node: %v", err) + } + node3, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node3); err != nil { + t.Fatalf("unable to add node: %v", err) + } + + // We'll now add a new edge to the graph, but only actually advertise + // the edge of *one* of the nodes. + edgeInfo, chanID := createEdge(100, 0, 0, 0, node1, node2) + if err := graph.AddChannelEdge(&edgeInfo); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + // We'll now insert an advertised edge, but it'll only be the edge that + // points from the first to the second node. + edge1 := randEdgePolicy(chanID.ToUint64(), edgeInfo.ChannelPoint, db) + edge1.Flags = 0 + edge1.Node = node1 + edge1.SigBytes = testSig.Serialize() + if err := graph.UpdateEdgePolicy(edge1); err != nil { + t.Fatalf("unable to update edge: %v", err) + } + + // We'll now initiate a around of graph pruning. + if err := graph.PruneGraphNodes(); err != nil { + t.Fatalf("unable to prune graph nodes: %v", err) + } + + // At this point, there should be 3 nodes left in the graph still: the + // source node (which can't be pruned), and node 1+2. Nodes 1 and two + // should still be left in the graph as there's half of an advertised + // edge between them. + assertNumNodes(t, graph, 3) + + // Finally, we'll ensure that node3, the only fully unconnected node as + // properly deleted from the graph and not another node in its place. + node3Pub, err := node3.PubKey() + if err != nil { + t.Fatalf("unable to fetch the pubkey of node3: %v", err) + } + if _, err := graph.FetchLightningNode(node3Pub); err == nil { + t.Fatalf("node 3 should have been deleted!") + } +} + +// TestAddChannelEdgeShellNodes tests that when we attempt to add a ChannelEdge +// to the graph, one or both of the nodes the edge involves aren't found in the +// database, then shell edges are created for each node if needed. +func TestAddChannelEdgeShellNodes(t *testing.T) { + t.Parallel() + + db, cleanUp, err := makeTestDB() + defer cleanUp() + if err != nil { + t.Fatalf("unable to make test database: %v", err) + } + + graph := db.ChannelGraph() + + // To start, we'll create two nodes, and only add one of them to the + // channel graph. + node1, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + if err := graph.AddLightningNode(node1); err != nil { + t.Fatalf("unable to add node: %v", err) + } + node2, err := createTestVertex(db) + if err != nil { + t.Fatalf("unable to create test node: %v", err) + } + + // We'll now create an edge between the two nodes, as a result, node2 + // should be inserted into the database as a shell node. + edgeInfo, _ := createEdge(100, 0, 0, 0, node1, node2) + if err := graph.AddChannelEdge(&edgeInfo); err != nil { + t.Fatalf("unable to add edge: %v", err) + } + + node1Pub, err := node1.PubKey() + if err != nil { + t.Fatalf("unable to parse node 1 pub: %v", err) + } + node2Pub, err := node2.PubKey() + if err != nil { + t.Fatalf("unable to parse node 2 pub: %v", err) + } + + // Ensure that node1 was inserted as a full node, while node2 only has + // a shell node present. + node1, err = graph.FetchLightningNode(node1Pub) + if err != nil { + t.Fatalf("unable to fetch node1: %v", err) + } + if !node1.HaveNodeAnnouncement { + t.Fatalf("have shell announcement for node1, shouldn't") + } + + node2, err = graph.FetchLightningNode(node2Pub) + if err != nil { + t.Fatalf("unable to fetch node2: %v", err) + } + if node2.HaveNodeAnnouncement { + t.Fatalf("should have shell announcement for node2, but is full") + } +} + // compareNodes is used to compare two LightningNodes while excluding the // Features struct, which cannot be compared as the semantics for reserializing // the featuresMap have not been defined. diff --git a/routing/router.go b/routing/router.go index 4d62de28..0e5b6e24 100644 --- a/routing/router.go +++ b/routing/router.go @@ -9,6 +9,9 @@ import ( "sync/atomic" "time" + "github.com/btcsuite/btcd/btcec" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btcutil" "github.com/coreos/bbolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" @@ -17,9 +20,6 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/multimutex" "github.com/lightningnetwork/lnd/routing/chainview" - "github.com/btcsuite/btcd/btcec" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" "crypto/sha256" @@ -382,6 +382,13 @@ func (r *ChannelRouter) Start() error { return err } + // Finally, before we proceed, we'll prune any unconnected nodes from + // the graph in order to ensure we maintain a tight graph of "useful" + // nodes. + if err := r.cfg.Graph.PruneGraphNodes(); err != nil { + return err + } + r.wg.Add(1) go r.networkHandler() @@ -949,34 +956,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { "chan_id=%v", msg.ChannelID) } - // Query the database for the existence of the two nodes in this - // channel. If not found, add a partial node to the database, - // containing only the node keys. - _, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey1Bytes) - if !exists { - node1 := &channeldb.LightningNode{ - PubKeyBytes: msg.NodeKey1Bytes, - HaveNodeAnnouncement: false, - } - err := r.cfg.Graph.AddLightningNode(node1) - if err != nil { - return errors.Errorf("unable to add node %v to"+ - " the graph: %v", node1.PubKeyBytes, err) - } - } - _, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey2Bytes) - if !exists { - node2 := &channeldb.LightningNode{ - PubKeyBytes: msg.NodeKey2Bytes, - HaveNodeAnnouncement: false, - } - err := r.cfg.Graph.AddLightningNode(node2) - if err != nil { - return errors.Errorf("unable to add node %v to"+ - " the graph: %v", node2.PubKeyBytes, err) - } - } - // Before we can add the channel to the channel graph, we need // to obtain the full funding outpoint that's encoded within // the channel ID.