diff --git a/nursery_store.go b/nursery_store.go index 3fdc5598..64f56069 100644 --- a/nursery_store.go +++ b/nursery_store.go @@ -3,6 +3,7 @@ package main import ( "bytes" "errors" + "fmt" "github.com/boltdb/bolt" "github.com/lightningnetwork/lnd/channeldb" @@ -38,7 +39,6 @@ type NurseryStore interface { // channel indexes. If this method detects that all outputs for a // particular contract have been incubated, it returns the channel // points that are ready to be marked as fully closed. - // TODO: make this handle one output at a time? GraduateKinder([]kidOutput) error // FinalizeHeight accepts a block height as a parameter and purges its @@ -310,32 +310,10 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { return err } - // Next, retrieve the height-channel bucket located in the - // height bucket corresponding to the baby output's CLTV expiry - // height. This bucket should always exist, but if it doesn't - // then we have nothing to clean up. - hghtChanBucketCltv := ns.getHeightChanBucket(tx, bby.expiry, - chanPoint) - if hghtChanBucketCltv != nil { - // We successfully located an existing height chan - // bucket at this babyOutput's expiry height, proceed by - // removing it from the index. - err := hghtChanBucketCltv.Delete(pfxOutputKey) - if err != nil { - return err - } - - // Since we removed a crib output from the height index, - // we opportunistically prune the height bucket - // corresponding to the babyOutput's CLTV delay. This - // allows us to clean up any persistent state as outputs - // are progressed through the incubation process. - pruned, err := ns.pruneHeight(tx, bby.expiry) - if err != nil && err != ErrBucketNotEmpty { - return err - } else if err == nil && pruned { - utxnLog.Infof("Height bucket %d pruned", bby.expiry) - } + err = ns.removeOutputFromHeight(tx, bby.expiry, chanPoint, + pfxOutputKey) + if err != nil { + return err } // Since we are moving this output from the crib bucket to the @@ -447,68 +425,46 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error { return err } - // Finally, we touch a bucket in the height-channel created - // above. The bucket is named using a kindergarten prefixed - // key, signaling that this CSV delayed output will be ready to - // broadcast at the maturity height, after a brief period of - // incubation. - _, err = hghtChanBucket.CreateBucketIfNotExists(pfxOutputKey) - - return err + // Finally, we touch a key in the height-channel created above. + // The key is named using a kindergarten prefixed key, signaling + // that this CSV delayed output will be ready to broadcast at + // the maturity height, after a brief period of incubation. + return hghtChanBucket.Put(pfxOutputKey, []byte{}) }) } -// GraduateKinder accepts a list of kidOutputs in the kindergarten bucket, -// removing their corresponding entries from the height and channel indexes. -// If this method detects that all outputs for a particular contract have been -// incubated, it returns the channel points that are ready to be marked as -// fully closed. This method will iterate through the provided kidOutputs and do -// the following: -// 1) Prune the kid height bucket at the kid's confirmation height, if it is -// empty. -// 2) Prune the channel bucket belonging to the kid's origin channel point, if -// it is empty. +// GraduateKinder accepts a list of kidOutputs in the kindergarten bucket and +// marks them as graduated. This method also removes their corresponding entries +// from the height and channel indexes corresponding to their kindergarten +// status. func (ns *nurseryStore) GraduateKinder(kids []kidOutput) error { - if err := ns.db.Update(func(tx *bolt.Tx) error { for _, kid := range kids { confHeight := kid.ConfHeight() - outpoint := kid.OutPoint() chanPoint := kid.OriginChanPoint() // Construct the key under which the output is currently // stored height and channel indexes. - pfxOutputKey, err := prefixOutputKey(kndrPrefix, outpoint) + pfxOutputKey, err := prefixOutputKey(kndrPrefix, + outpoint) if err != nil { return err } - // Load the height-channel bucket, remove this output, - // and attempt to prune the bucket if it empty. - hghtChanBucket := ns.getHeightChanBucket(tx, confHeight, chanPoint) - if hghtChanBucket != nil { - if err := hghtChanBucket.Delete(pfxOutputKey); err != nil { - return err - } - - // Attempt to prune the height bucket matching the kid - // output's confirmation height if it contains no active - // outputs. - pruned, err := ns.pruneHeight(tx, confHeight) - if err != nil && err != ErrBucketNotEmpty { - return err - } else if err == nil && pruned { - utxnLog.Infof("Height bucket %d pruned", confHeight) - } - } - - chanBucket, err := ns.createChannelBucket(tx, chanPoint) + // Remove the grad output's entry in the height index. + err = ns.removeOutputFromHeight(tx, confHeight, chanPoint, + pfxOutputKey) if err != nil { return err } + chanBucket := ns.getChannelBucket(tx, chanPoint) + if chanBucket == nil { + return ErrContractNotFound + } + // Remove previous output with kindergarten prefix. if err := chanBucket.Delete(pfxOutputKey); err != nil { return err @@ -528,7 +484,6 @@ func (ns *nurseryStore) GraduateKinder(kids []kidOutput) error { if err != nil { return err } - } return nil @@ -555,52 +510,54 @@ func (ns *nurseryStore) FinalizeHeight(height uint32) error { // FetchClass returns a list of babyOutputs in the crib bucket whose CLTV // delay expires at the provided block height. -func (ns *nurseryStore) FetchClass(height uint32) ([]kidOutput, []babyOutput, error) { +func (ns *nurseryStore) FetchClass( + height uint32) ([]kidOutput, []babyOutput, error) { + // Construct list of all crib and kindergarten outputs that need TLC at // the provided block height. var kids []kidOutput var babies []babyOutput if err := ns.db.View(func(tx *bolt.Tx) error { - if err := ns.forEachHeightPrefix(tx, cribPrefix, height, func(buf []byte) error { + // Append each crib output to our list of babyOutputs. + if err := ns.forEachHeightPrefix(tx, cribPrefix, height, + func(buf []byte) error { - // We will attempt to deserialize all outputs - // stored with the crib prefix into babyOutputs, - // since this is the expected type that would - // have been serialized previously. - var baby babyOutput - babyReader := bytes.NewReader(buf) - if err := baby.Decode(babyReader); err != nil { - return err - } + // We will attempt to deserialize all outputs + // stored with the crib prefix into babyOutputs, + // since this is the expected type that would + // have been serialized previously. + var baby babyOutput + babyReader := bytes.NewReader(buf) + if err := baby.Decode(babyReader); err != nil { + return err + } - // Append the deserialized object to our list of - // babyOutputs. - babies = append(babies, baby) + babies = append(babies, baby) - return nil + return nil - }); err != nil { + }); err != nil { return err } - return ns.forEachHeightPrefix(tx, kndrPrefix, height, func(buf []byte) error { - // We will attempt to deserialize all outputs stored - // with the kindergarten prefix into kidOutputs, since - // this is the expected type that would have been - // serialized previously. - var kid kidOutput - kidReader := bytes.NewReader(buf) - if err := kid.Decode(kidReader); err != nil { - return err - } + // Append each kindergarten output to our list of kidOutputs. + return ns.forEachHeightPrefix(tx, kndrPrefix, height, + func(buf []byte) error { + // We will attempt to deserialize all outputs + // stored with the kindergarten prefix into + // kidOutputs, since this is the expected type + // that would have been serialized previously. + var kid kidOutput + kidReader := bytes.NewReader(buf) + if err := kid.Decode(kidReader); err != nil { + return err + } - // Append the deserialized object to our list of - // kidOutputs. - kids = append(kids, kid) + kids = append(kids, kid) - return nil + return nil - }) + }) }); err != nil { return nil, nil, err @@ -675,8 +632,6 @@ func (ns *nurseryStore) FetchPreschools() ([]kidOutput, error) { // preschool outputs. kids = append(kids, psclOutput) - // Advance to the subsequent key-value pair of - // the prefix scan. pfxOutputKey, kidBytes = c.Next() } } @@ -711,28 +666,31 @@ var errImmatureChannel = errors.New("channel has non-graduated outputs") // IsMatureChannel determines the whether or not all of the outputs in a // particular channel bucket have been marked as graduated. func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) { - if err := ns.db.View(func(tx *bolt.Tx) error { + err := ns.db.View(func(tx *bolt.Tx) error { // Iterate over the contents of the channel bucket, computing // both total number of outputs, and those that have the grad // prefix. - return ns.forChanOutputs(tx, chanPoint, func(pfxKey, _ []byte) error { - if string(pfxKey[:4]) != string(gradPrefix) { - utxnLog.Infof("Found non-graduated output: %x", pfxKey) - return errImmatureChannel - } - return nil - }) + return ns.forChanOutputs(tx, chanPoint, + func(pfxKey, _ []byte) error { + if string(pfxKey[:4]) != string(gradPrefix) { + utxnLog.Infof("Found non-graduated "+ + "output: %x", pfxKey) + return errImmatureChannel + } + return nil + }) - }); err != nil && err != errImmatureChannel { + }) + if err != nil && err != errImmatureChannel { return false, err - } else { - return err == nil, nil } + return err == nil, nil } // RemoveChannel channel erases all entries from the channel bucket for the // provided channel point. +// NOTE: The channel's entries in the height index are assumed to be removed. func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { return ns.db.Update(func(tx *bolt.Tx) error { // Retrieve the existing chain bucket for this nursery store. @@ -753,13 +711,42 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { return err } + chanBytes := chanBuffer.Bytes() - err := chanIndex.DeleteBucket(chanBuffer.Bytes()) + utxnLog.Infof("Pruning and removing channel: %v", chanPoint) + + err := ns.forChanOutputs(tx, chanPoint, func(k, v []byte) error { + if string(k[:4]) != string(gradPrefix) { + return errors.New("expected grad output") + } + + // Construct a kindergarten prefixed key, since this + // would have been the preceding state for a grad + // output. + kndrKey := make([]byte, len(k)) + copy(kndrKey, k) + copy(kndrKey[:4], kndrPrefix) + + // Decode each to retrieve the output's maturity height. + var kid kidOutput + if err := kid.Decode(bytes.NewReader(v)); err != nil { + return err + } + + maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity() + + hghtBucket := ns.getHeightBucket(tx, maturityHeight) + if hghtBucket == nil { + return nil + } + + return removeBucketIfExists(hghtBucket, chanBytes) + }) if err != nil { return err } - return nil + return removeBucketIfExists(chanIndex, chanBytes) }) } @@ -942,9 +929,9 @@ func (ns *nurseryStore) createHeightBucket(tx *bolt.Tx, return hghtIndex.CreateBucketIfNotExists(heightBytes[:]) } -// getHeightBucketPath retrieves an existing height bucket from the nursery store, -// using the provided block height. If the bucket does not exist, or any bucket -// along its path does not exist, a nil value is returned. +// getHeightBucketPath retrieves an existing height bucket from the nursery +// store, using the provided block height. If the bucket does not exist, or any +// bucket along its path does not exist, a nil value is returned. func (ns *nurseryStore) getHeightBucketPath(tx *bolt.Tx, height uint32) (*bolt.Bucket, *bolt.Bucket, *bolt.Bucket) { @@ -981,18 +968,13 @@ func (ns *nurseryStore) getHeightBucket(tx *bolt.Tx, // deleteHeightBucket ensures that the height bucket at the provided index is // purged from the nursery store. func (ns *nurseryStore) deleteHeightBucket(tx *bolt.Tx, height uint32) error { - // Ensure that the chain bucket for this nursery store exists. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { + // Ensure that the height bucket already exists. + _, hghtIndex, hghtBucket := ns.getHeightBucketPath(tx, height) + if hghtBucket == nil { return nil } - // Ensure that the height index has been properly initialized for this - // chain. - hghtIndex := chainBucket.Bucket(heightIndexKey) - if hghtIndex == nil { - return nil - } + utxnLog.Infof("Deleting height bucket %d", height) // Serialize the provided height, as this will form the name of the // bucket. @@ -1000,12 +982,7 @@ func (ns *nurseryStore) deleteHeightBucket(tx *bolt.Tx, height uint32) error { byteOrder.PutUint32(heightBytes[:], height) // Finally, delete the bucket in question. - err := hghtIndex.DeleteBucket(heightBytes[:]) - if err != nil && err != bolt.ErrBucketNotFound { - return err - } - - return nil + return removeBucketIfExists(hghtIndex, heightBytes[:]) } // createHeightChanBucket creates or retrieves an existing height-channel bucket @@ -1105,7 +1082,8 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte, // holds a sub-bucket for all outputs maturing at this height. hghtChanBucket := hghtBucket.Bucket(chanBytes) if hghtChanBucket == nil { - return errors.New("unable to retrieve height-channel bucket") + return fmt.Errorf("unable to retrieve height-channel "+ + "bucket at height %d for %x", height, chanBytes) } // Load the appropriate channel bucket from the channel index, @@ -1113,7 +1091,7 @@ func (ns *nurseryStore) forEachHeightPrefix(tx *bolt.Tx, prefix []byte, // outputs. chanBucket := chanIndex.Bucket(chanBytes) if chanBucket == nil { - return errors.New("unable to retrieve channel bucket") + return fmt.Errorf("unable to retrieve channel bucket: '%x'", chanBytes) } // Since all of the outputs of interest will start with the same @@ -1199,8 +1177,6 @@ func (ns *nurseryStore) putLastFinalizedHeight(tx *bolt.Tx, return err } - // TODO(conner): purge all state below reorg depth. - // Serialize the provided last-finalized height, and store it in the // top-level chain bucket for this nursery store. var lastHeightBytes [4]byte @@ -1219,6 +1195,60 @@ var ( ErrBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned") ) +// removeOutputFromHeight will delete the given output from the specified +// height-channel bucket, and attempt to prune the upstream directories if they +// are empty. +func (ns *nurseryStore) removeOutputFromHeight(tx *bolt.Tx, height uint32, + chanPoint *wire.OutPoint, pfxKey []byte) error { + + // Retrieve the height-channel bucket and delete the prefixed output. + hghtChanBucket := ns.getHeightChanBucket(tx, height, chanPoint) + if hghtChanBucket == nil { + // Height-channel bucket already removed. + return nil + } + + // Try to delete the prefixed output key if it still exists. The output + // may have already been removed after confirmation, but a final pass is + // done when removing a channel as well. + if hghtChanBucket.Get(pfxKey) != nil { + if err := hghtChanBucket.Delete(pfxKey); err != nil { + return err + } + } + + // Retrieve the height bucket that contains the height-channel bucket. + hghtBucket := ns.getHeightBucket(tx, height) + if hghtBucket == nil { + return errors.New("height bucket not found") + } + + var chanBuffer bytes.Buffer + if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { + return err + } + + // Try to remove the channel-height bucket if it this was the last + // output in the bucket. + err := removeBucketIfEmpty(hghtBucket, chanBuffer.Bytes()) + if err != nil && err != ErrBucketNotEmpty { + return err + } else if err == ErrBucketNotEmpty { + return nil + } + + // Attempt to prune the height bucket matching the kid output's + // confirmation height in case that was the last height-chan bucket. + pruned, err := ns.pruneHeight(tx, height) + if err != nil && err != ErrBucketNotEmpty { + return err + } else if err == nil && pruned { + utxnLog.Infof("Height bucket %d pruned", height) + } + + return nil +} + // pruneHeight removes the height bucket at the provided height if and only if // all active outputs at this height have been removed from their respective // height-channel buckets. @@ -1228,35 +1258,26 @@ func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) (bool, error) { if hghtBucket == nil { return false, nil } + utxnLog.Infof("pruning height %d", height) - // TODO(conner): fix this comment - // this block height. We will attempt to remove each one if they are - // empty, keeping track of the number of height-channel buckets that - // still have active outputs. - var nActiveBuckets int + // Iterate over all channels stored at this block height. We will + // attempt to remove each one if they are empty, keeping track of the + // number of height-channel buckets that still have active outputs. if err := hghtBucket.ForEach(func(chanBytes, _ []byte) error { // Attempt to each height-channel bucket from the height bucket // located above. - _, err := ns.removeBucketIfEmpty(hghtBucket, chanBytes) - if err != nil && err != ErrBucketNotEmpty { - return err - } else if err == ErrBucketNotEmpty { - nActiveBuckets++ + + hghtChanBucket := hghtBucket.Bucket(chanBytes) + if hghtChanBucket == nil { + return errors.New("unable to find height-channel bucket") } - return nil + return isBucketEmpty(hghtChanBucket) }); err != nil { return false, err } - // If we located any height-channel buckets that still have active - // outputs, it is unsafe to delete this height bucket. Signal this event - // to the caller so that they can determine the appropriate action. - if nActiveBuckets > 0 { - return false, ErrBucketNotEmpty - } - // Serialize the provided block height, such that it can be used as the // key to delete desired height bucket. var heightBytes [4]byte @@ -1265,7 +1286,7 @@ func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) (bool, error) { // All of the height-channel buckets are empty or have been previously // removed, proceed by removing the height bucket // altogether. - if err := hghtIndex.DeleteBucket(heightBytes[:]); err != nil { + if err := removeBucketIfExists(hghtIndex, heightBytes[:]); err != nil { return false, err } @@ -1274,49 +1295,41 @@ func (ns *nurseryStore) pruneHeight(tx *bolt.Tx, height uint32) (bool, error) { // removeBucketIfEmpty attempts to delete a bucket specified by name from the // provided parent bucket. -func (ns *nurseryStore) removeBucketIfEmpty(parent *bolt.Bucket, - bktName []byte) (bool, error) { - +func removeBucketIfEmpty(parent *bolt.Bucket, bktName []byte) error { // Attempt to fetch the named bucket from its parent. bkt := parent.Bucket(bktName) if bkt == nil { - // No bucket was found, signal this to the caller. - return false, nil + // No bucket was found, already removed? + return nil } - // The bucket exists, now compute how many children *it* has. - nChildren, err := ns.numChildrenInBucket(bkt) - if err != nil { - return false, nil + // The bucket exists, fail if it still has children. + if err := isBucketEmpty(bkt); err != nil { + return err } - // If the number of children is non-zero, alert the caller that the - // named bucket is not being removed. - if nChildren > 0 { - return false, nil - } - - // Otherwise, remove the empty bucket from its parent. - err = parent.DeleteBucket(bktName) - if err != nil { - return false, err - } - - return true, nil + return parent.DeleteBucket(bktName) } -// numChildrenInBucket computes the number of children contained in the given -// boltdb bucket. -func (ns *nurseryStore) numChildrenInBucket(parent *bolt.Bucket) (int, error) { - var nChildren int - if err := parent.ForEach(func(_, _ []byte) error { - nChildren++ +// removeBucketIfExists safely deletes the named bucket by first checking +// that it exists in the parent bucket. +func removeBucketIfExists(parent *bolt.Bucket, bktName []byte) error { + // Attempt to fetch the named bucket from its parent. + bkt := parent.Bucket(bktName) + if bkt == nil { + // No bucket was found, already removed? return nil - }); err != nil { - return 0, err } - return nChildren, nil + return parent.DeleteBucket(bktName) +} + +// isBucketEmpty returns ErrBucketNotEmpty if the bucket has a non-zero number +// of children. +func isBucketEmpty(parent *bolt.Bucket) error { + return parent.ForEach(func(_, _ []byte) error { + return ErrBucketNotEmpty + }) } // Compile-time constraint to ensure nurseryStore implements NurseryStore.