channeldb: add new methods to allow adding a new edge+policy w/ existing db transaction

This commit is contained in:
Olaoluwa Osuntokun 2018-12-09 19:30:26 -08:00
parent 11c6887ffa
commit c656788b0b
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2
2 changed files with 107 additions and 105 deletions

@ -483,101 +483,105 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket,
// the channel supports. The chanPoint and chanID are used to uniquely identify
// the edge globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return c.db.Update(func(tx *bbolt.Tx) error {
return c.addChannelEdge(tx, edge)
})
}
// addChannelEdge is the private form of AddChannelEdge that allows callers to
// utilize an existing db transaction.
func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error {
// Construct the channel's primary key which is the 8-byte channel ID.
var chanKey [8]byte
binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
return c.db.Update(func(tx *bbolt.Tx) error {
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
}
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
}
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
if err != nil {
return err
}
// First, attempt to check if this edge has already been created. If
// so, then we can exit early as this method is meant to be idempotent.
if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
return ErrEdgeAlreadyExist
}
// Before we insert the channel into the database, we'll ensure that
// both nodes already exist in the channel graph. If either node
// doesn't, then we'll insert a "shell" node that just includes its
// public key, so subsequent validation and queries can work properly.
_, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
switch {
case node1Err == ErrGraphNodeNotFound:
node1Shell := LightningNode{
PubKeyBytes: edge.NodeKey1Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node1Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey1Bytes)
}
case node1Err != nil:
return err
}
_, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
switch {
case node2Err == ErrGraphNodeNotFound:
node2Shell := LightningNode{
PubKeyBytes: edge.NodeKey2Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node2Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey2Bytes)
}
case node2Err != nil:
return err
}
// If the edge hasn't been created yet, then we'll first add it to the
// edge index in order to associate the edge between two nodes and also
// store the static components of the channel.
if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
return err
}
// Mark edge policies for both sides as unknown. This is to enable
// efficient incoming channel lookup for a node.
for _, key := range []*[33]byte{&edge.NodeKey1Bytes,
&edge.NodeKey2Bytes} {
err := putChanEdgePolicyUnknown(edges, edge.ChannelID,
key[:])
if err != nil {
return err
}
}
// First, attempt to check if this edge has already been
// created. If so, then we can exit early as this method is
// meant to be idempotent.
if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo != nil {
return ErrEdgeAlreadyExist
}
// Before we insert the channel into the database, we'll ensure
// that both nodes already exist in the channel graph. If
// either node doesn't, then we'll insert a "shell" node that
// just includes its public key, so subsequent validation and
// queries can work properly.
_, node1Err := fetchLightningNode(nodes, edge.NodeKey1Bytes[:])
switch {
case node1Err == ErrGraphNodeNotFound:
node1Shell := LightningNode{
PubKeyBytes: edge.NodeKey1Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node1Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey1Bytes)
}
case node1Err != nil:
return err
}
_, node2Err := fetchLightningNode(nodes, edge.NodeKey2Bytes[:])
switch {
case node2Err == ErrGraphNodeNotFound:
node2Shell := LightningNode{
PubKeyBytes: edge.NodeKey2Bytes,
HaveNodeAnnouncement: false,
}
err := addLightningNode(tx, &node2Shell)
if err != nil {
return fmt.Errorf("unable to create shell node "+
"for: %x", edge.NodeKey2Bytes)
}
case node2Err != nil:
return err
}
// If the edge hasn't been created yet, then we'll first add it
// to the edge index in order to associate the edge between two
// nodes and also store the static components of the channel.
if err := putChanEdgeInfo(edgeIndex, edge, chanKey); err != nil {
return err
}
// Mark edge policies for both sides as unknown. This is to
// enable efficient incoming channel lookup for a node.
for _, key := range []*[33]byte{&edge.NodeKey1Bytes,
&edge.NodeKey2Bytes} {
err := putChanEdgePolicyUnknown(edges, edge.ChannelID,
key[:])
if err != nil {
return err
}
}
// Finally we add it to the channel index which maps channel
// points (outpoints) to the shorter channel ID's.
var b bytes.Buffer
if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil {
return err
}
return chanIndex.Put(b.Bytes(), chanKey[:])
})
// Finally we add it to the channel index which maps channel points
// (outpoints) to the shorter channel ID's.
var b bytes.Buffer
if err := writeOutpoint(&b, &edge.ChannelPoint); err != nil {
return err
}
return chanIndex.Put(b.Bytes(), chanKey[:])
}
// HasChannelEdge returns true if the database knows of a channel edge with the
@ -1639,28 +1643,26 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket,
// the nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return c.db.Update(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrEdgeNotFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrEdgeNotFound
}
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
return updateEdgePolicy(edges, edgeIndex, nodes, edge)
return updateEdgePolicy(tx, edge)
})
}
// updateEdgePolicy attempts to update an edge's policy within the relevant
// buckets using an existing database transaction.
func updateEdgePolicy(edges, edgeIndex, nodes *bbolt.Bucket,
edge *ChannelEdgePolicy) error {
func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrEdgeNotFound
}
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrEdgeNotFound
}
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
}
// Create the channelID key be converting the channel ID
// integer into a byte slice.

@ -563,7 +563,7 @@ func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error {
return err
}
err = updateEdgePolicy(edges, edgeIndex, nodes, edgePolicy)
err = updateEdgePolicy(tx, edgePolicy)
if err != nil {
return err
}