From 2e75499787f284d491e1fe533bc2a03882664997 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 22 Jul 2018 21:02:46 -0700 Subject: [PATCH] channeldb+routing: move adding shell nodes into db txn of AddChannelEdge In this commit, we fix a slight race condition that can occur when we go to add a shell node for a node announcement, but then right afterwards, a new block arrives that causes us to prune an unconnected node. To ensure this doesn't happen, we now add shell nodes within the same db transaction as AddChannelEdge. This ensures that the state is fully consistent and shell nodes will be added atomically along with the new channel edge. As a result of this change, we no longer need to add shell nodes within the ChannelRouter, as the database will take care of this operation as it should. --- channeldb/graph.go | 56 +++++++++++++++++++++++++++++++++++++++++----- routing/router.go | 28 ----------------------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index 11431192..49b73aad 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -439,6 +439,10 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) return c.db.Update(func(tx *bolt.Tx) error { + nodes, err := tx.CreateBucketIfNotExists(nodeBucket) + if err != nil { + return err + } edges, err := tx.CreateBucketIfNotExists(edgeBucket) if err != nil { return err @@ -459,6 +463,45 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { return ErrEdgeAlreadyExist } + // Before we insert the channel into the database, we'll ensure + // that both nodes already exist in the channel graph. If + // either node doesn't, then we'll insert a "shell" node that + // just includes its public key, so subsequent validation and + // queries can work properly. + _, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:]) + switch { + case node1Err == ErrGraphNodeNotFound: + node1Shell := LightningNode{ + PubKeyBytes: edge.NodeKey1Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node1Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey1Bytes) + + } + case node1Err != nil: + return err + } + + _, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:]) + switch { + case node2Err == ErrGraphNodeNotFound: + node2Shell := LightningNode{ + PubKeyBytes: edge.NodeKey2Bytes, + HaveNodeAnnouncement: false, + } + err := addLightningNode(tx, &node2Shell) + if err != nil { + return fmt.Errorf("unable to create shell node "+ + "for: %x", edge.NodeKey2Bytes) + + } + case node2Err != nil: + return err + } + // If the edge hasn't been created yet, then we'll first add it // to the edge index in order to associate the edge between two // nodes and also store the static components of the channel. @@ -2229,8 +2272,9 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelE // Once we have the information about the channels' parameters, // we'll fetch the routing policies for each for the directed // edges. - e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes, - chanID, c.db) + e1, e2, err := fetchChanEdgePolicies( + edgeIndex, edges, nodes, chanID, c.db, + ) if err != nil { return err } @@ -2288,8 +2332,9 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, * } edgeInfo = &edge - e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes, - channelID[:], c.db) + e1, e2, err := fetchChanEdgePolicies( + edgeIndex, edges, nodes, channelID[:], c.db, + ) if err != nil { return err } @@ -2889,7 +2934,8 @@ func deserializeChanEdgePolicy(r io.Reader, node, err := fetchLightningNode(nodes, pub[:]) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to fetch node: %x, %v", + pub[:], err) } edge.Node = &node diff --git a/routing/router.go b/routing/router.go index de465325..0e5b6e24 100644 --- a/routing/router.go +++ b/routing/router.go @@ -956,34 +956,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error { "chan_id=%v", msg.ChannelID) } - // Query the database for the existence of the two nodes in this - // channel. If not found, add a partial node to the database, - // containing only the node keys. - _, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey1Bytes) - if !exists { - node1 := &channeldb.LightningNode{ - PubKeyBytes: msg.NodeKey1Bytes, - HaveNodeAnnouncement: false, - } - err := r.cfg.Graph.AddLightningNode(node1) - if err != nil { - return errors.Errorf("unable to add node %v to"+ - " the graph: %v", node1.PubKeyBytes, err) - } - } - _, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey2Bytes) - if !exists { - node2 := &channeldb.LightningNode{ - PubKeyBytes: msg.NodeKey2Bytes, - HaveNodeAnnouncement: false, - } - err := r.cfg.Graph.AddLightningNode(node2) - if err != nil { - return errors.Errorf("unable to add node %v to"+ - " the graph: %v", node2.PubKeyBytes, err) - } - } - // Before we can add the channel to the channel graph, we need // to obtain the full funding outpoint that's encoded within // the channel ID.