From 3555d3dbd431fe3ddf2660d4dc270b5a595fe517 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 13 Mar 2019 13:13:36 -0700 Subject: [PATCH 1/3] channeldb: convert all Update calls to use Batch In this commit, we convert all the `Update` calls which are serial, to use `Batch` calls which are optimistically batched together for concurrent writers. This should increase performance slightly during the initial graph sync, and also updates at tip as we can coalesce more of these individual transactions into a single transaction. --- channeldb/graph.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/channeldb/graph.go b/channeldb/graph.go index c13e1879..2d924d94 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -326,7 +326,7 @@ func (c *ChannelGraph) sourceNode(nodes *bbolt.Bucket) (*LightningNode, error) { func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { nodePubBytes := node.PubKeyBytes[:] - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { // First grab the nodes bucket which stores the mapping from // pubKey to node information. nodes, err := tx.CreateBucketIfNotExists(nodeBucket) @@ -355,7 +355,7 @@ func (c *ChannelGraph) SetSourceNode(node *LightningNode) error { // // TODO(roasbeef): also need sig of announcement func (c *ChannelGraph) AddLightningNode(node *LightningNode) error { - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { return addLightningNode(tx, node) }) } @@ -419,7 +419,7 @@ func (c *ChannelGraph) LookupAlias(pub *btcec.PublicKey) (string, error) { // from the database according to the node's public key. func (c *ChannelGraph) DeleteLightningNode(nodePub *btcec.PublicKey) error { // TODO(roasbeef): ensure dangling edges are removed... - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { nodes := tx.Bucket(nodeBucket) if nodes == nil { return ErrGraphNodeNotFound @@ -483,7 +483,7 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket, // the channel supports. The chanPoint and chanID are used to uniquely identify // the edge globally within the database. func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error { - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { return c.addChannelEdge(tx, edge) }) } @@ -657,7 +657,7 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *ChannelEdgeInfo) error { var chanKey [8]byte binary.BigEndian.PutUint64(chanKey[:], edge.ChannelID) - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { edges := tx.Bucket(edgeBucket) if edge == nil { return ErrEdgeNotFound @@ -697,7 +697,9 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, var chansClosed []*ChannelEdgeInfo - err := c.db.Update(func(tx *bbolt.Tx) error { + err := c.db.Batch(func(tx *bbolt.Tx) error { + chansClosed = nil + // First grab the edges bucket which houses the information // we'd like to delete edges, err := tx.CreateBucketIfNotExists(edgeBucket) @@ -801,7 +803,7 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, // that we only maintain a graph of reachable nodes. In the event that a pruned // node gains more channels, it will be re-added back to the graph. func (c *ChannelGraph) PruneGraphNodes() error { - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { nodes := tx.Bucket(nodeBucket) if nodes == nil { return ErrGraphNodesNotFound @@ -946,7 +948,9 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf // Keep track of the channels that are removed from the graph. var removedChans []*ChannelEdgeInfo - if err := c.db.Update(func(tx *bbolt.Tx) error { + if err := c.db.Batch(func(tx *bbolt.Tx) error { + removedChans = nil + edges, err := tx.CreateBucketIfNotExists(edgeBucket) if err != nil { return err @@ -1070,7 +1074,7 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error { // channels // TODO(roasbeef): don't delete both edges? - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { // First grab the edges bucket which houses the information // we'd like to delete edges := tx.Bucket(edgeBucket) @@ -1642,7 +1646,7 @@ func delChannelByEdge(edges *bbolt.Bucket, edgeIndex *bbolt.Bucket, // determined by the lexicographical ordering of the identity public keys of // the nodes on either side of the channel. func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error { - return c.db.Update(func(tx *bbolt.Tx) error { + return c.db.Batch(func(tx *bbolt.Tx) error { return updateEdgePolicy(tx, edge) }) } From e8da6dd0b48843a48cecdcff894f48e8020d940e Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 13 Mar 2019 13:32:45 -0700 Subject: [PATCH 2/3] channeldb: convert concurrent channel state machine calls to use Batch --- channeldb/channel.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 486b66fd..2903eca9 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -1027,7 +1027,7 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { return ErrNoRestoredChannelMutation } - err := c.Db.Update(func(tx *bbolt.Tx) error { + err := c.Db.Batch(func(tx *bbolt.Tx) error { chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, ) @@ -1465,7 +1465,7 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { return ErrNoRestoredChannelMutation } - return c.Db.Update(func(tx *bbolt.Tx) error { + return c.Db.Batch(func(tx *bbolt.Tx) error { // First, we'll grab the writable bucket where this channel's // data resides. chanBucket, err := fetchChanBucket( @@ -1608,7 +1608,9 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { var newRemoteCommit *ChannelCommitment - err := c.Db.Update(func(tx *bbolt.Tx) error { + err := c.Db.Batch(func(tx *bbolt.Tx) error { + newRemoteCommit = nil + chanBucket, err := fetchChanBucket( tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, ) @@ -1746,7 +1748,7 @@ func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error { c.Lock() defer c.Unlock() - return c.Db.Update(func(tx *bbolt.Tx) error { + return c.Db.Batch(func(tx *bbolt.Tx) error { return c.Packager.AckAddHtlcs(tx, addRefs...) }) } @@ -1759,7 +1761,7 @@ func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error { c.Lock() defer c.Unlock() - return c.Db.Update(func(tx *bbolt.Tx) error { + return c.Db.Batch(func(tx *bbolt.Tx) error { return c.Packager.AckSettleFails(tx, settleFailRefs...) }) } @@ -1770,7 +1772,7 @@ func (c *OpenChannel) SetFwdFilter(height uint64, fwdFilter *PkgFilter) error { c.Lock() defer c.Unlock() - return c.Db.Update(func(tx *bbolt.Tx) error { + return c.Db.Batch(func(tx *bbolt.Tx) error { return c.Packager.SetFwdFilter(tx, height, fwdFilter) }) } @@ -1783,14 +1785,15 @@ func (c *OpenChannel) RemoveFwdPkg(height uint64) error { c.Lock() defer c.Unlock() - return c.Db.Update(func(tx *bbolt.Tx) error { + return c.Db.Batch(func(tx *bbolt.Tx) error { return c.Packager.RemovePkg(tx, height) }) } // RevocationLogTail returns the "tail", or the end of the current revocation // log. This entry represents the last previous state for the remote node's -// commitment chain. The ChannelDelta returned by this method will always lag one state behind the most current (unrevoked) state of the remote node's +// commitment chain. The ChannelDelta returned by this method will always lag +// one state behind the most current (unrevoked) state of the remote node's // commitment chain. func (c *OpenChannel) RevocationLogTail() (*ChannelCommitment, error) { c.RLock() From da76c34418a40761fdc99e11200755661699f7eb Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 13 Mar 2019 13:34:19 -0700 Subject: [PATCH 3/3] channeldb: convert invoice settle/cancel calls to use Batch --- channeldb/invoices.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/channeldb/invoices.go b/channeldb/invoices.go index 47a8e199..5e3c6202 100644 --- a/channeldb/invoices.go +++ b/channeldb/invoices.go @@ -242,7 +242,9 @@ func (d *DB) AddInvoice(newInvoice *Invoice, paymentHash lntypes.Hash) ( } var invoiceAddIndex uint64 - err := d.Update(func(tx *bbolt.Tx) error { + err := d.Batch(func(tx *bbolt.Tx) error { + invoiceAddIndex = 0 + invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err @@ -635,7 +637,9 @@ func (d *DB) AcceptOrSettleInvoice(paymentHash [32]byte, amtPaid lnwire.MilliSatoshi) (*Invoice, error) { var settledInvoice *Invoice - err := d.Update(func(tx *bbolt.Tx) error { + err := d.Batch(func(tx *bbolt.Tx) error { + settledInvoice = nil + invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err @@ -714,7 +718,9 @@ func (d *DB) SettleHoldInvoice(preimage lntypes.Preimage) (*Invoice, error) { // payment hash. func (d *DB) CancelInvoice(paymentHash lntypes.Hash) (*Invoice, error) { var canceledInvoice *Invoice - err := d.Update(func(tx *bbolt.Tx) error { + err := d.Batch(func(tx *bbolt.Tx) error { + canceledInvoice = nil + invoices, err := tx.CreateBucketIfNotExists(invoiceBucket) if err != nil { return err