channeldb: convert all Update calls to use Batch

In this commit, we convert all the `Update` calls which are serial, to
use `Batch` calls which are optimistically batched together for
concurrent writers. This should increase performance slightly during the
initial graph sync, and also updates at tip as we can coalesce more of
these individual transactions into a single transaction.
This commit is contained in:
Olaoluwa Osuntokun 2019-03-13 13:13:36 -07:00
parent abda447edb
commit 3555d3dbd4

@ -326,7 +326,7 @@ func (c *ChannelGraph) sourceNode(nodes *bbolt.Bucket) (*LightningNode, error) {
func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
nodePubBytes := node.PubKeyBytes[:]
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
// First grab the nodes bucket which stores the mapping from
// pubKey to node information.
nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
@ -355,7 +355,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error {
//
// TODO(roasbeef): also need sig of announcement
func (c *ChannelGraph) AddLightningNode(node *LightningNode) error {
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
return addLightningNode(tx, node)
})
}
@ -419,7 +419,7 @@ func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) {
// from the database according to the node's public key.
func (c *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error {
// TODO(roasbeef): ensure dangling edges are removed...
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodeNotFound
@ -483,7 +483,7 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket,
// the channel supports. The chanPoint and chanID are used to uniquely identify
// the edge globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
return c.addChannelEdge(tx, edge)
})
}
@ -657,7 +657,7 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error {
var chanKey [8]byte
binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID)
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edge == nil {
return ErrEdgeNotFound
@ -697,7 +697,9 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
var chansClosed []*ChannelEdgeInfo
err := c.db.Update(func(tx *bbolt.Tx) error {
err := c.db.Batch(func(tx *bbolt.Tx) error {
chansClosed = nil
// First grab the edges bucket which houses the information
// we'd like to delete
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
@ -801,7 +803,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
// that we only maintain a graph of reachable nodes. In the event that a pruned
// node gains more channels, it will be re-added back to the graph.
func (c *ChannelGraph) PruneGraphNodes() error {
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
nodes := tx.Bucket(nodeBucket)
if nodes == nil {
return ErrGraphNodesNotFound
@ -946,7 +948,9 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
// Keep track of the channels that are removed from the graph.
var removedChans []*ChannelEdgeInfo
if err := c.db.Update(func(tx *bbolt.Tx) error {
if err := c.db.Batch(func(tx *bbolt.Tx) error {
removedChans = nil
edges, err := tx.CreateBucketIfNotExists(edgeBucket)
if err != nil {
return err
@ -1070,7 +1074,7 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// channels
// TODO(roasbeef): don't delete both edges?
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
// First grab the edges bucket which houses the information
// we'd like to delete
edges := tx.Bucket(edgeBucket)
@ -1642,7 +1646,7 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket,
// determined by the lexicographical ordering of the identity public keys of
// the nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return c.db.Update(func(tx *bbolt.Tx) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
return updateEdgePolicy(tx, edge)
})
}