From 3f2aa1c36837ecde53366e02e77db3575f05bd67 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 22 May 2018 16:32:28 -0700 Subject: [PATCH] channeldb: modify updateChanBucket to no longer auto-create buckets In this commit, we modify the existing updateChanBucket function to no longer auto-create buckets if they don't exist. We do this in order to fix a class of bug that could arise wherein after a channel has actually be closed (and the parent buckets removed) a method that mutates the channel state is called, which then re-creates the relevant set of buckets. As a result, subsequent calls to any RPCs which need to read all the channels will fail as most of the fields won't actually be populated. After this commit, the fullSync method is the only one that's able to create the full bucket hierarchy. --- channeldb/channel.go | 89 ++++++++++++++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 24 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index 2dec4741..36e30420 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -504,29 +504,32 @@ func (c *OpenChannel) RefreshShortChanID() error { // updateChanBucket is a helper function that returns a writable bucket that a // channel's data resides in given: the public key for the node, the outpoint, // and the chainhash that the channel resides on. +// +// NOTE: This function assumes that all the relevant descendent buckets already +// exist. func updateChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey, outPoint *wire.OutPoint, chainHash chainhash.Hash) (*bolt.Bucket, error) { // First fetch the top level bucket which stores all data related to // current, active channels. - openChanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) - if err != nil { - return nil, err + openChanBucket := tx.Bucket(openChannelBucket) + if openChanBucket == nil { + return nil, ErrNoChanDBExists } // Within this top level bucket, fetch the bucket dedicated to storing // open channel data specific to the remote node. nodePub := nodeKey.SerializeCompressed() - nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub) - if err != nil { - return nil, err + nodeChanBucket := openChanBucket.Bucket(nodePub) + if nodeChanBucket == nil { + return nil, ErrNoActiveChannels } // We'll then recurse down an additional layer in order to fetch the // bucket for this particular chain. chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(chainHash[:]) if err != nil { - return nil, err + return nil, ErrNodeNotFound } // With the bucket for the node fetched, we can now go down another @@ -535,13 +538,14 @@ func updateChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey, var chanPointBuf bytes.Buffer chanPointBuf.Grow(outPointSize) if err := writeOutpoint(&chanPointBuf, outPoint); err != nil { - return nil, err + return nil, fmt.Errorf("unable to write outpoint: %v", err) } chanBucket, err := chainBucket.CreateBucketIfNotExists( chanPointBuf.Bytes(), ) - if err != nil { - return nil, err + if chanBucket == nil { + return nil, fmt.Errorf("unable to find bucket for "+ + "chan_point=%v", outPoint) } return chanBucket, nil @@ -595,8 +599,39 @@ func readChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey, // to sync the contents of an OpenChannel while re-using an existing database // transaction. func (c *OpenChannel) fullSync(tx *bolt.Tx) error { - chanBucket, err := updateChanBucket(tx, c.IdentityPub, - &c.FundingOutpoint, c.ChainHash) + // First fetch the top level bucket which stores all data related to + // current, active channels. + openChanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) + if err != nil { + return err + } + + // Within this top level bucket, fetch the bucket dedicated to storing + // open channel data specific to the remote node. + nodePub := c.IdentityPub.SerializeCompressed() + nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub) + if err != nil { + return err + } + + // We'll then recurse down an additional layer in order to fetch the + // bucket for this particular chain. + chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(c.ChainHash[:]) + if err != nil { + return err + } + + // With the bucket for the node fetched, we can now go down another + // level, creating the bucket (if it doesn't exist), for this channel + // itself. + var chanPointBuf bytes.Buffer + chanPointBuf.Grow(outPointSize) + if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil { + return err + } + chanBucket, err := chainBucket.CreateBucketIfNotExists( + chanPointBuf.Bytes(), + ) if err != nil { return err } @@ -611,8 +646,9 @@ func (c *OpenChannel) MarkAsOpen(openLoc lnwire.ShortChannelID) error { defer c.Unlock() if err := c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := updateChanBucket(tx, c.IdentityPub, - &c.FundingOutpoint, c.ChainHash) + chanBucket, err := updateChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) if err != nil { return err } @@ -658,8 +694,9 @@ func (c *OpenChannel) MarkCommitmentBroadcasted() error { func (c *OpenChannel) putChanStatus(status ChannelStatus) error { if err := c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := updateChanBucket(tx, c.IdentityPub, - &c.FundingOutpoint, c.ChainHash) + chanBucket, err := updateChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) if err != nil { return err } @@ -795,8 +832,9 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error { defer c.Unlock() err := c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := updateChanBucket(tx, c.IdentityPub, - &c.FundingOutpoint, c.ChainHash) + chanBucket, err := updateChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) if err != nil { return err } @@ -1215,8 +1253,9 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { return c.Db.Update(func(tx *bolt.Tx) error { // First, we'll grab the writable bucket where this channel's // data resides. - chanBucket, err := updateChanBucket(tx, c.IdentityPub, - &c.FundingOutpoint, c.ChainHash) + chanBucket, err := updateChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) if err != nil { return err } @@ -1305,8 +1344,9 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error { c.RemoteNextRevocation = revKey err := c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := updateChanBucket(tx, c.IdentityPub, - &c.FundingOutpoint, c.ChainHash) + chanBucket, err := updateChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) if err != nil { return err } @@ -1334,8 +1374,9 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error { var newRemoteCommit *ChannelCommitment err := c.Db.Update(func(tx *bolt.Tx) error { - chanBucket, err := updateChanBucket(tx, c.IdentityPub, - &c.FundingOutpoint, c.ChainHash) + chanBucket, err := updateChanBucket( + tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash, + ) if err != nil { return err }