Merge pull request #1745 from halseth/channeldb-avoid-create-buckets

channeldb: avoid creating empty buckets
This commit is contained in:
Olaoluwa Osuntokun 2018-12-17 20:01:29 -08:00 committed by GitHub
commit 11c24d3e0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 77 deletions

@ -277,7 +277,14 @@ func (c *ChannelGraph) ForEachNode(tx *bbolt.Tx, cb func(*bbolt.Tx, *LightningNo
func (c *ChannelGraph) SourceNode() (*LightningNode, error) { func (c *ChannelGraph) SourceNode() (*LightningNode, error) {
var source *LightningNode var source *LightningNode
err := c.db.View(func(tx *bbolt.Tx) error { err := c.db.View(func(tx *bbolt.Tx) error {
node, err := c.sourceNode(tx) // First grab the nodes bucket which stores the mapping from
// pubKey to node information.
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}
node, err := c.sourceNode(nodes)
if err != nil { if err != nil {
return err return err
} }
@ -296,14 +303,7 @@ func (c *ChannelGraph) SourceNode() (*LightningNode, error) {
// of the graph. The source node is treated as the center node within a // of the graph. The source node is treated as the center node within a
// star-graph. This method may be used to kick off a path finding algorithm in // star-graph. This method may be used to kick off a path finding algorithm in
// order to explore the reachability of another node based off the source node. // order to explore the reachability of another node based off the source node.
func (c *ChannelGraph) sourceNode(tx *bbolt.Tx) (*LightningNode, error) { func (c *ChannelGraph) sourceNode(nodes *bbolt.Bucket) (*LightningNode, error) {
// First grab the nodes bucket which stores the mapping from
// pubKey to node information.
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return nil, ErrGraphNotFound
}
selfPub := nodes.Get(sourceKey) selfPub := nodes.Get(sourceKey)
if selfPub == nil { if selfPub == nil {
return nil, ErrSourceNodeNotSet return nil, ErrSourceNodeNotSet
@ -420,20 +420,22 @@ func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
func (c *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error { func (c *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error {
// TODO(roasbeef): ensure dangling edges are removed... // TODO(roasbeef): ensure dangling edges are removed...
return c.db.Update(func(tx *bbolt.Tx) error { return c.db.Update(func(tx *bbolt.Tx) error {
return c.deleteLightningNode(tx, nodePub.SerializeCompressed()) nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodeNotFound
}
return c.deleteLightningNode(
nodes, nodePub.SerializeCompressed(),
)
}) })
} }
// deleteLightningNode uses an existing database transaction to remove a // deleteLightningNode uses an existing database transaction to remove a
// vertex/node from the database according to the node's public key. // vertex/node from the database according to the node's public key.
func (c *ChannelGraph) deleteLightningNode(tx *bbolt.Tx, func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket,
compressedPubKey []byte) error { compressedPubKey []byte) error {
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodesNotFound
}
aliases := nodes.Bucket(aliasIndexBucket) aliases := nodes.Bucket(aliasIndexBucket)
if aliases == nil { if aliases == nil {
return ErrGraphNodesNotFound return ErrGraphNodesNotFound
@ -652,13 +654,14 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error {
binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
return c.db.Update(func(tx *bbolt.Tx) error { return c.db.Update(func(tx *bbolt.Tx) error {
edges, err := tx.CreateBucketIfNotExists(edgeBucket) edges := tx.Bucket(edgeBucket)
if err != nil { if edge == nil {
return err return ErrEdgeNotFound
} }
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil { edgeIndex := edges.Bucket(edgeIndexBucket)
return err if edgeIndex == nil {
return ErrEdgeNotFound
} }
if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo == nil { if edgeInfo := edgeIndex.Get(chanKey[:]); edgeInfo == nil {
@ -707,9 +710,9 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
if err != nil { if err != nil {
return err return err
} }
nodes, err := tx.CreateBucketIfNotExists(nodeBucket) nodes := tx.Bucket(nodeBucket)
if err != nil { if nodes == nil {
return err return ErrSourceNodeNotSet
} }
// For each of the outpoints that have been spent within the // For each of the outpoints that have been spent within the
@ -780,7 +783,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// Now that the graph has been pruned, we'll also attempt to // Now that the graph has been pruned, we'll also attempt to
// prune any nodes that have had a channel closed within the // prune any nodes that have had a channel closed within the
// latest block. // latest block.
return c.pruneGraphNodes(tx, nodes, edgeIndex) return c.pruneGraphNodes(nodes, edgeIndex)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -795,9 +798,9 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// node gains more channels, it will be re-added back to the graph. // node gains more channels, it will be re-added back to the graph.
func (c *ChannelGraph) PruneGraphNodes() error { func (c *ChannelGraph) PruneGraphNodes() error {
return c.db.Update(func(tx *bbolt.Tx) error { return c.db.Update(func(tx *bbolt.Tx) error {
nodes, err := tx.CreateBucketIfNotExists(nodeBucket) nodes := tx.Bucket(nodeBucket)
if err != nil { if nodes == nil {
return err return ErrGraphNodesNotFound
} }
edges := tx.Bucket(edgeBucket) edges := tx.Bucket(edgeBucket)
if edges == nil { if edges == nil {
@ -808,21 +811,21 @@ func (c *ChannelGraph) PruneGraphNodes() error {
return ErrGraphNoEdgesFound return ErrGraphNoEdgesFound
} }
return c.pruneGraphNodes(tx, nodes, edgeIndex) return c.pruneGraphNodes(nodes, edgeIndex)
}) })
} }
// pruneGraphNodes attempts to remove any nodes from the graph who have had a // pruneGraphNodes attempts to remove any nodes from the graph who have had a
// channel closed within the current block. If the node still has existing // channel closed within the current block. If the node still has existing
// channels in the graph, this will act as a no-op. // channels in the graph, this will act as a no-op.
func (c *ChannelGraph) pruneGraphNodes(tx *bbolt.Tx, nodes *bbolt.Bucket, func (c *ChannelGraph) pruneGraphNodes(nodes *bbolt.Bucket,
edgeIndex *bbolt.Bucket) error { edgeIndex *bbolt.Bucket) error {
log.Trace("Pruning nodes from graph with no open channels") log.Trace("Pruning nodes from graph with no open channels")
// We'll retrieve the graph's source node to ensure we don't remove it // We'll retrieve the graph's source node to ensure we don't remove it
// even if it no longer has any open channels. // even if it no longer has any open channels.
sourceNode, err := c.sourceNode(tx) sourceNode, err := c.sourceNode(nodes)
if err != nil { if err != nil {
return err return err
} }
@ -888,7 +891,7 @@ func (c *ChannelGraph) pruneGraphNodes(tx *bbolt.Tx, nodes *bbolt.Bucket,
// If we reach this point, then there are no longer any edges // If we reach this point, then there are no longer any edges
// that connect this node, so we can delete it. // that connect this node, so we can delete it.
if err := c.deleteLightningNode(tx, nodePubKey[:]); err != nil { if err := c.deleteLightningNode(nodes, nodePubKey[:]); err != nil {
log.Warnf("Unable to prune node %x from the "+ log.Warnf("Unable to prune node %x from the "+
"graph: %v", nodePubKey, err) "graph: %v", nodePubKey, err)
continue continue
@ -1066,25 +1069,31 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
return c.db.Update(func(tx *bbolt.Tx) error { return c.db.Update(func(tx *bbolt.Tx) error {
// First grab the edges bucket which houses the information // First grab the edges bucket which houses the information
// we'd like to delete // we'd like to delete
edges, err := tx.CreateBucketIfNotExists(edgeBucket) edges := tx.Bucket(edgeBucket)
if err != nil { if edges == nil {
return err return ErrEdgeNotFound
}
// Next grab the two edge indexes which will also need to be updated.
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil {
return err
}
chanIndex, err := edges.CreateBucketIfNotExists(channelPointBucket)
if err != nil {
return err
}
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil {
return err
} }
return delChannelByEdge(edges, edgeIndex, chanIndex, nodes, chanPoint) // Next grab the two edge indexes which will also need to be
// updated.
edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrEdgeNotFound
}
chanIndex := edges.Bucket(channelPointBucket)
if chanIndex == nil {
return ErrEdgeNotFound
}
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodeNotFound
}
return delChannelByEdge(
edges, edgeIndex, chanIndex, nodes, chanPoint,
)
}) })
} }
@ -1521,11 +1530,10 @@ func delEdgeUpdateIndexEntry(edgesBucket *bbolt.Bucket, chanID uint64,
// First, we'll fetch the edge update index bucket which currently // First, we'll fetch the edge update index bucket which currently
// stores an entry for the channel we're about to delete. // stores an entry for the channel we're about to delete.
updateIndex, err := edgesBucket.CreateBucketIfNotExists( updateIndex := edgesBucket.Bucket(edgeUpdateIndexBucket)
edgeUpdateIndexBucket, if updateIndex == nil {
) // No edges in bucket, return early.
if err != nil { return nil
return err
} }
// Now that we have the bucket, we'll attempt to construct a template // Now that we have the bucket, we'll attempt to construct a template
@ -1631,13 +1639,14 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket,
// the nodes on either side of the channel. // the nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return c.db.Update(func(tx *bbolt.Tx) error { return c.db.Update(func(tx *bbolt.Tx) error {
edges, err := tx.CreateBucketIfNotExists(edgeBucket) edges := tx.Bucket(edgeBucket)
if err != nil { if edge == nil {
return err return ErrEdgeNotFound
} }
edgeIndex, err := edges.CreateBucketIfNotExists(edgeIndexBucket)
if err != nil { edgeIndex := edges.Bucket(edgeIndexBucket)
return err if edgeIndex == nil {
return ErrEdgeNotFound
} }
nodes, err := tx.CreateBucketIfNotExists(nodeBucket) nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil { if err != nil {

@ -445,7 +445,8 @@ func (r *ChannelRouter) Start() error {
// Finally, before we proceed, we'll prune any unconnected nodes from // Finally, before we proceed, we'll prune any unconnected nodes from
// the graph in order to ensure we maintain a tight graph of "useful" // the graph in order to ensure we maintain a tight graph of "useful"
// nodes. // nodes.
if err := r.cfg.Graph.PruneGraphNodes(); err != nil { err = r.cfg.Graph.PruneGraphNodes()
if err != nil && err != channeldb.ErrGraphNodesNotFound {
return err return err
} }

@ -170,10 +170,10 @@ func TestFindRoutesFeeSorting(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath) ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// In this test we'd like to ensure proper integration of the various // In this test we'd like to ensure proper integration of the various
// functions that are involved in path finding, and also route // functions that are involved in path finding, and also route
@ -225,10 +225,10 @@ func TestFindRoutesWithFeeLimit(t *testing.T) {
ctx, cleanUp, err := createTestCtxFromFile( ctx, cleanUp, err := createTestCtxFromFile(
startingBlockHeight, basicGraphFilePath, startingBlockHeight, basicGraphFilePath,
) )
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// This test will attempt to find routes from roasbeef to sophon for 100 // This test will attempt to find routes from roasbeef to sophon for 100
// satoshis with a fee limit of 10 satoshis. There are two routes from // satoshis with a fee limit of 10 satoshis. There are two routes from
@ -280,10 +280,10 @@ func TestSendPaymentRouteFailureFallback(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath) ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// Craft a LightningPayment struct that'll send a payment from roasbeef // Craft a LightningPayment struct that'll send a payment from roasbeef
// to luo ji for 1000 satoshis, with a maximum of 1000 satoshis in fees. // to luo ji for 1000 satoshis, with a maximum of 1000 satoshis in fees.
@ -519,10 +519,10 @@ func TestSendPaymentErrorRepeatedFeeInsufficient(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath) ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// Craft a LightningPayment struct that'll send a payment from roasbeef // Craft a LightningPayment struct that'll send a payment from roasbeef
// to luo ji for 100 satoshis. // to luo ji for 100 satoshis.
@ -623,10 +623,10 @@ func TestSendPaymentErrorNonFinalTimeLockErrors(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath) ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// Craft a LightningPayment struct that'll send a payment from roasbeef // Craft a LightningPayment struct that'll send a payment from roasbeef
// to sophon for 1k satoshis. // to sophon for 1k satoshis.
@ -758,10 +758,10 @@ func TestSendPaymentErrorPathPruning(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath) ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// Craft a LightningPayment struct that'll send a payment from roasbeef // Craft a LightningPayment struct that'll send a payment from roasbeef
// to luo ji for 1000 satoshis, with a maximum of 1000 satoshis in fees. // to luo ji for 1000 satoshis, with a maximum of 1000 satoshis in fees.
@ -1000,10 +1000,10 @@ func TestIgnoreNodeAnnouncement(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight,
basicGraphFilePath) basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
pub := priv1.PubKey() pub := priv1.PubKey()
node := &channeldb.LightningNode{ node := &channeldb.LightningNode{
@ -1033,10 +1033,10 @@ func TestAddEdgeUnknownVertexes(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight,
basicGraphFilePath) basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
var pub1 [33]byte var pub1 [33]byte
copy(pub1[:], priv1.PubKey().SerializeCompressed()) copy(pub1[:], priv1.PubKey().SerializeCompressed())
@ -1302,10 +1302,10 @@ func TestWakeUpOnStaleBranch(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight) ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
const chanValue = 10000 const chanValue = 10000
@ -1505,10 +1505,10 @@ func TestDisconnectedBlocks(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight) ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
const chanValue = 10000 const chanValue = 10000
@ -1695,10 +1695,10 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight) ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
const chanValue = 10000 const chanValue = 10000
@ -1848,10 +1848,10 @@ func TestFindPathFeeWeighting(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath) ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
var preImage [32]byte var preImage [32]byte
copy(preImage[:], bytes.Repeat([]byte{9}, 32)) copy(preImage[:], bytes.Repeat([]byte{9}, 32))
@ -1906,10 +1906,10 @@ func TestIsStaleNode(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight) ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// Before we can insert a node in to the database, we need to create a // Before we can insert a node in to the database, we need to create a
// channel that it's linked to. // channel that it's linked to.
@ -1988,10 +1988,10 @@ func TestIsKnownEdge(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight) ctx, cleanUp, err := createTestCtxSingleNode(startingBlockHeight)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// First, we'll create a new channel edge (just the info) and insert it // First, we'll create a new channel edge (just the info) and insert it
// into the database. // into the database.
@ -2041,10 +2041,10 @@ func TestIsStaleEdgePolicy(t *testing.T) {
const startingBlockHeight = 101 const startingBlockHeight = 101
ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight, ctx, cleanUp, err := createTestCtxFromFile(startingBlockHeight,
basicGraphFilePath) basicGraphFilePath)
defer cleanUp()
if err != nil { if err != nil {
t.Fatalf("unable to create router: %v", err) t.Fatalf("unable to create router: %v", err)
} }
defer cleanUp()
// First, we'll create a new channel edge (just the info) and insert it // First, we'll create a new channel edge (just the info) and insert it
// into the database. // into the database.