channeldb: modify updateChanBucket to no longer auto-create buckets

In this commit, we modify the existing updateChanBucket function to no
longer auto-create buckets if they don't exist. We do this in order to
fix a class of bug that could arise wherein after a channel has actually
be closed (and the parent buckets removed) a method that mutates the
channel state is called, which then re-creates the relevant set of
buckets. As a result, subsequent calls to any RPCs which need to read
all the channels will fail as most of the fields won't actually be
populated.

After this commit, the fullSync method is the only one that's able to
create the full bucket hierarchy.
This commit is contained in:
Olaoluwa Osuntokun 2018-05-22 16:32:28 -07:00
parent 95293f5102
commit 3f2aa1c368
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -504,29 +504,32 @@ func (c *OpenChannel) RefreshShortChanID() error {
// updateChanBucket is a helper function that returns a writable bucket that a // updateChanBucket is a helper function that returns a writable bucket that a
// channel's data resides in given: the public key for the node, the outpoint, // channel's data resides in given: the public key for the node, the outpoint,
// and the chainhash that the channel resides on. // and the chainhash that the channel resides on.
//
// NOTE: This function assumes that all the relevant descendent buckets already
// exist.
func updateChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey, func updateChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey,
outPoint *wire.OutPoint, chainHash chainhash.Hash) (*bolt.Bucket, error) { outPoint *wire.OutPoint, chainHash chainhash.Hash) (*bolt.Bucket, error) {
// First fetch the top level bucket which stores all data related to // First fetch the top level bucket which stores all data related to
// current, active channels. // current, active channels.
openChanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) openChanBucket := tx.Bucket(openChannelBucket)
if err != nil { if openChanBucket == nil {
return nil, err return nil, ErrNoChanDBExists
} }
// Within this top level bucket, fetch the bucket dedicated to storing // Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node. // open channel data specific to the remote node.
nodePub := nodeKey.SerializeCompressed() nodePub := nodeKey.SerializeCompressed()
nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub) nodeChanBucket := openChanBucket.Bucket(nodePub)
if err != nil { if nodeChanBucket == nil {
return nil, err return nil, ErrNoActiveChannels
} }
// We'll then recurse down an additional layer in order to fetch the // We'll then recurse down an additional layer in order to fetch the
// bucket for this particular chain. // bucket for this particular chain.
chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(chainHash[:]) chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(chainHash[:])
if err != nil { if err != nil {
return nil, err return nil, ErrNodeNotFound
} }
// With the bucket for the node fetched, we can now go down another // With the bucket for the node fetched, we can now go down another
@ -535,13 +538,14 @@ func updateChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey,
var chanPointBuf bytes.Buffer var chanPointBuf bytes.Buffer
chanPointBuf.Grow(outPointSize) chanPointBuf.Grow(outPointSize)
if err := writeOutpoint(&chanPointBuf, outPoint); err != nil { if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
return nil, err return nil, fmt.Errorf("unable to write outpoint: %v", err)
} }
chanBucket, err := chainBucket.CreateBucketIfNotExists( chanBucket, err := chainBucket.CreateBucketIfNotExists(
chanPointBuf.Bytes(), chanPointBuf.Bytes(),
) )
if err != nil { if chanBucket == nil {
return nil, err return nil, fmt.Errorf("unable to find bucket for "+
"chan_point=%v", outPoint)
} }
return chanBucket, nil return chanBucket, nil
@ -595,8 +599,39 @@ func readChanBucket(tx *bolt.Tx, nodeKey *btcec.PublicKey,
// to sync the contents of an OpenChannel while re-using an existing database // to sync the contents of an OpenChannel while re-using an existing database
// transaction. // transaction.
func (c *OpenChannel) fullSync(tx *bolt.Tx) error { func (c *OpenChannel) fullSync(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub, // First fetch the top level bucket which stores all data related to
&c.FundingOutpoint, c.ChainHash) // current, active channels.
openChanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodePub := c.IdentityPub.SerializeCompressed()
nodeChanBucket, err := openChanBucket.CreateBucketIfNotExists(nodePub)
if err != nil {
return err
}
// We'll then recurse down an additional layer in order to fetch the
// bucket for this particular chain.
chainBucket, err := nodeChanBucket.CreateBucketIfNotExists(c.ChainHash[:])
if err != nil {
return err
}
// With the bucket for the node fetched, we can now go down another
// level, creating the bucket (if it doesn't exist), for this channel
// itself.
var chanPointBuf bytes.Buffer
chanPointBuf.Grow(outPointSize)
if err := writeOutpoint(&chanPointBuf, &c.FundingOutpoint); err != nil {
return err
}
chanBucket, err := chainBucket.CreateBucketIfNotExists(
chanPointBuf.Bytes(),
)
if err != nil { if err != nil {
return err return err
} }
@ -611,8 +646,9 @@ func (c *OpenChannel) MarkAsOpen(openLoc lnwire.ShortChannelID) error {
defer c.Unlock() defer c.Unlock()
if err := c.Db.Update(func(tx *bolt.Tx) error { if err := c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub, chanBucket, err := updateChanBucket(
&c.FundingOutpoint, c.ChainHash) tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil { if err != nil {
return err return err
} }
@ -658,8 +694,9 @@ func (c *OpenChannel) MarkCommitmentBroadcasted() error {
func (c *OpenChannel) putChanStatus(status ChannelStatus) error { func (c *OpenChannel) putChanStatus(status ChannelStatus) error {
if err := c.Db.Update(func(tx *bolt.Tx) error { if err := c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub, chanBucket, err := updateChanBucket(
&c.FundingOutpoint, c.ChainHash) tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil { if err != nil {
return err return err
} }
@ -795,8 +832,9 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *ChannelCommitment) error {
defer c.Unlock() defer c.Unlock()
err := c.Db.Update(func(tx *bolt.Tx) error { err := c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub, chanBucket, err := updateChanBucket(
&c.FundingOutpoint, c.ChainHash) tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil { if err != nil {
return err return err
} }
@ -1215,8 +1253,9 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error {
return c.Db.Update(func(tx *bolt.Tx) error { return c.Db.Update(func(tx *bolt.Tx) error {
// First, we'll grab the writable bucket where this channel's // First, we'll grab the writable bucket where this channel's
// data resides. // data resides.
chanBucket, err := updateChanBucket(tx, c.IdentityPub, chanBucket, err := updateChanBucket(
&c.FundingOutpoint, c.ChainHash) tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil { if err != nil {
return err return err
} }
@ -1305,8 +1344,9 @@ func (c *OpenChannel) InsertNextRevocation(revKey *btcec.PublicKey) error {
c.RemoteNextRevocation = revKey c.RemoteNextRevocation = revKey
err := c.Db.Update(func(tx *bolt.Tx) error { err := c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub, chanBucket, err := updateChanBucket(
&c.FundingOutpoint, c.ChainHash) tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil { if err != nil {
return err return err
} }
@ -1334,8 +1374,9 @@ func (c *OpenChannel) AdvanceCommitChainTail(fwdPkg *FwdPkg) error {
var newRemoteCommit *ChannelCommitment var newRemoteCommit *ChannelCommitment
err := c.Db.Update(func(tx *bolt.Tx) error { err := c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := updateChanBucket(tx, c.IdentityPub, chanBucket, err := updateChanBucket(
&c.FundingOutpoint, c.ChainHash) tx, c.IdentityPub, &c.FundingOutpoint, c.ChainHash,
)
if err != nil { if err != nil {
return err return err
} }