utxonursery: connect to time-based sweeper

Previously, nursery generated and published its own sweep txes. It
stored the sweep tx in nursery_store to prevent a new tx with a new
sweep address from being generated on restart.

In this commit, sweep generation and publication is removed from nursery
and delegated to the sweeper. Also the confirmation notification is
received from the sweeper.
This commit is contained in:
Joost Jager 2018-10-23 12:08:03 +02:00
parent eec2efdd6b
commit 6389a97708
No known key found for this signature in database
GPG Key ID: AE6B0D042C8E38D9
8 changed files with 365 additions and 954 deletions

View File

@ -250,7 +250,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
signer := &mockSigner{
key: alicePrivKey,
}
bio := &mockChainIO{}
bio := &mockChainIO{
bestHeight: fundingBroadcastHeight,
}
dbDir := filepath.Join(tempTestDir, "cdb")
cdb, err := channeldb.Open(dbDir)

View File

@ -168,10 +168,12 @@ func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32,
}
}
type mockChainIO struct{}
type mockChainIO struct {
bestHeight int32
}
func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) {
return activeNetParams.GenesisHash, fundingBroadcastHeight, nil
func (m *mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) {
return activeNetParams.GenesisHash, m.bestHeight, nil
}
func (*mockChainIO) GetUtxo(op *wire.OutPoint, _ []byte,

View File

@ -23,21 +23,6 @@ import (
//
// utxn<chain-hash>/
// |
// | LAST PURGED + FINALIZED HEIGHTS
// |
// | Each nursery store tracks a "last graduated height", which records the
// | most recent block height for which the nursery store has successfully
// | graduated all outputs. It also tracks a "last finalized height", which
// | records the last block height that the nursery attempted to graduate
// | If a finalized height has kindergarten outputs, the sweep txn for these
// | outputs will be stored in the height bucket. This ensure that the same
// | txid will be used after restarts. Otherwise, the nursery will be unable
// | to recover the txid of kindergarten sweep transaction it has already
// | broadcast.
// |
// ├── last-finalized-height-key: <last-finalized-height>
// ├── last-graduated-height-key: <last-graduated-height>
// |
// | CHANNEL INDEX
// |
// | The channel index contains a directory for each channel that has a
@ -72,10 +57,7 @@ import (
// | relative file path:
// | e.g. <chan-point-3>/<prefix><outpoint-2>/
// | that can be queried in the channel index to retrieve the serialized
// | output. If a height bucket is less than or equal to the current last
// | finalized height and has a non-zero number of kindergarten outputs, a
// | height bucket will also contain the finalized kindergarten sweep txn
// | under the "finalized-kndr-txn" key.
// | output.
// |
// └── height-index-key/
//    ├── <height-1>/ <- HEIGHT BUCKET
@ -84,12 +66,15 @@ import (
// | |    └── <state-prefix><outpoint-5>: ""
// |   ├── <chan-point-2>/
// | |    └── <state-prefix><outpoint-3>: ""
//    | └── finalized-kndr-txn: "" | <kndr-sweep-tnx>
//    └── <height-2>/
//    └── <chan-point-1>/
//    └── <state-prefix><outpoint-1>: ""
//    └── <state-prefix><outpoint-2>: ""
// TODO(joostjager): Add database migration to clean up now unused last
// graduated height and finalized txes. This also prevents people downgrading
// and surprising the downgraded nursery with missing data.
// NurseryStore abstracts the persistent storage layer for the utxo nursery.
// Concretely, it stores commitment and htlc outputs until any time-bounded
// constraints have fully matured. The store exposes methods for enumerating its
@ -110,47 +95,30 @@ type NurseryStore interface {
CribToKinder(*babyOutput) error
// PreschoolToKinder atomically moves a kidOutput from the preschool
// bucket to the kindergarten bucket. This transition should be
// executed after receiving confirmation of the preschool output.
// Incoming HTLC's we need to go to the second-layer to claim, and also
// our commitment outputs fall into this class.
PreschoolToKinder(*kidOutput) error
// bucket to the kindergarten bucket. This transition should be executed
// after receiving confirmation of the preschool output. Incoming HTLC's
// we need to go to the second-layer to claim, and also our commitment
// outputs fall into this class.
//
// An additional parameter specifies the last graduated height. This is
// used in case of late registration. It schedules the output for sweep
// at the next epoch even though it has already expired earlier.
PreschoolToKinder(kid *kidOutput, lastGradHeight uint32) error
// GraduateKinder atomically moves the kindergarten class at the
// provided height into the graduated status. This involves removing the
// kindergarten entries from both the height and channel indexes, and
// cleaning up the finalized kindergarten sweep txn. The height bucket
// will be opportunistically pruned from the height index as outputs are
// GraduateKinder atomically moves an output at the provided height into
// the graduated status. This involves removing the kindergarten entries
// from both the height and channel indexes. The height bucket will be
// opportunistically pruned from the height index as outputs are
// removed.
GraduateKinder(height uint32) error
GraduateKinder(height uint32, output *kidOutput) error
// FetchPreschools returns a list of all outputs currently stored in
// the preschool bucket.
FetchPreschools() ([]kidOutput, error)
// FetchClass returns a list of kindergarten and crib outputs whose
// timelocks expire at the given height. If the kindergarten class at
// this height hash been finalized previously, via FinalizeKinder, it
// will also returns the finalized kindergarten sweep txn.
FetchClass(height uint32) (*wire.MsgTx, []kidOutput, []babyOutput, error)
// FinalizeKinder accepts a block height and the kindergarten sweep txn
// computed for this height. Upon startup, we will rebroadcast any
// finalized kindergarten txns instead of signing a new txn, as this
// result in a different txid from a preceding broadcast.
FinalizeKinder(height uint32, tx *wire.MsgTx) error
// LastFinalizedHeight returns the last block height for which the
// nursery store finalized a kindergarten class.
LastFinalizedHeight() (uint32, error)
// GraduateHeight records the provided height as the last height for
// which the nursery store successfully graduated all outputs.
GraduateHeight(height uint32) error
// LastGraduatedHeight returns the last block height for which the
// nursery store successfully graduated all outputs.
LastGraduatedHeight() (uint32, error)
// timelocks expire at the given height.
FetchClass(height uint32) ([]kidOutput, []babyOutput, error)
// HeightsBelowOrEqual returns the lowest non-empty heights in the
// height index, that exist at or below the provided upper bound.
@ -181,14 +149,6 @@ var (
// the root-level, chain-segmented bucket for each nursery store.
utxnChainPrefix = []byte("utxn")
// lastFinalizedHeightKey is a static key used to locate nursery store's
// last finalized height.
lastFinalizedHeightKey = []byte("last-finalized-height")
// lastGraduatedHeightKey is a static key used to retrieve the height of
// the last bucket that successfully graduated all outputs.
lastGraduatedHeightKey = []byte("last-graduated-height")
// channelIndexKey is a static key used to lookup the bucket containing
// all of the nursery's active channels.
channelIndexKey = []byte("channel-index")
@ -197,10 +157,6 @@ var (
// containing all heights for which the nursery will need to take
// action.
heightIndexKey = []byte("height-index")
// finalizedKndrTxnKey is a static key that can be used to locate a
// finalized kindergarten sweep txn.
finalizedKndrTxnKey = []byte("finalized-kndr-txn")
)
// Defines the state prefixes that will be used to persistently track an
@ -415,7 +371,9 @@ func (ns *nurseryStore) CribToKinder(bby *babyOutput) error {
// PreschoolToKinder atomically moves a kidOutput from the preschool bucket to
// the kindergarten bucket. This transition should be executed after receiving
// confirmation of the preschool output's commitment transaction.
func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error {
func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput,
lastGradHeight uint32) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
// Create or retrieve the channel bucket corresponding to the
// kid output's origin channel point.
@ -478,13 +436,6 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error {
maturityHeight = kid.ConfHeight() + kid.BlocksToMaturity()
}
// In the case of a Late Registration, we've already graduated
// the class that this kid is destined for. So we'll bump its
// height by one to ensure we don't forget to graduate it.
lastGradHeight, err := ns.getLastGraduatedHeight(tx)
if err != nil {
return err
}
if maturityHeight <= lastGradHeight {
utxnLog.Debugf("Late Registration for kid output=%v "+
"detected: class_height=%v, "+
@ -515,108 +466,66 @@ func (ns *nurseryStore) PreschoolToKinder(kid *kidOutput) error {
})
}
// GraduateKinder atomically moves the kindergarten class at the provided height
// into the graduated status. This involves removing the kindergarten entries
// from both the height and channel indexes, and cleaning up the finalized
// kindergarten sweep txn. The height bucket will be opportunistically pruned
// from the height index as outputs are removed.
func (ns *nurseryStore) GraduateKinder(height uint32) error {
// GraduateKinder atomically moves an output at the provided height into the
// graduated status. This involves removing the kindergarten entries from both
// the height and channel indexes. The height bucket will be opportunistically
// pruned from the height index as outputs are removed.
func (ns *nurseryStore) GraduateKinder(height uint32, kid *kidOutput) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
// Since all kindergarten outputs at a particular height are
// swept in a single txn, we can now safely delete the finalized
// txn, since it has already been broadcast and confirmed.
hghtBucket := ns.getHeightBucket(tx, height)
if hghtBucket == nil {
// Nothing to delete, bucket has already been removed.
return nil
}
// Remove the finalized kindergarten txn, we do this before
// removing the outputs so that the extra entry doesn't prevent
// the height bucket from being opportunistically pruned below.
if err := hghtBucket.Delete(finalizedKndrTxnKey); err != nil {
// For the kindergarten output, delete its entry from the
// height and channel index, and create a new grad output in the
// channel index.
outpoint := kid.OutPoint()
chanPoint := kid.OriginChanPoint()
// Construct the key under which the output is
// currently stored height and channel indexes.
pfxOutputKey, err := prefixOutputKey(kndrPrefix,
outpoint)
if err != nil {
return err
}
// For each kindergarten found output, delete its entry from the
// height and channel index, and create a new grad output in the
// channel index.
return ns.forEachHeightPrefix(tx, kndrPrefix, height,
func(v []byte) error {
var kid kidOutput
err := kid.Decode(bytes.NewReader(v))
if err != nil {
return err
}
// Remove the grad output's entry in the height
// index.
err = ns.removeOutputFromHeight(tx, height,
chanPoint, pfxOutputKey)
if err != nil {
return err
}
outpoint := kid.OutPoint()
chanPoint := kid.OriginChanPoint()
chanBucket := ns.getChannelBucket(tx,
chanPoint)
if chanBucket == nil {
return ErrContractNotFound
}
// Construct the key under which the output is
// currently stored height and channel indexes.
pfxOutputKey, err := prefixOutputKey(kndrPrefix,
outpoint)
if err != nil {
return err
}
// Remove previous output with kindergarten
// prefix.
err = chanBucket.Delete(pfxOutputKey)
if err != nil {
return err
}
// Remove the grad output's entry in the height
// index.
err = ns.removeOutputFromHeight(tx, height,
chanPoint, pfxOutputKey)
if err != nil {
return err
}
// Convert kindergarten key to graduate key.
copy(pfxOutputKey, gradPrefix)
chanBucket := ns.getChannelBucket(tx,
chanPoint)
if chanBucket == nil {
return ErrContractNotFound
}
var gradBuffer bytes.Buffer
if err := kid.Encode(&gradBuffer); err != nil {
return err
}
// Remove previous output with kindergarten
// prefix.
err = chanBucket.Delete(pfxOutputKey)
if err != nil {
return err
}
// Convert kindergarten key to graduate key.
copy(pfxOutputKey, gradPrefix)
var gradBuffer bytes.Buffer
if err := kid.Encode(&gradBuffer); err != nil {
return err
}
// Insert serialized output into channel bucket
// using graduate-prefixed key.
return chanBucket.Put(pfxOutputKey,
gradBuffer.Bytes())
},
)
})
}
// FinalizeKinder accepts a block height and a finalized kindergarten sweep
// transaction, persisting the transaction at the appropriate height bucket. The
// nursery store's last finalized height is also updated with the provided
// height.
func (ns *nurseryStore) FinalizeKinder(height uint32,
finalTx *wire.MsgTx) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
return ns.finalizeKinder(tx, height, finalTx)
})
}
// GraduateHeight persists the provided height as the nursery store's last
// graduated height.
func (ns *nurseryStore) GraduateHeight(height uint32) error {
return ns.db.Update(func(tx *bbolt.Tx) error {
return ns.putLastGraduatedHeight(tx, height)
// Insert serialized output into channel bucket
// using graduate-prefixed key.
return chanBucket.Put(pfxOutputKey,
gradBuffer.Bytes())
})
}
@ -625,23 +534,15 @@ func (ns *nurseryStore) GraduateHeight(height uint32) error {
// FetchClass returns a list of the kindergarten and crib outputs whose timeouts
// are expiring
func (ns *nurseryStore) FetchClass(
height uint32) (*wire.MsgTx, []kidOutput, []babyOutput, error) {
height uint32) ([]kidOutput, []babyOutput, error) {
// Construct list of all crib and kindergarten outputs that need to be
// processed at the provided block height.
var finalTx *wire.MsgTx
var kids []kidOutput
var babies []babyOutput
if err := ns.db.View(func(tx *bbolt.Tx) error {
var err error
finalTx, err = ns.getFinalizedTxn(tx, height)
if err != nil {
return err
}
// Append each crib output to our list of babyOutputs.
if err = ns.forEachHeightPrefix(tx, cribPrefix, height,
if err := ns.forEachHeightPrefix(tx, cribPrefix, height,
func(buf []byte) error {
// We will attempt to deserialize all outputs
@ -683,10 +584,10 @@ func (ns *nurseryStore) FetchClass(
})
}); err != nil {
return nil, nil, nil, err
return nil, nil, err
}
return finalTx, kids, babies, nil
return kids, babies, nil
}
// FetchPreschools returns a list of all outputs currently stored in the
@ -938,32 +839,6 @@ func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
})
}
// LastFinalizedHeight returns the last block height for which the nursery
// store has finalized a kindergarten class.
func (ns *nurseryStore) LastFinalizedHeight() (uint32, error) {
var lastFinalizedHeight uint32
err := ns.db.View(func(tx *bbolt.Tx) error {
var err error
lastFinalizedHeight, err = ns.getLastFinalizedHeight(tx)
return err
})
return lastFinalizedHeight, err
}
// LastGraduatedHeight returns the last block height for which the nursery
// store has successfully graduated all outputs.
func (ns *nurseryStore) LastGraduatedHeight() (uint32, error) {
var lastGraduatedHeight uint32
err := ns.db.View(func(tx *bbolt.Tx) error {
var err error
lastGraduatedHeight, err = ns.getLastGraduatedHeight(tx)
return err
})
return lastGraduatedHeight, err
}
// Helper Methods
// enterCrib accepts a new htlc output that the nursery will incubate through
@ -994,6 +869,8 @@ func (ns *nurseryStore) enterCrib(tx *bbolt.Tx, baby *babyOutput) error {
// Next, retrieve or create the height-channel bucket located in the
// height bucket corresponding to the baby output's CLTV expiry height.
// TODO: Handle late registration.
hghtChanBucket, err := ns.createHeightChanBucket(tx,
baby.expiry, chanPoint)
if err != nil {
@ -1332,145 +1209,6 @@ func (ns *nurseryStore) forChanOutputs(tx *bbolt.Tx, chanPoint *wire.OutPoint,
return chanBucket.ForEach(callback)
}
// getLastFinalizedHeight is a helper method that retrieves the last height for
// which the database finalized its persistent state.
func (ns *nurseryStore) getLastFinalizedHeight(tx *bbolt.Tx) (uint32, error) {
// Retrieve the chain bucket associated with the given nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
if chainBucket == nil {
return 0, nil
}
// Lookup the last finalized height in the top-level chain bucket.
heightBytes := chainBucket.Get(lastFinalizedHeightKey)
if heightBytes == nil {
// We have never finalized, return height 0.
return 0, nil
}
// If the resulting bytes are not sized like a uint32, then we have
// never finalized, so we return 0.
// Otherwise, parse the bytes and return the last finalized height.
return byteOrder.Uint32(heightBytes), nil
}
// finalizeKinder records a finalized kindergarten sweep txn to the given height
// bucket. It also updates the nursery store's last finalized height, so that we
// do not finalize the same height twice. If the finalized txn is nil, i.e. if
// the height has no kindergarten outputs, the height will be marked as
// finalized, and we skip the process of writing the txn. When the class is
// loaded, a nil value will be returned if no txn has been written to a
// finalized height bucket.
func (ns *nurseryStore) finalizeKinder(tx *bbolt.Tx, height uint32,
finalTx *wire.MsgTx) error {
// TODO(conner) ensure height is greater that current finalized height.
// 1. Write the last finalized height to the chain bucket.
// Ensure that the chain bucket for this nursery store exists.
chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey)
if err != nil {
return err
}
// Serialize the provided last-finalized height, and store it in the
// top-level chain bucket for this nursery store.
var lastHeightBytes [4]byte
byteOrder.PutUint32(lastHeightBytes[:], height)
err = chainBucket.Put(lastFinalizedHeightKey, lastHeightBytes[:])
if err != nil {
return err
}
// 2. Write the finalized txn in the appropriate height bucket.
// If there is no finalized txn, we have nothing to do.
if finalTx == nil {
return nil
}
// Otherwise serialize the finalized txn and write it to the height
// bucket.
hghtBucket := ns.getHeightBucket(tx, height)
if hghtBucket == nil {
return nil
}
var finalTxnBuf bytes.Buffer
if err := finalTx.Serialize(&finalTxnBuf); err != nil {
return err
}
return hghtBucket.Put(finalizedKndrTxnKey, finalTxnBuf.Bytes())
}
// getFinalizedTxn retrieves the finalized kindergarten sweep txn at the given
// height, returning nil if one was not found.
func (ns *nurseryStore) getFinalizedTxn(tx *bbolt.Tx,
height uint32) (*wire.MsgTx, error) {
hghtBucket := ns.getHeightBucket(tx, height)
if hghtBucket == nil {
// No class to finalize.
return nil, nil
}
finalTxBytes := hghtBucket.Get(finalizedKndrTxnKey)
if finalTxBytes == nil {
// No finalized txn for this height.
return nil, nil
}
// Otherwise, deserialize and return the finalized transaction.
txn := &wire.MsgTx{}
if err := txn.Deserialize(bytes.NewReader(finalTxBytes)); err != nil {
return nil, err
}
return txn, nil
}
// getLastGraduatedHeight is a helper method that retrieves the last height for
// which the database graduated all outputs successfully.
func (ns *nurseryStore) getLastGraduatedHeight(tx *bbolt.Tx) (uint32, error) {
// Retrieve the chain bucket associated with the given nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
if chainBucket == nil {
return 0, nil
}
// Lookup the last graduated height in the top-level chain bucket.
heightBytes := chainBucket.Get(lastGraduatedHeightKey)
if heightBytes == nil {
// We have never graduated before, return height 0.
return 0, nil
}
// Otherwise, parse the bytes and return the last graduated height.
return byteOrder.Uint32(heightBytes), nil
}
// pubLastGraduatedHeight is a helper method that writes the provided height under
// the last graduated height key.
func (ns *nurseryStore) putLastGraduatedHeight(tx *bbolt.Tx, height uint32) error {
// Ensure that the chain bucket for this nursery store exists.
chainBucket, err := tx.CreateBucketIfNotExists(ns.pfxChainKey)
if err != nil {
return err
}
// Serialize the provided last-graduated height, and store it in the
// top-level chain bucket for this nursery store.
var lastHeightBytes [4]byte
byteOrder.PutUint32(lastHeightBytes[:], height)
return chainBucket.Put(lastGraduatedHeightKey, lastHeightBytes[:])
}
// errBucketNotEmpty signals that an attempt to prune a particular
// bucket failed because it still has active outputs.
var errBucketNotEmpty = errors.New("bucket is not empty, cannot be pruned")
@ -1541,7 +1279,8 @@ func (ns *nurseryStore) pruneHeight(tx *bbolt.Tx, height uint32) (bool, error) {
// attempt to remove each one if they are empty, keeping track of the
// number of height-channel buckets that still have active outputs.
if err := hghtBucket.ForEach(func(chanBytes, v []byte) error {
// Skip the finalized txn key.
// Skip the finalized txn key if it still exists from a previous
// db version.
if v != nil {
return nil
}

View File

@ -88,8 +88,6 @@ func TestNurseryStoreInit(t *testing.T) {
assertNumChannels(t, ns, 0)
assertNumPreschools(t, ns, 0)
assertLastFinalizedHeight(t, ns, 0)
assertLastGraduatedHeight(t, ns, 0)
}
// TestNurseryStoreIncubate tests the primary state transitions taken by outputs
@ -172,7 +170,7 @@ func TestNurseryStoreIncubate(t *testing.T) {
// Now, move the commitment output to the kindergarten
// bucket.
err = ns.PreschoolToKinder(test.commOutput)
err = ns.PreschoolToKinder(test.commOutput, 0)
if err != test.err {
t.Fatalf("unable to move commitment output from "+
"pscl to kndr: %v", err)
@ -212,7 +210,7 @@ func TestNurseryStoreIncubate(t *testing.T) {
maturityHeight := test.commOutput.ConfHeight() +
test.commOutput.BlocksToMaturity()
err = ns.GraduateKinder(maturityHeight)
err = ns.GraduateKinder(maturityHeight, test.commOutput)
if err != nil {
t.Fatalf("unable to graduate kindergarten class at "+
"height %d: %v", maturityHeight, err)
@ -286,7 +284,8 @@ func TestNurseryStoreIncubate(t *testing.T) {
maturityHeight := htlcOutput.ConfHeight() +
htlcOutput.BlocksToMaturity()
err = ns.GraduateKinder(maturityHeight)
err = ns.GraduateKinder(maturityHeight,
&htlcOutput.kidOutput)
if err != nil {
t.Fatalf("unable to graduate htlc output "+
"from kndr to grad: %v", err)
@ -333,93 +332,6 @@ func TestNurseryStoreIncubate(t *testing.T) {
}
}
// TestNurseryStoreFinalize tests that kindergarten sweep transactions are
// properly persisted, and that the last finalized height is being set
// accordingly.
func TestNurseryStoreFinalize(t *testing.T) {
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to open channel db: %v", err)
}
defer cleanUp()
ns, err := newNurseryStore(&bitcoinTestnetGenesis, cdb)
if err != nil {
t.Fatalf("unable to open nursery store: %v", err)
}
kid := &kidOutputs[3]
// Compute the maturity height at which to enter the commitment output.
maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity()
// Since we haven't finalized before, we should see a last finalized
// height of 0.
assertLastFinalizedHeight(t, ns, 0)
// Begin incubating the commitment output, which will be placed in the
// preschool bucket.
err = ns.Incubate([]kidOutput{*kid}, nil)
if err != nil {
t.Fatalf("unable to incubate commitment output: %v", err)
}
// Then move the commitment output to the kindergarten bucket, so that
// the output is registered in the height index.
err = ns.PreschoolToKinder(kid)
if err != nil {
t.Fatalf("unable to move pscl output to kndr: %v", err)
}
// We should still see a last finalized height of 0, since no classes
// have been graduated.
assertLastFinalizedHeight(t, ns, 0)
// Now, iteratively finalize all heights below the maturity height,
// ensuring that the last finalized height is properly persisted, and
// that the finalized transactions are all nil.
for i := 0; i < int(maturityHeight); i++ {
err = ns.FinalizeKinder(uint32(i), nil)
if err != nil {
t.Fatalf("unable to finalize kndr at height=%d: %v",
i, err)
}
assertLastFinalizedHeight(t, ns, uint32(i))
assertFinalizedTxn(t, ns, uint32(i), nil)
}
// As we have now finalized all heights below the maturity height, we
// should still see the commitment output in the kindergarten bucket at
// its maturity height.
assertKndrAtMaturityHeight(t, ns, kid)
// Now, finalize the kindergarten sweep transaction at the maturity
// height.
err = ns.FinalizeKinder(maturityHeight, timeoutTx)
if err != nil {
t.Fatalf("unable to finalize kndr at height=%d: %v",
maturityHeight, err)
}
// The nursery store should now see the maturity height finalized, and
// the finalized kindergarten sweep txn should be returned at this
// height.
assertLastFinalizedHeight(t, ns, maturityHeight)
assertFinalizedTxn(t, ns, maturityHeight, timeoutTx)
// Lastly, continue to finalize heights above the maturity height. Each
// should report having a nil finalized kindergarten sweep txn.
for i := maturityHeight + 1; i < maturityHeight+10; i++ {
err = ns.FinalizeKinder(uint32(i), nil)
if err != nil {
t.Fatalf("unable to finalize kndr at height=%d: %v",
i, err)
}
assertLastFinalizedHeight(t, ns, uint32(i))
assertFinalizedTxn(t, ns, uint32(i), nil)
}
}
// TestNurseryStoreGraduate verifies that the nursery store properly removes
// populated entries from the height index as it is purged, and that the last
// purged height is set appropriately.
@ -441,9 +353,6 @@ func TestNurseryStoreGraduate(t *testing.T) {
// height index.
maturityHeight := kid.ConfHeight() + kid.BlocksToMaturity()
// Since we have never purged, the last purged height should be 0.
assertLastGraduatedHeight(t, ns, 0)
// First, add a commitment output to the nursery store, which is
// initially inserted in the preschool bucket.
err = ns.Incubate([]kidOutput{*kid}, nil)
@ -453,7 +362,7 @@ func TestNurseryStoreGraduate(t *testing.T) {
// Then, move the commitment output to the kindergarten bucket, such
// that it resides in the height index at its maturity height.
err = ns.PreschoolToKinder(kid)
err = ns.PreschoolToKinder(kid, 0)
if err != nil {
t.Fatalf("unable to move pscl output to kndr: %v", err)
}
@ -462,12 +371,6 @@ func TestNurseryStoreGraduate(t *testing.T) {
// checking that each class is now empty, and that the last purged
// height is set correctly.
for i := 0; i < int(maturityHeight); i++ {
err = ns.GraduateHeight(uint32(i))
if err != nil {
t.Fatalf("unable to purge height=%d: %v", i, err)
}
assertLastGraduatedHeight(t, ns, uint32(i))
assertHeightIsPurged(t, ns, uint32(i))
}
@ -475,27 +378,7 @@ func TestNurseryStoreGraduate(t *testing.T) {
// height.
assertKndrAtMaturityHeight(t, ns, kid)
// Finalize the kindergarten transaction, ensuring that it is a non-nil
// value.
err = ns.FinalizeKinder(maturityHeight, timeoutTx)
if err != nil {
t.Fatalf("unable to finalize kndr at height=%d: %v",
maturityHeight, err)
}
// Verify that the maturity height has now been finalized.
assertLastFinalizedHeight(t, ns, maturityHeight)
assertFinalizedTxn(t, ns, maturityHeight, timeoutTx)
// Finally, purge the non-empty maturity height, and check that returned
// class is empty.
err = ns.GraduateHeight(maturityHeight)
if err != nil {
t.Fatalf("unable to set graduated height=%d: %v", maturityHeight,
err)
}
err = ns.GraduateKinder(maturityHeight)
err = ns.GraduateKinder(maturityHeight, kid)
if err != nil {
t.Fatalf("unable to graduate kindergarten outputs at height=%d: "+
"%v", maturityHeight, err)
@ -528,36 +411,6 @@ func assertNumChanOutputs(t *testing.T, ns NurseryStore,
}
}
// assertLastFinalizedHeight checks that the nursery stores last finalized
// height matches the expected height.
func assertLastFinalizedHeight(t *testing.T, ns NurseryStore,
expected uint32) {
lfh, err := ns.LastFinalizedHeight()
if err != nil {
t.Fatalf("unable to get last finalized height: %v", err)
}
if lfh != expected {
t.Fatalf("expected last finalized height to be %d, got %d",
expected, lfh)
}
}
// assertLastGraduatedHeight checks that the nursery stores last purged height
// matches the expected height.
func assertLastGraduatedHeight(t *testing.T, ns NurseryStore, expected uint32) {
lgh, err := ns.LastGraduatedHeight()
if err != nil {
t.Fatalf("unable to get last graduated height: %v", err)
}
if lgh != expected {
t.Fatalf("expected last graduated height to be %d, got %d",
expected, lgh)
}
}
// assertNumPreschools loads all preschool outputs and verifies their count
// matches the expected number.
func assertNumPreschools(t *testing.T, ns NurseryStore, expected int) {
@ -592,16 +445,12 @@ func assertNumChannels(t *testing.T, ns NurseryStore, expected int) {
func assertHeightIsPurged(t *testing.T, ns NurseryStore,
height uint32) {
finalTx, kndrOutputs, cribOutputs, err := ns.FetchClass(height)
kndrOutputs, cribOutputs, err := ns.FetchClass(height)
if err != nil {
t.Fatalf("unable to retrieve class at height=%d: %v",
height, err)
}
if finalTx != nil {
t.Fatalf("height=%d not purged, final txn should be nil", height)
}
if kndrOutputs != nil {
t.Fatalf("height=%d not purged, kndr outputs should be nil", height)
}
@ -617,7 +466,7 @@ func assertCribAtExpiryHeight(t *testing.T, ns NurseryStore,
htlcOutput *babyOutput) {
expiryHeight := htlcOutput.expiry
_, _, cribOutputs, err := ns.FetchClass(expiryHeight)
_, cribOutputs, err := ns.FetchClass(expiryHeight)
if err != nil {
t.Fatalf("unable to retrieve class at height=%d: %v",
expiryHeight, err)
@ -639,7 +488,7 @@ func assertCribNotAtExpiryHeight(t *testing.T, ns NurseryStore,
htlcOutput *babyOutput) {
expiryHeight := htlcOutput.expiry
_, _, cribOutputs, err := ns.FetchClass(expiryHeight)
_, cribOutputs, err := ns.FetchClass(expiryHeight)
if err != nil {
t.Fatalf("unable to retrieve class at height %d: %v",
expiryHeight, err)
@ -653,25 +502,6 @@ func assertCribNotAtExpiryHeight(t *testing.T, ns NurseryStore,
}
}
// assertFinalizedTxn loads the class at the given height and compares the
// returned finalized txn to that in the class. It is safe to presented a nil
// expected transaction.
func assertFinalizedTxn(t *testing.T, ns NurseryStore, height uint32,
exFinalTx *wire.MsgTx) {
finalTx, _, _, err := ns.FetchClass(height)
if err != nil {
t.Fatalf("unable to fetch class at height=%d: %v", height,
err)
}
if !reflect.DeepEqual(finalTx, exFinalTx) {
t.Fatalf("expected finalized txn at height=%d "+
"to be %v, got %v", height, finalTx.TxHash(),
exFinalTx.TxHash())
}
}
// assertKndrAtMaturityHeight loads the class at the provided height and
// verifies that the provided kid output is one of the kindergarten outputs
// returned.
@ -680,7 +510,7 @@ func assertKndrAtMaturityHeight(t *testing.T, ns NurseryStore,
maturityHeight := kndrOutput.ConfHeight() +
kndrOutput.BlocksToMaturity()
_, kndrOutputs, _, err := ns.FetchClass(maturityHeight)
kndrOutputs, _, err := ns.FetchClass(maturityHeight)
if err != nil {
t.Fatalf("unable to retrieve class at height %d: %v",
maturityHeight, err)
@ -705,7 +535,7 @@ func assertKndrNotAtMaturityHeight(t *testing.T, ns NurseryStore,
maturityHeight := kndrOutput.ConfHeight() +
kndrOutput.BlocksToMaturity()
_, kndrOutputs, _, err := ns.FetchClass(maturityHeight)
kndrOutputs, _, err := ns.FetchClass(maturityHeight)
if err != nil {
t.Fatalf("unable to retrieve class at height %d: %v",
maturityHeight, err)

View File

@ -17,8 +17,6 @@ import (
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/sweep"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/connmgr"
@ -40,6 +38,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/nat"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/sweep"
"github.com/lightningnetwork/lnd/ticker"
"github.com/lightningnetwork/lnd/tor"
)
@ -156,6 +155,8 @@ type server struct {
utxoNursery *utxoNursery
sweeper *sweep.UtxoSweeper
chainArb *contractcourt.ChainArbitrator
sphinx *htlcswitch.OnionProcessor
@ -597,24 +598,45 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
return nil, err
}
sweeper := sweep.New(&sweep.UtxoSweeperConfig{
srvrLog.Tracef("Sweeper batch window duration: %v",
sweep.DefaultBatchWindowDuration)
sweeperStore, err := sweep.NewSweeperStore(
chanDB, activeNetParams.GenesisHash,
)
if err != nil {
srvrLog.Errorf("unable to create sweeper store: %v", err)
return nil, err
}
s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
Estimator: cc.feeEstimator,
GenSweepScript: func() ([]byte, error) {
return newSweepPkScript(cc.wallet)
},
Signer: cc.wallet.Cfg.Signer,
Signer: cc.wallet.Cfg.Signer,
PublishTransaction: cc.wallet.PublishTransaction,
NewBatchTimer: func() <-chan time.Time {
return time.NewTimer(sweep.DefaultBatchWindowDuration).C
},
SweepTxConfTarget: 6,
Notifier: cc.chainNotifier,
ChainIO: cc.chainIO,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,
MaxSweepAttempts: sweep.DefaultMaxSweepAttempts,
NextAttemptDeltaFunc: sweep.DefaultNextAttemptDeltaFunc,
})
s.utxoNursery = newUtxoNursery(&NurseryConfig{
ChainIO: cc.chainIO,
ConfDepth: 1,
SweepTxConfTarget: 6,
FetchClosedChannels: chanDB.FetchClosedChannels,
FetchClosedChannel: chanDB.FetchClosedChannel,
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
Store: utxnStore,
Sweeper: sweeper,
Sweeper: s.sweeper,
})
// Construct a closure that wraps the htlcswitch's CloseLink method.
@ -706,7 +728,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
DisableChannel: func(op wire.OutPoint) error {
return s.announceChanStatus(op, true)
},
Sweeper: sweeper,
Sweeper: s.sweeper,
}, chanDB)
s.breachArbiter = newBreachArbiter(&BreachConfig{
@ -963,6 +985,9 @@ func (s *server) Start() error {
if err := s.htlcSwitch.Start(); err != nil {
return err
}
if err := s.sweeper.Start(); err != nil {
return err
}
if err := s.utxoNursery.Start(); err != nil {
return err
}
@ -1050,6 +1075,7 @@ func (s *server) Stop() error {
s.breachArbiter.Stop()
s.authGossiper.Stop()
s.chainArb.Stop()
s.sweeper.Stop()
s.cc.wallet.Shutdown()
s.cc.chainView.Stop()
s.connMgr.Stop()

View File

@ -316,7 +316,9 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
}
bobPool.Start()
chainIO := &mockChainIO{}
chainIO := &mockChainIO{
bestHeight: fundingBroadcastHeight,
}
wallet := &lnwallet.LightningWallet{
WalletController: &mockWalletController{
rootKey: aliceKeyPriv,

View File

@ -72,17 +72,6 @@ import (
// the utxo nursery will sweep all KNDR outputs scheduled for that height
// using a single txn.
//
// NOTE: Due to the fact that KNDR outputs can be dynamically aggregated and
// swept, we make precautions to finalize the KNDR outputs at a particular
// height on our first attempt to sweep it. Finalizing involves signing the
// sweep transaction and persisting it in the nursery store, and recording
// the last finalized height. Any attempts to replay an already finalized
// height will result in broadcasting the already finalized txn, ensuring the
// nursery does not broadcast different txids for the same batch of KNDR
// outputs. The reason txids may change is due to the probabilistic nature of
// generating the pkscript in the sweep txn's output, even if the set of
// inputs remains static across attempts.
//
// - GRAD (kidOutput) outputs are KNDR outputs that have successfully been
// swept into the user's wallet. A channel is considered mature once all of
// its outputs, including two-stage htlcs, have entered the GRAD state,
@ -183,10 +172,6 @@ type NurseryConfig struct {
// determining outputs in the chain as confirmed.
ConfDepth uint32
// SweepTxConfTarget assigns a confirmation target for sweep txes on
// which the fee calculation will be based.
SweepTxConfTarget uint32
// FetchClosedChannels provides access to a user's channels, such that
// they can be marked fully closed after incubation has concluded.
FetchClosedChannels func(pendingOnly bool) (
@ -254,25 +239,22 @@ func (u *utxoNursery) Start() error {
utxnLog.Tracef("Starting UTXO nursery")
// 1. Start watching for new blocks, as this will drive the nursery
// store's state machine.
// Register with the notifier to receive notifications for each newly
// connected block. We register immediately on startup to ensure that
// no blocks are missed while we are handling blocks that were missed
// during the time the UTXO nursery was unavailable.
newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn(nil)
// Retrieve the currently best known block. This is needed to have the
// state machine catch up with the blocks we missed when we were down.
bestHash, bestHeight, err := u.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
// Set best known height to schedule late registrations properly.
atomic.StoreUint32(&u.bestHeight, uint32(bestHeight))
// 2. Flush all fully-graduated channels from the pipeline.
// Load any pending close channels, which represents the super set of
// all channels that may still be incubating.
pendingCloseChans, err := u.cfg.FetchClosedChannels(true)
if err != nil {
newBlockChan.Cancel()
return err
}
@ -281,7 +263,6 @@ func (u *utxoNursery) Start() error {
for _, pendingClose := range pendingCloseChans {
err := u.closeAndRemoveIfMature(&pendingClose.ChanPoint)
if err != nil {
newBlockChan.Cancel()
return err
}
}
@ -289,15 +270,6 @@ func (u *utxoNursery) Start() error {
// TODO(conner): check if any fully closed channels can be removed from
// utxn.
// Query the nursery store for the lowest block height we could be
// incubating, which is taken to be the last height for which the
// database was purged.
lastGraduatedHeight, err := u.cfg.Store.LastGraduatedHeight()
if err != nil {
newBlockChan.Cancel()
return err
}
// 2. Restart spend ntfns for any preschool outputs, which are waiting
// for the force closed commitment txn to confirm, or any second-layer
// HTLC success transactions.
@ -306,15 +278,24 @@ func (u *utxoNursery) Start() error {
// point forward, we must close the nursery's quit channel if we detect
// any failures during startup to ensure they terminate.
if err := u.reloadPreschool(); err != nil {
newBlockChan.Cancel()
close(u.quit)
return err
}
// 3. Replay all crib and kindergarten outputs from last pruned to
// current best height.
if err := u.reloadClasses(lastGraduatedHeight); err != nil {
newBlockChan.Cancel()
// 3. Replay all crib and kindergarten outputs up to the current best
// height.
if err := u.reloadClasses(uint32(bestHeight)); err != nil {
close(u.quit)
return err
}
// Start watching for new blocks, as this will drive the nursery store's
// state machine.
newBlockChan, err := u.cfg.Notifier.RegisterBlockEpochNtfn(&chainntnfs.BlockEpoch{
Height: bestHeight,
Hash: bestHash,
})
if err != nil {
close(u.quit)
return err
}
@ -672,123 +653,44 @@ func (u *utxoNursery) reloadPreschool() error {
// reloadClasses reinitializes any height-dependent state transitions for which
// the utxonursery has not received confirmation, and replays the graduation of
// all kindergarten and crib outputs for heights that have not been finalized.
// all kindergarten and crib outputs for all heights up to the current block.
// This allows the nursery to reinitialize all state to continue sweeping
// outputs, even in the event that we missed blocks while offline.
// reloadClasses is called during the startup of the UTXO Nursery.
func (u *utxoNursery) reloadClasses(lastGradHeight uint32) error {
// Begin by loading all of the still-active heights up to and including
// the last height we successfully graduated.
activeHeights, err := u.cfg.Store.HeightsBelowOrEqual(lastGradHeight)
// outputs, even in the event that we missed blocks while offline. reloadClasses
// is called during the startup of the UTXO Nursery.
func (u *utxoNursery) reloadClasses(bestHeight uint32) error {
// Loading all active heights up to and including the current block.
activeHeights, err := u.cfg.Store.HeightsBelowOrEqual(
uint32(bestHeight))
if err != nil {
return err
}
if len(activeHeights) > 0 {
utxnLog.Infof("Re-registering confirmations for %d already "+
"graduated heights below height=%d", len(activeHeights),
lastGradHeight)
// Return early if nothing to sweep.
if len(activeHeights) == 0 {
return nil
}
utxnLog.Infof("(Re)-sweeping %d heights below height=%d",
len(activeHeights), bestHeight)
// Attempt to re-register notifications for any outputs still at these
// heights.
for _, classHeight := range activeHeights {
utxnLog.Debugf("Attempting to regraduate outputs at height=%v",
utxnLog.Debugf("Attempting to sweep outputs at height=%v",
classHeight)
if err = u.regraduateClass(classHeight); err != nil {
utxnLog.Errorf("Failed to regraduate outputs at "+
if err = u.graduateClass(classHeight); err != nil {
utxnLog.Errorf("Failed to sweep outputs at "+
"height=%v: %v", classHeight, err)
return err
}
}
// Get the most recently mined block.
_, bestHeight, err := u.cfg.ChainIO.GetBestBlock()
if err != nil {
return err
}
// If we haven't yet seen any registered force closes, or we're already
// caught up with the current best chain, then we can exit early.
if lastGradHeight == 0 || uint32(bestHeight) == lastGradHeight {
return nil
}
utxnLog.Infof("Processing outputs from missed blocks. Starting with "+
"blockHeight=%v, to current blockHeight=%v", lastGradHeight,
bestHeight)
// Loop through and check for graduating outputs at each of the missed
// block heights.
for curHeight := lastGradHeight + 1; curHeight <= uint32(bestHeight); curHeight++ {
utxnLog.Debugf("Attempting to graduate outputs at height=%v",
curHeight)
if err := u.graduateClass(curHeight); err != nil {
utxnLog.Errorf("Failed to graduate outputs at "+
"height=%v: %v", curHeight, err)
return err
}
}
utxnLog.Infof("UTXO Nursery is now fully synced")
return nil
}
// regraduateClass handles the steps involved in re-registering for
// confirmations for all still-active outputs at a particular height. This is
// used during restarts to ensure that any still-pending state transitions are
// properly registered, so they can be driven by the chain notifier. No
// transactions or signing are done as a result of this step.
func (u *utxoNursery) regraduateClass(classHeight uint32) error {
// Fetch all information about the crib and kindergarten outputs at
// this height. In addition to the outputs, we also retrieve the
// finalized kindergarten sweep txn, which will be nil if we have not
// attempted this height before, or if no kindergarten outputs exist at
// this height.
finalTx, kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass(
classHeight)
if err != nil {
return err
}
if finalTx != nil {
utxnLog.Infof("Re-registering confirmation for kindergarten "+
"sweep transaction at height=%d ", classHeight)
err = u.sweepMatureOutputs(classHeight, finalTx, kgtnOutputs)
if err != nil {
utxnLog.Errorf("Failed to re-register for kindergarten "+
"sweep transaction at height=%d: %v",
classHeight, err)
return err
}
}
if len(cribOutputs) == 0 {
return nil
}
utxnLog.Infof("Re-registering confirmation for first-stage HTLC "+
"outputs at height=%d ", classHeight)
// Now, we broadcast all pre-signed htlc txns from the crib outputs at
// this height. There is no need to finalize these txns, since the txid
// is predetermined when signed in the wallet.
for i := range cribOutputs {
err := u.sweepCribOutput(classHeight, &cribOutputs[i])
if err != nil {
utxnLog.Errorf("Failed to re-register first-stage "+
"HTLC output %v", cribOutputs[i].OutPoint())
return err
}
}
return nil
}
// incubator is tasked with driving all state transitions that are dependent on
// the current height of the blockchain. As new blocks arrive, the incubator
// will attempt spend outputs at the latest height. The asynchronous
@ -821,6 +723,11 @@ func (u *utxoNursery) incubator(newBlockChan *chainntnfs.BlockEpochEvent) {
// as signing and broadcasting a sweep txn that spends
// from all kindergarten outputs at this height.
height := uint32(epoch.Height)
// Update best known block height for late registrations
// to be scheduled properly.
atomic.StoreUint32(&u.bestHeight, height)
if err := u.graduateClass(height); err != nil {
utxnLog.Errorf("error while graduating "+
"class at height=%d: %v", height, err)
@ -843,14 +750,9 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error {
u.mu.Lock()
defer u.mu.Unlock()
u.bestHeight = classHeight
// Fetch all information about the crib and kindergarten outputs at
// this height. In addition to the outputs, we also retrieve the
// finalized kindergarten sweep txn, which will be nil if we have not
// attempted this height before, or if no kindergarten outputs exist at
// this height.
finalTx, kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass(
kgtnOutputs, cribOutputs, err := u.cfg.Store.FetchClass(
classHeight)
if err != nil {
return err
@ -859,64 +761,11 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error {
utxnLog.Infof("Attempting to graduate height=%v: num_kids=%v, "+
"num_babies=%v", classHeight, len(kgtnOutputs), len(cribOutputs))
// Load the last finalized height, so we can determine if the
// kindergarten sweep txn should be crafted.
lastFinalizedHeight, err := u.cfg.Store.LastFinalizedHeight()
if err != nil {
return err
}
// If we haven't processed this height before, we finalize the
// graduating kindergarten outputs, by signing a sweep transaction that
// spends from them. This txn is persisted such that we never broadcast
// a different txn for the same height. This allows us to recover from
// failures, and watch for the correct txid.
if classHeight > lastFinalizedHeight {
// If this height has never been finalized, we have never
// generated a sweep txn for this height. Generate one if there
// are kindergarten outputs or cltv crib outputs to be spent.
if len(kgtnOutputs) > 0 {
sweepInputs := make([]sweep.Input, len(kgtnOutputs))
for i := range kgtnOutputs {
sweepInputs[i] = &kgtnOutputs[i]
}
finalTx, err = u.cfg.Sweeper.CreateSweepTx(
sweepInputs, u.cfg.SweepTxConfTarget,
classHeight,
)
if err != nil {
utxnLog.Errorf("Failed to create sweep txn at "+
"height=%d", classHeight)
return err
}
}
// Persist the kindergarten sweep txn to the nursery store. It
// is safe to store a nil finalTx, which happens if there are
// no graduating kindergarten outputs.
err = u.cfg.Store.FinalizeKinder(classHeight, finalTx)
if err != nil {
utxnLog.Errorf("Failed to finalize kindergarten at "+
"height=%d", classHeight)
return err
}
// Log if the finalized transaction is non-trivial.
if finalTx != nil {
utxnLog.Infof("Finalized kindergarten at height=%d ",
classHeight)
}
}
// Now that the kindergarten sweep txn has either been finalized or
// restored, broadcast the txn, and set up notifications that will
// transition the swept kindergarten outputs and cltvCrib into
// graduated outputs.
if finalTx != nil {
err := u.sweepMatureOutputs(classHeight, finalTx, kgtnOutputs)
if err != nil {
// Offer the outputs to the sweeper and set up notifications that will
// transition the swept kindergarten outputs and cltvCrib into graduated
// outputs.
if len(kgtnOutputs) > 0 {
if err := u.sweepMatureOutputs(classHeight, kgtnOutputs); err != nil {
utxnLog.Errorf("Failed to sweep %d kindergarten "+
"outputs at height=%d: %v",
len(kgtnOutputs), classHeight, err)
@ -925,8 +774,7 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error {
}
// Now, we broadcast all pre-signed htlc txns from the csv crib outputs
// at this height. There is no need to finalize these txns, since the
// txid is predetermined when signed in the wallet.
// at this height.
for i := range cribOutputs {
err := u.sweepCribOutput(classHeight, &cribOutputs[i])
if err != nil {
@ -937,62 +785,32 @@ func (u *utxoNursery) graduateClass(classHeight uint32) error {
}
}
return u.cfg.Store.GraduateHeight(classHeight)
return nil
}
// sweepMatureOutputs generates and broadcasts the transaction that transfers
// control of funds from a prior channel commitment transaction to the user's
// wallet. The outputs swept were previously time locked (either absolute or
// relative), but are not mature enough to sweep into the wallet.
func (u *utxoNursery) sweepMatureOutputs(classHeight uint32, finalTx *wire.MsgTx,
func (u *utxoNursery) sweepMatureOutputs(classHeight uint32,
kgtnOutputs []kidOutput) error {
utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx "+
"(txid=%v): %v", len(kgtnOutputs),
finalTx.TxHash(), newLogClosure(func() string {
return spew.Sdump(finalTx)
}),
)
utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx for "+
"height %v", len(kgtnOutputs), classHeight)
// With the sweep transaction fully signed, broadcast the transaction
// to the network. Additionally, we can stop tracking these outputs as
// they've just been swept.
err := u.cfg.PublishTransaction(finalTx)
if err != nil && err != lnwallet.ErrDoubleSpend {
utxnLog.Errorf("unable to broadcast sweep tx: %v, %v",
err, spew.Sdump(finalTx))
return err
for _, output := range kgtnOutputs {
// Create local copy to prevent pointer to loop variable to be
// passed in with disastruous consequences.
local := output
resultChan, err := u.cfg.Sweeper.SweepInput(&local)
if err != nil {
return err
}
u.wg.Add(1)
go u.waitForSweepConf(classHeight, &output, resultChan)
}
return u.registerSweepConf(finalTx, kgtnOutputs, classHeight)
}
// registerSweepConf is responsible for registering a finalized kindergarten
// sweep transaction for confirmation notifications. If the confirmation was
// successfully registered, a goroutine will be spawned that waits for the
// confirmation, and graduates the provided kindergarten class within the
// nursery store.
func (u *utxoNursery) registerSweepConf(finalTx *wire.MsgTx,
kgtnOutputs []kidOutput, heightHint uint32) error {
finalTxID := finalTx.TxHash()
confChan, err := u.cfg.Notifier.RegisterConfirmationsNtfn(
&finalTxID, finalTx.TxOut[0].PkScript, u.cfg.ConfDepth,
heightHint,
)
if err != nil {
utxnLog.Errorf("unable to register notification for "+
"sweep confirmation: %v", finalTxID)
return err
}
utxnLog.Infof("Registering sweep tx %v for confs at height=%d",
finalTxID, heightHint)
u.wg.Add(1)
go u.waitForSweepConf(heightHint, kgtnOutputs, confChan)
return nil
}
@ -1002,16 +820,30 @@ func (u *utxoNursery) registerSweepConf(finalTx *wire.MsgTx,
// to mark any mature channels as fully closed in channeldb.
// NOTE(conner): this method MUST be called as a go routine.
func (u *utxoNursery) waitForSweepConf(classHeight uint32,
kgtnOutputs []kidOutput, confChan *chainntnfs.ConfirmationEvent) {
output *kidOutput, resultChan chan sweep.Result) {
defer u.wg.Done()
select {
case _, ok := <-confChan.Confirmed:
case result, ok := <-resultChan:
if !ok {
utxnLog.Errorf("Notification chan closed, can't"+
" advance %v graduating outputs",
len(kgtnOutputs))
utxnLog.Errorf("Notification chan closed, can't" +
" advance graduating output")
return
}
// In case of a remote spend, still graduate the output. There
// is no way to sweep it anymore.
if result.Err == sweep.ErrRemoteSpend {
utxnLog.Infof("Output %v was spend by remote party",
output.OutPoint())
break
}
if result.Err != nil {
utxnLog.Errorf("Failed to sweep %v at "+
"height=%d", output.OutPoint(),
classHeight)
return
}
@ -1024,32 +856,23 @@ func (u *utxoNursery) waitForSweepConf(classHeight uint32,
// TODO(conner): add retry logic?
// Mark the confirmed kindergarten outputs as graduated.
if err := u.cfg.Store.GraduateKinder(classHeight); err != nil {
utxnLog.Errorf("Unable to graduate %v kindergarten outputs: "+
"%v", len(kgtnOutputs), err)
// Mark the confirmed kindergarten output as graduated.
if err := u.cfg.Store.GraduateKinder(classHeight, output); err != nil {
utxnLog.Errorf("Unable to graduate kindergarten output %v: %v",
output.OutPoint(), err)
return
}
utxnLog.Infof("Graduated %d kindergarten outputs from height=%d",
len(kgtnOutputs), classHeight)
utxnLog.Infof("Graduated kindergarten output from height=%d",
classHeight)
// Iterate over the kid outputs and construct a set of all channel
// points to which they belong.
var possibleCloses = make(map[wire.OutPoint]struct{})
for _, kid := range kgtnOutputs {
possibleCloses[*kid.OriginChanPoint()] = struct{}{}
}
// Attempt to close each channel, only doing so if all of the channel's
// Attempt to close the channel, only doing so if all of the channel's
// outputs have been graduated.
for chanPoint := range possibleCloses {
if err := u.closeAndRemoveIfMature(&chanPoint); err != nil {
utxnLog.Errorf("Failed to close and remove channel %v",
chanPoint)
return
}
chanPoint := output.OriginChanPoint()
if err := u.closeAndRemoveIfMature(chanPoint); err != nil {
utxnLog.Errorf("Failed to close and remove channel %v",
*chanPoint)
return
}
}
@ -1216,7 +1039,8 @@ func (u *utxoNursery) waitForPreschoolConf(kid *kidOutput,
outputType = "Commitment"
}
err := u.cfg.Store.PreschoolToKinder(kid)
bestHeight := atomic.LoadUint32(&u.bestHeight)
err := u.cfg.Store.PreschoolToKinder(kid, bestHeight)
if err != nil {
utxnLog.Errorf("Unable to move %v output "+
"from preschool to kindergarten bucket: %v",

View File

@ -9,8 +9,9 @@ import (
"github.com/lightningnetwork/lnd/sweep"
"io/ioutil"
"math"
"os"
"reflect"
"sync"
"runtime/pprof"
"testing"
"time"
@ -19,7 +20,6 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/lnwallet"
)
@ -396,11 +396,14 @@ func TestBabyOutputSerialization(t *testing.T) {
type nurseryTestContext struct {
nursery *utxoNursery
notifier *nurseryMockNotifier
notifier *sweep.MockNotifier
chainIO *mockChainIO
publishChan chan wire.MsgTx
store *nurseryStoreInterceptor
restart func() bool
receiveTx func() wire.MsgTx
sweeper *sweep.UtxoSweeper
timeoutChan chan chan time.Time
t *testing.T
}
@ -430,17 +433,50 @@ func createNurseryTestContext(t *testing.T,
// test.
storeIntercepter := newNurseryStoreInterceptor(store)
notifier := newNurseryMockNotifier(t)
notifier := sweep.NewMockNotifier(t)
sweeper := sweep.New(&sweep.UtxoSweeperConfig{
publishChan := make(chan wire.MsgTx, 1)
publishFunc := func(tx *wire.MsgTx, source string) error {
utxnLog.Tracef("Publishing tx %v by %v", tx.TxHash(), source)
publishChan <- *tx
return nil
}
timeoutChan := make(chan chan time.Time)
chainIO := &mockChainIO{
bestHeight: 0,
}
sweeperStore := sweep.NewMockSweeperStore()
sweeperCfg := &sweep.UtxoSweeperConfig{
GenSweepScript: func() ([]byte, error) {
return []byte{}, nil
},
Estimator: &lnwallet.StaticFeeEstimator{},
Signer: &nurseryMockSigner{},
})
Notifier: notifier,
PublishTransaction: func(tx *wire.MsgTx) error {
return publishFunc(tx, "sweeper")
},
NewBatchTimer: func() <-chan time.Time {
c := make(chan time.Time, 1)
timeoutChan <- c
return c
},
ChainIO: chainIO,
Store: sweeperStore,
MaxInputsPerTx: 10,
MaxSweepAttempts: 5,
NextAttemptDeltaFunc: func(int) int32 { return 1 },
}
cfg := NurseryConfig{
sweeper := sweep.New(sweeperCfg)
sweeper.Start()
nurseryCfg := NurseryConfig{
Notifier: notifier,
FetchClosedChannels: func(pendingOnly bool) (
[]*channeldb.ChannelCloseSummary, error) {
@ -453,54 +489,91 @@ func createNurseryTestContext(t *testing.T,
}, nil
},
Store: storeIntercepter,
ChainIO: &mockChainIO{},
ChainIO: chainIO,
Sweeper: sweeper,
PublishTransaction: func(tx *wire.MsgTx) error {
return publishFunc(tx, "nursery")
},
}
publishChan := make(chan wire.MsgTx, 1)
cfg.PublishTransaction = func(tx *wire.MsgTx) error {
t.Logf("Publishing tx %v", tx.TxHash())
publishChan <- *tx
return nil
}
nursery := newUtxoNursery(&cfg)
nursery := newUtxoNursery(&nurseryCfg)
nursery.Start()
ctx := &nurseryTestContext{
nursery: nursery,
notifier: notifier,
chainIO: chainIO,
store: storeIntercepter,
publishChan: publishChan,
sweeper: sweeper,
timeoutChan: timeoutChan,
t: t,
}
ctx.restart = func() bool {
return checkStartStop(func() {
ctx.nursery.Stop()
// Simulate lnd restart.
ctx.nursery = newUtxoNursery(ctx.nursery.cfg)
ctx.nursery.Start()
})
}
ctx.receiveTx = func() wire.MsgTx {
var tx wire.MsgTx
select {
case tx = <-ctx.publishChan:
utxnLog.Debugf("Published tx %v", tx.TxHash())
return tx
case <-time.After(5 * time.Second):
case <-time.After(defaultTestTimeout):
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Fatalf("tx not published")
}
return tx
}
ctx.restart = func() bool {
return checkStartStop(func() {
utxnLog.Tracef("Restart sweeper and nursery")
// Simulate lnd restart.
ctx.nursery.Stop()
// Also restart sweeper to test behaviour as one unit.
//
// TODO(joostjager): Mock sweeper to test nursery in
// isolation.
ctx.sweeper.Stop()
// Find out if there is a last tx stored. If so, we
// expect it to be republished on startup.
hasLastTx, err := sweeperCfg.Store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
// Restart sweeper.
ctx.sweeper = sweep.New(sweeperCfg)
ctx.sweeper.Start()
// Receive last tx if expected.
if hasLastTx != nil {
utxnLog.Debugf("Expecting republish")
ctx.receiveTx()
} else {
utxnLog.Debugf("Expecting no republish")
}
/// Restart nursery.
nurseryCfg.Sweeper = ctx.sweeper
ctx.nursery = newUtxoNursery(&nurseryCfg)
ctx.nursery.Start()
})
}
// Start with testing an immediate restart.
ctx.restart()
return ctx
}
func (ctx *nurseryTestContext) notifyEpoch(height int32) {
ctx.chainIO.bestHeight = height
ctx.notifier.NotifyEpoch(height)
}
func (ctx *nurseryTestContext) finish() {
// Add a final restart point in this state
ctx.restart()
@ -556,6 +629,8 @@ func (ctx *nurseryTestContext) finish() {
if len(activeHeights) > 0 {
ctx.t.Fatalf("Expected height index to be empty")
}
ctx.sweeper.Stop()
}
func createOutgoingRes(onLocalCommitment bool) *lnwallet.OutgoingHtlcResolution {
@ -703,6 +778,8 @@ func testRestartLoop(t *testing.T, test func(*testing.T,
return true
}
utxnLog.Debugf("Skipping restart point %v",
currentStartStopIdx)
return false
}
@ -739,7 +816,7 @@ func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T,
ctx.restart()
// Notify arrival of block where HTLC CLTV expires.
ctx.notifier.notifyEpoch(125)
ctx.notifyEpoch(125)
// This should trigger nursery to publish the timeout tx.
ctx.receiveTx()
@ -751,7 +828,7 @@ func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T,
// Confirm the timeout tx. This should promote the HTLC to KNDR state.
timeoutTxHash := outgoingRes.SignedTimeoutTx.TxHash()
if err := ctx.notifier.confirmTx(&timeoutTxHash, 126); err != nil {
if err := ctx.notifier.ConfirmTx(&timeoutTxHash, 126); err != nil {
t.Fatal(err)
}
@ -765,7 +842,7 @@ func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T,
ctx.restart()
// Notify arrival of block where second level HTLC unlocks.
ctx.notifier.notifyEpoch(128)
ctx.notifyEpoch(128)
// Check final sweep into wallet.
testSweepHtlc(t, ctx)
@ -790,7 +867,7 @@ func testNurseryOutgoingHtlcSuccessOnRemote(t *testing.T,
// resolving remote commitment tx.
//
// TODO(joostjager): This is probably not correct?
err := ctx.notifier.confirmTx(&outgoingRes.ClaimOutpoint.Hash, 124)
err := ctx.notifier.ConfirmTx(&outgoingRes.ClaimOutpoint.Hash, 124)
if err != nil {
t.Fatal(err)
}
@ -805,7 +882,7 @@ func testNurseryOutgoingHtlcSuccessOnRemote(t *testing.T,
ctx.restart()
// Notify arrival of block where HTLC CLTV expires.
ctx.notifier.notifyEpoch(125)
ctx.notifyEpoch(125)
// Check final sweep into wallet.
testSweepHtlc(t, ctx)
@ -840,7 +917,7 @@ func testNurseryCommitSuccessOnLocal(t *testing.T,
ctx.restart()
// Notify confirmation of the commitment tx.
err = ctx.notifier.confirmTx(&commitRes.SelfOutPoint.Hash, 124)
err = ctx.notifier.ConfirmTx(&commitRes.SelfOutPoint.Hash, 124)
if err != nil {
t.Fatal(err)
}
@ -855,7 +932,7 @@ func testNurseryCommitSuccessOnLocal(t *testing.T,
ctx.restart()
// Notify arrival of block where commit output CSV expires.
ctx.notifier.notifyEpoch(126)
ctx.notifyEpoch(126)
// Check final sweep into wallet.
testSweep(t, ctx, func() {
@ -876,27 +953,28 @@ func testSweepHtlc(t *testing.T, ctx *nurseryTestContext) {
func testSweep(t *testing.T, ctx *nurseryTestContext,
afterPublishAssert func()) {
// Wait for nursery to publish the sweep tx.
ctx.tick()
sweepTx := ctx.receiveTx()
if ctx.restart() {
// Restart will trigger rebroadcast of sweep tx.
sweepTx = ctx.receiveTx()
// Nursery reoffers its input. Sweeper needs a tick to create the sweep
// tx.
ctx.tick()
ctx.receiveTx()
}
afterPublishAssert()
// Confirm the sweep tx.
sweepTxHash := sweepTx.TxHash()
err := ctx.notifier.confirmTx(&sweepTxHash, 129)
if err != nil {
t.Fatal(err)
}
ctx.notifier.SpendOutpoint(sweepTx.TxIn[0].PreviousOutPoint, sweepTx)
// Wait for output to be promoted in store to GRAD.
select {
case <-ctx.store.graduateKinderChan:
case <-time.After(defaultTestTimeout):
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
t.Fatalf("output not graduated")
}
@ -907,6 +985,19 @@ func testSweep(t *testing.T, ctx *nurseryTestContext,
assertNurseryReportUnavailable(t, ctx.nursery)
}
func (ctx *nurseryTestContext) tick() {
select {
case c := <-ctx.timeoutChan:
select {
case c <- time.Time{}:
case <-time.After(defaultTestTimeout):
ctx.t.Fatal("tick timeout - tick not consumed")
}
case <-time.After(defaultTestTimeout):
ctx.t.Fatal("tick timeout - no new timer created")
}
}
type nurseryStoreInterceptor struct {
ns NurseryStore
@ -941,16 +1032,18 @@ func (i *nurseryStoreInterceptor) CribToKinder(babyOutput *babyOutput) error {
return err
}
func (i *nurseryStoreInterceptor) PreschoolToKinder(kidOutput *kidOutput) error {
err := i.ns.PreschoolToKinder(kidOutput)
func (i *nurseryStoreInterceptor) PreschoolToKinder(kidOutput *kidOutput,
lastGradHeight uint32) error {
err := i.ns.PreschoolToKinder(kidOutput, lastGradHeight)
i.preschoolToKinderChan <- struct{}{}
return err
}
func (i *nurseryStoreInterceptor) GraduateKinder(height uint32) error {
err := i.ns.GraduateKinder(height)
func (i *nurseryStoreInterceptor) GraduateKinder(height uint32, kid *kidOutput) error {
err := i.ns.GraduateKinder(height, kid)
i.graduateKinderChan <- struct{}{}
@ -961,30 +1054,12 @@ func (i *nurseryStoreInterceptor) FetchPreschools() ([]kidOutput, error) {
return i.ns.FetchPreschools()
}
func (i *nurseryStoreInterceptor) FetchClass(height uint32) (*wire.MsgTx,
func (i *nurseryStoreInterceptor) FetchClass(height uint32) (
[]kidOutput, []babyOutput, error) {
return i.ns.FetchClass(height)
}
func (i *nurseryStoreInterceptor) FinalizeKinder(height uint32,
tx *wire.MsgTx) error {
return i.ns.FinalizeKinder(height, tx)
}
func (i *nurseryStoreInterceptor) LastFinalizedHeight() (uint32, error) {
return i.ns.LastFinalizedHeight()
}
func (i *nurseryStoreInterceptor) GraduateHeight(height uint32) error {
return i.ns.GraduateHeight(height)
}
func (i *nurseryStoreInterceptor) LastGraduatedHeight() (uint32, error) {
return i.ns.LastGraduatedHeight()
}
func (i *nurseryStoreInterceptor) HeightsBelowOrEqual(height uint32) (
[]uint32, error) {
@ -1025,92 +1100,3 @@ func (m *nurseryMockSigner) ComputeInputScript(tx *wire.MsgTx,
return &lnwallet.InputScript{}, nil
}
type nurseryMockNotifier struct {
confChannel map[chainhash.Hash]chan *chainntnfs.TxConfirmation
epochChan chan *chainntnfs.BlockEpoch
spendChan chan *chainntnfs.SpendDetail
mutex sync.RWMutex
t *testing.T
}
func newNurseryMockNotifier(t *testing.T) *nurseryMockNotifier {
return &nurseryMockNotifier{
confChannel: make(map[chainhash.Hash]chan *chainntnfs.TxConfirmation),
epochChan: make(chan *chainntnfs.BlockEpoch),
spendChan: make(chan *chainntnfs.SpendDetail),
t: t,
}
}
func (m *nurseryMockNotifier) notifyEpoch(height int32) {
select {
case m.epochChan <- &chainntnfs.BlockEpoch{
Height: height,
}:
case <-time.After(defaultTestTimeout):
m.t.Fatal("epoch event not consumed")
}
}
func (m *nurseryMockNotifier) confirmTx(txid *chainhash.Hash, height uint32) error {
confirm := &chainntnfs.TxConfirmation{
BlockHeight: height,
}
select {
case m.getConfChannel(txid) <- confirm:
case <-time.After(defaultTestTimeout):
return fmt.Errorf("confirmation not consumed")
}
return nil
}
func (m *nurseryMockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
_ []byte, numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent,
error) {
return &chainntnfs.ConfirmationEvent{
Confirmed: m.getConfChannel(txid),
}, nil
}
func (m *nurseryMockNotifier) getConfChannel(
txid *chainhash.Hash) chan *chainntnfs.TxConfirmation {
m.mutex.Lock()
defer m.mutex.Unlock()
channel, ok := m.confChannel[*txid]
if ok {
return channel
}
channel = make(chan *chainntnfs.TxConfirmation)
m.confChannel[*txid] = channel
return channel
}
func (m *nurseryMockNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
return &chainntnfs.BlockEpochEvent{
Epochs: m.epochChan,
Cancel: func() {},
}, nil
}
func (m *nurseryMockNotifier) Start() error {
return nil
}
func (m *nurseryMockNotifier) Stop() error {
return nil
}
func (m *nurseryMockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
_ []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) {
return &chainntnfs.SpendEvent{
Spend: m.spendChan,
Cancel: func() {},
}, nil
}