channeldb+routing: move adding shell nodes into db txn of AddChannelEdge
In this commit, we fix a slight race condition that can occur when we go to add a shell node for a node announcement, but then right afterwards, a new block arrives that causes us to prune an unconnected node. To ensure this doesn't happen, we now add shell nodes within the same db transaction as AddChannelEdge. This ensures that the state is fully consistent and shell nodes will be added atomically along with the new channel edge. As a result of this change, we no longer need to add shell nodes within the ChannelRouter, as the database will take care of this operation as it should.
This commit is contained in:
parent
8519ea869d
commit
2e75499787
@ -439,6 +439,10 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
|
||||
binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
|
||||
|
||||
return c.db.Update(func(tx *bolt.Tx) error {
|
||||
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -459,6 +463,45 @@ func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
|
||||
return ErrEdgeAlreadyExist
|
||||
}
|
||||
|
||||
// Before we insert the channel into the database, we'll ensure
|
||||
// that both nodes already exist in the channel graph. If
|
||||
// either node doesn't, then we'll insert a "shell" node that
|
||||
// just includes its public key, so subsequent validation and
|
||||
// queries can work properly.
|
||||
_, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
|
||||
switch {
|
||||
case node1Err == ErrGraphNodeNotFound:
|
||||
node1Shell := LightningNode{
|
||||
PubKeyBytes: edge.NodeKey1Bytes,
|
||||
HaveNodeAnnouncement: false,
|
||||
}
|
||||
err := addLightningNode(tx, &node1Shell)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create shell node "+
|
||||
"for: %x", edge.NodeKey1Bytes)
|
||||
|
||||
}
|
||||
case node1Err != nil:
|
||||
return err
|
||||
}
|
||||
|
||||
_, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
|
||||
switch {
|
||||
case node2Err == ErrGraphNodeNotFound:
|
||||
node2Shell := LightningNode{
|
||||
PubKeyBytes: edge.NodeKey2Bytes,
|
||||
HaveNodeAnnouncement: false,
|
||||
}
|
||||
err := addLightningNode(tx, &node2Shell)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create shell node "+
|
||||
"for: %x", edge.NodeKey2Bytes)
|
||||
|
||||
}
|
||||
case node2Err != nil:
|
||||
return err
|
||||
}
|
||||
|
||||
// If the edge hasn't been created yet, then we'll first add it
|
||||
// to the edge index in order to associate the edge between two
|
||||
// nodes and also store the static components of the channel.
|
||||
@ -2229,8 +2272,9 @@ func (c *ChannelGraph) FetchChannelEdgesByOutpoint(op *wire.OutPoint) (*ChannelE
|
||||
// Once we have the information about the channels' parameters,
|
||||
// we'll fetch the routing policies for each for the directed
|
||||
// edges.
|
||||
e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes,
|
||||
chanID, c.db)
|
||||
e1, e2, err := fetchChanEdgePolicies(
|
||||
edgeIndex, edges, nodes, chanID, c.db,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2288,8 +2332,9 @@ func (c *ChannelGraph) FetchChannelEdgesByID(chanID uint64) (*ChannelEdgeInfo, *
|
||||
}
|
||||
edgeInfo = &edge
|
||||
|
||||
e1, e2, err := fetchChanEdgePolicies(edgeIndex, edges, nodes,
|
||||
channelID[:], c.db)
|
||||
e1, e2, err := fetchChanEdgePolicies(
|
||||
edgeIndex, edges, nodes, channelID[:], c.db,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -2889,7 +2934,8 @@ func deserializeChanEdgePolicy(r io.Reader,
|
||||
|
||||
node, err := fetchLightningNode(nodes, pub[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("unable to fetch node: %x, %v",
|
||||
pub[:], err)
|
||||
}
|
||||
|
||||
edge.Node = &node
|
||||
|
@ -956,34 +956,6 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
|
||||
"chan_id=%v", msg.ChannelID)
|
||||
}
|
||||
|
||||
// Query the database for the existence of the two nodes in this
|
||||
// channel. If not found, add a partial node to the database,
|
||||
// containing only the node keys.
|
||||
_, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey1Bytes)
|
||||
if !exists {
|
||||
node1 := &channeldb.LightningNode{
|
||||
PubKeyBytes: msg.NodeKey1Bytes,
|
||||
HaveNodeAnnouncement: false,
|
||||
}
|
||||
err := r.cfg.Graph.AddLightningNode(node1)
|
||||
if err != nil {
|
||||
return errors.Errorf("unable to add node %v to"+
|
||||
" the graph: %v", node1.PubKeyBytes, err)
|
||||
}
|
||||
}
|
||||
_, exists, _ = r.cfg.Graph.HasLightningNode(msg.NodeKey2Bytes)
|
||||
if !exists {
|
||||
node2 := &channeldb.LightningNode{
|
||||
PubKeyBytes: msg.NodeKey2Bytes,
|
||||
HaveNodeAnnouncement: false,
|
||||
}
|
||||
err := r.cfg.Graph.AddLightningNode(node2)
|
||||
if err != nil {
|
||||
return errors.Errorf("unable to add node %v to"+
|
||||
" the graph: %v", node2.PubKeyBytes, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Before we can add the channel to the channel graph, we need
|
||||
// to obtain the full funding outpoint that's encoded within
|
||||
// the channel ID.
|
||||
|
Loading…
Reference in New Issue
Block a user