diff --git a/utxonursery.go b/utxonursery.go index 7c2bc93b..46eec657 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "sync" "sync/atomic" @@ -25,20 +26,41 @@ var ( // persisted in case the system is shut down between the time when the // commitment has been broadcast and the time the transaction has been // confirmed on the blockchain. + // TODO(roasbeef): modify schema later to be: + // * chanPoint -> + // {outpoint1} -> info + // {outpoint2} -> info preschoolBucket = []byte("psc") + // preschoolIndex is an index that maps original chanPoint that created + // the channel to all the active time-locked outpoints for that + // channel. + preschoolIndex = []byte("preschool-index") + // kindergartenBucket stores outputs from commitment transactions that // have received an initial confirmation, but which aren't yet // spendable because they require additional confirmations enforced by - // Check Sequence Verify. Once required additional confirmations have + // CheckSequenceVerify. Once required additional confirmations have // been reported, a sweep transaction will be created to move the funds // out of these outputs. After a further six confirmations have been // reported, the outputs will be deleted from this bucket. The purpose // of this additional wait time is to ensure that a block // reorganization doesn't result in the sweep transaction getting // re-organized out of the chain. + // TODO(roasbeef): modify schema later to be: + // * height -> + // {chanPoint} -> info kindergartenBucket = []byte("kdg") + // contractIndex is an index that maps a contract's channel point to + // the current information pertaining to the maturity of outputs within + // that contract. Items are inserted into this index once they've been + // accepted to pre-school and deleted after the output has been fully + // swept. + // + // mapping: chanPoint -> graduationHeight || byte-offset-in-kindergartenBucket + contractIndex = []byte("contract-index") + // lastGraduatedHeightKey is used to persist the last block height that // has been checked for graduating outputs. When the nursery is // restarted, lastGraduatedHeightKey is used to determine the point @@ -331,6 +353,7 @@ func (u *utxoNursery) incubator(newBlockChan *chainntnfs.BlockEpochEvent) { out: for { select { + case preschoolRequest := <-u.requests: utxnLog.Infof("Incubating %v new outputs", len(preschoolRequest.outputs)) @@ -368,6 +391,7 @@ out: // transaction has been confirmed. go output.waitForPromotion(u.db, confChan) } + case epoch, ok := <-newBlockChan.Epochs: // If the epoch channel has been closed, then the // ChainNotifier is exiting which means the daemon is @@ -407,18 +431,36 @@ func (k *kidOutput) enterPreschool(db *channeldb.DB) error { if err != nil { return err } + psclIndex, err := tx.CreateBucketIfNotExists(preschoolIndex) + if err != nil { + return err + } + // Once we have the buckets we can insert the raw bytes of the + // immature outpoint into the preschool bucket. var outpointBytes bytes.Buffer if err := writeOutpoint(&outpointBytes, &k.outPoint); err != nil { return err } - var kidBytes bytes.Buffer if err := serializeKidOutput(&kidBytes, k); err != nil { return err } + err = psclBucket.Put(outpointBytes.Bytes(), kidBytes.Bytes()) + if err != nil { + return err + } - if err := psclBucket.Put(outpointBytes.Bytes(), kidBytes.Bytes()); err != nil { + // Additionally, we'll populate the preschool index so we can + // track all the immature outpoints for a particular channel's + // chanPoint. + var b bytes.Buffer + err = writeOutpoint(&b, &k.originChanPoint) + if err != nil { + return err + } + err = psclIndex.Put(b.Bytes(), outpointBytes.Bytes()) + if err != nil { return err } @@ -453,11 +495,23 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf // keyed by block height. Keys and values are serialized into byte // array form prior to database insertion. err := db.Update(func(tx *bolt.Tx) error { + var originPoint bytes.Buffer + if err := writeOutpoint(&originPoint, &k.originChanPoint); err != nil { + return err + } + psclBucket := tx.Bucket(preschoolBucket) if psclBucket == nil { return errors.New("unable to open preschool bucket") } + psclIndex := tx.Bucket(preschoolIndex) + if psclIndex == nil { + return errors.New("unable to open preschool index") + } + // Now that the entry has been confirmed, in order to move it + // along in the maturity pipeline we first delete the entry + // from the preschool bucket, as well as the secondary index. var outpointBytes bytes.Buffer if err := writeOutpoint(&outpointBytes, &k.outPoint); err != nil { return err @@ -467,7 +521,14 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf "preschool bucket: %v", k.outPoint) return err } + if err := psclIndex.Delete(originPoint.Bytes()); err != nil { + utxnLog.Errorf("unable to delete kindergarten output from "+ + "preschool index: %v", k.outPoint) + return err + } + // Next, fetch the kindergarten bucket. This output will remain + // in this bucket until it's fully mature. kgtnBucket, err := tx.CreateBucketIfNotExists(kindergartenBucket) if err != nil { return err @@ -486,6 +547,10 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf existingOutputs = results } + // We'll grab the output's offset in the value for its maturity + // height so we can add this to the contract index. + outputOffset := len(existingOutputs) + b := bytes.NewBuffer(existingOutputs) if err := serializeKidOutput(b, k); err != nil { return err @@ -494,6 +559,22 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf return err } + // Finally, we'll insert a new entry into the contract index. + // The entry itself consists of 4 bytes for the height, and 4 + // bytes for the offset within the value for the height. + var indexEntry [4 + 4]byte + copy(indexEntry[:4], heightBytes) + byteOrder.PutUint32(indexEntry[4:], uint32(outputOffset)) + + indexBucket, err := tx.CreateBucketIfNotExists(contractIndex) + if err != nil { + return err + } + err = indexBucket.Put(originPoint.Bytes(), indexEntry[:]) + if err != nil { + return err + } + utxnLog.Infof("Outpoint %v now in kindergarten, will mature "+ "at height %v (delay of %v)", k.outPoint, maturityHeight, k.blocksToMaturity) @@ -511,6 +592,7 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf // is called both when a new block notification has been received and also at // startup in order to process graduations from blocks missed while the UTXO // nursery was offline. +// TODO(roasbeef): single db transaction for the below func (u *utxoNursery) graduateKindergarten(blockHeight uint32) error { // First fetch the set of outputs that we can "graduate" at this // particular block height. We can graduate an output once we've @@ -523,7 +605,8 @@ func (u *utxoNursery) graduateKindergarten(blockHeight uint32) error { // If we're able to graduate any outputs, then create a single // transaction which sweeps them all into the wallet. if len(kgtnOutputs) > 0 { - if err := sweepGraduatingOutputs(u.wallet, kgtnOutputs); err != nil { + err := sweepGraduatingOutputs(u.wallet, kgtnOutputs) + if err != nil { return err } } @@ -695,18 +778,36 @@ func deleteGraduatedOutputs(db *channeldb.DB, deleteHeight uint32) error { return nil } - sweptOutputs, err := deserializeKidList(bytes.NewBuffer(results)) - if err != nil { - return err - } - + // Delete the row for this height within the kindergarten bucket.k if err := kgtnBucket.Delete(heightBytes); err != nil { return err } + sweptOutputs, err := deserializeKidList(bytes.NewBuffer(results)) + if err != nil { + return err + } utxnLog.Infof("Deleting %v swept outputs from kindergarten bucket "+ "at block height: %v", len(sweptOutputs), deleteHeight) + // Additionally, for each output that has now been fully swept, + // we'll also remove the index entry for that output. + indexBucket := tx.Bucket(contractIndex) + if indexBucket == nil { + return nil + } + for _, sweptOutput := range sweptOutputs { + var chanPoint bytes.Buffer + err := writeOutpoint(&chanPoint, &sweptOutput.originChanPoint) + if err != nil { + return err + } + + if err := indexBucket.Delete(chanPoint.Bytes()); err != nil { + return err + } + } + return nil }) }