nursery_store: tightens up height index pruning and formatting

This commit is contained in:
Conner Fromknecht 2017-10-17 17:58:49 -07:00
parent 23e36a58f0
commit ef81f0064c
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -3,6 +3,7 @@ package main
import (
"bytes"
"errors"
"fmt"
"github.com/boltdb/bolt"
"github.com/lightningnetwork/lnd/channeldb"
@ -38,7 +39,6 @@ type NurseryStore interface {
// channel indexes. If this method detects that all outputs for a
// particular contract have been incubated, it returns the channel
// points that are ready to be marked as fully closed.
// TODO: make this handle one output at a time?
GraduateKinder([]kidOutput) error
// FinalizeHeight accepts a block height as a parameter and purges its
@ -310,34 +310,12 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error {
return err
}
// Next, retrieve the height-channel bucket located in the
// height bucket corresponding to the baby output's CLTV expiry
// height. This bucket should always exist, but if it doesn't
// then we have nothing to clean up.
hghtChanBucketCltv := ns.getHeightChanBucket(tx, bby.expiry,
chanPoint)
if hghtChanBucketCltv != nil {
// We successfully located an existing height chan
// bucket at this babyOutput's expiry height, proceed by
// removing it from the index.
err := hghtChanBucketCltv.Delete(pfxOutputKey)
err = ns.removeOutputFromHeight(tx, bby.expiry, chanPoint,
pfxOutputKey)
if err != nil {
return err
}
// Since we removed a crib output from the height index,
// we opportunistically prune the height bucket
// corresponding to the babyOutput's CLTV delay. This
// allows us to clean up any persistent state as outputs
// are progressed through the incubation process.
pruned, err := ns.pruneHeight(tx, bby.expiry)
if err != nil && err != ErrBucketNotEmpty {
return err
} else if err == nil && pruned {
utxnLog.Infof("Height bucket %d pruned", bby.expiry)
}
}
// Since we are moving this output from the crib bucket to the
// kindergarten bucket, we overwrite the existing prefix of this
// key with the kindergarten prefix.
@ -447,68 +425,46 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error {
return err
}
// Finally, we touch a bucket in the height-channel created
// above. The bucket is named using a kindergarten prefixed
// key, signaling that this CSV delayed output will be ready to
// broadcast at the maturity height, after a brief period of
// incubation.
_, err = hghtChanBucket.CreateBucketIfNotExists(pfxOutputKey)
return err
// Finally, we touch a key in the height-channel created above.
// The key is named using a kindergarten prefixed key, signaling
// that this CSV delayed output will be ready to broadcast at
// the maturity height, after a brief period of incubation.
return hghtChanBucket.Put(pfxOutputKey, []byte{})
})
}
// GraduateKinder accepts a list of kidOutputs in the kindergarten bucket,
// removing their corresponding entries from the height and channel indexes.
// If this method detects that all outputs for a particular contract have been
// incubated, it returns the channel points that are ready to be marked as
// fully closed. This method will iterate through the provided kidOutputs and do
// the following:
// 1) Prune the kid height bucket at the kid's confirmation height, if it is
// empty.
// 2) Prune the channel bucket belonging to the kid's origin channel point, if
// it is empty.
// GraduateKinder accepts a list of kidOutputs in the kindergarten bucket and
// marks them as graduated. This method also removes their corresponding entries
// from the height and channel indexes corresponding to their kindergarten
// status.
func (ns *nurseryStore) GraduateKinder(kids []kidOutput) error {
if err := ns.db.Update(func(tx *bolt.Tx) error {
for _, kid := range kids {
confHeight := kid.ConfHeight()
outpoint := kid.OutPoint()
chanPoint := kid.OriginChanPoint()
// Construct the key under which the output is currently
// stored height and channel indexes.
pfxOutputKey, err := prefixOutputKey(kndrPrefix, outpoint)
pfxOutputKey, err := prefixOutputKey(kndrPrefix,
outpoint)
if err != nil {
return err
}
// Load the height-channel bucket, remove this output,
// and attempt to prune the bucket if it empty.
hghtChanBucket := ns.getHeightChanBucket(tx, confHeight, chanPoint)
if hghtChanBucket != nil {
if err := hghtChanBucket.Delete(pfxOutputKey); err != nil {
return err
}
// Attempt to prune the height bucket matching the kid
// output's confirmation height if it contains no active
// outputs.
pruned, err := ns.pruneHeight(tx, confHeight)
if err != nil && err != ErrBucketNotEmpty {
return err
} else if err == nil && pruned {
utxnLog.Infof("Height bucket %d pruned", confHeight)
}
}
chanBucket, err := ns.createChannelBucket(tx, chanPoint)
// Remove the grad output's entry in the height index.
err = ns.removeOutputFromHeight(tx, confHeight, chanPoint,
pfxOutputKey)
if err != nil {
return err
}
chanBucket := ns.getChannelBucket(tx, chanPoint)
if chanBucket == nil {
return ErrContractNotFound
}
// Remove previous output with kindergarten prefix.
if err := chanBucket.Delete(pfxOutputKey); err != nil {
return err
@ -528,7 +484,6 @@ func (ns *nurseryStore) GraduateKinder(kids []kidOutput) error {
if err != nil {
return err
}
}
return nil
@ -555,13 +510,17 @@ func (ns *nurseryStore) FinalizeHeight(height uint32) error {
// FetchClass returns a list of babyOutputs in the crib bucket whose CLTV
// delay expires at the provided block height.
func (ns *nurseryStore) FetchClass(height uint32) ([]kidOutput, []babyOutput, error) {
func (ns *nurseryStore) FetchClass(
height uint32) ([]kidOutput, []babyOutput, error) {
// Construct list of all crib and kindergarten outputs that need TLC at
// the provided block height.
var kids []kidOutput
var babies []babyOutput
if err := ns.db.View(func(tx *bolt.Tx) error {
if err := ns.forEachHeightPrefix(tx, cribPrefix, height, func(buf []byte) error {
// Append each crib output to our list of babyOutputs.
if err := ns.forEachHeightPrefix(tx, cribPrefix, height,
func(buf []byte) error {
// We will attempt to deserialize all outputs
// stored with the crib prefix into babyOutputs,
@ -573,8 +532,6 @@ func (ns *nurseryStore) FetchClass(height uint32) ([]kidOutput, []babyOutput, er
return err
}
// Append the deserialized object to our list of
// babyOutputs.
babies = append(babies, baby)
return nil
@ -583,19 +540,19 @@ func (ns *nurseryStore) FetchClass(height uint32) ([]kidOutput, []babyOutput, er
return err
}
return ns.forEachHeightPrefix(tx, kndrPrefix, height, func(buf []byte) error {
// We will attempt to deserialize all outputs stored
// with the kindergarten prefix into kidOutputs, since
// this is the expected type that would have been
// serialized previously.
// Append each kindergarten output to our list of kidOutputs.
return ns.forEachHeightPrefix(tx, kndrPrefix, height,
func(buf []byte) error {
// We will attempt to deserialize all outputs
// stored with the kindergarten prefix into
// kidOutputs, since this is the expected type
// that would have been serialized previously.
var kid kidOutput
kidReader := bytes.NewReader(buf)
if err := kid.Decode(kidReader); err != nil {
return err
}
// Append the deserialized object to our list of
// kidOutputs.
kids = append(kids, kid)
return nil
@ -675,8 +632,6 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
// preschool outputs.
kids = append(kids, psclOutput)
// Advance to the subsequent key-value pair of
// the prefix scan.
pfxOutputKey, kidBytes = c.Next()
}
}
@ -711,28 +666,31 @@ var errImmatureChannel = errors.New("channel has non-graduated outputs")
// IsMatureChannel determines the whether or not all of the outputs in a
// particular channel bucket have been marked as graduated.
func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) {
if err := ns.db.View(func(tx *bolt.Tx) error {
err := ns.db.View(func(tx *bolt.Tx) error {
// Iterate over the contents of the channel bucket, computing
// both total number of outputs, and those that have the grad
// prefix.
return ns.forChanOutputs(tx, chanPoint, func(pfxKey, _ []byte) error {
return ns.forChanOutputs(tx, chanPoint,
func(pfxKey, _ []byte) error {
if string(pfxKey[:4]) != string(gradPrefix) {
utxnLog.Infof("Found non-graduated output: %x", pfxKey)
utxnLog.Infof("Found non-graduated "+
"output: %x", pfxKey)
return errImmatureChannel
}
return nil
})
}); err != nil && err != errImmatureChannel {
})
if err != nil && err != errImmatureChannel {
return false, err
} else {
return err == nil, nil
}
return err == nil, nil
}
// RemoveChannel channel erases all entries from the channel bucket for the
// provided channel point.
// NOTE: The channel's entries in the height index are assumed to be removed.
func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
return ns.db.Update(func(tx *bolt.Tx) error {
// Retrieve the existing chain bucket for this nursery store.
@ -753,13 +711,42 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
if err := writeOutpoint(&chanBuffer, chanPoint); err != nil {
return err
}
chanBytes := chanBuffer.Bytes()
err := chanIndex.DeleteBucket(chanBuffer.Bytes())
utxnLog.Infof("Pruning and removing channel: %v", chanPoint)
err := ns.forChanOutputs(tx, chanPoint, func(k, v []byte) error {
if string(k[:4]) != string(gradPrefix) {
return errors.New("expected grad output")
}
// Construct a kindergarten prefixed key, since this
// would have been the preceding state for a grad
// output.
kndrKey := make([]byte, len(k))
copy(kndrKey, k)
copy(kndrKey[:4], kndrPrefix)
// Decode each to retrieve the output's maturity height.
var kid kidOutput
if err := kid.Decode(bytes.NewReader(v)); err != nil {
return err
}
maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity()
hghtBucket := ns.getHeightBucket(tx, maturityHeight)
if hghtBucket == nil {
return nil
}
return removeBucketIfExists(hghtBucket, chanBytes)
})
if err != nil {
return err
}
return nil
return removeBucketIfExists(chanIndex, chanBytes)
})
}
@ -942,9 +929,9 @@ func (ns *nurseryStore) createHeightBucket(tx *bolt.Tx,
return hghtIndex.CreateBucketIfNotExists(heightBytes[:])
}
// getHeightBucketPath retrieves an existing height bucket from the nursery store,
// using the provided block height. If the bucket does not exist, or any bucket
// along its path does not exist, a nil value is returned.
// getHeightBucketPath retrieves an existing height bucket from the nursery
// store, using the provided block height. If the bucket does not exist, or any
// bucket along its path does not exist, a nil value is returned.
func (ns *nurseryStore) getHeightBucketPath(tx *bolt.Tx,
height uint32) (*bolt.Bucket, *bolt.Bucket, *bolt.Bucket) {
@ -981,18 +968,13 @@ func (ns *nurseryStore) getHeightBucket(tx *bolt.Tx,
// deleteHeightBucket ensures that the height bucket at the provided index is
// purged from the nursery store.
func (ns *nurseryStore) deleteHeightBucket(tx *bolt.Tx, height uint32) error {
// Ensure that the chain bucket for this nursery store exists.
chainBucket := tx.Bucket(ns.pfxChainKey)
if chainBucket == nil {
// Ensure that the height bucket already exists.
_, hghtIndex, hghtBucket := ns.getHeightBucketPath(tx, height)
if hghtBucket == nil {
return nil
}
// Ensure that the height index has been properly initialized for this
// chain.
hghtIndex := chainBucket.Bucket(heightIndexKey)
if hghtIndex == nil {
return nil
}
utxnLog.Infof("Deleting height bucket %d", height)
// Serialize the provided height, as this will form the name of the
// bucket.
@ -1000,12 +982,7 @@ func (ns *nurseryStore) deleteHeightBucket(tx *bolt.Tx, height uint32) error {
byteOrder.PutUint32(heightBytes[:], height)
// Finally, delete the bucket in question.
err := hghtIndex.DeleteBucket(heightBytes[:])
if err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil
return removeBucketIfExists(hghtIndex, heightBytes[:])
}
// createHeightChanBucket creates or retrieves an existing height-channel bucket
@ -1105,7 +1082,8 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte,
// holds a sub-bucket for all outputs maturing at this height.
hghtChanBucket := hghtBucket.Bucket(chanBytes)
if hghtChanBucket == nil {
return errors.New("unable to retrieve height-channel bucket")
return fmt.Errorf("unable to retrieve height-channel "+
"bucket at height %d for %x", height, chanBytes)
}
// Load the appropriate channel bucket from the channel index,
@ -1113,7 +1091,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte,
// outputs.
chanBucket := chanIndex.Bucket(chanBytes)
if chanBucket == nil {
return errors.New("unable to retrieve channel bucket")
return fmt.Errorf("unable to retrieve channel bucket: '%x'", chanBytes)
}
// Since all of the outputs of interest will start with the same
@ -1199,8 +1177,6 @@ func (ns *nurseryStore) putLastFinalizedHeight(tx *bolt.Tx,
return err
}
// TODO(conner): purge all state below reorg depth.
// Serialize the provided last-finalized height, and store it in the
// top-level chain bucket for this nursery store.
var lastHeightBytes [4]byte
@ -1219,6 +1195,60 @@ var (
ErrBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned")
)
// removeOutputFromHeight will delete the given output from the specified
// height-channel bucket, and attempt to prune the upstream directories if they
// are empty.
func (ns *nurseryStore) removeOutputFromHeight(tx *bolt.Tx, height uint32,
chanPoint *wire.OutPoint, pfxKey []byte) error {
// Retrieve the height-channel bucket and delete the prefixed output.
hghtChanBucket := ns.getHeightChanBucket(tx, height, chanPoint)
if hghtChanBucket == nil {
// Height-channel bucket already removed.
return nil
}
// Try to delete the prefixed output key if it still exists. The output
// may have already been removed after confirmation, but a final pass is
// done when removing a channel as well.
if hghtChanBucket.Get(pfxKey) != nil {
if err := hghtChanBucket.Delete(pfxKey); err != nil {
return err
}
}
// Retrieve the height bucket that contains the height-channel bucket.
hghtBucket := ns.getHeightBucket(tx, height)
if hghtBucket == nil {
return errors.New("height bucket not found")
}
var chanBuffer bytes.Buffer
if err := writeOutpoint(&chanBuffer, chanPoint); err != nil {
return err
}
// Try to remove the channel-height bucket if it this was the last
// output in the bucket.
err := removeBucketIfEmpty(hghtBucket, chanBuffer.Bytes())
if err != nil && err != ErrBucketNotEmpty {
return err
} else if err == ErrBucketNotEmpty {
return nil
}
// Attempt to prune the height bucket matching the kid output's
// confirmation height in case that was the last height-chan bucket.
pruned, err := ns.pruneHeight(tx, height)
if err != nil && err != ErrBucketNotEmpty {
return err
} else if err == nil && pruned {
utxnLog.Infof("Height bucket %d pruned", height)
}
return nil
}
// pruneHeight removes the height bucket at the provided height if and only if
// all active outputs at this height have been removed from their respective
// height-channel buckets.
@ -1228,35 +1258,26 @@ func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) (bool, error) {
if hghtBucket == nil {
return false, nil
}
utxnLog.Infof("pruning height %d", height)
// TODO(conner): fix this comment
// this block height. We will attempt to remove each one if they are
// empty, keeping track of the number of height-channel buckets that
// still have active outputs.
var nActiveBuckets int
// Iterate over all channels stored at this block height. We will
// attempt to remove each one if they are empty, keeping track of the
// number of height-channel buckets that still have active outputs.
if err := hghtBucket.ForEach(func(chanBytes, _ []byte) error {
// Attempt to each height-channel bucket from the height bucket
// located above.
_, err := ns.removeBucketIfEmpty(hghtBucket, chanBytes)
if err != nil && err != ErrBucketNotEmpty {
return err
} else if err == ErrBucketNotEmpty {
nActiveBuckets++
hghtChanBucket := hghtBucket.Bucket(chanBytes)
if hghtChanBucket == nil {
return errors.New("unable to find height-channel bucket")
}
return nil
return isBucketEmpty(hghtChanBucket)
}); err != nil {
return false, err
}
// If we located any height-channel buckets that still have active
// outputs, it is unsafe to delete this height bucket. Signal this event
// to the caller so that they can determine the appropriate action.
if nActiveBuckets > 0 {
return false, ErrBucketNotEmpty
}
// Serialize the provided block height, such that it can be used as the
// key to delete desired height bucket.
var heightBytes [4]byte
@ -1265,7 +1286,7 @@ func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) (bool, error) {
// All of the height-channel buckets are empty or have been previously
// removed, proceed by removing the height bucket
// altogether.
if err := hghtIndex.DeleteBucket(heightBytes[:]); err != nil {
if err := removeBucketIfExists(hghtIndex, heightBytes[:]); err != nil {
return false, err
}
@ -1274,49 +1295,41 @@ func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) (bool, error) {
// removeBucketIfEmpty attempts to delete a bucket specified by name from the
// provided parent bucket.
func (ns *nurseryStore) removeBucketIfEmpty(parent *bolt.Bucket,
bktName []byte) (bool, error) {
func removeBucketIfEmpty(parent *bolt.Bucket, bktName []byte) error {
// Attempt to fetch the named bucket from its parent.
bkt := parent.Bucket(bktName)
if bkt == nil {
// No bucket was found, signal this to the caller.
return false, nil
}
// The bucket exists, now compute how many children *it* has.
nChildren, err := ns.numChildrenInBucket(bkt)
if err != nil {
return false, nil
}
// If the number of children is non-zero, alert the caller that the
// named bucket is not being removed.
if nChildren > 0 {
return false, nil
}
// Otherwise, remove the empty bucket from its parent.
err = parent.DeleteBucket(bktName)
if err != nil {
return false, err
}
return true, nil
}
// numChildrenInBucket computes the number of children contained in the given
// boltdb bucket.
func (ns *nurseryStore) numChildrenInBucket(parent *bolt.Bucket) (int, error) {
var nChildren int
if err := parent.ForEach(func(_, _ []byte) error {
nChildren++
// No bucket was found, already removed?
return nil
}); err != nil {
return 0, err
}
return nChildren, nil
// The bucket exists, fail if it still has children.
if err := isBucketEmpty(bkt); err != nil {
return err
}
return parent.DeleteBucket(bktName)
}
// removeBucketIfExists safely deletes the named bucket by first checking
// that it exists in the parent bucket.
func removeBucketIfExists(parent *bolt.Bucket, bktName []byte) error {
// Attempt to fetch the named bucket from its parent.
bkt := parent.Bucket(bktName)
if bkt == nil {
// No bucket was found, already removed?
return nil
}
return parent.DeleteBucket(bktName)
}
// isBucketEmpty returns ErrBucketNotEmpty if the bucket has a non-zero number
// of children.
func isBucketEmpty(parent *bolt.Bucket) error {
return parent.ForEach(func(_, _ []byte) error {
return ErrBucketNotEmpty
})
}
// Compile-time constraint to ensure nurseryStore implements NurseryStore.