diff --git a/nursery_store.go b/nursery_store.go index d32a1cad..f346a831 100644 --- a/nursery_store.go +++ b/nursery_store.go @@ -52,13 +52,15 @@ type NurseryStore interface { // TODO: make this handle one output at a time? AwardDiplomas(...kidOutput) ([]wire.OutPoint, error) - // FinalizeClass accepts a block height as a parameter and purges its + IsMatureChannel(*wire.OutPoint) (bool, error) + + // TryFinalizeClass accepts a block height as a parameter and purges its // persistent state for all outputs at that height. During a restart, // the utxo nursery will begin it's recovery procedure from the next // height that has yet to be finalized. This block height should lag // beyond the best height for this chain as a measure of reorg // protection. - FinalizeClass(height uint32) error + TryFinalizeClass(height uint32) error // State Bucket Enumeration. @@ -83,6 +85,8 @@ type NurseryStore interface { // whose type should be inferred from the key's prefix. ForChanOutputs(*wire.OutPoint, func([]byte, []byte) error) error + RemoveChannel(*wire.OutPoint) error + // The Point of No Return. // LastFinalizedHeight returns the last block height for which the @@ -164,6 +168,8 @@ var ( // and will be swept into the wallet after waiting out the relative // timelock. kndrPrefix = []byte("kndr") + + gradPrefix = []byte("grad") ) // Overview of Nursery Store Storage Hierarchy @@ -563,62 +569,71 @@ func (ns *nurseryStore) AwardDiplomas( // proceed to mark the channels as closed. // TODO(conner): write list of closed channels to separate bucket so // that they can be replayed on restart? - var closedChannelSet = make(map[wire.OutPoint]struct{}) + var possibleCloseSet = make(map[wire.OutPoint]struct{}) if err := ns.db.Update(func(tx *bolt.Tx) error { for _, kid := range kids { - // Attempt to prune the height bucket matching the kid - // output's confirmation height if it contains no active - // outputs. - err := ns.pruneHeight(tx, kid.ConfHeight()) - switch err { - case ErrBucketNotEmpty: - // Bucket still has active outputs, proceed to - // prune channel bucket. - case ErrBucketDoesNotExist: - // Bucket was previously pruned by another - // graduating output. - - case nil: - // Bucket was pruned successfully and no errors - // were encounter. - utxnLog.Infof("Height bucket %d pruned", - kid.ConfHeight()) - - default: - // Unexpected database error. - return err - } + confHeight := kid.ConfHeight() outpoint := kid.OutPoint() chanPoint := kid.OriginChanPoint() - // Remove the outpoint belonging to the kid output from - // it's channel bucket, then attempt to prune the - // channel bucket if it is now empty. - err = ns.deleteAndPruneChannel(tx, chanPoint, outpoint) - switch err { - case ErrBucketNotEmpty: - // Bucket still has active outputs, continue to - // next kid to avoid adding this channel point - // to the set of channels to be closed. - continue + // Remove output from kindergarten bucket. - case ErrBucketDoesNotExist: - // Bucket may have been removed previously, - // allow this to fall through and ensure the - // channel point is added to the set channels to - // be closed. + pfxOutputKey, err := prefixOutputKey(kndrPrefix, outpoint) + if err != nil { + return err + } - case nil: - // Channel bucket was successfully pruned, - // proceed to add to set of channels to be - // closed. - utxnLog.Infof("Height bucket %d pruned", - kid.ConfHeight()) + hghtChanBucket := ns.getHeightChanBucket(tx, confHeight, chanPoint) + if hghtChanBucket != nil { + if err := hghtChanBucket.DeleteBucket(pfxOutputKey); err != nil { + return err + } - default: - // Uh oh, database error. + // Attempt to prune the height bucket matching the kid + // output's confirmation height if it contains no active + // outputs. + err := ns.pruneHeight(tx, confHeight) + switch err { + case ErrBucketNotEmpty: + // Bucket still has active outputs, proceed to + // prune channel bucket. + + case ErrBucketDoesNotExist: + // Bucket was previously pruned by another + // graduating output. + + case nil: + // Bucket was pruned successfully and no errors + // were encounter. + utxnLog.Infof("Height bucket %d pruned", confHeight) + + default: + // Unexpected database error. + return err + } + } + + chanBucket, err := ns.createChannelBucket(tx, chanPoint) + if err != nil { + return err + } + + if err := chanBucket.Delete(pfxOutputKey); err != nil { + return err + } + + // Convert kindergarten key to graduate key. + copy(pfxOutputKey, gradPrefix) + + var gradBuffer bytes.Buffer + if err := kid.Encode(&gradBuffer); err != nil { + return err + } + + err = chanBucket.Put(pfxOutputKey, gradBuffer.Bytes()) + if err != nil { return err } @@ -628,7 +643,7 @@ func (ns *nurseryStore) AwardDiplomas( // to our set of closed channels to be closed, since // these may need to be replayed to ensure the channel // database is aware that incubation has completed. - closedChannelSet[*chanPoint] = struct{}{} + possibleCloseSet[*chanPoint] = struct{}{} } return nil @@ -637,25 +652,31 @@ func (ns *nurseryStore) AwardDiplomas( } // Convert our set of channels to be closed into a list. - channelsToBeClosed := make([]wire.OutPoint, 0, len(closedChannelSet)) - for chanPoint := range closedChannelSet { - channelsToBeClosed = append(channelsToBeClosed, chanPoint) + possibleCloses := make([]wire.OutPoint, 0, len(possibleCloseSet)) + for chanPoint := range possibleCloseSet { + possibleCloses = append(possibleCloses, chanPoint) } - utxnLog.Infof("Channels to be marked fully closed: %x", - channelsToBeClosed) + utxnLog.Infof("Possible channels to be marked fully closed: %v", + possibleCloses) - return channelsToBeClosed, nil + return possibleCloses, nil } -// FinalizeClass accepts a block height as a parameter and purges its +// TryFinalizeClass accepts a block height as a parameter and purges its // persistent state for all outputs at that height. During a restart, the utxo // nursery will begin it's recovery procedure from the next height that has // yet to be finalized. -func (ns *nurseryStore) FinalizeClass(height uint32) error { - utxnLog.Infof("Finalizing class at height %v", height) +func (ns *nurseryStore) TryFinalizeClass(height uint32) error { return ns.db.Update(func(tx *bolt.Tx) error { - return ns.putLastFinalizedHeight(tx, height) + utxnLog.Infof("Attempting to finalize class at height %v", height) + lastHeight, err := ns.getNextLastFinalizedHeight(tx, height) + if err != nil { + return err + } + utxnLog.Infof("Finalizing class at height %v", lastHeight) + + return ns.putLastFinalizedHeight(tx, lastHeight) }) } @@ -697,9 +718,11 @@ func (ns *nurseryStore) FetchCribs(height uint32) ([]babyOutput, error) { func (ns *nurseryStore) FetchKindergartens(height uint32) ([]kidOutput, error) { // Construct a list of all kidOutputs that mature at the provided block // height. + utxnLog.Infof("Fetching kinders") var kids []kidOutput if err := ns.forEachHeightPrefix(kndrPrefix, height, func(buf []byte) error { + utxnLog.Infof("Inside kinder") // We will attempt to deserialize all outputs stored // with the kindergarten prefix into kidOutputs, since @@ -719,6 +742,7 @@ func (ns *nurseryStore) FetchKindergartens(height uint32) ([]kidOutput, error) { }); err != nil { return nil, err } + utxnLog.Infof("Returning kinders") return kids, nil } @@ -816,13 +840,49 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint, callback func([]byte, []byte) error) error { return ns.db.View(func(tx *bolt.Tx) error { - chanBucket := ns.getChannelBucket(tx, chanPoint) - if chanBucket == nil { - return ErrContractNotFound + return ns.forChanOutputs(tx, chanPoint, callback) + }) +} +func (ns *nurseryStore) forChanOutputs(tx *bolt.Tx, chanPoint *wire.OutPoint, + callback func([]byte, []byte) error) error { + + chanBucket := ns.getChannelBucket(tx, chanPoint) + if chanBucket == nil { + return ErrContractNotFound + } + + return chanBucket.ForEach(callback) +} + +func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) { + var isMature bool + if err := ns.db.View(func(tx *bolt.Tx) error { + var nOutputs, nGrads int + if err := ns.forChanOutputs(tx, chanPoint, func(pfxKey, _ []byte) error { + // Count total and number of grad outputs + if string(pfxKey[:4]) == string(gradPrefix) { + nGrads++ + } + nOutputs++ + + return nil + + }); err != nil { + return err } - return chanBucket.ForEach(callback) - }) + utxnLog.Infof("Found %d graduated outputs out of %d", nGrads, nOutputs) + // Channel is mature if all outputs are graduated. + if nGrads == nOutputs { + isMature = true + } + + return nil + }); err != nil { + return false, err + } + + return isMature, nil } // The Point of No Return. @@ -867,6 +927,49 @@ func (ns *nurseryStore) getLastFinalizedHeight(tx *bolt.Tx) (uint32, error) { return byteOrder.Uint32(heightBytes), nil } +func (ns *nurseryStore) getNextLastFinalizedHeight(tx *bolt.Tx, height uint32) (uint32, error) { + // Retrieve the previous last finalized height. + lastHeight, err := ns.getLastFinalizedHeight(tx) + if err != nil { + return 0, err + } + + // TODO(conner): add lower bound after which all state is purged. + + // Retrieve the existing chain bucket for this nursery store. + chainBucket := tx.Bucket(ns.pfxChainKey) + if chainBucket == nil { + return lastHeight, nil + } + + // Retrieve the existing height index. + hghtIndex := chainBucket.Bucket(heightIndexKey) + if hghtIndex == nil { + return lastHeight, nil + } + + for curHeight := lastHeight + 1; curHeight <= height; curHeight++ { + var curHeightBytes [4]byte + byteOrder.PutUint32(curHeightBytes[:], curHeight) + + hghtBucket := hghtIndex.Bucket(curHeightBytes[:]) + if hghtBucket == nil { + continue + } + + nChildren, err := ns.numChildrenInBucket(hghtBucket) + if err != nil { + return 0, err + } + + if nChildren > 0 { + return curHeight - 1, nil + } + } + + return height, nil +} + // pubLastFinalizedHeight is a helper method that writes the provided height // under the last finalized height key. func (ns *nurseryStore) putLastFinalizedHeight(tx *bolt.Tx, @@ -1135,6 +1238,7 @@ func (ns *nurseryStore) forEachHeightPrefix(prefix []byte, height uint32, // the appropriate channel bucket. outputBytes := chanBucket.Get(pfxOutputKey) if outputBytes == nil { + pfxOutputKey, _ = c.Next() continue } @@ -1228,6 +1332,31 @@ func (ns *nurseryStore) deleteAndPruneChannel(tx *bolt.Tx, return ns.removeBucketIfEmpty(chanIndex, chanBytes) } +func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { + return ns.db.Update(func(tx *bolt.Tx) error { + // Retrieve the existing chain bucket for this nursery store. + chainBucket := tx.Bucket(ns.pfxChainKey) + if chainBucket == nil { + return nil + } + + // Retrieve the channel index stored in the chain bucket. + chanIndex := chainBucket.Bucket(channelIndexKey) + if chanIndex == nil { + return nil + } + + // Serialize the provided channel point, such that we can delete + // the mature channel bucket. + var chanBuffer bytes.Buffer + if err := writeOutpoint(&chanBuffer, chanPoint); err != nil { + return err + } + + return chanIndex.DeleteBucket(chanBuffer.Bytes()) + }) +} + // pruneHeight // NOTE: This method returns two concrete errors apart from those returned by // the underlying database: ErrBucketDoesNotExist and ErrBucketNotEmpty. These