diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 87e2767b..84886ea9 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -250,7 +250,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, signer := &mockSigner{ key: alicePrivKey, } - bio := &mockChainIO{} + bio := &mockChainIO{ + bestHeight: fundingBroadcastHeight, + } dbDir := filepath.Join(tempTestDir, "cdb") cdb, err := channeldb.Open(dbDir) diff --git a/mock.go b/mock.go index 34276bee..b323dcc7 100644 --- a/mock.go +++ b/mock.go @@ -168,10 +168,12 @@ func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, } } -type mockChainIO struct{} +type mockChainIO struct { + bestHeight int32 +} -func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) { - return activeNetParams.GenesisHash, fundingBroadcastHeight, nil +func (m *mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) { + return activeNetParams.GenesisHash, m.bestHeight, nil } func (*mockChainIO) GetUtxo(op *wire.OutPoint, _ []byte, diff --git a/nursery_store.go b/nursery_store.go index 95ce5847..eec8e74a 100644 --- a/nursery_store.go +++ b/nursery_store.go @@ -23,21 +23,6 @@ import ( // // utxn/ // | -// | LAST PURGED + FINALIZED HEIGHTS -// | -// | Each nursery store tracks a "last graduated height", which records the -// | most recent block height for which the nursery store has successfully -// | graduated all outputs. It also tracks a "last finalized height", which -// | records the last block height that the nursery attempted to graduate -// | If a finalized height has kindergarten outputs, the sweep txn for these -// | outputs will be stored in the height bucket. This ensure that the same -// | txid will be used after restarts. Otherwise, the nursery will be unable -// | to recover the txid of kindergarten sweep transaction it has already -// | broadcast. -// | -// ├── last-finalized-height-key: -// ├── last-graduated-height-key: -// | // | CHANNEL INDEX // | // | The channel index contains a directory for each channel that has a @@ -72,10 +57,7 @@ import ( // | relative file path: // | e.g. // // | that can be queried in the channel index to retrieve the serialized -// | output. If a height bucket is less than or equal to the current last -// | finalized height and has a non-zero number of kindergarten outputs, a -// | height bucket will also contain the finalized kindergarten sweep txn -// | under the "finalized-kndr-txn" key. +// | output. // | // └── height-index-key/ //    ├── / <- HEIGHT BUCKET @@ -84,12 +66,15 @@ import ( // | |    └── : "" // |   ├── / // | |    └── : "" -//    | └── finalized-kndr-txn: "" | //    └── / //    └── / //    └── : "" //    └── : "" +// TODO(joostjager): Add database migration to clean up now unused last +// graduated height and finalized txes. This also prevents people downgrading +// and surprising the downgraded nursery with missing data. + // NurseryStore abstracts the persistent storage layer for the utxo nursery. // Concretely, it stores commitment and htlc outputs until any time-bounded // constraints have fully matured. The store exposes methods for enumerating its @@ -110,47 +95,30 @@ type NurseryStore interface { CribToKinder(*babyOutput) error // PreschoolToKinder atomically moves a kidOutput from the preschool - // bucket to the kindergarten bucket. This transition should be - // executed after receiving confirmation of the preschool output. - // Incoming HTLC's we need to go to the second-layer to claim, and also - // our commitment outputs fall into this class. - PreschoolToKinder(*kidOutput) error + // bucket to the kindergarten bucket. This transition should be executed + // after receiving confirmation of the preschool output. Incoming HTLC's + // we need to go to the second-layer to claim, and also our commitment + // outputs fall into this class. + // + // An additional parameter specifies the last graduated height. This is + // used in case of late registration. It schedules the output for sweep + // at the next epoch even though it has already expired earlier. + PreschoolToKinder(kid *kidOutput, lastGradHeight uint32) error - // GraduateKinder atomically moves the kindergarten class at the - // provided height into the graduated status. This involves removing the - // kindergarten entries from both the height and channel indexes, and - // cleaning up the finalized kindergarten sweep txn. The height bucket - // will be opportunistically pruned from the height index as outputs are + // GraduateKinder atomically moves an output at the provided height into + // the graduated status. This involves removing the kindergarten entries + // from both the height and channel indexes. The height bucket will be + // opportunistically pruned from the height index as outputs are // removed. - GraduateKinder(height uint32) error + GraduateKinder(height uint32, output *kidOutput) error // FetchPreschools returns a list of all outputs currently stored in // the preschool bucket. FetchPreschools() ([]kidOutput, error) // FetchClass returns a list of kindergarten and crib outputs whose - // timelocks expire at the given height. If the kindergarten class at - // this height hash been finalized previously, via FinalizeKinder, it - // will also returns the finalized kindergarten sweep txn. - FetchClass(height uint32) (*wire.MsgTx, []kidOutput, []babyOutput, error) - - // FinalizeKinder accepts a block height and the kindergarten sweep txn - // computed for this height. Upon startup, we will rebroadcast any - // finalized kindergarten txns instead of signing a new txn, as this - // result in a different txid from a preceding broadcast. - FinalizeKinder(height uint32, tx *wire.MsgTx) error - - // LastFinalizedHeight returns the last block height for which the - // nursery store finalized a kindergarten class. - LastFinalizedHeight() (uint32, error) - - // GraduateHeight records the provided height as the last height for - // which the nursery store successfully graduated all outputs. - GraduateHeight(height uint32) error - - // LastGraduatedHeight returns the last block height for which the - // nursery store successfully graduated all outputs. - LastGraduatedHeight() (uint32, error) + // timelocks expire at the given height. + FetchClass(height uint32) ([]kidOutput, []babyOutput, error) // HeightsBelowOrEqual returns the lowest non-empty heights in the // height index, that exist at or below the provided upper bound. @@ -181,14 +149,6 @@ var ( // the root-level, chain-segmented bucket for each nursery store. utxnChainPrefix = []byte("utxn") - // lastFinalizedHeightKey is a static key used to locate nursery store's - // last finalized height. - lastFinalizedHeightKey = []byte("last-finalized-height") - - // lastGraduatedHeightKey is a static key used to retrieve the height of - // the last bucket that successfully graduated all outputs. - lastGraduatedHeightKey = []byte("last-graduated-height") - // channelIndexKey is a static key used to lookup the bucket containing // all of the nursery's active channels. channelIndexKey = []byte("channel-index") @@ -197,10 +157,6 @@ var ( // containing all heights for which the nursery will need to take // action. heightIndexKey = []byte("height-index") - - // finalizedKndrTxnKey is a static key that can be used to locate a - // finalized kindergarten sweep txn. - finalizedKndrTxnKey = []byte("finalized-kndr-txn") ) // Defines the state prefixes that will be used to persistently track an @@ -415,7 +371,9 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error { // PreschoolToKinder atomically moves a kidOutput from the preschool bucket to // the kindergarten bucket. This transition should be executed after receiving // confirmation of the preschool output's commitment transaction. -func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error { +func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput, + lastGradHeight uint32) error { + return ns.db.Update(func(tx *bbolt.Tx) error { // Create or retrieve the channel bucket corresponding to the // kid output's origin channel point. @@ -478,13 +436,6 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error { maturityHeight = kid.ConfHeight() + kid.BlocksToMaturity() } - // In the case of a Late Registration, we've already graduated - // the class that this kid is destined for. So we'll bump its - // height by one to ensure we don't forget to graduate it. - lastGradHeight, err := ns.getLastGraduatedHeight(tx) - if err != nil { - return err - } if maturityHeight <= lastGradHeight { utxnLog.Debugf("Late Registration for kid output=%v "+ "detected: class_height=%v, "+ @@ -515,108 +466,66 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error { }) } -// GraduateKinder atomically moves the kindergarten class at the provided height -// into the graduated status. This involves removing the kindergarten entries -// from both the height and channel indexes, and cleaning up the finalized -// kindergarten sweep txn. The height bucket will be opportunistically pruned -// from the height index as outputs are removed. -func (ns *nurseryStore) GraduateKinder(height uint32) error { +// GraduateKinder atomically moves an output at the provided height into the +// graduated status. This involves removing the kindergarten entries from both +// the height and channel indexes. The height bucket will be opportunistically +// pruned from the height index as outputs are removed. +func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error { return ns.db.Update(func(tx *bbolt.Tx) error { - // Since all kindergarten outputs at a particular height are - // swept in a single txn, we can now safely delete the finalized - // txn, since it has already been broadcast and confirmed. hghtBucket := ns.getHeightBucket(tx, height) if hghtBucket == nil { // Nothing to delete, bucket has already been removed. return nil } - // Remove the finalized kindergarten txn, we do this before - // removing the outputs so that the extra entry doesn't prevent - // the height bucket from being opportunistically pruned below. - if err := hghtBucket.Delete(finalizedKndrTxnKey); err != nil { + // For the kindergarten output, delete its entry from the + // height and channel index, and create a new grad output in the + // channel index. + outpoint := kid.OutPoint() + chanPoint := kid.OriginChanPoint() + + // Construct the key under which the output is + // currently stored height and channel indexes. + pfxOutputKey, err := prefixOutputKey(kndrPrefix, + outpoint) + if err != nil { return err } - // For each kindergarten found output, delete its entry from the - // height and channel index, and create a new grad output in the - // channel index. - return ns.forEachHeightPrefix(tx, kndrPrefix, height, - func(v []byte) error { - var kid kidOutput - err := kid.Decode(bytes.NewReader(v)) - if err != nil { - return err - } + // Remove the grad output's entry in the height + // index. + err = ns.removeOutputFromHeight(tx, height, + chanPoint, pfxOutputKey) + if err != nil { + return err + } - outpoint := kid.OutPoint() - chanPoint := kid.OriginChanPoint() + chanBucket := ns.getChannelBucket(tx, + chanPoint) + if chanBucket == nil { + return ErrContractNotFound + } - // Construct the key under which the output is - // currently stored height and channel indexes. - pfxOutputKey, err := prefixOutputKey(kndrPrefix, - outpoint) - if err != nil { - return err - } + // Remove previous output with kindergarten + // prefix. + err = chanBucket.Delete(pfxOutputKey) + if err != nil { + return err + } - // Remove the grad output's entry in the height - // index. - err = ns.removeOutputFromHeight(tx, height, - chanPoint, pfxOutputKey) - if err != nil { - return err - } + // Convert kindergarten key to graduate key. + copy(pfxOutputKey, gradPrefix) - chanBucket := ns.getChannelBucket(tx, - chanPoint) - if chanBucket == nil { - return ErrContractNotFound - } + var gradBuffer bytes.Buffer + if err := kid.Encode(&gradBuffer); err != nil { + return err + } - // Remove previous output with kindergarten - // prefix. - err = chanBucket.Delete(pfxOutputKey) - if err != nil { - return err - } - - // Convert kindergarten key to graduate key. - copy(pfxOutputKey, gradPrefix) - - var gradBuffer bytes.Buffer - if err := kid.Encode(&gradBuffer); err != nil { - return err - } - - // Insert serialized output into channel bucket - // using graduate-prefixed key. - return chanBucket.Put(pfxOutputKey, - gradBuffer.Bytes()) - }, - ) - }) -} - -// FinalizeKinder accepts a block height and a finalized kindergarten sweep -// transaction, persisting the transaction at the appropriate height bucket. The -// nursery store's last finalized height is also updated with the provided -// height. -func (ns *nurseryStore) FinalizeKinder(height uint32, - finalTx *wire.MsgTx) error { - - return ns.db.Update(func(tx *bbolt.Tx) error { - return ns.finalizeKinder(tx, height, finalTx) - }) -} - -// GraduateHeight persists the provided height as the nursery store's last -// graduated height. -func (ns *nurseryStore) GraduateHeight(height uint32) error { - - return ns.db.Update(func(tx *bbolt.Tx) error { - return ns.putLastGraduatedHeight(tx, height) + // Insert serialized output into channel bucket + // using graduate-prefixed key. + return chanBucket.Put(pfxOutputKey, + gradBuffer.Bytes()) }) } @@ -625,23 +534,15 @@ func (ns *nurseryStore) GraduateHeight(height uint32) error { // FetchClass returns a list of the kindergarten and crib outputs whose timeouts // are expiring func (ns *nurseryStore) FetchClass( - height uint32) (*wire.MsgTx, []kidOutput, []babyOutput, error) { + height uint32) ([]kidOutput, []babyOutput, error) { // Construct list of all crib and kindergarten outputs that need to be // processed at the provided block height. - var finalTx *wire.MsgTx var kids []kidOutput var babies []babyOutput if err := ns.db.View(func(tx *bbolt.Tx) error { - - var err error - finalTx, err = ns.getFinalizedTxn(tx, height) - if err != nil { - return err - } - // Append each crib output to our list of babyOutputs. - if err = ns.forEachHeightPrefix(tx, cribPrefix, height, + if err := ns.forEachHeightPrefix(tx, cribPrefix, height, func(buf []byte) error { // We will attempt to deserialize all outputs @@ -683,10 +584,10 @@ func (ns *nurseryStore) FetchClass( }) }); err != nil { - return nil, nil, nil, err + return nil, nil, err } - return finalTx, kids, babies, nil + return kids, babies, nil } // FetchPreschools returns a list of all outputs currently stored in the @@ -938,32 +839,6 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error { }) } -// LastFinalizedHeight returns the last block height for which the nursery -// store has finalized a kindergarten class. -func (ns *nurseryStore) LastFinalizedHeight() (uint32, error) { - var lastFinalizedHeight uint32 - err := ns.db.View(func(tx *bbolt.Tx) error { - var err error - lastFinalizedHeight, err = ns.getLastFinalizedHeight(tx) - return err - }) - - return lastFinalizedHeight, err -} - -// LastGraduatedHeight returns the last block height for which the nursery -// store has successfully graduated all outputs. -func (ns *nurseryStore) LastGraduatedHeight() (uint32, error) { - var lastGraduatedHeight uint32 - err := ns.db.View(func(tx *bbolt.Tx) error { - var err error - lastGraduatedHeight, err = ns.getLastGraduatedHeight(tx) - return err - }) - - return lastGraduatedHeight, err -} - // Helper Methods // enterCrib accepts a new htlc output that the nursery will incubate through @@ -994,6 +869,8 @@ func (ns *nurseryStore) enterCrib(tx *bbolt.Tx, baby *babyOutput) error { // Next, retrieve or create the height-channel bucket located in the // height bucket corresponding to the baby output's CLTV expiry height. + + // TODO: Handle late registration. hghtChanBucket, err := ns.createHeightChanBucket(tx, baby.expiry, chanPoint) if err != nil { @@ -1332,145 +1209,6 @@ func (ns *nurseryStore) forChanOutputs(tx *bbolt.Tx, chanPoint *wire.OutPoint, return chanBucket.ForEach(callback) } -// getLastFinalizedHeight is a helper method that retrieves the last height for -// which the database finalized its persistent state. -func (ns *nurseryStore) getLastFinalizedHeight(tx *bbolt.Tx) (uint32, error) { - // Retrieve the chain bucket associated with the given nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return 0, nil - } - - // Lookup the last finalized height in the top-level chain bucket. - heightBytes := chainBucket.Get(lastFinalizedHeightKey) - if heightBytes == nil { - // We have never finalized, return height 0. - return 0, nil - } - - // If the resulting bytes are not sized like a uint32, then we have - // never finalized, so we return 0. - - // Otherwise, parse the bytes and return the last finalized height. - return byteOrder.Uint32(heightBytes), nil -} - -// finalizeKinder records a finalized kindergarten sweep txn to the given height -// bucket. It also updates the nursery store's last finalized height, so that we -// do not finalize the same height twice. If the finalized txn is nil, i.e. if -// the height has no kindergarten outputs, the height will be marked as -// finalized, and we skip the process of writing the txn. When the class is -// loaded, a nil value will be returned if no txn has been written to a -// finalized height bucket. -func (ns *nurseryStore) finalizeKinder(tx *bbolt.Tx, height uint32, - finalTx *wire.MsgTx) error { - - // TODO(conner) ensure height is greater that current finalized height. - - // 1. Write the last finalized height to the chain bucket. - - // Ensure that the chain bucket for this nursery store exists. - chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey) - if err != nil { - return err - } - - // Serialize the provided last-finalized height, and store it in the - // top-level chain bucket for this nursery store. - var lastHeightBytes [4]byte - byteOrder.PutUint32(lastHeightBytes[:], height) - - err = chainBucket.Put(lastFinalizedHeightKey, lastHeightBytes[:]) - if err != nil { - return err - } - - // 2. Write the finalized txn in the appropriate height bucket. - - // If there is no finalized txn, we have nothing to do. - if finalTx == nil { - return nil - } - - // Otherwise serialize the finalized txn and write it to the height - // bucket. - hghtBucket := ns.getHeightBucket(tx, height) - if hghtBucket == nil { - return nil - } - - var finalTxnBuf bytes.Buffer - if err := finalTx.Serialize(&finalTxnBuf); err != nil { - return err - } - - return hghtBucket.Put(finalizedKndrTxnKey, finalTxnBuf.Bytes()) -} - -// getFinalizedTxn retrieves the finalized kindergarten sweep txn at the given -// height, returning nil if one was not found. -func (ns *nurseryStore) getFinalizedTxn(tx *bbolt.Tx, - height uint32) (*wire.MsgTx, error) { - - hghtBucket := ns.getHeightBucket(tx, height) - if hghtBucket == nil { - // No class to finalize. - return nil, nil - } - - finalTxBytes := hghtBucket.Get(finalizedKndrTxnKey) - if finalTxBytes == nil { - // No finalized txn for this height. - return nil, nil - } - - // Otherwise, deserialize and return the finalized transaction. - txn := &wire.MsgTx{} - if err := txn.Deserialize(bytes.NewReader(finalTxBytes)); err != nil { - return nil, err - } - - return txn, nil -} - -// getLastGraduatedHeight is a helper method that retrieves the last height for -// which the database graduated all outputs successfully. -func (ns *nurseryStore) getLastGraduatedHeight(tx *bbolt.Tx) (uint32, error) { - // Retrieve the chain bucket associated with the given nursery store. - chainBucket := tx.Bucket(ns.pfxChainKey) - if chainBucket == nil { - return 0, nil - } - - // Lookup the last graduated height in the top-level chain bucket. - heightBytes := chainBucket.Get(lastGraduatedHeightKey) - if heightBytes == nil { - // We have never graduated before, return height 0. - return 0, nil - } - - // Otherwise, parse the bytes and return the last graduated height. - return byteOrder.Uint32(heightBytes), nil -} - -// pubLastGraduatedHeight is a helper method that writes the provided height under -// the last graduated height key. -func (ns *nurseryStore) putLastGraduatedHeight(tx *bbolt.Tx, height uint32) error { - - // Ensure that the chain bucket for this nursery store exists. - chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey) - if err != nil { - return err - } - - // Serialize the provided last-graduated height, and store it in the - // top-level chain bucket for this nursery store. - var lastHeightBytes [4]byte - byteOrder.PutUint32(lastHeightBytes[:], height) - - return chainBucket.Put(lastGraduatedHeightKey, lastHeightBytes[:]) -} - // errBucketNotEmpty signals that an attempt to prune a particular // bucket failed because it still has active outputs. var errBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned") @@ -1541,7 +1279,8 @@ func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) { // attempt to remove each one if they are empty, keeping track of the // number of height-channel buckets that still have active outputs. if err := hghtBucket.ForEach(func(chanBytes, v []byte) error { - // Skip the finalized txn key. + // Skip the finalized txn key if it still exists from a previous + // db version. if v != nil { return nil } diff --git a/nursery_store_test.go b/nursery_store_test.go index 4617e5e0..0c205958 100644 --- a/nursery_store_test.go +++ b/nursery_store_test.go @@ -88,8 +88,6 @@ func TestNurseryStoreInit(t *testing.T) { assertNumChannels(t, ns, 0) assertNumPreschools(t, ns, 0) - assertLastFinalizedHeight(t, ns, 0) - assertLastGraduatedHeight(t, ns, 0) } // TestNurseryStoreIncubate tests the primary state transitions taken by outputs @@ -172,7 +170,7 @@ func TestNurseryStoreIncubate(t *testing.T) { // Now, move the commitment output to the kindergarten // bucket. - err = ns.PreschoolToKinder(test.commOutput) + err = ns.PreschoolToKinder(test.commOutput, 0) if err != test.err { t.Fatalf("unable to move commitment output from "+ "pscl to kndr: %v", err) @@ -212,7 +210,7 @@ func TestNurseryStoreIncubate(t *testing.T) { maturityHeight := test.commOutput.ConfHeight() + test.commOutput.BlocksToMaturity() - err = ns.GraduateKinder(maturityHeight) + err = ns.GraduateKinder(maturityHeight, test.commOutput) if err != nil { t.Fatalf("unable to graduate kindergarten class at "+ "height %d: %v", maturityHeight, err) @@ -286,7 +284,8 @@ func TestNurseryStoreIncubate(t *testing.T) { maturityHeight := htlcOutput.ConfHeight() + htlcOutput.BlocksToMaturity() - err = ns.GraduateKinder(maturityHeight) + err = ns.GraduateKinder(maturityHeight, + &htlcOutput.kidOutput) if err != nil { t.Fatalf("unable to graduate htlc output "+ "from kndr to grad: %v", err) @@ -333,93 +332,6 @@ func TestNurseryStoreIncubate(t *testing.T) { } } -// TestNurseryStoreFinalize tests that kindergarten sweep transactions are -// properly persisted, and that the last finalized height is being set -// accordingly. -func TestNurseryStoreFinalize(t *testing.T) { - cdb, cleanUp, err := makeTestDB() - if err != nil { - t.Fatalf("unable to open channel db: %v", err) - } - defer cleanUp() - - ns, err := newNurseryStore(&bitcoinTestnetGenesis, cdb) - if err != nil { - t.Fatalf("unable to open nursery store: %v", err) - } - - kid := &kidOutputs[3] - - // Compute the maturity height at which to enter the commitment output. - maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity() - - // Since we haven't finalized before, we should see a last finalized - // height of 0. - assertLastFinalizedHeight(t, ns, 0) - - // Begin incubating the commitment output, which will be placed in the - // preschool bucket. - err = ns.Incubate([]kidOutput{*kid}, nil) - if err != nil { - t.Fatalf("unable to incubate commitment output: %v", err) - } - - // Then move the commitment output to the kindergarten bucket, so that - // the output is registered in the height index. - err = ns.PreschoolToKinder(kid) - if err != nil { - t.Fatalf("unable to move pscl output to kndr: %v", err) - } - - // We should still see a last finalized height of 0, since no classes - // have been graduated. - assertLastFinalizedHeight(t, ns, 0) - - // Now, iteratively finalize all heights below the maturity height, - // ensuring that the last finalized height is properly persisted, and - // that the finalized transactions are all nil. - for i := 0; i < int(maturityHeight); i++ { - err = ns.FinalizeKinder(uint32(i), nil) - if err != nil { - t.Fatalf("unable to finalize kndr at height=%d: %v", - i, err) - } - assertLastFinalizedHeight(t, ns, uint32(i)) - assertFinalizedTxn(t, ns, uint32(i), nil) - } - - // As we have now finalized all heights below the maturity height, we - // should still see the commitment output in the kindergarten bucket at - // its maturity height. - assertKndrAtMaturityHeight(t, ns, kid) - - // Now, finalize the kindergarten sweep transaction at the maturity - // height. - err = ns.FinalizeKinder(maturityHeight, timeoutTx) - if err != nil { - t.Fatalf("unable to finalize kndr at height=%d: %v", - maturityHeight, err) - } - - // The nursery store should now see the maturity height finalized, and - // the finalized kindergarten sweep txn should be returned at this - // height. - assertLastFinalizedHeight(t, ns, maturityHeight) - assertFinalizedTxn(t, ns, maturityHeight, timeoutTx) - - // Lastly, continue to finalize heights above the maturity height. Each - // should report having a nil finalized kindergarten sweep txn. - for i := maturityHeight + 1; i < maturityHeight+10; i++ { - err = ns.FinalizeKinder(uint32(i), nil) - if err != nil { - t.Fatalf("unable to finalize kndr at height=%d: %v", - i, err) - } - assertLastFinalizedHeight(t, ns, uint32(i)) - assertFinalizedTxn(t, ns, uint32(i), nil) - } -} - // TestNurseryStoreGraduate verifies that the nursery store properly removes // populated entries from the height index as it is purged, and that the last // purged height is set appropriately. @@ -441,9 +353,6 @@ func TestNurseryStoreGraduate(t *testing.T) { // height index. maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity() - // Since we have never purged, the last purged height should be 0. - assertLastGraduatedHeight(t, ns, 0) - // First, add a commitment output to the nursery store, which is // initially inserted in the preschool bucket. err = ns.Incubate([]kidOutput{*kid}, nil) @@ -453,7 +362,7 @@ func TestNurseryStoreGraduate(t *testing.T) { // Then, move the commitment output to the kindergarten bucket, such // that it resides in the height index at its maturity height. - err = ns.PreschoolToKinder(kid) + err = ns.PreschoolToKinder(kid, 0) if err != nil { t.Fatalf("unable to move pscl output to kndr: %v", err) } @@ -462,12 +371,6 @@ func TestNurseryStoreGraduate(t *testing.T) { // checking that each class is now empty, and that the last purged // height is set correctly. for i := 0; i < int(maturityHeight); i++ { - err = ns.GraduateHeight(uint32(i)) - if err != nil { - t.Fatalf("unable to purge height=%d: %v", i, err) - } - - assertLastGraduatedHeight(t, ns, uint32(i)) assertHeightIsPurged(t, ns, uint32(i)) } @@ -475,27 +378,7 @@ func TestNurseryStoreGraduate(t *testing.T) { // height. assertKndrAtMaturityHeight(t, ns, kid) - // Finalize the kindergarten transaction, ensuring that it is a non-nil - // value. - err = ns.FinalizeKinder(maturityHeight, timeoutTx) - if err != nil { - t.Fatalf("unable to finalize kndr at height=%d: %v", - maturityHeight, err) - } - - // Verify that the maturity height has now been finalized. - assertLastFinalizedHeight(t, ns, maturityHeight) - assertFinalizedTxn(t, ns, maturityHeight, timeoutTx) - - // Finally, purge the non-empty maturity height, and check that returned - // class is empty. - err = ns.GraduateHeight(maturityHeight) - if err != nil { - t.Fatalf("unable to set graduated height=%d: %v", maturityHeight, - err) - } - - err = ns.GraduateKinder(maturityHeight) + err = ns.GraduateKinder(maturityHeight, kid) if err != nil { t.Fatalf("unable to graduate kindergarten outputs at height=%d: "+ "%v", maturityHeight, err) @@ -528,36 +411,6 @@ func assertNumChanOutputs(t *testing.T, ns NurseryStore, } } -// assertLastFinalizedHeight checks that the nursery stores last finalized -// height matches the expected height. -func assertLastFinalizedHeight(t *testing.T, ns NurseryStore, - expected uint32) { - - lfh, err := ns.LastFinalizedHeight() - if err != nil { - t.Fatalf("unable to get last finalized height: %v", err) - } - - if lfh != expected { - t.Fatalf("expected last finalized height to be %d, got %d", - expected, lfh) - } -} - -// assertLastGraduatedHeight checks that the nursery stores last purged height -// matches the expected height. -func assertLastGraduatedHeight(t *testing.T, ns NurseryStore, expected uint32) { - lgh, err := ns.LastGraduatedHeight() - if err != nil { - t.Fatalf("unable to get last graduated height: %v", err) - } - - if lgh != expected { - t.Fatalf("expected last graduated height to be %d, got %d", - expected, lgh) - } -} - // assertNumPreschools loads all preschool outputs and verifies their count // matches the expected number. func assertNumPreschools(t *testing.T, ns NurseryStore, expected int) { @@ -592,16 +445,12 @@ func assertNumChannels(t *testing.T, ns NurseryStore, expected int) { func assertHeightIsPurged(t *testing.T, ns NurseryStore, height uint32) { - finalTx, kndrOutputs, cribOutputs, err := ns.FetchClass(height) + kndrOutputs, cribOutputs, err := ns.FetchClass(height) if err != nil { t.Fatalf("unable to retrieve class at height=%d: %v", height, err) } - if finalTx != nil { - t.Fatalf("height=%d not purged, final txn should be nil", height) - } - if kndrOutputs != nil { t.Fatalf("height=%d not purged, kndr outputs should be nil", height) } @@ -617,7 +466,7 @@ func assertCribAtExpiryHeight(t *testing.T, ns NurseryStore, htlcOutput *babyOutput) { expiryHeight := htlcOutput.expiry - _, _, cribOutputs, err := ns.FetchClass(expiryHeight) + _, cribOutputs, err := ns.FetchClass(expiryHeight) if err != nil { t.Fatalf("unable to retrieve class at height=%d: %v", expiryHeight, err) @@ -639,7 +488,7 @@ func assertCribNotAtExpiryHeight(t *testing.T, ns NurseryStore, htlcOutput *babyOutput) { expiryHeight := htlcOutput.expiry - _, _, cribOutputs, err := ns.FetchClass(expiryHeight) + _, cribOutputs, err := ns.FetchClass(expiryHeight) if err != nil { t.Fatalf("unable to retrieve class at height %d: %v", expiryHeight, err) @@ -653,25 +502,6 @@ func assertCribNotAtExpiryHeight(t *testing.T, ns NurseryStore, } } -// assertFinalizedTxn loads the class at the given height and compares the -// returned finalized txn to that in the class. It is safe to presented a nil -// expected transaction. -func assertFinalizedTxn(t *testing.T, ns NurseryStore, height uint32, - exFinalTx *wire.MsgTx) { - - finalTx, _, _, err := ns.FetchClass(height) - if err != nil { - t.Fatalf("unable to fetch class at height=%d: %v", height, - err) - } - - if !reflect.DeepEqual(finalTx, exFinalTx) { - t.Fatalf("expected finalized txn at height=%d "+ - "to be %v, got %v", height, finalTx.TxHash(), - exFinalTx.TxHash()) - } -} - // assertKndrAtMaturityHeight loads the class at the provided height and // verifies that the provided kid output is one of the kindergarten outputs // returned. @@ -680,7 +510,7 @@ func assertKndrAtMaturityHeight(t *testing.T, ns NurseryStore, maturityHeight := kndrOutput.ConfHeight() + kndrOutput.BlocksToMaturity() - _, kndrOutputs, _, err := ns.FetchClass(maturityHeight) + kndrOutputs, _, err := ns.FetchClass(maturityHeight) if err != nil { t.Fatalf("unable to retrieve class at height %d: %v", maturityHeight, err) @@ -705,7 +535,7 @@ func assertKndrNotAtMaturityHeight(t *testing.T, ns NurseryStore, maturityHeight := kndrOutput.ConfHeight() + kndrOutput.BlocksToMaturity() - _, kndrOutputs, _, err := ns.FetchClass(maturityHeight) + kndrOutputs, _, err := ns.FetchClass(maturityHeight) if err != nil { t.Fatalf("unable to retrieve class at height %d: %v", maturityHeight, err) diff --git a/server.go b/server.go index c792122b..c6ae09b6 100644 --- a/server.go +++ b/server.go @@ -17,8 +17,6 @@ import ( "sync/atomic" "time" - "github.com/lightningnetwork/lnd/sweep" - "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/connmgr" @@ -40,6 +38,7 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/nat" "github.com/lightningnetwork/lnd/routing" + "github.com/lightningnetwork/lnd/sweep" "github.com/lightningnetwork/lnd/ticker" "github.com/lightningnetwork/lnd/tor" ) @@ -156,6 +155,8 @@ type server struct { utxoNursery *utxoNursery + sweeper *sweep.UtxoSweeper + chainArb *contractcourt.ChainArbitrator sphinx *htlcswitch.OnionProcessor @@ -597,24 +598,45 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, return nil, err } - sweeper := sweep.New(&sweep.UtxoSweeperConfig{ + srvrLog.Tracef("Sweeper batch window duration: %v", + sweep.DefaultBatchWindowDuration) + + sweeperStore, err := sweep.NewSweeperStore( + chanDB, activeNetParams.GenesisHash, + ) + if err != nil { + srvrLog.Errorf("unable to create sweeper store: %v", err) + return nil, err + } + + s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{ Estimator: cc.feeEstimator, GenSweepScript: func() ([]byte, error) { return newSweepPkScript(cc.wallet) }, - Signer: cc.wallet.Cfg.Signer, + Signer: cc.wallet.Cfg.Signer, + PublishTransaction: cc.wallet.PublishTransaction, + NewBatchTimer: func() <-chan time.Time { + return time.NewTimer(sweep.DefaultBatchWindowDuration).C + }, + SweepTxConfTarget: 6, + Notifier: cc.chainNotifier, + ChainIO: cc.chainIO, + Store: sweeperStore, + MaxInputsPerTx: sweep.DefaultMaxInputsPerTx, + MaxSweepAttempts: sweep.DefaultMaxSweepAttempts, + NextAttemptDeltaFunc: sweep.DefaultNextAttemptDeltaFunc, }) s.utxoNursery = newUtxoNursery(&NurseryConfig{ ChainIO: cc.chainIO, ConfDepth: 1, - SweepTxConfTarget: 6, FetchClosedChannels: chanDB.FetchClosedChannels, FetchClosedChannel: chanDB.FetchClosedChannel, Notifier: cc.chainNotifier, PublishTransaction: cc.wallet.PublishTransaction, Store: utxnStore, - Sweeper: sweeper, + Sweeper: s.sweeper, }) // Construct a closure that wraps the htlcswitch's CloseLink method. @@ -706,7 +728,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, DisableChannel: func(op wire.OutPoint) error { return s.announceChanStatus(op, true) }, - Sweeper: sweeper, + Sweeper: s.sweeper, }, chanDB) s.breachArbiter = newBreachArbiter(&BreachConfig{ @@ -963,6 +985,9 @@ func (s *server) Start() error { if err := s.htlcSwitch.Start(); err != nil { return err } + if err := s.sweeper.Start(); err != nil { + return err + } if err := s.utxoNursery.Start(); err != nil { return err } @@ -1050,6 +1075,7 @@ func (s *server) Stop() error { s.breachArbiter.Stop() s.authGossiper.Stop() s.chainArb.Stop() + s.sweeper.Stop() s.cc.wallet.Shutdown() s.cc.chainView.Stop() s.connMgr.Stop() diff --git a/test_utils.go b/test_utils.go index b3b1d4c7..eb6b3dc2 100644 --- a/test_utils.go +++ b/test_utils.go @@ -316,7 +316,9 @@ func createTestPeer(notifier chainntnfs.ChainNotifier, } bobPool.Start() - chainIO := &mockChainIO{} + chainIO := &mockChainIO{ + bestHeight: fundingBroadcastHeight, + } wallet := &lnwallet.LightningWallet{ WalletController: &mockWalletController{ rootKey: aliceKeyPriv, diff --git a/utxonursery.go b/utxonursery.go index 0849c309..5285127e 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -72,17 +72,6 @@ import ( // the utxo nursery will sweep all KNDR outputs scheduled for that height // using a single txn. // -// NOTE: Due to the fact that KNDR outputs can be dynamically aggregated and -// swept, we make precautions to finalize the KNDR outputs at a particular -// height on our first attempt to sweep it. Finalizing involves signing the -// sweep transaction and persisting it in the nursery store, and recording -// the last finalized height. Any attempts to replay an already finalized -// height will result in broadcasting the already finalized txn, ensuring the -// nursery does not broadcast different txids for the same batch of KNDR -// outputs. The reason txids may change is due to the probabilistic nature of -// generating the pkscript in the sweep txn's output, even if the set of -// inputs remains static across attempts. -// // - GRAD (kidOutput) outputs are KNDR outputs that have successfully been // swept into the user's wallet. A channel is considered mature once all of // its outputs, including two-stage htlcs, have entered the GRAD state, @@ -183,10 +172,6 @@ type NurseryConfig struct { // determining outputs in the chain as confirmed. ConfDepth uint32 - // SweepTxConfTarget assigns a confirmation target for sweep txes on - // which the fee calculation will be based. - SweepTxConfTarget uint32 - // FetchClosedChannels provides access to a user's channels, such that // they can be marked fully closed after incubation has concluded. FetchClosedChannels func(pendingOnly bool) ( @@ -254,25 +239,22 @@ func (u *utxoNursery) Start() error { utxnLog.Tracef("Starting UTXO nursery") - // 1. Start watching for new blocks, as this will drive the nursery - // store's state machine. - - // Register with the notifier to receive notifications for each newly - // connected block. We register immediately on startup to ensure that - // no blocks are missed while we are handling blocks that were missed - // during the time the UTXO nursery was unavailable. - newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn(nil) + // Retrieve the currently best known block. This is needed to have the + // state machine catch up with the blocks we missed when we were down. + bestHash, bestHeight, err := u.cfg.ChainIO.GetBestBlock() if err != nil { return err } + // Set best known height to schedule late registrations properly. + atomic.StoreUint32(&u.bestHeight, uint32(bestHeight)) + // 2. Flush all fully-graduated channels from the pipeline. // Load any pending close channels, which represents the super set of // all channels that may still be incubating. pendingCloseChans, err := u.cfg.FetchClosedChannels(true) if err != nil { - newBlockChan.Cancel() return err } @@ -281,7 +263,6 @@ func (u *utxoNursery) Start() error { for _, pendingClose := range pendingCloseChans { err := u.closeAndRemoveIfMature(&pendingClose.ChanPoint) if err != nil { - newBlockChan.Cancel() return err } } @@ -289,15 +270,6 @@ func (u *utxoNursery) Start() error { // TODO(conner): check if any fully closed channels can be removed from // utxn. - // Query the nursery store for the lowest block height we could be - // incubating, which is taken to be the last height for which the - // database was purged. - lastGraduatedHeight, err := u.cfg.Store.LastGraduatedHeight() - if err != nil { - newBlockChan.Cancel() - return err - } - // 2. Restart spend ntfns for any preschool outputs, which are waiting // for the force closed commitment txn to confirm, or any second-layer // HTLC success transactions. @@ -306,15 +278,24 @@ func (u *utxoNursery) Start() error { // point forward, we must close the nursery's quit channel if we detect // any failures during startup to ensure they terminate. if err := u.reloadPreschool(); err != nil { - newBlockChan.Cancel() close(u.quit) return err } - // 3. Replay all crib and kindergarten outputs from last pruned to - // current best height. - if err := u.reloadClasses(lastGraduatedHeight); err != nil { - newBlockChan.Cancel() + // 3. Replay all crib and kindergarten outputs up to the current best + // height. + if err := u.reloadClasses(uint32(bestHeight)); err != nil { + close(u.quit) + return err + } + + // Start watching for new blocks, as this will drive the nursery store's + // state machine. + newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn(&chainntnfs.BlockEpoch{ + Height: bestHeight, + Hash: bestHash, + }) + if err != nil { close(u.quit) return err } @@ -672,123 +653,44 @@ func (u *utxoNursery) reloadPreschool() error { // reloadClasses reinitializes any height-dependent state transitions for which // the utxonursery has not received confirmation, and replays the graduation of -// all kindergarten and crib outputs for heights that have not been finalized. +// all kindergarten and crib outputs for all heights up to the current block. // This allows the nursery to reinitialize all state to continue sweeping -// outputs, even in the event that we missed blocks while offline. -// reloadClasses is called during the startup of the UTXO Nursery. -func (u *utxoNursery) reloadClasses(lastGradHeight uint32) error { - // Begin by loading all of the still-active heights up to and including - // the last height we successfully graduated. - activeHeights, err := u.cfg.Store.HeightsBelowOrEqual(lastGradHeight) +// outputs, even in the event that we missed blocks while offline. reloadClasses +// is called during the startup of the UTXO Nursery. +func (u *utxoNursery) reloadClasses(bestHeight uint32) error { + // Loading all active heights up to and including the current block. + activeHeights, err := u.cfg.Store.HeightsBelowOrEqual( + uint32(bestHeight)) if err != nil { return err } - if len(activeHeights) > 0 { - utxnLog.Infof("Re-registering confirmations for %d already "+ - "graduated heights below height=%d", len(activeHeights), - lastGradHeight) + // Return early if nothing to sweep. + if len(activeHeights) == 0 { + return nil } + utxnLog.Infof("(Re)-sweeping %d heights below height=%d", + len(activeHeights), bestHeight) + // Attempt to re-register notifications for any outputs still at these // heights. for _, classHeight := range activeHeights { - utxnLog.Debugf("Attempting to regraduate outputs at height=%v", + utxnLog.Debugf("Attempting to sweep outputs at height=%v", classHeight) - if err = u.regraduateClass(classHeight); err != nil { - utxnLog.Errorf("Failed to regraduate outputs at "+ + if err = u.graduateClass(classHeight); err != nil { + utxnLog.Errorf("Failed to sweep outputs at "+ "height=%v: %v", classHeight, err) return err } } - // Get the most recently mined block. - _, bestHeight, err := u.cfg.ChainIO.GetBestBlock() - if err != nil { - return err - } - - // If we haven't yet seen any registered force closes, or we're already - // caught up with the current best chain, then we can exit early. - if lastGradHeight == 0 || uint32(bestHeight) == lastGradHeight { - return nil - } - - utxnLog.Infof("Processing outputs from missed blocks. Starting with "+ - "blockHeight=%v, to current blockHeight=%v", lastGradHeight, - bestHeight) - - // Loop through and check for graduating outputs at each of the missed - // block heights. - for curHeight := lastGradHeight + 1; curHeight <= uint32(bestHeight); curHeight++ { - utxnLog.Debugf("Attempting to graduate outputs at height=%v", - curHeight) - - if err := u.graduateClass(curHeight); err != nil { - utxnLog.Errorf("Failed to graduate outputs at "+ - "height=%v: %v", curHeight, err) - return err - } - } - utxnLog.Infof("UTXO Nursery is now fully synced") return nil } -// regraduateClass handles the steps involved in re-registering for -// confirmations for all still-active outputs at a particular height. This is -// used during restarts to ensure that any still-pending state transitions are -// properly registered, so they can be driven by the chain notifier. No -// transactions or signing are done as a result of this step. -func (u *utxoNursery) regraduateClass(classHeight uint32) error { - // Fetch all information about the crib and kindergarten outputs at - // this height. In addition to the outputs, we also retrieve the - // finalized kindergarten sweep txn, which will be nil if we have not - // attempted this height before, or if no kindergarten outputs exist at - // this height. - finalTx, kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass( - classHeight) - if err != nil { - return err - } - - if finalTx != nil { - utxnLog.Infof("Re-registering confirmation for kindergarten "+ - "sweep transaction at height=%d ", classHeight) - - err = u.sweepMatureOutputs(classHeight, finalTx, kgtnOutputs) - if err != nil { - utxnLog.Errorf("Failed to re-register for kindergarten "+ - "sweep transaction at height=%d: %v", - classHeight, err) - return err - } - } - - if len(cribOutputs) == 0 { - return nil - } - - utxnLog.Infof("Re-registering confirmation for first-stage HTLC "+ - "outputs at height=%d ", classHeight) - - // Now, we broadcast all pre-signed htlc txns from the crib outputs at - // this height. There is no need to finalize these txns, since the txid - // is predetermined when signed in the wallet. - for i := range cribOutputs { - err := u.sweepCribOutput(classHeight, &cribOutputs[i]) - if err != nil { - utxnLog.Errorf("Failed to re-register first-stage "+ - "HTLC output %v", cribOutputs[i].OutPoint()) - return err - } - } - - return nil -} - // incubator is tasked with driving all state transitions that are dependent on // the current height of the blockchain. As new blocks arrive, the incubator // will attempt spend outputs at the latest height. The asynchronous @@ -821,6 +723,11 @@ func (u *utxoNursery) incubator(newBlockChan *chainntnfs.BlockEpochEvent) { // as signing and broadcasting a sweep txn that spends // from all kindergarten outputs at this height. height := uint32(epoch.Height) + + // Update best known block height for late registrations + // to be scheduled properly. + atomic.StoreUint32(&u.bestHeight, height) + if err := u.graduateClass(height); err != nil { utxnLog.Errorf("error while graduating "+ "class at height=%d: %v", height, err) @@ -843,14 +750,9 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error { u.mu.Lock() defer u.mu.Unlock() - u.bestHeight = classHeight - // Fetch all information about the crib and kindergarten outputs at - // this height. In addition to the outputs, we also retrieve the - // finalized kindergarten sweep txn, which will be nil if we have not - // attempted this height before, or if no kindergarten outputs exist at // this height. - finalTx, kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass( + kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass( classHeight) if err != nil { return err @@ -859,64 +761,11 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error { utxnLog.Infof("Attempting to graduate height=%v: num_kids=%v, "+ "num_babies=%v", classHeight, len(kgtnOutputs), len(cribOutputs)) - // Load the last finalized height, so we can determine if the - // kindergarten sweep txn should be crafted. - lastFinalizedHeight, err := u.cfg.Store.LastFinalizedHeight() - if err != nil { - return err - } - - // If we haven't processed this height before, we finalize the - // graduating kindergarten outputs, by signing a sweep transaction that - // spends from them. This txn is persisted such that we never broadcast - // a different txn for the same height. This allows us to recover from - // failures, and watch for the correct txid. - if classHeight > lastFinalizedHeight { - // If this height has never been finalized, we have never - // generated a sweep txn for this height. Generate one if there - // are kindergarten outputs or cltv crib outputs to be spent. - if len(kgtnOutputs) > 0 { - sweepInputs := make([]sweep.Input, len(kgtnOutputs)) - for i := range kgtnOutputs { - sweepInputs[i] = &kgtnOutputs[i] - } - - finalTx, err = u.cfg.Sweeper.CreateSweepTx( - sweepInputs, u.cfg.SweepTxConfTarget, - classHeight, - ) - if err != nil { - utxnLog.Errorf("Failed to create sweep txn at "+ - "height=%d", classHeight) - return err - } - } - - // Persist the kindergarten sweep txn to the nursery store. It - // is safe to store a nil finalTx, which happens if there are - // no graduating kindergarten outputs. - err = u.cfg.Store.FinalizeKinder(classHeight, finalTx) - if err != nil { - utxnLog.Errorf("Failed to finalize kindergarten at "+ - "height=%d", classHeight) - - return err - } - - // Log if the finalized transaction is non-trivial. - if finalTx != nil { - utxnLog.Infof("Finalized kindergarten at height=%d ", - classHeight) - } - } - - // Now that the kindergarten sweep txn has either been finalized or - // restored, broadcast the txn, and set up notifications that will - // transition the swept kindergarten outputs and cltvCrib into - // graduated outputs. - if finalTx != nil { - err := u.sweepMatureOutputs(classHeight, finalTx, kgtnOutputs) - if err != nil { + // Offer the outputs to the sweeper and set up notifications that will + // transition the swept kindergarten outputs and cltvCrib into graduated + // outputs. + if len(kgtnOutputs) > 0 { + if err := u.sweepMatureOutputs(classHeight, kgtnOutputs); err != nil { utxnLog.Errorf("Failed to sweep %d kindergarten "+ "outputs at height=%d: %v", len(kgtnOutputs), classHeight, err) @@ -925,8 +774,7 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error { } // Now, we broadcast all pre-signed htlc txns from the csv crib outputs - // at this height. There is no need to finalize these txns, since the - // txid is predetermined when signed in the wallet. + // at this height. for i := range cribOutputs { err := u.sweepCribOutput(classHeight, &cribOutputs[i]) if err != nil { @@ -937,62 +785,32 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error { } } - return u.cfg.Store.GraduateHeight(classHeight) + return nil } // sweepMatureOutputs generates and broadcasts the transaction that transfers // control of funds from a prior channel commitment transaction to the user's // wallet. The outputs swept were previously time locked (either absolute or // relative), but are not mature enough to sweep into the wallet. -func (u *utxoNursery) sweepMatureOutputs(classHeight uint32, finalTx *wire.MsgTx, +func (u *utxoNursery) sweepMatureOutputs(classHeight uint32, kgtnOutputs []kidOutput) error { - utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx "+ - "(txid=%v): %v", len(kgtnOutputs), - finalTx.TxHash(), newLogClosure(func() string { - return spew.Sdump(finalTx) - }), - ) + utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx for "+ + "height %v", len(kgtnOutputs), classHeight) - // With the sweep transaction fully signed, broadcast the transaction - // to the network. Additionally, we can stop tracking these outputs as - // they've just been swept. - err := u.cfg.PublishTransaction(finalTx) - if err != nil && err != lnwallet.ErrDoubleSpend { - utxnLog.Errorf("unable to broadcast sweep tx: %v, %v", - err, spew.Sdump(finalTx)) - return err + for _, output := range kgtnOutputs { + // Create local copy to prevent pointer to loop variable to be + // passed in with disastruous consequences. + local := output + + resultChan, err := u.cfg.Sweeper.SweepInput(&local) + if err != nil { + return err + } + u.wg.Add(1) + go u.waitForSweepConf(classHeight, &output, resultChan) } - return u.registerSweepConf(finalTx, kgtnOutputs, classHeight) -} - -// registerSweepConf is responsible for registering a finalized kindergarten -// sweep transaction for confirmation notifications. If the confirmation was -// successfully registered, a goroutine will be spawned that waits for the -// confirmation, and graduates the provided kindergarten class within the -// nursery store. -func (u *utxoNursery) registerSweepConf(finalTx *wire.MsgTx, - kgtnOutputs []kidOutput, heightHint uint32) error { - - finalTxID := finalTx.TxHash() - - confChan, err := u.cfg.Notifier.RegisterConfirmationsNtfn( - &finalTxID, finalTx.TxOut[0].PkScript, u.cfg.ConfDepth, - heightHint, - ) - if err != nil { - utxnLog.Errorf("unable to register notification for "+ - "sweep confirmation: %v", finalTxID) - return err - } - - utxnLog.Infof("Registering sweep tx %v for confs at height=%d", - finalTxID, heightHint) - - u.wg.Add(1) - go u.waitForSweepConf(heightHint, kgtnOutputs, confChan) - return nil } @@ -1002,16 +820,30 @@ func (u *utxoNursery) registerSweepConf(finalTx *wire.MsgTx, // to mark any mature channels as fully closed in channeldb. // NOTE(conner): this method MUST be called as a go routine. func (u *utxoNursery) waitForSweepConf(classHeight uint32, - kgtnOutputs []kidOutput, confChan *chainntnfs.ConfirmationEvent) { + output *kidOutput, resultChan chan sweep.Result) { defer u.wg.Done() select { - case _, ok := <-confChan.Confirmed: + case result, ok := <-resultChan: if !ok { - utxnLog.Errorf("Notification chan closed, can't"+ - " advance %v graduating outputs", - len(kgtnOutputs)) + utxnLog.Errorf("Notification chan closed, can't" + + " advance graduating output") + return + } + + // In case of a remote spend, still graduate the output. There + // is no way to sweep it anymore. + if result.Err == sweep.ErrRemoteSpend { + utxnLog.Infof("Output %v was spend by remote party", + output.OutPoint()) + break + } + + if result.Err != nil { + utxnLog.Errorf("Failed to sweep %v at "+ + "height=%d", output.OutPoint(), + classHeight) return } @@ -1024,32 +856,23 @@ func (u *utxoNursery) waitForSweepConf(classHeight uint32, // TODO(conner): add retry logic? - // Mark the confirmed kindergarten outputs as graduated. - if err := u.cfg.Store.GraduateKinder(classHeight); err != nil { - utxnLog.Errorf("Unable to graduate %v kindergarten outputs: "+ - "%v", len(kgtnOutputs), err) + // Mark the confirmed kindergarten output as graduated. + if err := u.cfg.Store.GraduateKinder(classHeight, output); err != nil { + utxnLog.Errorf("Unable to graduate kindergarten output %v: %v", + output.OutPoint(), err) return } - utxnLog.Infof("Graduated %d kindergarten outputs from height=%d", - len(kgtnOutputs), classHeight) + utxnLog.Infof("Graduated kindergarten output from height=%d", + classHeight) - // Iterate over the kid outputs and construct a set of all channel - // points to which they belong. - var possibleCloses = make(map[wire.OutPoint]struct{}) - for _, kid := range kgtnOutputs { - possibleCloses[*kid.OriginChanPoint()] = struct{}{} - - } - - // Attempt to close each channel, only doing so if all of the channel's + // Attempt to close the channel, only doing so if all of the channel's // outputs have been graduated. - for chanPoint := range possibleCloses { - if err := u.closeAndRemoveIfMature(&chanPoint); err != nil { - utxnLog.Errorf("Failed to close and remove channel %v", - chanPoint) - return - } + chanPoint := output.OriginChanPoint() + if err := u.closeAndRemoveIfMature(chanPoint); err != nil { + utxnLog.Errorf("Failed to close and remove channel %v", + *chanPoint) + return } } @@ -1216,7 +1039,8 @@ func (u *utxoNursery) waitForPreschoolConf(kid *kidOutput, outputType = "Commitment" } - err := u.cfg.Store.PreschoolToKinder(kid) + bestHeight := atomic.LoadUint32(&u.bestHeight) + err := u.cfg.Store.PreschoolToKinder(kid, bestHeight) if err != nil { utxnLog.Errorf("Unable to move %v output "+ "from preschool to kindergarten bucket: %v", diff --git a/utxonursery_test.go b/utxonursery_test.go index 32cde4cf..1584edb4 100644 --- a/utxonursery_test.go +++ b/utxonursery_test.go @@ -9,8 +9,9 @@ import ( "github.com/lightningnetwork/lnd/sweep" "io/ioutil" "math" + "os" "reflect" - "sync" + "runtime/pprof" "testing" "time" @@ -19,7 +20,6 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/lnwallet" ) @@ -396,11 +396,14 @@ func TestBabyOutputSerialization(t *testing.T) { type nurseryTestContext struct { nursery *utxoNursery - notifier *nurseryMockNotifier + notifier *sweep.MockNotifier + chainIO *mockChainIO publishChan chan wire.MsgTx store *nurseryStoreInterceptor restart func() bool receiveTx func() wire.MsgTx + sweeper *sweep.UtxoSweeper + timeoutChan chan chan time.Time t *testing.T } @@ -430,17 +433,50 @@ func createNurseryTestContext(t *testing.T, // test. storeIntercepter := newNurseryStoreInterceptor(store) - notifier := newNurseryMockNotifier(t) + notifier := sweep.NewMockNotifier(t) - sweeper := sweep.New(&sweep.UtxoSweeperConfig{ + publishChan := make(chan wire.MsgTx, 1) + publishFunc := func(tx *wire.MsgTx, source string) error { + utxnLog.Tracef("Publishing tx %v by %v", tx.TxHash(), source) + publishChan <- *tx + return nil + } + + timeoutChan := make(chan chan time.Time) + + chainIO := &mockChainIO{ + bestHeight: 0, + } + + sweeperStore := sweep.NewMockSweeperStore() + + sweeperCfg := &sweep.UtxoSweeperConfig{ GenSweepScript: func() ([]byte, error) { return []byte{}, nil }, Estimator: &lnwallet.StaticFeeEstimator{}, Signer: &nurseryMockSigner{}, - }) + Notifier: notifier, + PublishTransaction: func(tx *wire.MsgTx) error { + return publishFunc(tx, "sweeper") + }, + NewBatchTimer: func() <-chan time.Time { + c := make(chan time.Time, 1) + timeoutChan <- c + return c + }, + ChainIO: chainIO, + Store: sweeperStore, + MaxInputsPerTx: 10, + MaxSweepAttempts: 5, + NextAttemptDeltaFunc: func(int) int32 { return 1 }, + } - cfg := NurseryConfig{ + sweeper := sweep.New(sweeperCfg) + + sweeper.Start() + + nurseryCfg := NurseryConfig{ Notifier: notifier, FetchClosedChannels: func(pendingOnly bool) ( []*channeldb.ChannelCloseSummary, error) { @@ -453,54 +489,91 @@ func createNurseryTestContext(t *testing.T, }, nil }, Store: storeIntercepter, - ChainIO: &mockChainIO{}, + ChainIO: chainIO, Sweeper: sweeper, + PublishTransaction: func(tx *wire.MsgTx) error { + return publishFunc(tx, "nursery") + }, } - publishChan := make(chan wire.MsgTx, 1) - cfg.PublishTransaction = func(tx *wire.MsgTx) error { - t.Logf("Publishing tx %v", tx.TxHash()) - publishChan <- *tx - return nil - } - - nursery := newUtxoNursery(&cfg) + nursery := newUtxoNursery(&nurseryCfg) nursery.Start() ctx := &nurseryTestContext{ nursery: nursery, notifier: notifier, + chainIO: chainIO, store: storeIntercepter, publishChan: publishChan, + sweeper: sweeper, + timeoutChan: timeoutChan, t: t, } - ctx.restart = func() bool { - return checkStartStop(func() { - ctx.nursery.Stop() - // Simulate lnd restart. - ctx.nursery = newUtxoNursery(ctx.nursery.cfg) - ctx.nursery.Start() - }) - } - ctx.receiveTx = func() wire.MsgTx { var tx wire.MsgTx select { case tx = <-ctx.publishChan: + utxnLog.Debugf("Published tx %v", tx.TxHash()) return tx - case <-time.After(5 * time.Second): + case <-time.After(defaultTestTimeout): + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + t.Fatalf("tx not published") } return tx } + ctx.restart = func() bool { + return checkStartStop(func() { + utxnLog.Tracef("Restart sweeper and nursery") + // Simulate lnd restart. + ctx.nursery.Stop() + + // Also restart sweeper to test behaviour as one unit. + // + // TODO(joostjager): Mock sweeper to test nursery in + // isolation. + ctx.sweeper.Stop() + + // Find out if there is a last tx stored. If so, we + // expect it to be republished on startup. + hasLastTx, err := sweeperCfg.Store.GetLastPublishedTx() + if err != nil { + t.Fatal(err) + } + + // Restart sweeper. + ctx.sweeper = sweep.New(sweeperCfg) + ctx.sweeper.Start() + + // Receive last tx if expected. + if hasLastTx != nil { + utxnLog.Debugf("Expecting republish") + ctx.receiveTx() + } else { + utxnLog.Debugf("Expecting no republish") + } + + /// Restart nursery. + nurseryCfg.Sweeper = ctx.sweeper + ctx.nursery = newUtxoNursery(&nurseryCfg) + ctx.nursery.Start() + + }) + } + // Start with testing an immediate restart. ctx.restart() return ctx } +func (ctx *nurseryTestContext) notifyEpoch(height int32) { + ctx.chainIO.bestHeight = height + ctx.notifier.NotifyEpoch(height) +} + func (ctx *nurseryTestContext) finish() { // Add a final restart point in this state ctx.restart() @@ -556,6 +629,8 @@ func (ctx *nurseryTestContext) finish() { if len(activeHeights) > 0 { ctx.t.Fatalf("Expected height index to be empty") } + + ctx.sweeper.Stop() } func createOutgoingRes(onLocalCommitment bool) *lnwallet.OutgoingHtlcResolution { @@ -703,6 +778,8 @@ func testRestartLoop(t *testing.T, test func(*testing.T, return true } + utxnLog.Debugf("Skipping restart point %v", + currentStartStopIdx) return false } @@ -739,7 +816,7 @@ func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T, ctx.restart() // Notify arrival of block where HTLC CLTV expires. - ctx.notifier.notifyEpoch(125) + ctx.notifyEpoch(125) // This should trigger nursery to publish the timeout tx. ctx.receiveTx() @@ -751,7 +828,7 @@ func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T, // Confirm the timeout tx. This should promote the HTLC to KNDR state. timeoutTxHash := outgoingRes.SignedTimeoutTx.TxHash() - if err := ctx.notifier.confirmTx(&timeoutTxHash, 126); err != nil { + if err := ctx.notifier.ConfirmTx(&timeoutTxHash, 126); err != nil { t.Fatal(err) } @@ -765,7 +842,7 @@ func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T, ctx.restart() // Notify arrival of block where second level HTLC unlocks. - ctx.notifier.notifyEpoch(128) + ctx.notifyEpoch(128) // Check final sweep into wallet. testSweepHtlc(t, ctx) @@ -790,7 +867,7 @@ func testNurseryOutgoingHtlcSuccessOnRemote(t *testing.T, // resolving remote commitment tx. // // TODO(joostjager): This is probably not correct? - err := ctx.notifier.confirmTx(&outgoingRes.ClaimOutpoint.Hash, 124) + err := ctx.notifier.ConfirmTx(&outgoingRes.ClaimOutpoint.Hash, 124) if err != nil { t.Fatal(err) } @@ -805,7 +882,7 @@ func testNurseryOutgoingHtlcSuccessOnRemote(t *testing.T, ctx.restart() // Notify arrival of block where HTLC CLTV expires. - ctx.notifier.notifyEpoch(125) + ctx.notifyEpoch(125) // Check final sweep into wallet. testSweepHtlc(t, ctx) @@ -840,7 +917,7 @@ func testNurseryCommitSuccessOnLocal(t *testing.T, ctx.restart() // Notify confirmation of the commitment tx. - err = ctx.notifier.confirmTx(&commitRes.SelfOutPoint.Hash, 124) + err = ctx.notifier.ConfirmTx(&commitRes.SelfOutPoint.Hash, 124) if err != nil { t.Fatal(err) } @@ -855,7 +932,7 @@ func testNurseryCommitSuccessOnLocal(t *testing.T, ctx.restart() // Notify arrival of block where commit output CSV expires. - ctx.notifier.notifyEpoch(126) + ctx.notifyEpoch(126) // Check final sweep into wallet. testSweep(t, ctx, func() { @@ -876,27 +953,28 @@ func testSweepHtlc(t *testing.T, ctx *nurseryTestContext) { func testSweep(t *testing.T, ctx *nurseryTestContext, afterPublishAssert func()) { + // Wait for nursery to publish the sweep tx. + ctx.tick() sweepTx := ctx.receiveTx() if ctx.restart() { - // Restart will trigger rebroadcast of sweep tx. - sweepTx = ctx.receiveTx() + // Nursery reoffers its input. Sweeper needs a tick to create the sweep + // tx. + ctx.tick() + ctx.receiveTx() } afterPublishAssert() // Confirm the sweep tx. - sweepTxHash := sweepTx.TxHash() - err := ctx.notifier.confirmTx(&sweepTxHash, 129) - if err != nil { - t.Fatal(err) - } + ctx.notifier.SpendOutpoint(sweepTx.TxIn[0].PreviousOutPoint, sweepTx) // Wait for output to be promoted in store to GRAD. select { case <-ctx.store.graduateKinderChan: case <-time.After(defaultTestTimeout): + pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) t.Fatalf("output not graduated") } @@ -907,6 +985,19 @@ func testSweep(t *testing.T, ctx *nurseryTestContext, assertNurseryReportUnavailable(t, ctx.nursery) } +func (ctx *nurseryTestContext) tick() { + select { + case c := <-ctx.timeoutChan: + select { + case c <- time.Time{}: + case <-time.After(defaultTestTimeout): + ctx.t.Fatal("tick timeout - tick not consumed") + } + case <-time.After(defaultTestTimeout): + ctx.t.Fatal("tick timeout - no new timer created") + } +} + type nurseryStoreInterceptor struct { ns NurseryStore @@ -941,16 +1032,18 @@ func (i *nurseryStoreInterceptor) CribToKinder(babyOutput *babyOutput) error { return err } -func (i *nurseryStoreInterceptor) PreschoolToKinder(kidOutput *kidOutput) error { - err := i.ns.PreschoolToKinder(kidOutput) +func (i *nurseryStoreInterceptor) PreschoolToKinder(kidOutput *kidOutput, + lastGradHeight uint32) error { + + err := i.ns.PreschoolToKinder(kidOutput, lastGradHeight) i.preschoolToKinderChan <- struct{}{} return err } -func (i *nurseryStoreInterceptor) GraduateKinder(height uint32) error { - err := i.ns.GraduateKinder(height) +func (i *nurseryStoreInterceptor) GraduateKinder(height uint32, kid *kidOutput) error { + err := i.ns.GraduateKinder(height, kid) i.graduateKinderChan <- struct{}{} @@ -961,30 +1054,12 @@ func (i *nurseryStoreInterceptor) FetchPreschools() ([]kidOutput, error) { return i.ns.FetchPreschools() } -func (i *nurseryStoreInterceptor) FetchClass(height uint32) (*wire.MsgTx, +func (i *nurseryStoreInterceptor) FetchClass(height uint32) ( []kidOutput, []babyOutput, error) { return i.ns.FetchClass(height) } -func (i *nurseryStoreInterceptor) FinalizeKinder(height uint32, - tx *wire.MsgTx) error { - - return i.ns.FinalizeKinder(height, tx) -} - -func (i *nurseryStoreInterceptor) LastFinalizedHeight() (uint32, error) { - return i.ns.LastFinalizedHeight() -} - -func (i *nurseryStoreInterceptor) GraduateHeight(height uint32) error { - return i.ns.GraduateHeight(height) -} - -func (i *nurseryStoreInterceptor) LastGraduatedHeight() (uint32, error) { - return i.ns.LastGraduatedHeight() -} - func (i *nurseryStoreInterceptor) HeightsBelowOrEqual(height uint32) ( []uint32, error) { @@ -1025,92 +1100,3 @@ func (m *nurseryMockSigner) ComputeInputScript(tx *wire.MsgTx, return &lnwallet.InputScript{}, nil } - -type nurseryMockNotifier struct { - confChannel map[chainhash.Hash]chan *chainntnfs.TxConfirmation - epochChan chan *chainntnfs.BlockEpoch - spendChan chan *chainntnfs.SpendDetail - mutex sync.RWMutex - t *testing.T -} - -func newNurseryMockNotifier(t *testing.T) *nurseryMockNotifier { - return &nurseryMockNotifier{ - confChannel: make(map[chainhash.Hash]chan *chainntnfs.TxConfirmation), - epochChan: make(chan *chainntnfs.BlockEpoch), - spendChan: make(chan *chainntnfs.SpendDetail), - t: t, - } -} - -func (m *nurseryMockNotifier) notifyEpoch(height int32) { - select { - case m.epochChan <- &chainntnfs.BlockEpoch{ - Height: height, - }: - case <-time.After(defaultTestTimeout): - m.t.Fatal("epoch event not consumed") - } -} - -func (m *nurseryMockNotifier) confirmTx(txid *chainhash.Hash, height uint32) error { - confirm := &chainntnfs.TxConfirmation{ - BlockHeight: height, - } - select { - case m.getConfChannel(txid) <- confirm: - case <-time.After(defaultTestTimeout): - return fmt.Errorf("confirmation not consumed") - } - return nil -} - -func (m *nurseryMockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, - _ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, - error) { - - return &chainntnfs.ConfirmationEvent{ - Confirmed: m.getConfChannel(txid), - }, nil -} - -func (m *nurseryMockNotifier) getConfChannel( - txid *chainhash.Hash) chan *chainntnfs.TxConfirmation { - - m.mutex.Lock() - defer m.mutex.Unlock() - - channel, ok := m.confChannel[*txid] - if ok { - return channel - } - channel = make(chan *chainntnfs.TxConfirmation) - m.confChannel[*txid] = channel - - return channel -} - -func (m *nurseryMockNotifier) RegisterBlockEpochNtfn( - bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) { - return &chainntnfs.BlockEpochEvent{ - Epochs: m.epochChan, - Cancel: func() {}, - }, nil -} - -func (m *nurseryMockNotifier) Start() error { - return nil -} - -func (m *nurseryMockNotifier) Stop() error { - return nil -} - -func (m *nurseryMockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, - _ []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { - - return &chainntnfs.SpendEvent{ - Spend: m.spendChan, - Cancel: func() {}, - }, nil -}