From 79ceaca7476eee77b2cab730835f7e940baef131 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 4 May 2017 15:51:57 -0700 Subject: [PATCH] utxonursery: add to new index buckets to enable querying nursery state This commit adds to new index buckets to the undo nursery: one that allows searching the preschool bucket by origin chan point, and another that allows querying the kindergarten bucket using an identical key. These index buckets are updated in tandem with their regular buckets. A note has been named in this commit to re-work the bucket structure for a time in the near future in which we have implemented full on-chain HTLC handling. --- utxonursery.go | 119 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 110 insertions(+), 9 deletions(-) diff --git a/utxonursery.go b/utxonursery.go index 7c2bc93b..46eec657 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "sync" "sync/atomic" @@ -25,20 +26,41 @@ var ( // persisted in case the system is shut down between the time when the // commitment has been broadcast and the time the transaction has been // confirmed on the blockchain. + // TODO(roasbeef): modify schema later to be: + // * chanPoint -> + // {outpoint1} -> info + // {outpoint2} -> info preschoolBucket = []byte("psc") + // preschoolIndex is an index that maps original chanPoint that created + // the channel to all the active time-locked outpoints for that + // channel. + preschoolIndex = []byte("preschool-index") + // kindergartenBucket stores outputs from commitment transactions that // have received an initial confirmation, but which aren't yet // spendable because they require additional confirmations enforced by - // Check Sequence Verify. Once required additional confirmations have + // CheckSequenceVerify. Once required additional confirmations have // been reported, a sweep transaction will be created to move the funds // out of these outputs. After a further six confirmations have been // reported, the outputs will be deleted from this bucket. The purpose // of this additional wait time is to ensure that a block // reorganization doesn't result in the sweep transaction getting // re-organized out of the chain. + // TODO(roasbeef): modify schema later to be: + // * height -> + // {chanPoint} -> info kindergartenBucket = []byte("kdg") + // contractIndex is an index that maps a contract's channel point to + // the current information pertaining to the maturity of outputs within + // that contract. Items are inserted into this index once they've been + // accepted to pre-school and deleted after the output has been fully + // swept. + // + // mapping: chanPoint -> graduationHeight || byte-offset-in-kindergartenBucket + contractIndex = []byte("contract-index") + // lastGraduatedHeightKey is used to persist the last block height that // has been checked for graduating outputs. When the nursery is // restarted, lastGraduatedHeightKey is used to determine the point @@ -331,6 +353,7 @@ func (u *utxoNursery) incubator(newBlockChan *chainntnfs.BlockEpochEvent) { out: for { select { + case preschoolRequest := <-u.requests: utxnLog.Infof("Incubating %v new outputs", len(preschoolRequest.outputs)) @@ -368,6 +391,7 @@ out: // transaction has been confirmed. go output.waitForPromotion(u.db, confChan) } + case epoch, ok := <-newBlockChan.Epochs: // If the epoch channel has been closed, then the // ChainNotifier is exiting which means the daemon is @@ -407,18 +431,36 @@ func (k *kidOutput) enterPreschool(db *channeldb.DB) error { if err != nil { return err } + psclIndex, err := tx.CreateBucketIfNotExists(preschoolIndex) + if err != nil { + return err + } + // Once we have the buckets we can insert the raw bytes of the + // immature outpoint into the preschool bucket. var outpointBytes bytes.Buffer if err := writeOutpoint(&outpointBytes, &k.outPoint); err != nil { return err } - var kidBytes bytes.Buffer if err := serializeKidOutput(&kidBytes, k); err != nil { return err } + err = psclBucket.Put(outpointBytes.Bytes(), kidBytes.Bytes()) + if err != nil { + return err + } - if err := psclBucket.Put(outpointBytes.Bytes(), kidBytes.Bytes()); err != nil { + // Additionally, we'll populate the preschool index so we can + // track all the immature outpoints for a particular channel's + // chanPoint. + var b bytes.Buffer + err = writeOutpoint(&b, &k.originChanPoint) + if err != nil { + return err + } + err = psclIndex.Put(b.Bytes(), outpointBytes.Bytes()) + if err != nil { return err } @@ -453,11 +495,23 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf // keyed by block height. Keys and values are serialized into byte // array form prior to database insertion. err := db.Update(func(tx *bolt.Tx) error { + var originPoint bytes.Buffer + if err := writeOutpoint(&originPoint, &k.originChanPoint); err != nil { + return err + } + psclBucket := tx.Bucket(preschoolBucket) if psclBucket == nil { return errors.New("unable to open preschool bucket") } + psclIndex := tx.Bucket(preschoolIndex) + if psclIndex == nil { + return errors.New("unable to open preschool index") + } + // Now that the entry has been confirmed, in order to move it + // along in the maturity pipeline we first delete the entry + // from the preschool bucket, as well as the secondary index. var outpointBytes bytes.Buffer if err := writeOutpoint(&outpointBytes, &k.outPoint); err != nil { return err @@ -467,7 +521,14 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf "preschool bucket: %v", k.outPoint) return err } + if err := psclIndex.Delete(originPoint.Bytes()); err != nil { + utxnLog.Errorf("unable to delete kindergarten output from "+ + "preschool index: %v", k.outPoint) + return err + } + // Next, fetch the kindergarten bucket. This output will remain + // in this bucket until it's fully mature. kgtnBucket, err := tx.CreateBucketIfNotExists(kindergartenBucket) if err != nil { return err @@ -486,6 +547,10 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf existingOutputs = results } + // We'll grab the output's offset in the value for its maturity + // height so we can add this to the contract index. + outputOffset := len(existingOutputs) + b := bytes.NewBuffer(existingOutputs) if err := serializeKidOutput(b, k); err != nil { return err @@ -494,6 +559,22 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf return err } + // Finally, we'll insert a new entry into the contract index. + // The entry itself consists of 4 bytes for the height, and 4 + // bytes for the offset within the value for the height. + var indexEntry [4 + 4]byte + copy(indexEntry[:4], heightBytes) + byteOrder.PutUint32(indexEntry[4:], uint32(outputOffset)) + + indexBucket, err := tx.CreateBucketIfNotExists(contractIndex) + if err != nil { + return err + } + err = indexBucket.Put(originPoint.Bytes(), indexEntry[:]) + if err != nil { + return err + } + utxnLog.Infof("Outpoint %v now in kindergarten, will mature "+ "at height %v (delay of %v)", k.outPoint, maturityHeight, k.blocksToMaturity) @@ -511,6 +592,7 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf // is called both when a new block notification has been received and also at // startup in order to process graduations from blocks missed while the UTXO // nursery was offline. +// TODO(roasbeef): single db transaction for the below func (u *utxoNursery) graduateKindergarten(blockHeight uint32) error { // First fetch the set of outputs that we can "graduate" at this // particular block height. We can graduate an output once we've @@ -523,7 +605,8 @@ func (u *utxoNursery) graduateKindergarten(blockHeight uint32) error { // If we're able to graduate any outputs, then create a single // transaction which sweeps them all into the wallet. if len(kgtnOutputs) > 0 { - if err := sweepGraduatingOutputs(u.wallet, kgtnOutputs); err != nil { + err := sweepGraduatingOutputs(u.wallet, kgtnOutputs) + if err != nil { return err } } @@ -695,18 +778,36 @@ func deleteGraduatedOutputs(db *channeldb.DB, deleteHeight uint32) error { return nil } - sweptOutputs, err := deserializeKidList(bytes.NewBuffer(results)) - if err != nil { - return err - } - + // Delete the row for this height within the kindergarten bucket.k if err := kgtnBucket.Delete(heightBytes); err != nil { return err } + sweptOutputs, err := deserializeKidList(bytes.NewBuffer(results)) + if err != nil { + return err + } utxnLog.Infof("Deleting %v swept outputs from kindergarten bucket "+ "at block height: %v", len(sweptOutputs), deleteHeight) + // Additionally, for each output that has now been fully swept, + // we'll also remove the index entry for that output. + indexBucket := tx.Bucket(contractIndex) + if indexBucket == nil { + return nil + } + for _, sweptOutput := range sweptOutputs { + var chanPoint bytes.Buffer + err := writeOutpoint(&chanPoint, &sweptOutput.originChanPoint) + if err != nil { + return err + } + + if err := indexBucket.Delete(chanPoint.Bytes()); err != nil { + return err + } + } + return nil }) }