diff --git a/utxonursery.go b/utxonursery.go index 2bad107f..0051756f 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -3,71 +3,165 @@ package main import ( "bytes" "encoding/binary" - "errors" "fmt" "io" + "strings" "sync" "sync/atomic" - "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwallet" + "github.com/roasbeef/btcd/blockchain" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" ) -var ( - // preschoolBucket stores outputs from commitment transactions that - // have been broadcast, but not yet confirmed. This set of outputs is - // persisted in case the system is shut down between the time when the - // commitment has been broadcast and the time the transaction has been - // confirmed on the blockchain. - // TODO(roasbeef): modify schema later to be: - // * chanPoint -> - // {outpoint1} -> info - // {outpoint2} -> info - preschoolBucket = []byte("psc") +// SUMMARY OF OUTPUT STATES +// +// - CRIB +// - SerializedType: babyOutput +// - OriginalOutputType: HTLC +// - Awaiting: First-stage HTLC CLTV expiry +// - HeightIndexEntry: Absolute block height of CLTV expiry. +// - NextState: KNDR +// - PSCL +// - SerializedType: kidOutput +// - OriginalOutputType: Commitment +// - Awaiting: Confirmation of commitment txn +// - HeightIndexEntry: None. +// - NextState: KNDR +// - KNDR +// - SerializedType: kidOutput +// - OriginalOutputType: Commitment or HTLC +// - Awaiting: Commitment CSV expiry or second-stage HTLC CSV expiry. +// - HeightIndexEntry: Input confirmation height + relative CSV delay +// - NextState: GRAD +// - GRAD: +// - SerializedType: kidOutput +// - OriginalOutputType: Commitment or HTLC +// - Awaiting: All other outputs in channel to become GRAD. +// - NextState: Mark channel fully closed in channeldb and remove. +// +// DESCRIPTION OF OUTPUT STATES +// +// - CRIB (babyOutput) outputs are two-stage htlc outputs that are initially +// locked using a CLTV delay, followed by a CSV delay. The first stage of a +// crib output requires broadcasting a presigned htlc timeout txn generated +// by the wallet after an absolute expiry height. Since the timeout txns are +// predetermined, they cannot be batched after-the-fact, meaning that all +// CRIB outputs are broadcast and confirmed independently. After the first +// stage is complete, a CRIB output is moved to the KNDR state, which will +// finishing sweeping the second-layer CSV delay. +// +// - PSCL (kidOutput) outputs are commitment outputs locked under a CSV delay. +// These outputs are stored temporarily in this state until the commitment +// transaction confirms, as this solidifies an absolute height that the +// relative time lock will expire. Once this maturity height is determined, +// the PSCL output is moved into KNDR. +// +// - KNDR (kidOutput) outputs are CSV delayed outputs for which the maturity +// height has been fully determined. This results from having received +// confirmation of the UTXO we are trying to spend, contained in either the +// commitment txn or htlc timeout txn. Once the maturity height is reached, +// the utxo nursery will sweep all KNDR outputs scheduled for that height +// using a single txn. +// +// NOTE: Due to the fact that KNDR outputs can be dynamically aggregated and +// swept, we make precautions to finalize the KNDR outputs at a particular +// height on our first attempt to sweep it. Finalizing involves signing the +// sweep transaction and persisting it in the nursery store, and recording +// the last finalized height. Any attempts to replay an already finalized +// height will result in broadcasting the already finalized txn, ensuring the +// nursery does not broadcast different txids for the same batch of KNDR +// outputs. The reason txids may change is due to the probabilistic nature of +// generating the pkscript in the sweep txn's output, even if the set of +// inputs remains static across attempts. +// +// - GRAD (kidOutput) outputs are KNDR outputs that have successfully been +// swept into the user's wallet. A channel is considered mature once all of +// its outputs, including two-stage htlcs, have entered the GRAD state, +// indicating that it safe to mark the channel as fully closed. +// +// +// OUTPUT STATE TRANSITIONS IN UTXO NURSERY +// +// ┌────────────────┐ ┌──────────────┐ +// │ Commit Outputs │ │ HTLC Outputs │ +// └────────────────┘ └──────────────┘ +// │ │ +// │ │ +// │ │ UTXO NURSERY +// ┌───────────┼────────────────┬───────────┼───────────────────────────────┐ +// │ │ │ │ +// │ │ │ │ │ +// │ │ │ CLTV-Delayed │ +// │ │ │ V babyOutputs │ +// │ │ ┌──────┐ │ +// │ │ │ │ CRIB │ │ +// │ │ └──────┘ │ +// │ │ │ │ │ +// │ │ │ │ +// │ │ │ | │ +// │ │ V Wait CLTV │ +// │ │ │ [ ] + │ +// │ │ | Publish Txn │ +// │ │ │ │ │ +// │ │ │ │ +// │ │ │ V ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐ │ +// │ │ ( ) waitForTimeoutConf │ +// │ │ │ | └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘ │ +// │ │ │ │ +// │ │ │ │ │ +// │ │ │ │ +// │ V │ │ │ +// │ ┌──────┐ │ │ +// │ │ PSCL │ └ ── ── ─┼ ── ── ── ── ── ── ── ─┤ +// │ └──────┘ │ │ +// │ │ │ │ +// │ │ │ │ +// │ V ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐ │ CSV-Delayed │ +// │ ( ) waitForCommitConf │ kidOutputs │ +// │ | └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘ │ │ +// │ │ │ │ +// │ │ │ │ +// │ │ V │ +// │ │ ┌──────┐ │ +// │ └─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─▶│ KNDR │ │ +// │ └──────┘ │ +// │ │ │ +// │ │ │ +// │ | │ +// │ V Wait CSV │ +// │ [ ] + │ +// │ | Publish Txn │ +// │ │ │ +// │ │ │ +// │ V ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │ +// │ ( ) waitForSweepConf │ +// │ | └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │ +// │ │ │ +// │ │ │ +// │ V │ +// │ ┌──────┐ │ +// │ │ GRAD │ │ +// │ └──────┘ │ +// │ │ │ +// │ │ │ +// │ │ │ +// └────────────────────────────────────────┼───────────────────────────────┘ +// │ +// │ +// │ +// │ +// V +// ┌────────────────┐ +// │ Wallet Outputs │ +// └────────────────┘ - // preschoolIndex is an index that maps original chanPoint that created - // the channel to all the active time-locked outpoints for that - // channel. - preschoolIndex = []byte("preschool-index") - - // kindergartenBucket stores outputs from commitment transactions that - // have received an initial confirmation, but which aren't yet - // spendable because they require additional confirmations enforced by - // CheckSequenceVerify. Once required additional confirmations have - // been reported, a sweep transaction will be created to move the funds - // out of these outputs. After a further six confirmations have been - // reported, the outputs will be deleted from this bucket. The purpose - // of this additional wait time is to ensure that a block - // reorganization doesn't result in the sweep transaction getting - // re-organized out of the chain. - // TODO(roasbeef): modify schema later to be: - // * height -> - // {chanPoint} -> info - kindergartenBucket = []byte("kdg") - - // contractIndex is an index that maps a contract's channel point to - // the current information pertaining to the maturity of outputs within - // that contract. Items are inserted into this index once they've been - // accepted to pre-school and deleted after the output has been fully - // swept. - // - // mapping: chanPoint -> graduationHeight || byte-offset-in-kindergartenBucket - contractIndex = []byte("contract-index") - - // lastGraduatedHeightKey is used to persist the last block height that - // has been checked for graduating outputs. When the nursery is - // restarted, lastGraduatedHeightKey is used to determine the point - // from which it's necessary to catch up. - lastGraduatedHeightKey = []byte("lgh") - - byteOrder = binary.BigEndian -) +var byteOrder = binary.BigEndian var ( // ErrContractNotFound is returned when the nursery is unable to @@ -75,6 +169,47 @@ var ( ErrContractNotFound = fmt.Errorf("unable to locate contract") ) +// NurseryConfig abstracts the required subsystems used by the utxo nursery. An +// instance of NurseryConfig is passed to newUtxoNursery during instantiationn. +type NurseryConfig struct { + // ChainIO is used by the utxo nursery to determine the current block + // height, which drives the incubation of the nursery's outputs. + ChainIO lnwallet.BlockChainIO + + // ConfDepth is the number of blocks the nursery store waits before + // determining outputs in the chain as confirmed. + ConfDepth uint32 + + // DB provides access to a user's channels, such that they can be marked + // fully closed after incubation has concluded. + DB *channeldb.DB + + // Estimator is used when crafting sweep transactions to estimate the + // necessary fee relative to the expected size of the sweep transaction. + Estimator lnwallet.FeeEstimator + + // GenSweepScript generates a P2WKH script belonging to the wallet where + // funds can be swept. + GenSweepScript func() ([]byte, error) + + // Notifier provides the utxo nursery the ability to subscribe to + // transaction confirmation events, which advance outputs through their + // persistence state transitions. + Notifier chainntnfs.ChainNotifier + + // PublishTransaction facilitates the process of broadcasting a signed + // transaction to the appropriate network. + PublishTransaction func(*wire.MsgTx) error + + // Signer is used by the utxo nursery to generate valid witnesses at the + // time the incubated outputs need to be spent. + Signer lnwallet.Signer + + // Store provides access to and modification of the persistent state + // maintained about the utxo nursery's incubating outputs. + Store NurseryStore +} + // utxoNursery is a system dedicated to incubating time-locked outputs created // by the broadcast of a commitment transaction either by us, or the remote // peer. The nursery accepts outputs and "incubates" them until they've reached @@ -84,32 +219,24 @@ var ( // the source wallet, returning the outputs so they can be used within future // channels, or regular Bitcoin transactions. type utxoNursery struct { - sync.RWMutex - - notifier chainntnfs.ChainNotifier - wallet *lnwallet.LightningWallet - - db *channeldb.DB - - requests chan *incubationRequest - started uint32 stopped uint32 - quit chan struct{} - wg sync.WaitGroup + + cfg *NurseryConfig + + mu sync.Mutex + bestHeight uint32 + + quit chan struct{} + wg sync.WaitGroup } // newUtxoNursery creates a new instance of the utxoNursery from a // ChainNotifier and LightningWallet instance. -func newUtxoNursery(db *channeldb.DB, notifier chainntnfs.ChainNotifier, - wallet *lnwallet.LightningWallet) *utxoNursery { - +func newUtxoNursery(cfg *NurseryConfig) *utxoNursery { return &utxoNursery{ - notifier: notifier, - wallet: wallet, - requests: make(chan *incubationRequest), - db: db, - quit: make(chan struct{}), + cfg: cfg, + quit: make(chan struct{}), } } @@ -122,115 +249,72 @@ func (u *utxoNursery) Start() error { utxnLog.Tracef("Starting UTXO nursery") - // Query the database for the most recently processed block. We'll use - // this to strict the search space when asking for confirmation - // notifications, and also to scan the chain to graduate now mature - // outputs. - var lastGraduatedHeight uint32 - err := u.db.View(func(tx *bolt.Tx) error { - kgtnBucket := tx.Bucket(kindergartenBucket) - if kgtnBucket == nil { - return nil - } - heightBytes := kgtnBucket.Get(lastGraduatedHeightKey) - if heightBytes == nil { - return nil - } - - lastGraduatedHeight = byteOrder.Uint32(heightBytes) - return nil - }) - if err != nil { - return err - } - - if err := u.reloadPreschool(lastGraduatedHeight); err != nil { - return err - } + // 1. Start watching for new blocks, as this will drive the nursery + // store's state machine. // Register with the notifier to receive notifications for each newly - // connected block. We register during startup to ensure that no blocks - // are missed while we are handling blocks that were missed during the - // time the UTXO nursery was unavailable. - newBlockChan, err := u.notifier.RegisterBlockEpochNtfn() - if err != nil { - return err - } - if err := u.catchUpKindergarten(lastGraduatedHeight); err != nil { - return err - } - - u.wg.Add(1) - go u.incubator(newBlockChan, lastGraduatedHeight) - - return nil -} - -// reloadPreschool re-initializes the chain notifier with all of the outputs -// that had been saved to the "preschool" database bucket prior to shutdown. -func (u *utxoNursery) reloadPreschool(heightHint uint32) error { - return u.db.View(func(tx *bolt.Tx) error { - psclBucket := tx.Bucket(preschoolBucket) - if psclBucket == nil { - return nil - } - - return psclBucket.ForEach(func(outputBytes, kidBytes []byte) error { - var psclOutput kidOutput - err := psclOutput.Decode(bytes.NewBuffer(kidBytes)) - if err != nil { - return err - } - - sourceTxid := psclOutput.OutPoint().Hash - - confChan, err := u.notifier.RegisterConfirmationsNtfn( - &sourceTxid, 1, heightHint, - ) - if err != nil { - return err - } - - utxnLog.Infof("Preschool outpoint %v re-registered for confirmation "+ - "notification.", psclOutput.OutPoint()) - go psclOutput.waitForPromotion(u.db, confChan) - return nil - }) - }) -} - -// catchUpKindergarten handles the graduation of kindergarten outputs from -// blocks that were missed while the UTXO Nursery was down or offline. -// graduateMissedBlocks is called during the startup of the UTXO Nursery. -func (u *utxoNursery) catchUpKindergarten(lastGraduatedHeight uint32) error { - // Get the most recently mined block - _, bestHeight, err := u.wallet.Cfg.ChainIO.GetBestBlock() + // connected block. We register immediately on startup to ensure that no + // blocks are missed while we are handling blocks that were missed + // during the time the UTXO nursery was unavailable. + newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn() if err != nil { return err } - // If we haven't yet seen any registered force closes, or we're already - // caught up with the current best chain, then we can exit early. - if lastGraduatedHeight == 0 || uint32(bestHeight) == lastGraduatedHeight { - return nil + // 2. Flush all fully-graduated channels from the pipeline. + + // Load any pending close channels, which represents the super set of + // all channels that may still be incubating. + pendingCloseChans, err := u.cfg.DB.FetchClosedChannels(true) + if err != nil { + newBlockChan.Cancel() + return err } - utxnLog.Infof("Processing outputs from missed blocks. Starting with "+ - "blockHeight: %v, to current blockHeight: %v", lastGraduatedHeight, - bestHeight) - - // Loop through and check for graduating outputs at each of the missed - // block heights. - for graduationHeight := lastGraduatedHeight + 1; graduationHeight <= uint32(bestHeight); graduationHeight++ { - utxnLog.Debugf("Attempting to graduate outputs at height=%v", - graduationHeight) - - if err := u.graduateKindergarten(graduationHeight); err != nil { + // Ensure that all mature channels have been marked as fully closed in + // the channeldb. + for _, pendingClose := range pendingCloseChans { + err := u.closeAndRemoveIfMature(&pendingClose.ChanPoint) + if err != nil { + newBlockChan.Cancel() return err } } - utxnLog.Infof("UTXO Nursery is now fully synced") + // TODO(conner): check if any fully closed channels can be removed from + // utxn. + + // Query the nursery store for the lowest block height we could be + // incubating, which is taken to be the last height for which the + // database was purged. + lastGraduatedHeight, err := u.cfg.Store.LastGraduatedHeight() + if err != nil { + newBlockChan.Cancel() + return err + } + + // 2. Restart spend ntfns for any preschool outputs, which are waiting + // for the force closed commitment txn to confirm. + // + // NOTE: The next two steps *may* spawn go routines, thus from this + // point forward, we must close the nursery's quit channel if we detect + // any failures during startup to ensure they terminate. + if err := u.reloadPreschool(lastGraduatedHeight); err != nil { + newBlockChan.Cancel() + close(u.quit) + return err + } + + // 3. Replay all crib and kindergarten outputs from last pruned to + // current best height. + if err := u.reloadClasses(lastGraduatedHeight); err != nil { + newBlockChan.Cancel() + close(u.quit) + return err + } + + u.wg.Add(1) + go u.incubator(newBlockChan) return nil } @@ -250,21 +334,23 @@ func (u *utxoNursery) Stop() error { return nil } -// incubationRequest is a request to the utxoNursery to incubate a set of -// outputs until their mature, finally sweeping them into the wallet once -// available. -type incubationRequest struct { - outputs []*kidOutput -} - -// incubateOutputs sends a request to utxoNursery to incubate the outputs +// IncubateOutputs sends a request to utxoNursery to incubate the outputs // defined within the summary of a closed channel. Individually, as all outputs -// reach maturity they'll be swept back into the wallet. -func (u *utxoNursery) IncubateOutputs(closeSummary *lnwallet.ForceCloseSummary) { - var incReq incubationRequest +// reach maturity, they'll be swept back into the wallet. +func (u *utxoNursery) IncubateOutputs( + closeSummary *lnwallet.ForceCloseSummary) error { - // It could be that our to-self output was below the dust limit. In - // that case the SignDescriptor would be nil and we would not have that + nHtlcs := len(closeSummary.HtlcResolutions) + + var ( + commOutput *kidOutput + htlcOutputs = make([]babyOutput, 0, nHtlcs) + ) + + // 1. Build all the spendable outputs that we will try to incubate. + + // It could be that our to-self output was below the dust limit. In that + // case the SignDescriptor would be nil and we would not have that // output to incubate. if closeSummary.SelfOutputSignDesc != nil { selfOutput := makeKidOutput( @@ -275,75 +361,319 @@ func (u *utxoNursery) IncubateOutputs(closeSummary *lnwallet.ForceCloseSummary) closeSummary.SelfOutputSignDesc, ) - incReq.outputs = append(incReq.outputs, &selfOutput) + // We'll skip any zero value'd outputs as this indicates we + // don't have a settled balance within the commitment + // transaction. + if selfOutput.Amount() > 0 { + commOutput = &selfOutput + } } - // If there are no outputs to incubate, there is nothing to send to the - // request channel. - if len(incReq.outputs) != 0 { - u.requests <- &incReq + for i := range closeSummary.HtlcResolutions { + htlcRes := closeSummary.HtlcResolutions[i] + + htlcOutpoint := &wire.OutPoint{ + Hash: htlcRes.SignedTimeoutTx.TxHash(), + Index: 0, + } + + htlcOutput := makeBabyOutput( + htlcOutpoint, + &closeSummary.ChanPoint, + closeSummary.SelfOutputMaturity, + lnwallet.HtlcOfferedTimeout, + &htlcRes, + ) + + if htlcOutput.Amount() > 0 { + htlcOutputs = append(htlcOutputs, htlcOutput) + } + } + + // If there are no outputs to incubate for this channel, we simply mark + // the channel as fully closed. + if commOutput == nil && len(htlcOutputs) == 0 { + utxnLog.Infof("Channel(%s) has no outputs to incubate, "+ + "marking fully closed.", &closeSummary.ChanPoint) + return u.cfg.DB.MarkChanFullyClosed(&closeSummary.ChanPoint) + } + + utxnLog.Infof("Incubating Channel(%s) has-commit=%v, num-htlcs=%d", + &closeSummary.ChanPoint, commOutput != nil, len(htlcOutputs)) + + u.mu.Lock() + defer u.mu.Unlock() + + // 2. Persist the outputs we intended to sweep in the nursery store + if err := u.cfg.Store.Incubate(commOutput, htlcOutputs); err != nil { + utxnLog.Errorf("unable to begin incubation of Channel(%s): %v", + &closeSummary.ChanPoint, err) + return err + } + + // 3. If we are incubating a preschool output, register for a + // confirmation notification that will transition it to the kindergarten + // bucket. + if commOutput != nil { + return u.registerCommitConf(commOutput, u.bestHeight) + } + + return nil } -// incubator is tasked with watching over all outputs from channel closes as -// they transition from being broadcast (at which point they move into the -// "preschool state"), then confirmed and waiting for the necessary number of -// blocks to be confirmed (as specified as kidOutput.blocksToMaturity and -// enforced by CheckSequenceVerify). When the necessary block height has been -// reached, the output has "matured" and the waitForGraduation function will -// generate a sweep transaction to move funds from the commitment transaction -// into the user's wallet. -func (u *utxoNursery) incubator(newBlockChan *chainntnfs.BlockEpochEvent, - startingHeight uint32) { +// NurseryReport attempts to return a nursery report stored for the target +// outpoint. A nursery report details the maturity/sweeping progress for a +// contract that was previously force closed. If a report entry for the target +// chanPoint is unable to be constructed, then an error will be returned. +func (u *utxoNursery) NurseryReport( + chanPoint *wire.OutPoint) (*contractMaturityReport, error) { + u.mu.Lock() + defer u.mu.Unlock() + + utxnLog.Infof("NurseryReport: building nursery report for channel %v", + chanPoint) + + report := &contractMaturityReport{ + chanPoint: *chanPoint, + } + + if err := u.cfg.Store.ForChanOutputs(chanPoint, func(k, v []byte) error { + switch { + case bytes.HasPrefix(k, cribPrefix): + // Cribs outputs are the only kind currently stored as + // baby outputs. + var baby babyOutput + err := baby.Decode(bytes.NewReader(v)) + if err != nil { + return err + } + + // Each crib output represents a stage one htlc, and + // will contribute towards the limbo balance. + report.AddLimboStage1Htlc(&baby) + + case bytes.HasPrefix(k, psclPrefix), + bytes.HasPrefix(k, kndrPrefix), + bytes.HasPrefix(k, gradPrefix): + + // All others states can be deserialized as kid outputs. + var kid kidOutput + err := kid.Decode(bytes.NewReader(v)) + if err != nil { + return err + } + + // Now, use the state prefixes to determine how the this + // output should be represented in the nursery report. + // An output's funds are always in limbo until reaching + // the graduate state. + switch { + case bytes.HasPrefix(k, psclPrefix): + // Preschool outputs are awaiting the + // confirmation of the commitment transaction. + report.AddLimboCommitment(&kid) + + case bytes.HasPrefix(k, kndrPrefix): + // Kindergarten outputs may originate from + // either the commitment transaction or an htlc. + // We can distinguish them via their witness + // types. + switch kid.WitnessType() { + case lnwallet.CommitmentTimeLock: + // The commitment transaction has been + // confirmed, and we are waiting the CSV + // delay to expire. + report.AddLimboCommitment(&kid) + + case lnwallet.HtlcOfferedTimeout: + // The htlc timeout transaction has + // confirmed, and the CSV delay has + // begun ticking. + report.AddLimboStage2Htlc(&kid) + } + + case bytes.HasPrefix(k, gradPrefix): + // Graduate outputs are those whose funds have + // been swept back into the wallet. Each output + // will contribute towards the recovered + // balance. + switch kid.WitnessType() { + case lnwallet.CommitmentTimeLock: + // The commitment output was + // successfully swept back into a + // regular p2wkh output. + report.AddRecoveredCommitment(&kid) + + case lnwallet.HtlcOfferedTimeout: + // This htlc output successfully resides + // in a p2wkh output belonging to the + // user. + report.AddRecoveredHtlc(&kid) + } + } + + default: + } + + return nil + }); err != nil { + return nil, err + } + + return report, nil +} + +// reloadPreschool re-initializes the chain notifier with all of the outputs +// that had been saved to the "preschool" database bucket prior to shutdown. +func (u *utxoNursery) reloadPreschool(heightHint uint32) error { + psclOutputs, err := u.cfg.Store.FetchPreschools() + if err != nil { + return err + } + + for i := range psclOutputs { + err := u.registerCommitConf(&psclOutputs[i], heightHint) + if err != nil { + return err + } + } + + return nil +} + +// reloadClasses reinitializes any height-dependent state transitions for which +// the utxonursery has not recevied confirmation, and replays the graduation of +// all kindergarten and crib outputs for heights that have not been finalized. +// This allows the nursery to reinitialize all state to continue sweeping +// outputs, even in the event that we missed blocks while offline. reloadClasses +// is called during the startup of the UTXO Nursery. +func (u *utxoNursery) reloadClasses(lastGradHeight uint32) error { + // Begin by loading all of the still-active heights up to and including + // the last height we successfully graduated. + activeHeights, err := u.cfg.Store.HeightsBelowOrEqual(lastGradHeight) + if err != nil { + return err + } + + if len(activeHeights) > 0 { + utxnLog.Infof("Re-registering confirmations for %d already "+ + "graduated heights below height=%d", len(activeHeights), + lastGradHeight) + } + + // Attempt to re-register notifications for any outputs still at these + // heights. + for _, classHeight := range activeHeights { + utxnLog.Debugf("Attempting to regraduate outputs at height=%v", + classHeight) + + if err = u.regraduateClass(classHeight); err != nil { + utxnLog.Errorf("Failed to regraduate outputs at "+ + "height=%v: %v", classHeight, err) + return err + } + } + + // Get the most recently mined block. + _, bestHeight, err := u.cfg.ChainIO.GetBestBlock() + if err != nil { + return err + } + + // If we haven't yet seen any registered force closes, or we're already + // caught up with the current best chain, then we can exit early. + if lastGradHeight == 0 || uint32(bestHeight) == lastGradHeight { + return nil + } + + utxnLog.Infof("Processing outputs from missed blocks. Starting with "+ + "blockHeight=%v, to current blockHeight=%v", lastGradHeight, + bestHeight) + + // Loop through and check for graduating outputs at each of the missed + // block heights. + for curHeight := lastGradHeight + 1; curHeight <= uint32(bestHeight); curHeight++ { + utxnLog.Debugf("Attempting to graduate outputs at height=%v", + curHeight) + + if err := u.graduateClass(curHeight); err != nil { + utxnLog.Errorf("Failed to graduate outputs at "+ + "height=%v: %v", curHeight, err) + return err + } + } + + utxnLog.Infof("UTXO Nursery is now fully synced") + + return nil +} + +// regraduateClass handles the steps involved in re-registering for +// confirmations for all still-active outputs at a particular height. This is +// used during restarts to ensure that any still-pending state transitions are +// properly registered, so they can be driven by the chain notifier. No +// transactions or signing are done as a result of this step. +func (u *utxoNursery) regraduateClass(classHeight uint32) error { + // Fetch all information about the crib and kindergarten outputs at this + // height. In addition to the outputs, we also retrieve the finalized + // kindergarten sweep txn, which will be nil if we have not attempted + // this height before, or if no kindergarten outputs exist at this + // height. + finalTx, kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass( + classHeight) + if err != nil { + return err + } + + if finalTx != nil { + utxnLog.Infof("Re-registering confirmation for kindergarten "+ + "sweep transaction at height=%d ", classHeight) + + err = u.registerSweepConf(finalTx, kgtnOutputs, classHeight) + if err != nil { + utxnLog.Errorf("Failed to re-register for kindergarten "+ + "sweep transaction at height=%d: %v", + classHeight, err) + return err + } + } + + if len(cribOutputs) == 0 { + return nil + } + + utxnLog.Infof("Re-registering confirmation for first-stage HTLC "+ + "outputs at height=%d ", classHeight) + + // Now, we broadcast all pre-signed htlc txns from the crib outputs at + // this height. There is no need to finalize these txns, since the txid + // is predetermined when signed in the wallet. + for i := range cribOutputs { + err = u.registerTimeoutConf(&cribOutputs[i], classHeight) + if err != nil { + utxnLog.Errorf("Failed to re-register first-stage "+ + "HTLC output %v", cribOutputs[i].OutPoint()) + return err + } + } + + return nil +} + +// incubator is tasked with driving all state transitions that are dependent on +// the current height of the blockchain. As new blocks arrive, the incubator +// will attempt spend outputs at the latest height. The asynchronous +// confirmation of these spends will either 1) move a crib output into the +// kindergarten bucket or 2) move a kindergarten output into the graduated +// bucket. +func (u *utxoNursery) incubator(newBlockChan *chainntnfs.BlockEpochEvent) { defer u.wg.Done() defer newBlockChan.Cancel() - currentHeight := startingHeight -out: for { select { - - case preschoolRequest := <-u.requests: - utxnLog.Infof("Incubating %v new outputs", - len(preschoolRequest.outputs)) - - for _, output := range preschoolRequest.outputs { - // We'll skip any zero value'd outputs as this - // indicates we don't have a settled balance - // within the commitment transaction. - if output.Amount() == 0 { - continue - } - - sourceTxid := output.OutPoint().Hash - - if err := output.enterPreschool(u.db); err != nil { - utxnLog.Errorf("unable to add kidOutput to preschool: %v, %v ", - output, err) - continue - } - - // Register for a notification that will - // trigger graduation from preschool to - // kindergarten when the channel close - // transaction has been confirmed. - confChan, err := u.notifier.RegisterConfirmationsNtfn( - &sourceTxid, 1, currentHeight, - ) - if err != nil { - utxnLog.Errorf("unable to register output for confirmation: %v", - sourceTxid) - continue - } - - // Launch a dedicated goroutine that will move - // the output from the preschool bucket to the - // kindergarten bucket once the channel close - // transaction has been confirmed. - go output.waitForPromotion(u.db, confChan) - } - case epoch, ok := <-newBlockChan.Epochs: // If the epoch channel has been closed, then the // ChainNotifier is exiting which means the daemon is @@ -358,22 +688,516 @@ out: // will give stale data // A new block has just been connected to the main - // chain which means we might be able to graduate some - // outputs out of the kindergarten bucket. Graduation - // entails successfully sweeping a time-locked output. + // chain, which means we might be able to graduate crib + // or kindergarten outputs at this height. This involves + // broadcasting any presigned htlc timeout txns, as well + // as signing and broadcasting a sweep txn that spends + // from all kindergarten outputs at this height. height := uint32(epoch.Height) - currentHeight = height - if err := u.graduateKindergarten(height); err != nil { + if err := u.graduateClass(height); err != nil { utxnLog.Errorf("error while graduating "+ - "kindergarten outputs: %v", err) + "class at height=%d: %v", height, err) + + // TODO(conner): signal fatal error to daemon } case <-u.quit: - break out + return } } } +// graduateClass handles the steps involved in spending outputs whose CSV or +// CLTV delay expires at the nursery's current height. This method is called +// each time a new block arrives, or during startup to catch up on heights we +// may have missed while the nursery was offline. +func (u *utxoNursery) graduateClass(classHeight uint32) error { + // Record this height as the nursery's current best height. + u.mu.Lock() + defer u.mu.Unlock() + + u.bestHeight = classHeight + + // Fetch all information about the crib and kindergarten outputs at this + // height. In addition to the outputs, we also retrieve the finalized + // kindergarten sweep txn, which will be nil if we have not attempted + // this height before, or if no kindergarten outputs exist at this + // height. + finalTx, kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass( + classHeight) + if err != nil { + return err + } + + // Load the last finalized height, so we can determine if the + // kindergarten sweep txn should be crafted. + lastFinalizedHeight, err := u.cfg.Store.LastFinalizedHeight() + if err != nil { + return err + } + + // If we haven't processed this height before, we finalize the + // graduating kindergarten outputs, by signing a sweep transaction that + // spends from them. This txn is persisted such that we never broadcast + // a different txn for the same height. This allows us to recover from + // failures, and watch for the correct txid. + if classHeight > lastFinalizedHeight { + // If this height has never been finalized, we have never + // generated a sweep txn for this height. Generate one if there + // are kindergarten outputs to be spent. + if len(kgtnOutputs) > 0 { + finalTx, err = u.createSweepTx(kgtnOutputs) + if err != nil { + utxnLog.Errorf("Failed to create sweep txn at "+ + "height=%d", classHeight) + return err + } + } + + // Persist the kindergarten sweep txn to the nursery store. It + // is safe to store a nil finalTx, which happens if there are no + // graduating kindergarten outputs. + err = u.cfg.Store.FinalizeKinder(classHeight, finalTx) + if err != nil { + utxnLog.Errorf("Failed to finalize kindergarten at "+ + "height=%d", classHeight) + + return err + } + + // Log if the finalized transaction is non-trivial. + if finalTx != nil { + utxnLog.Infof("Finalized kindergarten at height=%d ", + classHeight) + } + } + + // Now that the kindergarten sweep txn has either been finalized or + // restored, broadcast the txn, and set up notifications that will + // transition the swept kindergarten outputs into graduated outputs. + if finalTx != nil { + err := u.sweepGraduatingKinders(classHeight, finalTx, + kgtnOutputs) + if err != nil { + utxnLog.Errorf("Failed to sweep %d kindergarten outputs "+ + "at height=%d: %v", len(kgtnOutputs), classHeight, + err) + return err + } + } + + // Now, we broadcast all pre-signed htlc txns from the crib outputs at + // this height. There is no need to finalize these txns, since the txid + // is predetermined when signed in the wallet. + for i := range cribOutputs { + err := u.sweepCribOutput(classHeight, &cribOutputs[i]) + if err != nil { + utxnLog.Errorf("Failed to sweep first-stage HTLC "+ + "(CLTV-delayed) output %v", + cribOutputs[i].OutPoint()) + return err + } + } + + return u.cfg.Store.GraduateHeight(classHeight) +} + +// craftSweepTx accepts accepts a list of kindergarten outputs, and signs and +// generates a signed txn that spends from them. This method also makes an +// accurate fee estimate before generating the required witnesses. +func (u *utxoNursery) createSweepTx(kgtnOutputs []kidOutput) (*wire.MsgTx, error) { + // Create a transaction which sweeps all the newly mature outputs into + // a output controlled by the wallet. + // TODO(roasbeef): can be more intelligent about buffering outputs to + // be more efficient on-chain. + + // Assemble the kindergarten class into a slice csv spendable outputs, + // while also computing an estimate for the total transaction weight. + var ( + csvSpendableOutputs []CsvSpendableOutput + weightEstimate lnwallet.TxWeightEstimator + ) + + // Allocate enough room for each of the kindergarten outputs. + csvSpendableOutputs = make([]CsvSpendableOutput, 0, len(kgtnOutputs)) + + // Our sweep transaction will pay to a single segwit p2wkh address, + // ensure it contributes to our weight estimate. + weightEstimate.AddP2WKHOutput() + + // For each kindergarten output, use its witness type to determine the + // estimate weight of its witness. + for i := range kgtnOutputs { + input := &kgtnOutputs[i] + + var witnessWeight int + switch input.WitnessType() { + case lnwallet.CommitmentTimeLock: + witnessWeight = lnwallet.ToLocalTimeoutWitnessSize + + case lnwallet.HtlcOfferedTimeout: + witnessWeight = lnwallet.OfferedHtlcTimeoutWitnessSize + + default: + utxnLog.Warnf("kindergarten output in nursery store "+ + "contains unexpected witness type: %v", + input.WitnessType()) + continue + } + + // Add the kindergarten output's input and witness to our + // running estimate. + weightEstimate.AddWitnessInput(witnessWeight) + + // Include this input in the transaction. + csvSpendableOutputs = append(csvSpendableOutputs, input) + } + + txWeight := uint64(weightEstimate.Weight()) + return u.sweepCsvSpendableOutputsTxn(txWeight, csvSpendableOutputs) +} + +// sweepCsvSpendableOutputsTxn creates a final sweeping transaction with all +// witnesses in place for all inputs using the provided txn fee. The created +// transaction has a single output sending all the funds back to the source +// wallet, after accounting for the fee estimate. +func (u *utxoNursery) sweepCsvSpendableOutputsTxn(txWeight uint64, + inputs []CsvSpendableOutput) (*wire.MsgTx, error) { + + // Generate the receiving script to which the funds will be swept. + pkScript, err := u.cfg.GenSweepScript() + if err != nil { + return nil, err + } + + // Sum up the total value contained in the inputs. + var totalSum btcutil.Amount + for _, o := range inputs { + totalSum += o.Amount() + } + + // Using the txn weight estimate, compute the required txn fee. + feePerWeight := u.cfg.Estimator.EstimateFeePerWeight(1) + txFee := btcutil.Amount(txWeight * feePerWeight) + + // Sweep as much possible, after subtracting txn fees. + sweepAmt := int64(totalSum - txFee) + + // Create the sweep transaction that we will be building. We use version + // 2 as it is required for CSV. The txn will sweep the amount after fees + // to the pkscript generated above. + sweepTx := wire.NewMsgTx(2) + sweepTx.AddTxOut(&wire.TxOut{ + PkScript: pkScript, + Value: sweepAmt, + }) + + // Add all of our inputs, including the respective CSV delays. + for _, input := range inputs { + sweepTx.AddTxIn(&wire.TxIn{ + PreviousOutPoint: *input.OutPoint(), + // TODO(roasbeef): assumes pure block delays + Sequence: input.BlocksToMaturity(), + }) + } + + // Before signing the transaction, check to ensure that it meets some + // basic validity requirements. + // TODO(conner): add more control to sanity checks, allowing us to delay + // spending "problem" outputs, e.g. possibly batching with other classes + // if fees are too low. + btx := btcutil.NewTx(sweepTx) + if err := blockchain.CheckTransactionSanity(btx); err != nil { + return nil, err + } + + hashCache := txscript.NewTxSigHashes(sweepTx) + + // With all the inputs in place, use each output's unique witness + // function to generate the final witness required for spending. + addWitness := func(idx int, tso CsvSpendableOutput) error { + witness, err := tso.BuildWitness(u.cfg.Signer, sweepTx, hashCache, idx) + if err != nil { + return err + } + + sweepTx.TxIn[idx].Witness = witness + + return nil + } + + for i, input := range inputs { + if err := addWitness(i, input); err != nil { + return nil, err + } + } + + return sweepTx, nil +} + +// sweepGraduatingKinders generates and broadcasts the transaction that +// transfers control of funds from a channel commitment transaction to the +// user's wallet. +func (u *utxoNursery) sweepGraduatingKinders(classHeight uint32, + finalTx *wire.MsgTx, kgtnOutputs []kidOutput) error { + + utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx "+ + "(txid=%v): %v", len(kgtnOutputs), finalTx.TxHash(), + newLogClosure(func() string { + return spew.Sdump(finalTx) + }), + ) + + // With the sweep transaction fully signed, broadcast the transaction + // to the network. Additionally, we can stop tracking these outputs as + // they've just been swept. + // TODO(conner): handle concrete error types returned from publication + if err := u.cfg.PublishTransaction(finalTx); err != nil && + !strings.Contains(err.Error(), "TX rejected:") { + utxnLog.Errorf("unable to broadcast sweep tx: %v, %v", + err, spew.Sdump(finalTx)) + return err + } + + return u.registerSweepConf(finalTx, kgtnOutputs, classHeight) +} + +// registerSweepConf is responsible for registering a finalized kindergarten +// sweep transaction for confirmation notifications. If the confirmation was +// successfully registered, a goroutine will be spawned that waits for the +// confirmation, and graduates the provided kindergarten class within the +// nursery store. +func (u *utxoNursery) registerSweepConf(finalTx *wire.MsgTx, + kgtnOutputs []kidOutput, heightHint uint32) error { + + finalTxID := finalTx.TxHash() + + confChan, err := u.cfg.Notifier.RegisterConfirmationsNtfn( + &finalTxID, u.cfg.ConfDepth, heightHint) + if err != nil { + utxnLog.Errorf("unable to register notification for "+ + "sweep confirmation: %v", finalTxID) + return err + } + + utxnLog.Infof("Registering sweep tx %v for confs at height=%d", + finalTxID, heightHint) + + u.wg.Add(1) + go u.waitForSweepConf(heightHint, kgtnOutputs, confChan) + + return nil +} + +// waitForSweepConf watches for the confirmation of a sweep transaction +// containing a batch of kindergarten outputs. Once confirmation has been +// received, the nursery will mark those outputs as fully graduated, and proceed +// to mark any mature channels as fully closed in channeldb. +// NOTE(conner): this method MUST be called as a go routine. +func (u *utxoNursery) waitForSweepConf(classHeight uint32, + kgtnOutputs []kidOutput, confChan *chainntnfs.ConfirmationEvent) { + + defer u.wg.Done() + + select { + case _, ok := <-confChan.Confirmed: + if !ok { + utxnLog.Errorf("Notification chan closed, can't"+ + " advance %v graduating outputs", + len(kgtnOutputs)) + return + } + + case <-u.quit: + return + } + + u.mu.Lock() + defer u.mu.Unlock() + + // TODO(conner): add retry logic? + + // Mark the confirmed kindergarten outputs as graduated. + if err := u.cfg.Store.GraduateKinder(classHeight); err != nil { + utxnLog.Errorf("Unable to graduate %v kingdergarten outputs: "+ + "%v", len(kgtnOutputs), err) + return + } + + utxnLog.Infof("Graduated %d kindergarten outputs from height=%d", + len(kgtnOutputs), classHeight) + + // Iterate over the kid outputs and construct a set of all channel + // points to which they belong. + var possibleCloses = make(map[wire.OutPoint]struct{}) + for _, kid := range kgtnOutputs { + possibleCloses[*kid.OriginChanPoint()] = struct{}{} + + } + + // Attempt to close each channel, only doing so if all of the channel's + // outputs have been graduated. + for chanPoint := range possibleCloses { + if err := u.closeAndRemoveIfMature(&chanPoint); err != nil { + utxnLog.Errorf("Failed to close and remove channel %v", + chanPoint) + return + } + } +} + +// sweepCribOutput broadcasts the crib output's htlc timeout txn, and sets up a +// notification that will advance it to the kindergarten bucket upon +// confirmation. +func (u *utxoNursery) sweepCribOutput(classHeight uint32, baby *babyOutput) error { + utxnLog.Infof("Publishing CTLV-delayed HTLC output using timeout tx "+ + "(txid=%v): %v", baby.timeoutTx.TxHash(), + newLogClosure(func() string { + return spew.Sdump(baby.timeoutTx) + }), + ) + + // Broadcast HTLC transaction + // TODO(conner): handle concrete error types returned from publication + err := u.cfg.PublishTransaction(baby.timeoutTx) + if err != nil && + !strings.Contains(err.Error(), "TX rejected:") { + utxnLog.Errorf("Unable to broadcast baby tx: "+ + "%v, %v", err, + spew.Sdump(baby.timeoutTx)) + return err + } + + return u.registerTimeoutConf(baby, classHeight) +} + +// registerTimeoutConf is responsible for subscribing to confirmation +// notification for an htlc timeout transaction. If successful, a goroutine will +// be spawned that will transition the provided baby output into the +// kindergarten state within the nursery store. +func (u *utxoNursery) registerTimeoutConf(baby *babyOutput, heightHint uint32) error { + + birthTxID := baby.timeoutTx.TxHash() + + // Register for the confirmation of presigned htlc txn. + confChan, err := u.cfg.Notifier.RegisterConfirmationsNtfn( + &birthTxID, u.cfg.ConfDepth, heightHint) + if err != nil { + return err + } + + utxnLog.Infof("Htlc output %v registered for promotion "+ + "notification.", baby.OutPoint()) + + u.wg.Add(1) + go u.waitForTimeoutConf(baby, confChan) + + return nil +} + +// waitForTimeoutConf watches for the confirmation of an htlc timeout +// transaction, and attempts to move the htlc output from the crib bucket to the +// kindergarten bucket upon success. +func (u *utxoNursery) waitForTimeoutConf(baby *babyOutput, + confChan *chainntnfs.ConfirmationEvent) { + + defer u.wg.Done() + + select { + case txConfirmation, ok := <-confChan.Confirmed: + if !ok { + utxnLog.Errorf("Notification chan "+ + "closed, can't advance baby output %v", + baby.OutPoint()) + return + } + + baby.SetConfHeight(txConfirmation.BlockHeight) + + case <-u.quit: + return + } + + u.mu.Lock() + defer u.mu.Unlock() + + // TODO(conner): add retry logic? + + err := u.cfg.Store.CribToKinder(baby) + if err != nil { + utxnLog.Errorf("Unable to move htlc output from "+ + "crib to kindergarten bucket: %v", err) + return + } + + utxnLog.Infof("Htlc output %v promoted to "+ + "kindergarten", baby.OutPoint()) +} + +// registerCommitConf is responsible for subscribing to the confirmation of a +// commitment transaction. If successful, the provided preschool output will be +// moved persistently into the kindergarten state within the nursery store. +func (u *utxoNursery) registerCommitConf(kid *kidOutput, heightHint uint32) error { + txID := kid.OutPoint().Hash + + confChan, err := u.cfg.Notifier.RegisterConfirmationsNtfn(&txID, + u.cfg.ConfDepth, heightHint) + if err != nil { + return err + } + + utxnLog.Infof("Commitment outpoint %v registered for "+ + "confirmation notification.", kid.OutPoint()) + + u.wg.Add(1) + go u.waitForCommitConf(kid, confChan) + + return nil +} + +// waitForCommitConf is intended to be run as a goroutine that will wait until a +// channel force close commitment transaction has been included in a confirmed +// block. Once the transaction has been confirmed (as reported by the Chain +// Notifier), waitForCommitConf will delete the output from the "preschool" +// database bucket and atomically add it to the "kindergarten" database bucket. +// This is the second step in the output incubation process. +func (u *utxoNursery) waitForCommitConf(kid *kidOutput, + confChan *chainntnfs.ConfirmationEvent) { + + defer u.wg.Done() + + select { + case txConfirmation, ok := <-confChan.Confirmed: + if !ok { + utxnLog.Errorf("Notification chan "+ + "closed, can't advance output %v", + kid.OutPoint()) + return + } + + kid.SetConfHeight(txConfirmation.BlockHeight) + + case <-u.quit: + return + } + + u.mu.Lock() + defer u.mu.Unlock() + + // TODO(conner): add retry logic? + + err := u.cfg.Store.PreschoolToKinder(kid) + if err != nil { + utxnLog.Errorf("Unable to move commitment output "+ + "from preschool to kindergarten bucket: %v", + err) + return + } + + utxnLog.Infof("Commitment output %v promoted to "+ + "kindergarten", kid.OutPoint()) +} + // contractMaturityReport is a report that details the maturity progress of a // particular force closed contract. type contractMaturityReport struct { @@ -385,9 +1209,15 @@ type contractMaturityReport struct { // contract. limboBalance btcutil.Amount - // confirmationHeight is the block height that this output originally - // confirmed at. - confirmationHeight uint32 + // recoveredBalance is the total value that has been successfully swept + // back to the user's wallet. + recoveredBalance btcutil.Amount + + // localAmount is the local value of the commitment output. + localAmount btcutil.Amount + + // confHeight is the block height that this output originally confirmed. + confHeight uint32 // maturityRequirement is the input age required for this output to // reach maturity. @@ -396,535 +1226,167 @@ type contractMaturityReport struct { // maturityHeight is the absolute block height that this output will // mature at. maturityHeight uint32 + + // htlcs records a maturity report for each htlc output in this channel. + htlcs []htlcMaturityReport } -// NurseryReport attempts to return a nursery report stored for the target -// outpoint. A nursery report details the maturity/sweeping progress for a -// contract that was previously force closed. If a report entry for the target -// chanPoint is unable to be constructed, then an error will be returned. -func (u *utxoNursery) NurseryReport(chanPoint *wire.OutPoint) (*contractMaturityReport, error) { - var report *contractMaturityReport - if err := u.db.View(func(tx *bolt.Tx) error { - // First we'll examine the preschool bucket as the target - // contract may not yet have been confirmed. - psclBucket := tx.Bucket(preschoolBucket) - if psclBucket == nil { - return nil - } - psclIndex := tx.Bucket(preschoolIndex) - if psclIndex == nil { - return nil - } +// htlcMaturityReport provides a summary of a single htlc output, and is +// embedded as party of the overarching contractMaturityReport +type htlcMaturityReport struct { + // outpoint is the final output that will be swept back to the wallet. + outpoint wire.OutPoint - var b bytes.Buffer - if err := writeOutpoint(&b, chanPoint); err != nil { - return err - } - chanPointBytes := b.Bytes() + // amount is the final value that will be swept in back to the wallet. + amount btcutil.Amount - var outputReader *bytes.Reader + // confHeight is the block height that this output originally confirmed. + confHeight uint32 - // If the target contract hasn't been confirmed yet, then we - // can just construct the report from this information. - if outPoint := psclIndex.Get(chanPointBytes); outPoint != nil { - // The channel entry hasn't yet been fully confirmed - // yet, so we'll dig into the preschool bucket to fetch - // the channel information. - outputBytes := psclBucket.Get(outPoint) - if outputBytes == nil { - return nil - } + // maturityRequirement is the input age required for this output to + // reach maturity. + maturityRequirement uint32 - outputReader = bytes.NewReader(outputBytes) - } else { - // Otherwise, we'll have to consult out contract index, - // so fetch that bucket as well as the kindergarten - // bucket. - indexBucket := tx.Bucket(contractIndex) - if indexBucket == nil { - return fmt.Errorf("contract not found, " + - "contract index not populated") - } - kgtnBucket := tx.Bucket(kindergartenBucket) - if kgtnBucket == nil { - return fmt.Errorf("contract not found, " + - "kindergarten bucket not populated") - } + // maturityHeight is the absolute block height that this output will + // mature at. + maturityHeight uint32 - // Attempt to query the index to see if we have an - // entry for this particular contract. - indexInfo := indexBucket.Get(chanPointBytes) - if indexInfo == nil { - return ErrContractNotFound - } + // stage indicates whether the htlc is in the CLTV-timeout stage (1) or + // the CSV-delay stage (2). A stage 1 htlc's maturity height will be set + // to it's expiry height, while a stage 2 htlc's maturity height will be + // set to it's confirmation height plus the maturity requirement. + stage uint32 +} - // If an entry is found, then using the height store in - // the first 4 bytes, we'll fetch the height that this - // entry matures at. - height := indexInfo[:4] - heightRow := kgtnBucket.Get(height) - if heightRow == nil { - return ErrContractNotFound - } +// AddLimboCommitment adds an incubating commitment output to maturity +// report's htlcs, and contributes its amount to the limbo balance. +func (c *contractMaturityReport) AddLimboCommitment(kid *kidOutput) { + c.limboBalance += kid.Amount() - // Once we have the entry itself, we'll slice of the - // last for bytes so we can seek into this row to fetch - // the contract's information. - offset := byteOrder.Uint32(indexInfo[4:]) - outputReader = bytes.NewReader(heightRow[offset:]) - } + c.localAmount += kid.Amount() + c.confHeight = kid.ConfHeight() + c.maturityRequirement = kid.BlocksToMaturity() - // With the proper set of bytes received, we'll deserialize the - // information for this immature output. - var immatureOutput kidOutput - if err := immatureOutput.Decode(outputReader); err != nil { - return err - } - - // TODO(roasbeef): should actually be list of outputs - report = &contractMaturityReport{ - chanPoint: *chanPoint, - limboBalance: immatureOutput.Amount(), - maturityRequirement: immatureOutput.BlocksToMaturity(), - } - - // If the confirmation height is set, then this means the - // contract has been confirmed, and we know the final maturity - // height. - if immatureOutput.ConfHeight() != 0 { - report.confirmationHeight = immatureOutput.ConfHeight() - report.maturityHeight = (immatureOutput.BlocksToMaturity() + - immatureOutput.ConfHeight()) - } - - return nil - }); err != nil { - return nil, err + // If the confirmation height is set, then this means the contract has + // been confirmed, and we know the final maturity height. + if kid.ConfHeight() != 0 { + c.maturityHeight = kid.BlocksToMaturity() + kid.ConfHeight() } - - return report, nil } -// enterPreschool is the first stage in the process of transferring funds from -// a force closed channel into the user's wallet. When an output is in the -// "preschool" stage, the daemon is waiting for the initial confirmation of the -// commitment transaction. -func (k *kidOutput) enterPreschool(db *channeldb.DB) error { - return db.Update(func(tx *bolt.Tx) error { - psclBucket, err := tx.CreateBucketIfNotExists(preschoolBucket) - if err != nil { - return err - } - psclIndex, err := tx.CreateBucketIfNotExists(preschoolIndex) - if err != nil { - return err - } +// AddRecoveredCommitment adds a graduated commitment output to maturity +// report's htlcs, and contributes its amount to the recovered balance. +func (c *contractMaturityReport) AddRecoveredCommitment(kid *kidOutput) { + c.recoveredBalance += kid.Amount() - // Once we have the buckets we can insert the raw bytes of the - // immature outpoint into the preschool bucket. - var outpointBytes bytes.Buffer - if err := writeOutpoint(&outpointBytes, k.OutPoint()); err != nil { - return err - } - var kidBytes bytes.Buffer - if err := k.Encode(&kidBytes); err != nil { - return err - } - err = psclBucket.Put(outpointBytes.Bytes(), kidBytes.Bytes()) - if err != nil { - return err - } + c.localAmount += kid.Amount() + c.confHeight = kid.ConfHeight() + c.maturityRequirement = kid.BlocksToMaturity() + c.maturityHeight = kid.BlocksToMaturity() + kid.ConfHeight() +} - // Additionally, we'll populate the preschool index so we can - // track all the immature outpoints for a particular channel's - // chanPoint. - var b bytes.Buffer - err = writeOutpoint(&b, k.OriginChanPoint()) - if err != nil { - return err - } - err = psclIndex.Put(b.Bytes(), outpointBytes.Bytes()) - if err != nil { - return err - } +// AddLimboStage1Htlc adds an htlc crib output to the maturity report's +// htlcs, and contributes its amount to the limbo balance. +func (c *contractMaturityReport) AddLimboStage1Htlc(baby *babyOutput) { + c.limboBalance += baby.Amount() - utxnLog.Infof("Outpoint %v now in preschool, waiting for "+ - "initial confirmation", k.OutPoint()) - - return nil + c.htlcs = append(c.htlcs, htlcMaturityReport{ + outpoint: *baby.OutPoint(), + amount: baby.Amount(), + confHeight: baby.ConfHeight(), + maturityHeight: baby.expiry, + stage: 1, }) } -// waitForPromotion is intended to be run as a goroutine that will wait until a -// channel force close commitment transaction has been included in a confirmed -// block. Once the transaction has been confirmed (as reported by the Chain -// Notifier), waitForPromotion will delete the output from the "preschool" -// database bucket and atomically add it to the "kindergarten" database bucket. -// This is the second step in the output incubation process. -func (k *kidOutput) waitForPromotion(db *channeldb.DB, confChan *chainntnfs.ConfirmationEvent) { - txConfirmation, ok := <-confChan.Confirmed - if !ok { - utxnLog.Errorf("notification chan "+ - "closed, can't advance output %v", k.OutPoint()) - return +// AddLimboStage2Htlc adds an htlc kindergarten output to the maturity report's +// htlcs, and contributes its amount to the limbo balance. +func (c *contractMaturityReport) AddLimboStage2Htlc(kid *kidOutput) { + c.limboBalance += kid.Amount() + + htlcReport := htlcMaturityReport{ + outpoint: *kid.OutPoint(), + amount: kid.Amount(), + confHeight: kid.ConfHeight(), + maturityRequirement: kid.BlocksToMaturity(), + stage: 2, } - utxnLog.Infof("Outpoint %v confirmed in block %v moving to kindergarten", - k.OutPoint(), txConfirmation.BlockHeight) + // If the confirmation height is set, then this means the first stage + // has been confirmed, and we know the final maturity height of the CSV + // delay. + if kid.ConfHeight() != 0 { + htlcReport.maturityHeight = kid.ConfHeight() + kid.BlocksToMaturity() + } - k.SetConfHeight(txConfirmation.BlockHeight) + c.htlcs = append(c.htlcs, htlcReport) +} - // The following block deletes a kidOutput from the preschool database - // bucket and adds it to the kindergarten database bucket which is - // keyed by block height. Keys and values are serialized into byte - // array form prior to database insertion. - err := db.Update(func(tx *bolt.Tx) error { - var originPoint bytes.Buffer - if err := writeOutpoint(&originPoint, k.OriginChanPoint()); err != nil { - return err - } +// AddRecoveredHtlc adds an graduate output to the maturity report's htlcs, and +// contributes its amount to the recovered balance. +func (c *contractMaturityReport) AddRecoveredHtlc(kid *kidOutput) { + c.recoveredBalance += kid.Amount() - psclBucket := tx.Bucket(preschoolBucket) - if psclBucket == nil { - return errors.New("unable to open preschool bucket") - } - psclIndex := tx.Bucket(preschoolIndex) - if psclIndex == nil { - return errors.New("unable to open preschool index") - } - - // Now that the entry has been confirmed, in order to move it - // along in the maturity pipeline we first delete the entry - // from the preschool bucket, as well as the secondary index. - var outpointBytes bytes.Buffer - if err := writeOutpoint(&outpointBytes, k.OutPoint()); err != nil { - return err - } - if err := psclBucket.Delete(outpointBytes.Bytes()); err != nil { - utxnLog.Errorf("unable to delete kindergarten output from "+ - "preschool bucket: %v", k.OutPoint()) - return err - } - if err := psclIndex.Delete(originPoint.Bytes()); err != nil { - utxnLog.Errorf("unable to delete kindergarten output from "+ - "preschool index: %v", k.OutPoint()) - return err - } - - // Next, fetch the kindergarten bucket. This output will remain - // in this bucket until it's fully mature. - kgtnBucket, err := tx.CreateBucketIfNotExists(kindergartenBucket) - if err != nil { - return err - } - - maturityHeight := k.ConfHeight() + k.BlocksToMaturity() - - heightBytes := make([]byte, 4) - byteOrder.PutUint32(heightBytes, maturityHeight) - - // If there're any existing outputs for this particular block - // height target, then we'll append this new output to the - // serialized list of outputs. - var existingOutputs []byte - if results := kgtnBucket.Get(heightBytes); results != nil { - existingOutputs = results - } - - // We'll grab the output's offset in the value for its maturity - // height so we can add this to the contract index. - outputOffset := len(existingOutputs) - - b := bytes.NewBuffer(existingOutputs) - if err := k.Encode(b); err != nil { - return err - } - if err := kgtnBucket.Put(heightBytes, b.Bytes()); err != nil { - return err - } - - // Finally, we'll insert a new entry into the contract index. - // The entry itself consists of 4 bytes for the height, and 4 - // bytes for the offset within the value for the height. - var indexEntry [4 + 4]byte - copy(indexEntry[:4], heightBytes) - byteOrder.PutUint32(indexEntry[4:], uint32(outputOffset)) - - indexBucket, err := tx.CreateBucketIfNotExists(contractIndex) - if err != nil { - return err - } - err = indexBucket.Put(originPoint.Bytes(), indexEntry[:]) - if err != nil { - return err - } - - utxnLog.Infof("Outpoint %v now in kindergarten, will mature "+ - "at height %v (delay of %v)", k.OutPoint(), - maturityHeight, k.BlocksToMaturity()) - return nil + c.htlcs = append(c.htlcs, htlcMaturityReport{ + outpoint: *kid.OutPoint(), + amount: kid.Amount(), + confHeight: kid.ConfHeight(), + maturityRequirement: kid.BlocksToMaturity(), + maturityHeight: kid.ConfHeight() + kid.BlocksToMaturity(), }) - if err != nil { - utxnLog.Errorf("unable to move kid output from preschool bucket "+ - "to kindergarten bucket: %v", err) - } + } -// graduateKindergarten handles the steps invoked with moving funds from a -// force close commitment transaction into a user's wallet after the output -// from the commitment transaction has become spendable. graduateKindergarten -// is called both when a new block notification has been received and also at -// startup in order to process graduations from blocks missed while the UTXO -// nursery was offline. -// TODO(roasbeef): single db transaction for the below -func (u *utxoNursery) graduateKindergarten(blockHeight uint32) error { - // First fetch the set of outputs that we can "graduate" at this - // particular block height. We can graduate an output once we've - // reached its height maturity. - kgtnOutputs, err := fetchGraduatingOutputs(u.db, u.wallet, blockHeight) - if err != nil { - return err - } - - // If we're able to graduate any outputs, then create a single - // transaction which sweeps them all into the wallet. - if len(kgtnOutputs) > 0 { - err := sweepGraduatingOutputs(u.wallet, kgtnOutputs) - if err != nil { - return err - } - - // Now that the sweeping transaction has been broadcast, for - // each of the immature outputs, we'll mark them as being fully - // closed within the database. - for _, closedChan := range kgtnOutputs { - err := u.db.MarkChanFullyClosed(closedChan.OriginChanPoint()) - if err != nil { - return err - } - } - } - - // Using a re-org safety margin of 6-blocks, delete any outputs which - // have graduated 6 blocks ago. - deleteHeight := blockHeight - 6 - if err := deleteGraduatedOutputs(u.db, deleteHeight); err != nil { - return err - } - - // Finally, record the last height at which we graduated outputs so we - // can reconcile our state with that of the main-chain during restarts. - return putLastHeightGraduated(u.db, blockHeight) -} - -// fetchGraduatingOutputs checks the "kindergarten" database bucket whenever a -// new block is received in order to determine if commitment transaction -// outputs have become newly spendable. If fetchGraduatingOutputs finds outputs -// that are ready for "graduation," it passes them on to be swept. This is the -// third step in the output incubation process. -func fetchGraduatingOutputs(db *channeldb.DB, wallet *lnwallet.LightningWallet, - blockHeight uint32) ([]*kidOutput, error) { - - var results []byte - if err := db.View(func(tx *bolt.Tx) error { - // A new block has just been connected, check to see if we have - // any new outputs that can be swept into the wallet. - kgtnBucket := tx.Bucket(kindergartenBucket) - if kgtnBucket == nil { - return nil - } - - heightBytes := make([]byte, 4) - byteOrder.PutUint32(heightBytes, blockHeight) - - results = kgtnBucket.Get(heightBytes) +// closeAndRemoveIfMature removes a particular channel from the channel index +// if and only if all of its outputs have been marked graduated. If the channel +// still has ungraduated outputs, the method will succeed without altering the +// database state. +func (u *utxoNursery) closeAndRemoveIfMature(chanPoint *wire.OutPoint) error { + isMature, err := u.cfg.Store.IsMatureChannel(chanPoint) + if err == ErrContractNotFound { return nil - }); err != nil { - return nil, err - } - - // If no time-locked outputs can be swept at this point, then we can - // exit early. - if len(results) == 0 { - return nil, nil - } - - // Otherwise, we deserialize the list of kid outputs into their full - // forms. - kgtnOutputs, err := deserializeKidList(bytes.NewReader(results)) - if err != nil { - utxnLog.Errorf("error while deserializing list of kidOutputs: %v", err) - } - - // For each of the outputs, we also generate its proper witness - // function based on its witness type. This varies if the output is on - // our commitment transaction or theirs, and also if it's an HTLC - // output or not. - for _, kgtnOutput := range kgtnOutputs { - kgtnOutput.witnessFunc = kgtnOutput.witnessType.GenWitnessFunc( - wallet.Cfg.Signer, kgtnOutput.SignDesc()) - } - - utxnLog.Infof("New block: height=%v, sweeping %v mature outputs", - blockHeight, len(kgtnOutputs)) - - return kgtnOutputs, nil -} - -// sweepGraduatingOutputs generates and broadcasts the transaction that -// transfers control of funds from a channel commitment transaction to the -// user's wallet. -func sweepGraduatingOutputs(wallet *lnwallet.LightningWallet, kgtnOutputs []*kidOutput) error { - // Create a transaction which sweeps all the newly mature outputs into - // a output controlled by the wallet. - // TODO(roasbeef): can be more intelligent about buffering outputs to - // be more efficient on-chain. - sweepTx, err := createSweepTx(wallet, kgtnOutputs) - if err != nil { - // TODO(roasbeef): retry logic? - utxnLog.Errorf("unable to create sweep tx: %v", err) + } else if err != nil { + utxnLog.Errorf("Unable to determine maturity of "+ + "channel=%s", chanPoint) return err } - utxnLog.Infof("Sweeping %v time-locked outputs "+ - "with sweep tx (txid=%v): %v", len(kgtnOutputs), - sweepTx.TxHash(), - newLogClosure(func() string { - return spew.Sdump(sweepTx) - })) + // Nothing to do if we are still incubating. + if !isMature { + return nil + } - // With the sweep transaction fully signed, broadcast the transaction - // to the network. Additionally, we can stop tracking these outputs as - // they've just been swept. - if err := wallet.PublishTransaction(sweepTx); err != nil { - utxnLog.Errorf("unable to broadcast sweep tx: %v, %v", - err, spew.Sdump(sweepTx)) + // Now that the sweeping transaction has been broadcast, for + // each of the immature outputs, we'll mark them as being fully + // closed within the database. + err = u.cfg.DB.MarkChanFullyClosed(chanPoint) + if err != nil { + utxnLog.Errorf("Unable to mark channel=%v as fully "+ + "closed: %v", chanPoint, err) return err } + utxnLog.Infof("Marked Channel(%s) as fully closed", chanPoint) + + // Now that the channel is fully closed, we remove the channel from the + // nursery store here. This preserves the invariant that we never remove + // a channel unless it is mature, as this is the only place the utxo + // nursery removes a channel. + if err := u.cfg.Store.RemoveChannel(chanPoint); err != nil { + utxnLog.Errorf("Unable to remove channel=%s from "+ + "nursery store: %v", chanPoint, err) + return err + } + + utxnLog.Infof("Removed channel %v from nursery store", chanPoint) + return nil } -// createSweepTx creates a final sweeping transaction with all witnesses in -// place for all inputs. The created transaction has a single output sending -// all the funds back to the source wallet. -func createSweepTx(wallet *lnwallet.LightningWallet, - matureOutputs []*kidOutput) (*wire.MsgTx, error) { - - pkScript, err := newSweepPkScript(wallet) - if err != nil { - return nil, err - } - - var totalSum btcutil.Amount - for _, o := range matureOutputs { - totalSum += o.Amount() - } - - sweepTx := wire.NewMsgTx(2) - sweepTx.AddTxOut(&wire.TxOut{ - PkScript: pkScript, - Value: int64(totalSum - 5000), - }) - for _, utxo := range matureOutputs { - sweepTx.AddTxIn(&wire.TxIn{ - PreviousOutPoint: *utxo.OutPoint(), - // TODO(roasbeef): assumes pure block delays - Sequence: utxo.BlocksToMaturity(), - }) - } - - // TODO(roasbeef): insert fee calculation - // * remove hardcoded fee above - - // With all the inputs in place, use each output's unique witness - // function to generate the final witness required for spending. - hashCache := txscript.NewTxSigHashes(sweepTx) - for i, txIn := range sweepTx.TxIn { - witness, err := matureOutputs[i].witnessFunc(sweepTx, hashCache, i) - if err != nil { - return nil, err - } - - txIn.Witness = witness - } - - return sweepTx, nil -} - -// deleteGraduatedOutputs removes outputs from the kindergarten database bucket -// when six blockchain confirmations have passed since the outputs were swept. -// We wait for six confirmations to ensure that the outputs will be swept if a -// chain reorganization occurs. This is the final step in the output incubation -// process. -func deleteGraduatedOutputs(db *channeldb.DB, deleteHeight uint32) error { - return db.Update(func(tx *bolt.Tx) error { - kgtnBucket := tx.Bucket(kindergartenBucket) - if kgtnBucket == nil { - return nil - } - - heightBytes := make([]byte, 4) - byteOrder.PutUint32(heightBytes, deleteHeight) - results := kgtnBucket.Get(heightBytes) - if results == nil { - return nil - } - - // Delete the row for this height within the kindergarten bucket.k - if err := kgtnBucket.Delete(heightBytes); err != nil { - return err - } - - sweptOutputs, err := deserializeKidList(bytes.NewBuffer(results)) - if err != nil { - return err - } - utxnLog.Infof("Deleting %v swept outputs from kindergarten bucket "+ - "at block height: %v", len(sweptOutputs), deleteHeight) - - // Additionally, for each output that has now been fully swept, - // we'll also remove the index entry for that output. - indexBucket := tx.Bucket(contractIndex) - if indexBucket == nil { - return nil - } - for _, sweptOutput := range sweptOutputs { - var chanPoint bytes.Buffer - err := writeOutpoint(&chanPoint, sweptOutput.OriginChanPoint()) - if err != nil { - return err - } - - if err := indexBucket.Delete(chanPoint.Bytes()); err != nil { - return err - } - } - - return nil - }) -} - -// putLastHeightGraduated persists the most recently processed blockheight -// to the database. This blockheight is used during restarts to determine if -// blocks were missed while the UTXO Nursery was offline. -func putLastHeightGraduated(db *channeldb.DB, blockheight uint32) error { - return db.Update(func(tx *bolt.Tx) error { - kgtnBucket, err := tx.CreateBucketIfNotExists(kindergartenBucket) - if err != nil { - return nil - } - - heightBytes := make([]byte, 4) - byteOrder.PutUint32(heightBytes, blockheight) - return kgtnBucket.Put(lastGraduatedHeightKey, heightBytes) - }) -} - // newSweepPkScript creates a new public key script which should be used to // sweep any time-locked, or contested channel funds into the wallet. -// Specifically, the script generated is a version 0, -// pay-to-witness-pubkey-hash (p2wkh) output. +// Specifically, the script generated is a version 0, pay-to-witness-pubkey-hash +// (p2wkh) output. func newSweepPkScript(wallet lnwallet.WalletController) ([]byte, error) { sweepAddr, err := wallet.NewAddress(lnwallet.WitnessPubKey, false) if err != nil { @@ -934,26 +1396,6 @@ func newSweepPkScript(wallet lnwallet.WalletController) ([]byte, error) { return txscript.PayToAddrScript(sweepAddr) } -// deserializedKidList takes a sequence of serialized kid outputs and returns a -// slice of kidOutput structs. -func deserializeKidList(r io.Reader) ([]*kidOutput, error) { - var kidOutputs []*kidOutput - - for { - var kid = &kidOutput{} - if err := kid.Decode(r); err != nil { - if err == io.EOF { - break - } else { - return nil, err - } - } - kidOutputs = append(kidOutputs, kid) - } - - return kidOutputs, nil -} - // CsvSpendableOutput is a SpendableOutput that contains all of the information // necessary to construct, sign, and sweep an output locked with a CSV delay. type CsvSpendableOutput interface { @@ -977,12 +1419,13 @@ type CsvSpendableOutput interface { OriginChanPoint() *wire.OutPoint } -// babyOutput is an HTLC output that is in the earliest stage of upbringing. -// Each babyOutput carries a presigned timeout transction, which should be -// broadcast at the appropriate CLTV expiry, and its future kidOutput self. If -// all goes well, and the timeout transaction is successfully confirmed, the -// the now-mature kidOutput will be unwrapped and continue its journey through -// the nursery. +// babyOutput represents a two-stage CSV locked output, and is used to track +// htlc outputs through incubation. The first stage requires broadcasting a +// presigned timeout txn that spends from the CLTV locked output on the +// commitment txn. A babyOutput is treated as a subset of CsvSpendableOutputs, +// with the additional constraint that a transaction must be broadcast before it +// can be spent. Each baby transaction embeds the kidOutput that can later be +// used to spend the CSV output contained in the timeout txn. type babyOutput struct { // expiry is the absolute block height at which the timeoutTx should be // broadcast to the network. @@ -992,8 +1435,8 @@ type babyOutput struct { // transitions the htlc into the delay+claim stage. timeoutTx *wire.MsgTx - // kidOutput represents the CSV output to be swept after the timeoutTx has - // been broadcast and confirmed. + // kidOutput represents the CSV output to be swept from the timeoutTx + // after it has been broadcast and confirmed. kidOutput }