diff --git a/contractcourt/commit_sweep_resolver.go b/contractcourt/commit_sweep_resolver.go index ef49f8dc..c1e29dc4 100644 --- a/contractcourt/commit_sweep_resolver.go +++ b/contractcourt/commit_sweep_resolver.go @@ -5,10 +5,16 @@ import ( "fmt" "io" - "github.com/lightningnetwork/lnd/input" - "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/sweep" +) + +const ( + // commitOutputConfTarget is the default confirmation target we'll use + // for sweeps of commit outputs that belong to us. + commitOutputConfTarget = 6 ) // commitSweepResolver is a resolver that will attempt to sweep the commitment @@ -98,7 +104,8 @@ func (c *commitSweepResolver) Resolve() (ContractResolver, error) { // sweeper. log.Infof("%T(%v): sweeping commit output", c, c.chanPoint) - resultChan, err := c.Sweeper.SweepInput(&inp) + feePref := sweep.FeePreference{ConfTarget: commitOutputConfTarget} + resultChan, err := c.Sweeper.SweepInput(&inp, feePref) if err != nil { log.Errorf("%T(%v): unable to sweep input: %v", c, c.chanPoint, err) diff --git a/server.go b/server.go index cc2033ad..06a1b150 100644 --- a/server.go +++ b/server.go @@ -717,13 +717,14 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, NewBatchTimer: func() <-chan time.Time { return time.NewTimer(sweep.DefaultBatchWindowDuration).C }, - SweepTxConfTarget: 6, Notifier: cc.chainNotifier, ChainIO: cc.chainIO, Store: sweeperStore, MaxInputsPerTx: sweep.DefaultMaxInputsPerTx, MaxSweepAttempts: sweep.DefaultMaxSweepAttempts, NextAttemptDeltaFunc: sweep.DefaultNextAttemptDeltaFunc, + MaxFeeRate: sweep.DefaultMaxFeeRate, + FeeRateBucketSize: sweep.DefaultFeeRateBucketSize, }) s.utxoNursery = newUtxoNursery(&NurseryConfig{ diff --git a/sweep/fee_estimator_mock_test.go b/sweep/fee_estimator_mock_test.go index c926f159..b0510f36 100644 --- a/sweep/fee_estimator_mock_test.go +++ b/sweep/fee_estimator_mock_test.go @@ -16,6 +16,10 @@ type mockFeeEstimator struct { blocksToFee map[uint32]lnwallet.SatPerKWeight + // A closure that when set is used instead of the + // mockFeeEstimator.EstimateFeePerKW method. + estimateFeePerKW func(numBlocks uint32) (lnwallet.SatPerKWeight, error) + lock sync.Mutex } @@ -45,6 +49,10 @@ func (e *mockFeeEstimator) EstimateFeePerKW(numBlocks uint32) ( e.lock.Lock() defer e.lock.Unlock() + if e.estimateFeePerKW != nil { + return e.estimateFeePerKW(numBlocks) + } + if fee, ok := e.blocksToFee[numBlocks]; ok { return fee, nil } diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 8ac4aafb..3737ecb0 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -3,6 +3,7 @@ package sweep import ( "errors" "fmt" + "math" "math/rand" "sync" "sync/atomic" @@ -16,6 +17,25 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" ) +const ( + // DefaultMaxFeeRate is the default maximum fee rate allowed within the + // UtxoSweeper. The current value is equivalent to a fee rate of 10,000 + // sat/vbyte. + DefaultMaxFeeRate lnwallet.SatPerKWeight = 250 * 1e4 + + // DefaultFeeRateBucketSize is the default size of fee rate buckets + // we'll use when clustering inputs into buckets with similar fee rates + // within the UtxoSweeper. + // + // Given a minimum relay fee rate of 1 sat/vbyte, a multiplier of 10 + // would result in the following fee rate buckets up to the maximum fee + // rate: + // + // #1: min = 1 sat/vbyte, max = 10 sat/vbyte + // #2: min = 11 sat/vbyte, max = 20 sat/vbyte... + DefaultFeeRateBucketSize = 10 +) + var ( // ErrRemoteSpend is returned in case an output that we try to sweep is // confirmed in a tx of the remote party. @@ -53,6 +73,22 @@ type pendingInput struct { // publishAttempts records the number of attempts that have already been // made to sweep this tx. publishAttempts int + + // feePreference is the fee preference of the client who requested the + // input to be swept. If a confirmation target is specified, then we'll + // map it into a fee rate whenever we attempt to cluster inputs for a + // sweep. + feePreference FeePreference +} + +// pendingInputs is a type alias for a set of pending inputs. +type pendingInputs = map[wire.OutPoint]*pendingInput + +// inputCluster is a helper struct to gather a set of pending inputs that should +// be swept with the specified fee rate. +type inputCluster struct { + sweepFeeRate lnwallet.SatPerKWeight + inputs pendingInputs } // UtxoSweeper is responsible for sweeping outputs back into the wallet @@ -65,7 +101,9 @@ type UtxoSweeper struct { newInputs chan *sweepInputMessage spendChan chan *chainntnfs.SpendDetail - pendingInputs map[wire.OutPoint]*pendingInput + // pendingInputs is the total set of inputs the UtxoSweeper has been + // requested to sweep. + pendingInputs pendingInputs // timer is the channel that signals expiry of the sweep batch timer. timer <-chan time.Time @@ -74,7 +112,7 @@ type UtxoSweeper struct { currentOutputScript []byte - relayFeePerKW lnwallet.SatPerKWeight + relayFeeRate lnwallet.SatPerKWeight quit chan struct{} wg sync.WaitGroup @@ -114,10 +152,6 @@ type UtxoSweeperConfig struct { // time the incubated outputs need to be spent. Signer input.Signer - // SweepTxConfTarget assigns a confirmation target for sweep txes on - // which the fee calculation will be based. - SweepTxConfTarget uint32 - // MaxInputsPerTx specifies the default maximum number of inputs allowed // in a single sweep tx. If more need to be swept, multiple txes are // created and published. @@ -131,6 +165,22 @@ type UtxoSweeperConfig struct { // NextAttemptDeltaFunc returns given the number of already attempted // sweeps, how many blocks to wait before retrying to sweep. NextAttemptDeltaFunc func(int) int32 + + // MaxFeeRate is the the maximum fee rate allowed within the + // UtxoSweeper. + MaxFeeRate lnwallet.SatPerKWeight + + // FeeRateBucketSize is the default size of fee rate buckets we'll use + // when clustering inputs into buckets with similar fee rates within the + // UtxoSweeper. + // + // Given a minimum relay fee rate of 1 sat/vbyte, a fee rate bucket size + // of 10 would result in the following fee rate buckets up to the + // maximum fee rate: + // + // #1: min = 1 sat/vbyte, max = 10 sat/vbyte + // #2: min = 11 sat/vbyte, max = 20 sat/vbyte... + FeeRateBucketSize int } // Result is the struct that is pushed through the result channel. Callers can @@ -149,19 +199,19 @@ type Result struct { // sweepInputMessage structs are used in the internal channel between the // SweepInput call and the sweeper main loop. type sweepInputMessage struct { - input input.Input - resultChan chan Result + input input.Input + feePreference FeePreference + resultChan chan Result } // New returns a new Sweeper instance. func New(cfg *UtxoSweeperConfig) *UtxoSweeper { - return &UtxoSweeper{ cfg: cfg, newInputs: make(chan *sweepInputMessage), spendChan: make(chan *chainntnfs.SpendDetail), quit: make(chan struct{}), - pendingInputs: make(map[wire.OutPoint]*pendingInput), + pendingInputs: make(pendingInputs), } } @@ -199,7 +249,7 @@ func (s *UtxoSweeper) Start() error { // Retrieve relay fee for dust limit calculation. Assume that this will // not change from here on. - s.relayFeePerKW = s.cfg.FeeEstimator.RelayFeePerKW() + s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW() // Register for block epochs to retry sweeping every block. bestHash, bestHeight, err := s.cfg.ChainIO.GetBestBlock() @@ -252,24 +302,33 @@ func (s *UtxoSweeper) Stop() error { } // SweepInput sweeps inputs back into the wallet. The inputs will be batched and -// swept after the batch time window ends. +// swept after the batch time window ends. A custom fee preference can be +// provided, otherwise the UtxoSweeper's default will be used. // // NOTE: Extreme care needs to be taken that input isn't changed externally. // Because it is an interface and we don't know what is exactly behind it, we // cannot make a local copy in sweeper. -func (s *UtxoSweeper) SweepInput(input input.Input) (chan Result, error) { +func (s *UtxoSweeper) SweepInput(input input.Input, + feePreference FeePreference) (chan Result, error) { + if input == nil || input.OutPoint() == nil || input.SignDesc() == nil { return nil, errors.New("nil input received") } + // Ensure the client provided a sane fee preference. + if _, err := s.feeRateForPreference(feePreference); err != nil { + return nil, err + } + log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+ - "time_lock=%v, size=%v", input.OutPoint(), input.WitnessType(), - input.BlocksToMaturity(), - btcutil.Amount(input.SignDesc().Output.Value)) + "time_lock=%v, amount=%v, fee_preference=%v", input.OutPoint(), + input.WitnessType(), input.BlocksToMaturity(), + btcutil.Amount(input.SignDesc().Output.Value), feePreference) sweeperInput := &sweepInputMessage{ - input: input, - resultChan: make(chan Result, 1), + input: input, + feePreference: feePreference, + resultChan: make(chan Result, 1), } // Deliver input to main event loop. @@ -282,6 +341,27 @@ func (s *UtxoSweeper) SweepInput(input input.Input) (chan Result, error) { return sweeperInput.resultChan, nil } +// feeRateForPreference returns a fee rate for the given fee preference. It +// ensures that the fee rate respects the bounds of the UtxoSweeper. +func (s *UtxoSweeper) feeRateForPreference( + feePreference FeePreference) (lnwallet.SatPerKWeight, error) { + + feeRate, err := DetermineFeePerKw(s.cfg.FeeEstimator, feePreference) + if err != nil { + return 0, err + } + if feeRate < s.relayFeeRate { + return 0, fmt.Errorf("fee preference resulted in invalid fee "+ + "rate %v, mininum is %v", feeRate, s.relayFeeRate) + } + if feeRate > s.cfg.MaxFeeRate { + return 0, fmt.Errorf("fee preference resulted in invalid fee "+ + "rate %v, maximum is %v", feeRate, s.cfg.MaxFeeRate) + } + + return feeRate, nil +} + // collector is the sweeper main loop. It processes new inputs, spend // notifications and counts down to publication of the sweep tx. func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch, @@ -315,6 +395,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch, listeners: []chan Result{input.resultChan}, input: input.input, minPublishHeight: bestHeight, + feePreference: input.feePreference, } s.pendingInputs[outpoint] = pendInput @@ -406,29 +487,30 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch, // be started when new inputs arrive. s.timer = nil - // Retrieve fee estimate for input filtering and final - // tx fee calculation. - satPerKW, err := s.cfg.FeeEstimator.EstimateFeePerKW( - s.cfg.SweepTxConfTarget, - ) - if err != nil { - log.Errorf("estimate fee: %v", err) - continue - } - - // Examine pending inputs and try to construct lists of - // inputs. - inputLists, err := s.getInputLists(bestHeight, satPerKW) - if err != nil { - log.Errorf("get input lists: %v", err) - continue - } - - // Sweep selected inputs. - for _, inputs := range inputLists { - err := s.sweep(inputs, satPerKW, bestHeight) + // We'll attempt to cluster all of our inputs with + // similar fee rates. + for _, cluster := range s.clusterBySweepFeeRate() { + // Examine pending inputs and try to construct + // lists of inputs. + inputLists, err := s.getInputLists( + cluster, bestHeight, + ) if err != nil { - log.Errorf("sweep: %v", err) + log.Errorf("Unable to examine pending "+ + "inputs: %v", err) + continue + } + + // Sweep selected inputs. + for _, inputs := range inputLists { + err := s.sweep( + inputs, cluster.sweepFeeRate, + bestHeight, + ) + if err != nil { + log.Errorf("Unable to sweep "+ + "inputs: %v", err) + } } } @@ -441,7 +523,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch, bestHeight = epoch.Height - log.Debugf("New blocks: height=%v, sha=%v", + log.Debugf("New block: height=%v, sha=%v", epoch.Height, epoch.Hash) if err := s.scheduleSweep(bestHeight); err != nil { @@ -454,6 +536,62 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch, } } +// bucketForFeeReate determines the proper bucket for a fee rate. This is done +// in order to batch inputs with similar fee rates together. +func (s *UtxoSweeper) bucketForFeeRate( + feeRate lnwallet.SatPerKWeight) lnwallet.SatPerKWeight { + + minBucket := s.relayFeeRate + lnwallet.SatPerKWeight(s.cfg.FeeRateBucketSize) + return lnwallet.SatPerKWeight( + math.Ceil(float64(feeRate) / float64(minBucket)), + ) +} + +// clusterBySweepFeeRate takes the set of pending inputs within the UtxoSweeper +// and clusters those together with similar fee rates. Each cluster contains a +// sweep fee rate, which is determined by calculating the average fee rate of +// all inputs within that cluster. +func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster { + bucketInputs := make(map[lnwallet.SatPerKWeight]pendingInputs) + inputFeeRates := make(map[wire.OutPoint]lnwallet.SatPerKWeight) + + // First, we'll group together all inputs with similar fee rates. This + // is done by determining the fee rate bucket they should belong in. + for op, input := range s.pendingInputs { + feeRate, err := s.feeRateForPreference(input.feePreference) + if err != nil { + log.Warnf("Skipping input %v: %v", op, err) + continue + } + bucket := s.bucketForFeeRate(feeRate) + + inputs, ok := bucketInputs[bucket] + if !ok { + inputs = make(pendingInputs) + bucketInputs[bucket] = inputs + } + inputs[op] = input + inputFeeRates[op] = feeRate + } + + // We'll then determine the sweep fee rate for each set of inputs by + // calculating the average fee rate of the inputs within each set. + inputClusters := make([]inputCluster, 0, len(bucketInputs)) + for _, inputs := range bucketInputs { + var sweepFeeRate lnwallet.SatPerKWeight + for op := range inputs { + sweepFeeRate += inputFeeRates[op] + } + sweepFeeRate /= lnwallet.SatPerKWeight(len(inputs)) + inputClusters = append(inputClusters, inputCluster{ + sweepFeeRate: sweepFeeRate, + inputs: inputs, + }) + } + + return inputClusters +} + // scheduleSweep starts the sweep timer to create an opportunity for more inputs // to be added. func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error { @@ -464,27 +602,25 @@ func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error { return nil } - // Retrieve fee estimate for input filtering and final tx fee - // calculation. - satPerKW, err := s.cfg.FeeEstimator.EstimateFeePerKW( - s.cfg.SweepTxConfTarget, - ) - if err != nil { - return fmt.Errorf("estimate fee: %v", err) + // We'll only start our timer once we have inputs we're able to sweep. + startTimer := false + for _, cluster := range s.clusterBySweepFeeRate() { + // Examine pending inputs and try to construct lists of inputs. + inputLists, err := s.getInputLists(cluster, currentHeight) + if err != nil { + return fmt.Errorf("get input lists: %v", err) + } + + log.Infof("Sweep candidates at height=%v with fee_rate=%v, "+ + "yield %v distinct txns", currentHeight, + cluster.sweepFeeRate, len(inputLists)) + + if len(inputLists) != 0 { + startTimer = true + break + } } - - // Examine pending inputs and try to construct lists of inputs. - inputLists, err := s.getInputLists(currentHeight, satPerKW) - if err != nil { - return fmt.Errorf("get input lists: %v", err) - } - - log.Infof("Sweep candidates at height=%v, yield %v distinct txns", - currentHeight, len(inputLists)) - - // If there are no input sets, there is nothing sweepable and we can - // return without starting the timer. - if len(inputLists) == 0 { + if !startTimer { return nil } @@ -533,13 +669,13 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) { delete(s.pendingInputs, *outpoint) } -// getInputLists goes through all pending inputs and constructs sweep lists, -// each up to the configured maximum number of inputs. Negative yield inputs are -// skipped. Transactions with an output below the dust limit are not published. -// Those inputs remain pending and will be bundled with future inputs if -// possible. -func (s *UtxoSweeper) getInputLists(currentHeight int32, - satPerKW lnwallet.SatPerKWeight) ([]inputSet, error) { +// getInputLists goes through the given inputs and constructs multiple distinct +// sweep lists with the given fee rate, each up to the configured maximum number +// of inputs. Negative yield inputs are skipped. Transactions with an output +// below the dust limit are not published. Those inputs remain pending and will +// be bundled with future inputs if possible. +func (s *UtxoSweeper) getInputLists(cluster inputCluster, + currentHeight int32) ([]inputSet, error) { // Filter for inputs that need to be swept. Create two lists: all // sweepable inputs and a list containing only the new, never tried @@ -552,7 +688,7 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32, // consisting of only new inputs to the list, to make sure that new // inputs are given a good, isolated chance of being published. var newInputs, retryInputs []input.Input - for _, input := range s.pendingInputs { + for _, input := range cluster.inputs { // Skip inputs that have a minimum publish height that is not // yet reached. if input.minPublishHeight > currentHeight { @@ -573,9 +709,8 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32, if len(retryInputs) > 0 { var err error allSets, err = generateInputPartitionings( - append(retryInputs, newInputs...), - s.relayFeePerKW, satPerKW, - s.cfg.MaxInputsPerTx, + append(retryInputs, newInputs...), s.relayFeeRate, + cluster.sweepFeeRate, s.cfg.MaxInputsPerTx, ) if err != nil { return nil, fmt.Errorf("input partitionings: %v", err) @@ -584,8 +719,7 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32, // Create sets for just the new inputs. newSets, err := generateInputPartitionings( - newInputs, - s.relayFeePerKW, satPerKW, + newInputs, s.relayFeeRate, cluster.sweepFeeRate, s.cfg.MaxInputsPerTx, ) if err != nil { @@ -602,23 +736,22 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32, // sweep takes a set of preselected inputs, creates a sweep tx and publishes the // tx. The output address is only marked as used if the publish succeeds. -func (s *UtxoSweeper) sweep(inputs inputSet, - satPerKW lnwallet.SatPerKWeight, currentHeight int32) error { +func (s *UtxoSweeper) sweep(inputs inputSet, feeRate lnwallet.SatPerKWeight, + currentHeight int32) error { - var err error - - // Generate output script if no unused script available. + // Generate an output script if there isn't an unused script available. if s.currentOutputScript == nil { - s.currentOutputScript, err = s.cfg.GenSweepScript() + pkScript, err := s.cfg.GenSweepScript() if err != nil { return fmt.Errorf("gen sweep script: %v", err) } + s.currentOutputScript = pkScript } // Create sweep tx. tx, err := createSweepTx( - inputs, s.currentOutputScript, - uint32(currentHeight), satPerKW, s.cfg.Signer, + inputs, s.currentOutputScript, uint32(currentHeight), feeRate, + s.cfg.Signer, ) if err != nil { return fmt.Errorf("create sweep tx: %v", err) @@ -651,8 +784,8 @@ func (s *UtxoSweeper) sweep(inputs inputSet, return fmt.Errorf("publish tx: %v", err) } - // Keep outputScript in case of an error, so that it can be reused for - // the next tx and causes no address inflation. + // Keep the output script in case of an error, so that it can be reused + // for the next transaction and causes no address inflation. if err == nil { s.currentOutputScript = nil } @@ -692,6 +825,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, }) } } + return nil } diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 2023a684..08fd1034 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -10,10 +10,11 @@ import ( "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" - "github.com/davecgh/go-spew/spew" + "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/build" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnwallet" ) var ( @@ -22,6 +23,8 @@ var ( testMaxSweepAttempts = 3 testMaxInputsPerTx = 3 + + defaultFeePref = FeePreference{ConfTarget: 1} ) type sweeperTestContext struct { @@ -96,7 +99,7 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { backend := newMockBackend(notifier) - estimator := newMockFeeEstimator(10000, 1000) + estimator := newMockFeeEstimator(10000, lnwallet.FeePerKwFloor) publishChan := make(chan wire.MsgTx, 2) ctx := &sweeperTestContext{ @@ -127,10 +130,9 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { ctx.timeoutChan <- c return c }, - Store: store, - Signer: &mockSigner{}, - SweepTxConfTarget: 1, - ChainIO: &mockChainIO{}, + Store: store, + Signer: &mockSigner{}, + ChainIO: &mockChainIO{}, GenSweepScript: func() ([]byte, error) { script := []byte{outputScriptCount} outputScriptCount++ @@ -143,6 +145,8 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { // Use delta func without random factor. return 1 << uint(attempts-1) }, + MaxFeeRate: DefaultMaxFeeRate, + FeeRateBucketSize: DefaultFeeRateBucketSize, }) ctx.sweeper.Start() @@ -150,6 +154,14 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext { return ctx } +func (ctx *sweeperTestContext) restartSweeper() { + ctx.t.Helper() + + ctx.sweeper.Stop() + ctx.sweeper = New(ctx.sweeper.cfg) + ctx.sweeper.Start() +} + func (ctx *sweeperTestContext) tick() { testLog.Trace("Waiting for tick to be consumed") select { @@ -251,11 +263,95 @@ func (ctx *sweeperTestContext) expectResult(c chan Result, expected error) { } } +// receiveSpendTx receives the transaction sent through the given resultChan. +func receiveSpendTx(t *testing.T, resultChan chan Result) *wire.MsgTx { + t.Helper() + + var result Result + select { + case result = <-resultChan: + case <-time.After(5 * time.Second): + t.Fatal("no sweep result received") + } + + if result.Err != nil { + t.Fatalf("expected successful spend, but received error "+ + "\"%v\" instead", result.Err) + } + + return result.Tx +} + +// assertTxSweepsInputs ensures that the transaction returned within the value +// received from resultChan spends the given inputs. +func assertTxSweepsInputs(t *testing.T, sweepTx *wire.MsgTx, + inputs ...input.Input) { + + t.Helper() + + if len(sweepTx.TxIn) != len(inputs) { + t.Fatalf("expected sweep tx to contain %d inputs, got %d", + len(inputs), len(sweepTx.TxIn)) + } + m := make(map[wire.OutPoint]struct{}, len(inputs)) + for _, input := range inputs { + m[*input.OutPoint()] = struct{}{} + } + for _, txIn := range sweepTx.TxIn { + if _, ok := m[txIn.PreviousOutPoint]; !ok { + t.Fatalf("expected tx %v to spend input %v", + txIn.PreviousOutPoint, sweepTx.TxHash()) + } + } +} + +// assertTxFeeRate asserts that the transaction was created with the given +// inputs and fee rate. +// +// NOTE: This assumes that transactions only have one output, as this is the +// only type of transaction the UtxoSweeper can create at the moment. +func assertTxFeeRate(t *testing.T, tx *wire.MsgTx, + expectedFeeRate lnwallet.SatPerKWeight, inputs ...input.Input) { + + t.Helper() + + if len(tx.TxIn) != len(inputs) { + t.Fatalf("expected %d inputs, got %d", len(tx.TxIn), len(inputs)) + } + + m := make(map[wire.OutPoint]input.Input, len(inputs)) + for _, input := range inputs { + m[*input.OutPoint()] = input + } + + var inputAmt int64 + for _, txIn := range tx.TxIn { + input, ok := m[txIn.PreviousOutPoint] + if !ok { + t.Fatalf("expected input %v to be provided", + txIn.PreviousOutPoint) + } + inputAmt += input.SignDesc().Output.Value + } + outputAmt := tx.TxOut[0].Value + + fee := btcutil.Amount(inputAmt - outputAmt) + _, txWeight, _, _ := getWeightEstimate(inputs) + + expectedFee := expectedFeeRate.FeeForWeight(txWeight) + if fee != expectedFee { + t.Fatalf("expected fee rate %v results in %v fee, got %v fee", + expectedFeeRate, expectedFee, fee) + } +} + // TestSuccess tests the sweeper happy flow. func TestSuccess(t *testing.T) { ctx := createSweeperTestContext(t) - resultChan, err := ctx.sweeper.SweepInput(spendableInputs[0]) + resultChan, err := ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -305,7 +401,7 @@ func TestDust(t *testing.T) { // sweep tx output script (P2WPKH). dustInput := createTestInput(5260, input.CommitmentTimeLock) - _, err := ctx.sweeper.SweepInput(&dustInput) + _, err := ctx.sweeper.SweepInput(&dustInput, defaultFeePref) if err != nil { t.Fatal(err) } @@ -316,7 +412,7 @@ func TestDust(t *testing.T) { // Sweep another input that brings the tx output above the dust limit. largeInput := createTestInput(100000, input.CommitmentTimeLock) - _, err = ctx.sweeper.SweepInput(&largeInput) + _, err = ctx.sweeper.SweepInput(&largeInput, defaultFeePref) if err != nil { t.Fatal(err) } @@ -345,7 +441,9 @@ func TestNegativeInput(t *testing.T) { // Sweep an input large enough to cover fees, so in any case the tx // output will be above the dust limit. largeInput := createTestInput(100000, input.CommitmentNoDelay) - largeInputResult, err := ctx.sweeper.SweepInput(&largeInput) + largeInputResult, err := ctx.sweeper.SweepInput( + &largeInput, defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -354,7 +452,7 @@ func TestNegativeInput(t *testing.T) { // the HtlcAcceptedRemoteSuccess input type adds more in fees than its // value at the current fee level. negInput := createTestInput(2900, input.HtlcOfferedRemoteTimeout) - negInputResult, err := ctx.sweeper.SweepInput(&negInput) + negInputResult, err := ctx.sweeper.SweepInput(&negInput, defaultFeePref) if err != nil { t.Fatal(err) } @@ -362,7 +460,9 @@ func TestNegativeInput(t *testing.T) { // Sweep a third input that has a smaller output than the previous one, // but yields positively because of its lower weight. positiveInput := createTestInput(2800, input.CommitmentNoDelay) - positiveInputResult, err := ctx.sweeper.SweepInput(&positiveInput) + positiveInputResult, err := ctx.sweeper.SweepInput( + &positiveInput, defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -373,13 +473,7 @@ func TestNegativeInput(t *testing.T) { // contain the large input. The negative input should stay out of sweeps // until fees come down to get a positive net yield. sweepTx1 := ctx.receiveTx() - - if !testTxIns(&sweepTx1, []*wire.OutPoint{ - largeInput.OutPoint(), positiveInput.OutPoint(), - }) { - t.Fatalf("Tx does not contain expected inputs: %v", - spew.Sdump(sweepTx1)) - } + assertTxSweepsInputs(t, &sweepTx1, &largeInput, &positiveInput) ctx.backend.mine() @@ -389,9 +483,11 @@ func TestNegativeInput(t *testing.T) { // Lower fee rate so that the negative input is no longer negative. ctx.estimator.updateFees(1000, 1000) - // Create another large input + // Create another large input. secondLargeInput := createTestInput(100000, input.CommitmentNoDelay) - secondLargeInputResult, err := ctx.sweeper.SweepInput(&secondLargeInput) + secondLargeInputResult, err := ctx.sweeper.SweepInput( + &secondLargeInput, defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -399,11 +495,7 @@ func TestNegativeInput(t *testing.T) { ctx.tick() sweepTx2 := ctx.receiveTx() - if !testTxIns(&sweepTx2, []*wire.OutPoint{ - secondLargeInput.OutPoint(), negInput.OutPoint(), - }) { - t.Fatal("Tx does not contain expected inputs") - } + assertTxSweepsInputs(t, &sweepTx2, &secondLargeInput, &negInput) ctx.backend.mine() @@ -413,32 +505,13 @@ func TestNegativeInput(t *testing.T) { ctx.finish(1) } -func testTxIns(tx *wire.MsgTx, inputs []*wire.OutPoint) bool { - if len(tx.TxIn) != len(inputs) { - return false - } - - ins := make(map[wire.OutPoint]struct{}) - for _, in := range tx.TxIn { - ins[in.PreviousOutPoint] = struct{}{} - } - - for _, expectedIn := range inputs { - if _, ok := ins[*expectedIn]; !ok { - return false - } - } - - return true -} - // TestChunks asserts that large sets of inputs are split into multiple txes. func TestChunks(t *testing.T) { ctx := createSweeperTestContext(t) // Sweep five inputs. for _, input := range spendableInputs[:5] { - _, err := ctx.sweeper.SweepInput(input) + _, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) } @@ -479,12 +552,16 @@ func TestRemoteSpend(t *testing.T) { func testRemoteSpend(t *testing.T, postSweep bool) { ctx := createSweeperTestContext(t) - resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[0]) + resultChan1, err := ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) if err != nil { t.Fatal(err) } - resultChan2, err := ctx.sweeper.SweepInput(spendableInputs[1]) + resultChan2, err := ctx.sweeper.SweepInput( + spendableInputs[1], defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -557,12 +634,13 @@ func testRemoteSpend(t *testing.T, postSweep bool) { func TestIdempotency(t *testing.T) { ctx := createSweeperTestContext(t) - resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[0]) + input := spendableInputs[0] + resultChan1, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) } - resultChan2, err := ctx.sweeper.SweepInput(spendableInputs[0]) + resultChan2, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) } @@ -571,7 +649,7 @@ func TestIdempotency(t *testing.T) { ctx.receiveTx() - resultChan3, err := ctx.sweeper.SweepInput(spendableInputs[0]) + resultChan3, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) } @@ -588,7 +666,7 @@ func TestIdempotency(t *testing.T) { // immediately receive the spend notification with a spending tx hash. // Because the sweeper kept track of all of its sweep txes, it will // recognize the spend as its own. - resultChan4, err := ctx.sweeper.SweepInput(spendableInputs[0]) + resultChan4, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) } @@ -615,8 +693,8 @@ func TestRestart(t *testing.T) { ctx := createSweeperTestContext(t) // Sweep input and expect sweep tx. - _, err := ctx.sweeper.SweepInput(spendableInputs[0]) - if err != nil { + input1 := spendableInputs[0] + if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil { t.Fatal(err) } ctx.tick() @@ -624,21 +702,19 @@ func TestRestart(t *testing.T) { ctx.receiveTx() // Restart sweeper. - ctx.sweeper.Stop() - - ctx.sweeper = New(ctx.sweeper.cfg) - ctx.sweeper.Start() + ctx.restartSweeper() // Expect last tx to be republished. ctx.receiveTx() // Simulate other subsystem (eg contract resolver) re-offering inputs. - spendChan1, err := ctx.sweeper.SweepInput(spendableInputs[0]) + spendChan1, err := ctx.sweeper.SweepInput(input1, defaultFeePref) if err != nil { t.Fatal(err) } - spendChan2, err := ctx.sweeper.SweepInput(spendableInputs[1]) + input2 := spendableInputs[1] + spendChan2, err := ctx.sweeper.SweepInput(input2, defaultFeePref) if err != nil { t.Fatal(err) } @@ -676,9 +752,7 @@ func TestRestart(t *testing.T) { } // Restart sweeper again. No action is expected. - ctx.sweeper.Stop() - ctx.sweeper = New(ctx.sweeper.cfg) - ctx.sweeper.Start() + ctx.restartSweeper() // Expect last tx to be republished. ctx.receiveTx() @@ -693,14 +767,14 @@ func TestRestartRemoteSpend(t *testing.T) { ctx := createSweeperTestContext(t) // Sweep input. - _, err := ctx.sweeper.SweepInput(spendableInputs[0]) - if err != nil { + input1 := spendableInputs[0] + if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil { t.Fatal(err) } // Sweep another input. - _, err = ctx.sweeper.SweepInput(spendableInputs[1]) - if err != nil { + input2 := spendableInputs[1] + if _, err := ctx.sweeper.SweepInput(input2, defaultFeePref); err != nil { t.Fatal(err) } @@ -709,10 +783,7 @@ func TestRestartRemoteSpend(t *testing.T) { sweepTx := ctx.receiveTx() // Restart sweeper. - ctx.sweeper.Stop() - - ctx.sweeper = New(ctx.sweeper.cfg) - ctx.sweeper.Start() + ctx.restartSweeper() // Expect last tx to be republished. ctx.receiveTx() @@ -723,12 +794,11 @@ func TestRestartRemoteSpend(t *testing.T) { remoteTx := &wire.MsgTx{ TxIn: []*wire.TxIn{ { - PreviousOutPoint: *(spendableInputs[1].OutPoint()), + PreviousOutPoint: *(input2.OutPoint()), }, }, } - err = ctx.backend.publishTransaction(remoteTx) - if err != nil { + if err := ctx.backend.publishTransaction(remoteTx); err != nil { t.Fatal(err) } @@ -736,7 +806,7 @@ func TestRestartRemoteSpend(t *testing.T) { ctx.backend.mine() // Simulate other subsystem (eg contract resolver) re-offering input 0. - spendChan, err := ctx.sweeper.SweepInput(spendableInputs[0]) + spendChan, err := ctx.sweeper.SweepInput(input1, defaultFeePref) if err != nil { t.Fatal(err) } @@ -760,8 +830,8 @@ func TestRestartConfirmed(t *testing.T) { ctx := createSweeperTestContext(t) // Sweep input. - _, err := ctx.sweeper.SweepInput(spendableInputs[0]) - if err != nil { + input := spendableInputs[0] + if _, err := ctx.sweeper.SweepInput(input, defaultFeePref); err != nil { t.Fatal(err) } @@ -770,10 +840,7 @@ func TestRestartConfirmed(t *testing.T) { ctx.receiveTx() // Restart sweeper. - ctx.sweeper.Stop() - - ctx.sweeper = New(ctx.sweeper.cfg) - ctx.sweeper.Start() + ctx.restartSweeper() // Expect last tx to be republished. ctx.receiveTx() @@ -782,7 +849,7 @@ func TestRestartConfirmed(t *testing.T) { ctx.backend.mine() // Simulate other subsystem (eg contract resolver) re-offering input 0. - spendChan, err := ctx.sweeper.SweepInput(spendableInputs[0]) + spendChan, err := ctx.sweeper.SweepInput(input, defaultFeePref) if err != nil { t.Fatal(err) } @@ -801,7 +868,7 @@ func TestRestartConfirmed(t *testing.T) { func TestRestartRepublish(t *testing.T) { ctx := createSweeperTestContext(t) - _, err := ctx.sweeper.SweepInput(spendableInputs[0]) + _, err := ctx.sweeper.SweepInput(spendableInputs[0], defaultFeePref) if err != nil { t.Fatal(err) } @@ -811,9 +878,7 @@ func TestRestartRepublish(t *testing.T) { sweepTx := ctx.receiveTx() // Restart sweeper again. No action is expected. - ctx.sweeper.Stop() - ctx.sweeper = New(ctx.sweeper.cfg) - ctx.sweeper.Start() + ctx.restartSweeper() republishedTx := ctx.receiveTx() @@ -831,7 +896,9 @@ func TestRestartRepublish(t *testing.T) { func TestRetry(t *testing.T) { ctx := createSweeperTestContext(t) - resultChan0, err := ctx.sweeper.SweepInput(spendableInputs[0]) + resultChan0, err := ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -846,7 +913,9 @@ func TestRetry(t *testing.T) { ctx.notifier.NotifyEpoch(1000) // Offer a fresh input. - resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[1]) + resultChan1, err := ctx.sweeper.SweepInput( + spendableInputs[1], defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -871,7 +940,9 @@ func TestRetry(t *testing.T) { func TestGiveUp(t *testing.T) { ctx := createSweeperTestContext(t) - resultChan0, err := ctx.sweeper.SweepInput(spendableInputs[0]) + resultChan0, err := ctx.sweeper.SweepInput( + spendableInputs[0], defaultFeePref, + ) if err != nil { t.Fatal(err) } @@ -902,3 +973,63 @@ func TestGiveUp(t *testing.T) { ctx.finish(1) } + +// TestDifferentFeePreferences ensures that the sweeper can have different +// transactions for different fee preferences. +func TestDifferentFeePreferences(t *testing.T) { + ctx := createSweeperTestContext(t) + + // Throughout this test, we'll be attempting to sweep three inputs, two + // with the higher fee preference, and the last with the lower. We do + // this to ensure the sweeper can broadcast distinct transactions for + // each sweep with a different fee preference. + lowFeePref := FeePreference{ + ConfTarget: 12, + } + ctx.estimator.blocksToFee[lowFeePref.ConfTarget] = 5000 + highFeePref := FeePreference{ + ConfTarget: 6, + } + ctx.estimator.blocksToFee[highFeePref.ConfTarget] = 10000 + + input1 := spendableInputs[0] + resultChan1, err := ctx.sweeper.SweepInput(input1, highFeePref) + if err != nil { + t.Fatal(err) + } + input2 := spendableInputs[1] + resultChan2, err := ctx.sweeper.SweepInput(input2, highFeePref) + if err != nil { + t.Fatal(err) + } + input3 := spendableInputs[2] + resultChan3, err := ctx.sweeper.SweepInput(input3, lowFeePref) + if err != nil { + t.Fatal(err) + } + + // Start the sweeper's batch ticker, which should cause the sweep + // transactions to be broadcast. + ctx.tick() + + ctx.receiveTx() + ctx.receiveTx() + + // With the transactions broadcast, we'll mine a block to so that the + // result is delivered to each respective client. + ctx.backend.mine() + + // We should expect to see a single transaction that sweeps the high fee + // preference inputs. + sweepTx1 := receiveSpendTx(t, resultChan1) + assertTxSweepsInputs(t, sweepTx1, input1, input2) + sweepTx2 := receiveSpendTx(t, resultChan2) + assertTxSweepsInputs(t, sweepTx2, input1, input2) + + // We should expect to see a distinct transaction that sweeps the low + // fee preference inputs. + sweepTx3 := receiveSpendTx(t, resultChan3) + assertTxSweepsInputs(t, sweepTx3, input3) + + ctx.finish(1) +} diff --git a/utxonursery.go b/utxonursery.go index c5397581..bfdc075a 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -155,6 +155,12 @@ import ( var byteOrder = binary.BigEndian +const ( + // kgtnOutputConfTarget is the default confirmation target we'll use for + // sweeps of CSV delayed outputs. + kgtnOutputConfTarget = 6 +) + var ( // ErrContractNotFound is returned when the nursery is unable to // retrieve information about a queried contract. @@ -196,7 +202,7 @@ type NurseryConfig struct { Store NurseryStore // Sweep sweeps an input back to the wallet. - SweepInput func(input input.Input) (chan sweep.Result, error) + SweepInput func(input.Input, sweep.FeePreference) (chan sweep.Result, error) } // utxoNursery is a system dedicated to incubating time-locked outputs created @@ -804,12 +810,13 @@ func (u *utxoNursery) sweepMatureOutputs(classHeight uint32, utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx for "+ "height %v", len(kgtnOutputs), classHeight) + feePref := sweep.FeePreference{ConfTarget: kgtnOutputConfTarget} for _, output := range kgtnOutputs { // Create local copy to prevent pointer to loop variable to be // passed in with disastrous consequences. local := output - resultChan, err := u.cfg.SweepInput(&local) + resultChan, err := u.cfg.SweepInput(&local, feePref) if err != nil { return err } diff --git a/utxonursery_test.go b/utxonursery_test.go index 05be1a0a..8ad1c31f 100644 --- a/utxonursery_test.go +++ b/utxonursery_test.go @@ -1051,7 +1051,9 @@ func newMockSweeper(t *testing.T) *mockSweeper { } } -func (s *mockSweeper) sweepInput(input input.Input) (chan sweep.Result, error) { +func (s *mockSweeper) sweepInput(input input.Input, + _ sweep.FeePreference) (chan sweep.Result, error) { + utxnLog.Debugf("mockSweeper sweepInput called for %v", *input.OutPoint()) select {