utxonursery: add to new index buckets to enable querying nursery state

This commit adds to new index buckets to the undo nursery: one that
allows searching the preschool bucket by origin chan point, and another
that allows querying the kindergarten bucket using an identical key.
These index buckets are updated in tandem with their regular buckets.

A note has been named in this commit to re-work the bucket structure
for a time in the near future in which we have implemented full
on-chain HTLC handling.
This commit is contained in:
Olaoluwa Osuntokun 2017-05-04 15:51:57 -07:00
parent ed7e4ad715
commit 79ceaca747
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
@ -25,20 +26,41 @@ var (
// persisted in case the system is shut down between the time when the
// commitment has been broadcast and the time the transaction has been
// confirmed on the blockchain.
// TODO(roasbeef): modify schema later to be:
// * chanPoint ->
// {outpoint1} -> info
// {outpoint2} -> info
preschoolBucket = []byte("psc")
// preschoolIndex is an index that maps original chanPoint that created
// the channel to all the active time-locked outpoints for that
// channel.
preschoolIndex = []byte("preschool-index")
// kindergartenBucket stores outputs from commitment transactions that
// have received an initial confirmation, but which aren't yet
// spendable because they require additional confirmations enforced by
// Check Sequence Verify. Once required additional confirmations have
// CheckSequenceVerify. Once required additional confirmations have
// been reported, a sweep transaction will be created to move the funds
// out of these outputs. After a further six confirmations have been
// reported, the outputs will be deleted from this bucket. The purpose
// of this additional wait time is to ensure that a block
// reorganization doesn't result in the sweep transaction getting
// re-organized out of the chain.
// TODO(roasbeef): modify schema later to be:
// * height ->
// {chanPoint} -> info
kindergartenBucket = []byte("kdg")
// contractIndex is an index that maps a contract's channel point to
// the current information pertaining to the maturity of outputs within
// that contract. Items are inserted into this index once they've been
// accepted to pre-school and deleted after the output has been fully
// swept.
//
// mapping: chanPoint -> graduationHeight || byte-offset-in-kindergartenBucket
contractIndex = []byte("contract-index")
// lastGraduatedHeightKey is used to persist the last block height that
// has been checked for graduating outputs. When the nursery is
// restarted, lastGraduatedHeightKey is used to determine the point
@ -331,6 +353,7 @@ func (u *utxoNursery) incubator(newBlockChan *chainntnfs.BlockEpochEvent) {
out:
for {
select {
case preschoolRequest := <-u.requests:
utxnLog.Infof("Incubating %v new outputs",
len(preschoolRequest.outputs))
@ -368,6 +391,7 @@ out:
// transaction has been confirmed.
go output.waitForPromotion(u.db, confChan)
}
case epoch, ok := <-newBlockChan.Epochs:
// If the epoch channel has been closed, then the
// ChainNotifier is exiting which means the daemon is
@ -407,18 +431,36 @@ func (k *kidOutput) enterPreschool(db *channeldb.DB) error {
if err != nil {
return err
}
psclIndex, err := tx.CreateBucketIfNotExists(preschoolIndex)
if err != nil {
return err
}
// Once we have the buckets we can insert the raw bytes of the
// immature outpoint into the preschool bucket.
var outpointBytes bytes.Buffer
if err := writeOutpoint(&outpointBytes, &k.outPoint); err != nil {
return err
}
var kidBytes bytes.Buffer
if err := serializeKidOutput(&kidBytes, k); err != nil {
return err
}
err = psclBucket.Put(outpointBytes.Bytes(), kidBytes.Bytes())
if err != nil {
return err
}
if err := psclBucket.Put(outpointBytes.Bytes(), kidBytes.Bytes()); err != nil {
// Additionally, we'll populate the preschool index so we can
// track all the immature outpoints for a particular channel's
// chanPoint.
var b bytes.Buffer
err = writeOutpoint(&b, &k.originChanPoint)
if err != nil {
return err
}
err = psclIndex.Put(b.Bytes(), outpointBytes.Bytes())
if err != nil {
return err
}
@ -453,11 +495,23 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf
// keyed by block height. Keys and values are serialized into byte
// array form prior to database insertion.
err := db.Update(func(tx *bolt.Tx) error {
var originPoint bytes.Buffer
if err := writeOutpoint(&originPoint, &k.originChanPoint); err != nil {
return err
}
psclBucket := tx.Bucket(preschoolBucket)
if psclBucket == nil {
return errors.New("unable to open preschool bucket")
}
psclIndex := tx.Bucket(preschoolIndex)
if psclIndex == nil {
return errors.New("unable to open preschool index")
}
// Now that the entry has been confirmed, in order to move it
// along in the maturity pipeline we first delete the entry
// from the preschool bucket, as well as the secondary index.
var outpointBytes bytes.Buffer
if err := writeOutpoint(&outpointBytes, &k.outPoint); err != nil {
return err
@ -467,7 +521,14 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf
"preschool bucket: %v", k.outPoint)
return err
}
if err := psclIndex.Delete(originPoint.Bytes()); err != nil {
utxnLog.Errorf("unable to delete kindergarten output from "+
"preschool index: %v", k.outPoint)
return err
}
// Next, fetch the kindergarten bucket. This output will remain
// in this bucket until it's fully mature.
kgtnBucket, err := tx.CreateBucketIfNotExists(kindergartenBucket)
if err != nil {
return err
@ -486,6 +547,10 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf
existingOutputs = results
}
// We'll grab the output's offset in the value for its maturity
// height so we can add this to the contract index.
outputOffset := len(existingOutputs)
b := bytes.NewBuffer(existingOutputs)
if err := serializeKidOutput(b, k); err != nil {
return err
@ -494,6 +559,22 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf
return err
}
// Finally, we'll insert a new entry into the contract index.
// The entry itself consists of 4 bytes for the height, and 4
// bytes for the offset within the value for the height.
var indexEntry [4 + 4]byte
copy(indexEntry[:4], heightBytes)
byteOrder.PutUint32(indexEntry[4:], uint32(outputOffset))
indexBucket, err := tx.CreateBucketIfNotExists(contractIndex)
if err != nil {
return err
}
err = indexBucket.Put(originPoint.Bytes(), indexEntry[:])
if err != nil {
return err
}
utxnLog.Infof("Outpoint %v now in kindergarten, will mature "+
"at height %v (delay of %v)", k.outPoint,
maturityHeight, k.blocksToMaturity)
@ -511,6 +592,7 @@ func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.Conf
// is called both when a new block notification has been received and also at
// startup in order to process graduations from blocks missed while the UTXO
// nursery was offline.
// TODO(roasbeef): single db transaction for the below
func (u *utxoNursery) graduateKindergarten(blockHeight uint32) error {
// First fetch the set of outputs that we can "graduate" at this
// particular block height. We can graduate an output once we've
@ -523,7 +605,8 @@ func (u *utxoNursery) graduateKindergarten(blockHeight uint32) error {
// If we're able to graduate any outputs, then create a single
// transaction which sweeps them all into the wallet.
if len(kgtnOutputs) > 0 {
if err := sweepGraduatingOutputs(u.wallet, kgtnOutputs); err != nil {
err := sweepGraduatingOutputs(u.wallet, kgtnOutputs)
if err != nil {
return err
}
}
@ -695,18 +778,36 @@ func deleteGraduatedOutputs(db *channeldb.DB, deleteHeight uint32) error {
return nil
}
sweptOutputs, err := deserializeKidList(bytes.NewBuffer(results))
if err != nil {
return err
}
// Delete the row for this height within the kindergarten bucket.k
if err := kgtnBucket.Delete(heightBytes); err != nil {
return err
}
sweptOutputs, err := deserializeKidList(bytes.NewBuffer(results))
if err != nil {
return err
}
utxnLog.Infof("Deleting %v swept outputs from kindergarten bucket "+
"at block height: %v", len(sweptOutputs), deleteHeight)
// Additionally, for each output that has now been fully swept,
// we'll also remove the index entry for that output.
indexBucket := tx.Bucket(contractIndex)
if indexBucket == nil {
return nil
}
for _, sweptOutput := range sweptOutputs {
var chanPoint bytes.Buffer
err := writeOutpoint(&chanPoint, &sweptOutput.originChanPoint)
if err != nil {
return err
}
if err := indexBucket.Delete(chanPoint.Bytes()); err != nil {
return err
}
}
return nil
})
}