nursery_store: make GraduateKinder only accept height + list channels

This commit is contained in:
Conner Fromknecht 2017-10-20 00:47:01 -07:00
parent 0ed5f83dce
commit d0155f3128
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

@ -114,11 +114,13 @@ type NurseryStore interface {
// transaction. // transaction.
PreschoolToKinder(*kidOutput) error PreschoolToKinder(*kidOutput) error
// GraduateKinder accepts a slice of kidOutputs from the kindergarten // GraduateKinder atomically moves the kindergarten class at the
// bucket and marks them graduated. It also removes their corresponding // provided height into the graduated status. This involves removing the
// entries from the height and channel indexes, pruning any intermediate // kindergarten entries from both the height and channel indexes, and
// buckets that become empty. // cleaning up the finalized kindergarten sweep txn. The height bucket
GraduateKinder(height uint32, kndrOutputs []kidOutput) error // will be opportunistically pruned from the height index as outputs are
// removed.
GraduateKinder(height uint32) error
// FetchPreschools returns a list of all outputs currently stored in the // FetchPreschools returns a list of all outputs currently stored in the
// preschool bucket. // preschool bucket.
@ -155,6 +157,9 @@ type NurseryStore interface {
// whose type should be inferred from the key's prefix. // whose type should be inferred from the key's prefix.
ForChanOutputs(*wire.OutPoint, func([]byte, []byte) error) error ForChanOutputs(*wire.OutPoint, func([]byte, []byte) error) error
// ListChannels returns all channels the nursery is currently tracking.
ListChannels() ([]wire.OutPoint, error)
// IsMatureChannel determines the whether or not all of the outputs in a // IsMatureChannel determines the whether or not all of the outputs in a
// particular channel bucket have been marked as graduated. // particular channel bucket have been marked as graduated.
IsMatureChannel(*wire.OutPoint) (bool, error) IsMatureChannel(*wire.OutPoint) (bool, error)
@ -344,6 +349,7 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error {
return err return err
} }
// Remove the crib output's entry in the height index.
err = ns.removeOutputFromHeight(tx, bby.expiry, chanPoint, err = ns.removeOutputFromHeight(tx, bby.expiry, chanPoint,
pfxOutputKey) pfxOutputKey)
if err != nil { if err != nil {
@ -467,40 +473,70 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error {
}) })
} }
// GraduateKinder accepts a list of kidOutputs in the kindergarten bucket and // GraduateKinder atomically moves the kindergarten class at the provided height
// marks them as graduated. This method also removes their corresponding entries // into the graduated status. This involves removing the kindergarten entries
// from the height and channel indexes corresponding to their kindergarten // from both the height and channel indexes, and cleaning up the finalized
// status. It also ensures that the finalized kindergarten sweep txn is removed // kindergarten sweep txn. The height bucket will be opportunistically pruned
// for this height. // from the height index as outputs are removed.
func (ns *nurseryStore) GraduateKinder(height uint32, kids []kidOutput) error { func (ns *nurseryStore) GraduateKinder(height uint32) error {
if err := ns.db.Update(func(tx *bolt.Tx) error { return ns.db.Update(func(tx *bolt.Tx) error {
for _, kid := range kids {
// Since all kindergarten outputs at a particular height are
// swept in a single txn, we can now safely delete the finalized
// txn, since it has already been broadcast and confirmed.
hghtBucket := ns.getHeightBucket(tx, height)
if hghtBucket == nil {
// Nothing to delete, bucket has already been removed.
return nil
}
// Remove the finalized kindergarten txn, we do this before
// removing the outputs so that the extra entry doesn't prevent
// the height bucket from being opportunistically pruned below.
if err := hghtBucket.Delete(finalizedKndrTxnKey); err != nil {
return err
}
// For each kindergarten found output, delete its entry from the
// height and channel index, and create a new grad output in the
// channel index.
return ns.forEachHeightPrefix(tx, kndrPrefix, height,
func(v []byte) error {
var kid kidOutput
err := kid.Decode(bytes.NewReader(v))
if err != nil {
return err
}
outpoint := kid.OutPoint() outpoint := kid.OutPoint()
chanPoint := kid.OriginChanPoint() chanPoint := kid.OriginChanPoint()
// Construct the key under which the output is currently // Construct the key under which the output is
// stored height and channel indexes. // currently stored height and channel indexes.
pfxOutputKey, err := prefixOutputKey(kndrPrefix, pfxOutputKey, err := prefixOutputKey(kndrPrefix,
outpoint) outpoint)
if err != nil { if err != nil {
return err return err
} }
// Remove the grad output's entry in the height index. // Remove the grad output's entry in the height
err = ns.removeOutputFromHeight(tx, height, chanPoint, // index.
pfxOutputKey) err = ns.removeOutputFromHeight(tx, height,
chanPoint, pfxOutputKey)
if err != nil { if err != nil {
return err return err
} }
chanBucket := ns.getChannelBucket(tx, chanPoint) chanBucket := ns.getChannelBucket(tx,
chanPoint)
if chanBucket == nil { if chanBucket == nil {
return ErrContractNotFound return ErrContractNotFound
} }
// Remove previous output with kindergarten prefix. // Remove previous output with kindergarten
if err := chanBucket.Delete(pfxOutputKey); err != nil { // prefix.
err = chanBucket.Delete(pfxOutputKey)
if err != nil {
return err return err
} }
@ -512,29 +548,13 @@ func (ns *nurseryStore) GraduateKinder(height uint32, kids []kidOutput) error {
return err return err
} }
// Insert serialized output into channel bucket using // Insert serialized output into channel bucket
// graduate-prefixed key. // using graduate-prefixed key.
err = chanBucket.Put(pfxOutputKey, gradBuffer.Bytes()) return chanBucket.Put(pfxOutputKey,
if err != nil { gradBuffer.Bytes())
return err },
} )
} })
hghtBucket := ns.getHeightBucket(tx, height)
if hghtBucket == nil {
return nil
}
// Since all kindergarten outputs at a particular height are
// swept in a single txn, we can now safely delete the finalized
// txn, since it has already been broadcast and confirmed.
return hghtBucket.Delete(finalizedKndrTxnKey)
}); err != nil {
return err
}
return nil
} }
// FinalizeKinder accepts a block height as a parameter and purges its // FinalizeKinder accepts a block height as a parameter and purges its
@ -555,7 +575,7 @@ func (ns *nurseryStore) FinalizeKinder(height uint32,
// finalized. // finalized.
func (ns *nurseryStore) PurgeHeight(height uint32) error { func (ns *nurseryStore) PurgeHeight(height uint32) error {
return ns.db.Update(func(tx *bolt.Tx) error { return ns.db.Update(func(tx *bolt.Tx) error {
if err := ns.deleteHeightBucket(tx, height); err != nil { if err := ns.purgeHeightBucket(tx, height); err != nil {
return err return err
} }
@ -601,7 +621,8 @@ func (ns *nurseryStore) FetchClass(
return nil return nil
}); err != nil { },
); err != nil {
return err return err
} }
@ -677,17 +698,14 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
// the channel bucket to efficiently enumerate all the // the channel bucket to efficiently enumerate all the
// desired outputs. // desired outputs.
c := chanBucket.Cursor() c := chanBucket.Cursor()
for k, v := c.Seek(psclPrefix); bytes.HasPrefix(
// Seek and iterate over all outputs starting with the k, psclPrefix); k, v = c.Next() {
// prefix "pscl".
pfxOutputKey, kidBytes := c.Seek(psclPrefix)
for bytes.HasPrefix(pfxOutputKey, psclPrefix) {
// Deserialize each output as a kidOutput, since // Deserialize each output as a kidOutput, since
// this should have been the type that was // this should have been the type that was
// serialized when it was written to disk. // serialized when it was written to disk.
var psclOutput kidOutput var psclOutput kidOutput
psclReader := bytes.NewReader(kidBytes) psclReader := bytes.NewReader(v)
err := psclOutput.Decode(psclReader) err := psclOutput.Decode(psclReader)
if err != nil { if err != nil {
return err return err
@ -696,8 +714,6 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) {
// Add the deserialized output to our list of // Add the deserialized output to our list of
// preschool outputs. // preschool outputs.
kids = append(kids, psclOutput) kids = append(kids, psclOutput)
pfxOutputKey, kidBytes = c.Next()
} }
} }
@ -724,9 +740,39 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint,
}) })
} }
// errImmatureChannel signals that not all outputs in a channel bucket have // ListChannels returns all channels the nursery is currently tracking.
// graduated. func (ns *nurseryStore) ListChannels() ([]wire.OutPoint, error) {
var errImmatureChannel = errors.New("channel has non-graduated outputs") var activeChannels []wire.OutPoint
if err := ns.db.View(func(tx *bolt.Tx) error {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Retrieve the existing channel index.
chanIndex := chainBucket.Bucket(channelIndexKey)
if chanIndex == nil {
return nil
}
return chanIndex.ForEach(func(chanBytes, _ []byte) error {
var chanPoint wire.OutPoint
err := readOutpoint(bytes.NewReader(chanBytes), &chanPoint)
if err != nil {
return err
}
activeChannels = append(activeChannels, chanPoint)
return nil
})
}); err != nil {
return nil, err
}
return activeChannels, nil
}
// IsMatureChannel determines the whether or not all of the outputs in a // IsMatureChannel determines the whether or not all of the outputs in a
// particular channel bucket have been marked as graduated. // particular channel bucket have been marked as graduated.
@ -738,19 +784,24 @@ func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error)
return ns.forChanOutputs(tx, chanPoint, return ns.forChanOutputs(tx, chanPoint,
func(pfxKey, _ []byte) error { func(pfxKey, _ []byte) error {
if !bytes.HasPrefix(pfxKey, gradPrefix) { if !bytes.HasPrefix(pfxKey, gradPrefix) {
return errImmatureChannel return ErrImmatureChannel
} }
return nil return nil
}) })
}) })
if err != nil && err != errImmatureChannel { if err != nil && err != ErrImmatureChannel {
return false, err return false, err
} }
return err == nil, nil return err == nil, nil
} }
// ErrImmatureChannel signals a channel cannot be removed because not all of its
// outputs have graduated.
var ErrImmatureChannel = errors.New("cannot remove immature channel, " +
"still has ungraduated outputs")
// RemoveChannel channel erases all entries from the channel bucket for the // RemoveChannel channel erases all entries from the channel bucket for the
// provided channel point. // provided channel point.
// NOTE: The channel's entries in the height index are assumed to be removed. // NOTE: The channel's entries in the height index are assumed to be removed.
@ -778,7 +829,7 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
err := ns.forChanOutputs(tx, chanPoint, func(k, v []byte) error { err := ns.forChanOutputs(tx, chanPoint, func(k, v []byte) error {
if !bytes.HasPrefix(k, gradPrefix) { if !bytes.HasPrefix(k, gradPrefix) {
return errors.New("expected grad output") return ErrImmatureChannel
} }
// Construct a kindergarten prefixed key, since this // Construct a kindergarten prefixed key, since this
@ -1166,7 +1217,8 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte,
// outputs. // outputs.
chanBucket := chanIndex.Bucket(chanBytes) chanBucket := chanIndex.Bucket(chanBytes)
if chanBucket == nil { if chanBucket == nil {
return fmt.Errorf("unable to retrieve channel bucket: '%x'", chanBytes) return fmt.Errorf("unable to retrieve channel "+
"bucket: '%x'", chanBytes)
} }
// Since all of the outputs of interest will start with the same // Since all of the outputs of interest will start with the same
@ -1174,16 +1226,13 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte,
// contained in the height-channel bucket, efficiently // contained in the height-channel bucket, efficiently
// enumerating the desired outputs. // enumerating the desired outputs.
c := hghtChanBucket.Cursor() c := hghtChanBucket.Cursor()
for k, _ := c.Seek(prefix); bytes.HasPrefix(
// Seek to and iterate over all entries starting with the given k, prefix); k, _ = c.Next() {
// prefix.
pfxOutputKey, _ := c.Seek(prefix)
for bytes.HasPrefix(pfxOutputKey, prefix) {
// Use the prefix output key emitted from our scan to // Use the prefix output key emitted from our scan to
// load the serialized babyOutput from the appropriate // load the serialized babyOutput from the appropriate
// channel bucket. // channel bucket.
outputBytes := chanBucket.Get(pfxOutputKey) outputBytes := chanBucket.Get(k)
if outputBytes == nil { if outputBytes == nil {
return errors.New("unable to retrieve output") return errors.New("unable to retrieve output")
} }
@ -1194,10 +1243,6 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte,
if err := callback(outputBytes); err != nil { if err := callback(outputBytes); err != nil {
return err return err
} }
// Lastly, advance our prefix output key for the next
// iteration.
pfxOutputKey, _ = c.Next()
} }
} }
@ -1230,12 +1275,13 @@ func (ns *nurseryStore) getLastFinalizedHeight(tx *bolt.Tx) (uint32, error) {
// Lookup the last finalized height in the top-level chain bucket. // Lookup the last finalized height in the top-level chain bucket.
heightBytes := chainBucket.Get(lastFinalizedHeightKey) heightBytes := chainBucket.Get(lastFinalizedHeightKey)
if heightBytes == nil {
// We have never finalized, return height 0.
return 0, nil
}
// If the resulting bytes are not sized like a uint32, then we have // If the resulting bytes are not sized like a uint32, then we have
// never finalized, so we return 0. // never finalized, so we return 0.
if len(heightBytes) != 4 {
return 0, nil
}
// Otherwise, parse the bytes and return the last finalized height. // Otherwise, parse the bytes and return the last finalized height.
return byteOrder.Uint32(heightBytes), nil return byteOrder.Uint32(heightBytes), nil
@ -1328,16 +1374,14 @@ func (ns *nurseryStore) getLastPurgedHeight(tx *bolt.Tx) (uint32, error) {
return 0, nil return 0, nil
} }
// Lookup the last finalized height in the top-level chain bucket. // Lookup the last purged height in the top-level chain bucket.
heightBytes := chainBucket.Get(lastPurgedHeightKey) heightBytes := chainBucket.Get(lastPurgedHeightKey)
if heightBytes == nil {
// If the resulting bytes are not sized like a uint32, then we have // We have never purged before, return height 0.
// never finalized, so we return 0.
if len(heightBytes) != 4 {
return 0, nil return 0, nil
} }
// Otherwise, parse the bytes and return the last finalized height. // Otherwise, parse the bytes and return the last purged height.
return byteOrder.Uint32(heightBytes), nil return byteOrder.Uint32(heightBytes), nil
} }
@ -1351,7 +1395,7 @@ func (ns *nurseryStore) putLastPurgedHeight(tx *bolt.Tx, height uint32) error {
return err return err
} }
// Serialize the provided last-finalized height, and store it in the // Serialize the provided last-purged height, and store it in the
// top-level chain bucket for this nursery store. // top-level chain bucket for this nursery store.
var lastHeightBytes [4]byte var lastHeightBytes [4]byte
byteOrder.PutUint32(lastHeightBytes[:], height) byteOrder.PutUint32(lastHeightBytes[:], height)
@ -1359,9 +1403,9 @@ func (ns *nurseryStore) putLastPurgedHeight(tx *bolt.Tx, height uint32) error {
return chainBucket.Put(lastPurgedHeightKey, lastHeightBytes[:]) return chainBucket.Put(lastPurgedHeightKey, lastHeightBytes[:])
} }
// ErrBucketNotEmpty signals that an attempt to prune a particular // errBucketNotEmpty signals that an attempt to prune a particular
// bucket failed because it still has active outputs. // bucket failed because it still has active outputs.
var ErrBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned") var errBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned")
// removeOutputFromHeight will delete the given output from the specified // removeOutputFromHeight will delete the given output from the specified
// height-channel bucket, and attempt to prune the upstream directories if they // height-channel bucket, and attempt to prune the upstream directories if they
@ -1396,16 +1440,16 @@ func (ns *nurseryStore) removeOutputFromHeight(tx *bolt.Tx, height uint32,
// Try to remove the channel-height bucket if it this was the last // Try to remove the channel-height bucket if it this was the last
// output in the bucket. // output in the bucket.
err := removeBucketIfEmpty(hghtBucket, chanBuffer.Bytes()) err := removeBucketIfEmpty(hghtBucket, chanBuffer.Bytes())
if err != nil && err != ErrBucketNotEmpty { if err != nil && err != errBucketNotEmpty {
return err return err
} else if err == ErrBucketNotEmpty { } else if err == errBucketNotEmpty {
return nil return nil
} }
// Attempt to prune the height bucket matching the kid output's // Attempt to prune the height bucket matching the kid output's
// confirmation height in case that was the last height-chan bucket. // confirmation height in case that was the last height-chan bucket.
pruned, err := ns.pruneHeight(tx, height) pruned, err := ns.pruneHeight(tx, height)
if err != nil && err != ErrBucketNotEmpty { if err != nil && err != errBucketNotEmpty {
return err return err
} else if err == nil && pruned { } else if err == nil && pruned {
utxnLog.Infof("Height bucket %d pruned", height) utxnLog.Infof("Height bucket %d pruned", height)
@ -1493,11 +1537,11 @@ func removeBucketIfExists(parent *bolt.Bucket, bktName []byte) error {
return parent.DeleteBucket(bktName) return parent.DeleteBucket(bktName)
} }
// isBucketEmpty returns ErrBucketNotEmpty if the bucket has a non-zero number // isBucketEmpty returns errBucketNotEmpty if the bucket has a non-zero number
// of children. // of children.
func isBucketEmpty(parent *bolt.Bucket) error { func isBucketEmpty(parent *bolt.Bucket) error {
return parent.ForEach(func(_, _ []byte) error { return parent.ForEach(func(_, _ []byte) error {
return ErrBucketNotEmpty return errBucketNotEmpty
}) })
} }