multi: support arbitrary client fee preferences to UtxoSweeper

In this commit, we introduce support for arbitrary client fee
preferences when accepting input sweep requests. This is possible with
the addition of fee rate buckets. Fee rate buckets are buckets that
contain inputs with similar fee rates within a specific range, e.g.,
1-10 sat/vbyte, 11-20 sat/vbyte, etc. Having these buckets allows us to
batch and sweep inputs from different clients with similar fee rates
within a single transaction, allowing us to save on chain fees.

With this addition, we can now get rid of the UtxoSweeper's default fee
preference. As of this commit, any clients using the it to sweep inputs
specify the same fee preference to not change their behavior. Each of
these can be fine-tuned later on given their use cases.
This commit is contained in:
Wilmer Paulino 2019-05-01 16:06:19 -07:00
parent 138d9b68f0
commit 5172a5e255
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
7 changed files with 470 additions and 180 deletions

View File

@ -5,10 +5,16 @@ import (
"fmt"
"io"
"github.com/lightningnetwork/lnd/input"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/sweep"
)
const (
// commitOutputConfTarget is the default confirmation target we'll use
// for sweeps of commit outputs that belong to us.
commitOutputConfTarget = 6
)
// commitSweepResolver is a resolver that will attempt to sweep the commitment
@ -98,7 +104,8 @@ func (c *commitSweepResolver) Resolve() (ContractResolver, error) {
// sweeper.
log.Infof("%T(%v): sweeping commit output", c, c.chanPoint)
resultChan, err := c.Sweeper.SweepInput(&inp)
feePref := sweep.FeePreference{ConfTarget: commitOutputConfTarget}
resultChan, err := c.Sweeper.SweepInput(&inp, feePref)
if err != nil {
log.Errorf("%T(%v): unable to sweep input: %v",
c, c.chanPoint, err)

View File

@ -717,13 +717,14 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
NewBatchTimer: func() <-chan time.Time {
return time.NewTimer(sweep.DefaultBatchWindowDuration).C
},
SweepTxConfTarget: 6,
Notifier: cc.chainNotifier,
ChainIO: cc.chainIO,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,
MaxSweepAttempts: sweep.DefaultMaxSweepAttempts,
NextAttemptDeltaFunc: sweep.DefaultNextAttemptDeltaFunc,
MaxFeeRate: sweep.DefaultMaxFeeRate,
FeeRateBucketSize: sweep.DefaultFeeRateBucketSize,
})
s.utxoNursery = newUtxoNursery(&NurseryConfig{

View File

@ -16,6 +16,10 @@ type mockFeeEstimator struct {
blocksToFee map[uint32]lnwallet.SatPerKWeight
// A closure that when set is used instead of the
// mockFeeEstimator.EstimateFeePerKW method.
estimateFeePerKW func(numBlocks uint32) (lnwallet.SatPerKWeight, error)
lock sync.Mutex
}
@ -45,6 +49,10 @@ func (e *mockFeeEstimator) EstimateFeePerKW(numBlocks uint32) (
e.lock.Lock()
defer e.lock.Unlock()
if e.estimateFeePerKW != nil {
return e.estimateFeePerKW(numBlocks)
}
if fee, ok := e.blocksToFee[numBlocks]; ok {
return fee, nil
}

View File

@ -3,6 +3,7 @@ package sweep
import (
"errors"
"fmt"
"math"
"math/rand"
"sync"
"sync/atomic"
@ -16,6 +17,25 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
)
const (
// DefaultMaxFeeRate is the default maximum fee rate allowed within the
// UtxoSweeper. The current value is equivalent to a fee rate of 10,000
// sat/vbyte.
DefaultMaxFeeRate lnwallet.SatPerKWeight = 250 * 1e4
// DefaultFeeRateBucketSize is the default size of fee rate buckets
// we'll use when clustering inputs into buckets with similar fee rates
// within the UtxoSweeper.
//
// Given a minimum relay fee rate of 1 sat/vbyte, a multiplier of 10
// would result in the following fee rate buckets up to the maximum fee
// rate:
//
// #1: min = 1 sat/vbyte, max = 10 sat/vbyte
// #2: min = 11 sat/vbyte, max = 20 sat/vbyte...
DefaultFeeRateBucketSize = 10
)
var (
// ErrRemoteSpend is returned in case an output that we try to sweep is
// confirmed in a tx of the remote party.
@ -53,6 +73,22 @@ type pendingInput struct {
// publishAttempts records the number of attempts that have already been
// made to sweep this tx.
publishAttempts int
// feePreference is the fee preference of the client who requested the
// input to be swept. If a confirmation target is specified, then we'll
// map it into a fee rate whenever we attempt to cluster inputs for a
// sweep.
feePreference FeePreference
}
// pendingInputs is a type alias for a set of pending inputs.
type pendingInputs = map[wire.OutPoint]*pendingInput
// inputCluster is a helper struct to gather a set of pending inputs that should
// be swept with the specified fee rate.
type inputCluster struct {
sweepFeeRate lnwallet.SatPerKWeight
inputs pendingInputs
}
// UtxoSweeper is responsible for sweeping outputs back into the wallet
@ -65,7 +101,9 @@ type UtxoSweeper struct {
newInputs chan *sweepInputMessage
spendChan chan *chainntnfs.SpendDetail
pendingInputs map[wire.OutPoint]*pendingInput
// pendingInputs is the total set of inputs the UtxoSweeper has been
// requested to sweep.
pendingInputs pendingInputs
// timer is the channel that signals expiry of the sweep batch timer.
timer <-chan time.Time
@ -74,7 +112,7 @@ type UtxoSweeper struct {
currentOutputScript []byte
relayFeePerKW lnwallet.SatPerKWeight
relayFeeRate lnwallet.SatPerKWeight
quit chan struct{}
wg sync.WaitGroup
@ -114,10 +152,6 @@ type UtxoSweeperConfig struct {
// time the incubated outputs need to be spent.
Signer input.Signer
// SweepTxConfTarget assigns a confirmation target for sweep txes on
// which the fee calculation will be based.
SweepTxConfTarget uint32
// MaxInputsPerTx specifies the default maximum number of inputs allowed
// in a single sweep tx. If more need to be swept, multiple txes are
// created and published.
@ -131,6 +165,22 @@ type UtxoSweeperConfig struct {
// NextAttemptDeltaFunc returns given the number of already attempted
// sweeps, how many blocks to wait before retrying to sweep.
NextAttemptDeltaFunc func(int) int32
// MaxFeeRate is the the maximum fee rate allowed within the
// UtxoSweeper.
MaxFeeRate lnwallet.SatPerKWeight
// FeeRateBucketSize is the default size of fee rate buckets we'll use
// when clustering inputs into buckets with similar fee rates within the
// UtxoSweeper.
//
// Given a minimum relay fee rate of 1 sat/vbyte, a fee rate bucket size
// of 10 would result in the following fee rate buckets up to the
// maximum fee rate:
//
// #1: min = 1 sat/vbyte, max = 10 sat/vbyte
// #2: min = 11 sat/vbyte, max = 20 sat/vbyte...
FeeRateBucketSize int
}
// Result is the struct that is pushed through the result channel. Callers can
@ -149,19 +199,19 @@ type Result struct {
// sweepInputMessage structs are used in the internal channel between the
// SweepInput call and the sweeper main loop.
type sweepInputMessage struct {
input input.Input
resultChan chan Result
input input.Input
feePreference FeePreference
resultChan chan Result
}
// New returns a new Sweeper instance.
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
return &UtxoSweeper{
cfg: cfg,
newInputs: make(chan *sweepInputMessage),
spendChan: make(chan *chainntnfs.SpendDetail),
quit: make(chan struct{}),
pendingInputs: make(map[wire.OutPoint]*pendingInput),
pendingInputs: make(pendingInputs),
}
}
@ -199,7 +249,7 @@ func (s *UtxoSweeper) Start() error {
// Retrieve relay fee for dust limit calculation. Assume that this will
// not change from here on.
s.relayFeePerKW = s.cfg.FeeEstimator.RelayFeePerKW()
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
// Register for block epochs to retry sweeping every block.
bestHash, bestHeight, err := s.cfg.ChainIO.GetBestBlock()
@ -252,24 +302,33 @@ func (s *UtxoSweeper) Stop() error {
}
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
// swept after the batch time window ends.
// swept after the batch time window ends. A custom fee preference can be
// provided, otherwise the UtxoSweeper's default will be used.
//
// NOTE: Extreme care needs to be taken that input isn't changed externally.
// Because it is an interface and we don't know what is exactly behind it, we
// cannot make a local copy in sweeper.
func (s *UtxoSweeper) SweepInput(input input.Input) (chan Result, error) {
func (s *UtxoSweeper) SweepInput(input input.Input,
feePreference FeePreference) (chan Result, error) {
if input == nil || input.OutPoint() == nil || input.SignDesc() == nil {
return nil, errors.New("nil input received")
}
// Ensure the client provided a sane fee preference.
if _, err := s.feeRateForPreference(feePreference); err != nil {
return nil, err
}
log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
"time_lock=%v, size=%v", input.OutPoint(), input.WitnessType(),
input.BlocksToMaturity(),
btcutil.Amount(input.SignDesc().Output.Value))
"time_lock=%v, amount=%v, fee_preference=%v", input.OutPoint(),
input.WitnessType(), input.BlocksToMaturity(),
btcutil.Amount(input.SignDesc().Output.Value), feePreference)
sweeperInput := &sweepInputMessage{
input: input,
resultChan: make(chan Result, 1),
input: input,
feePreference: feePreference,
resultChan: make(chan Result, 1),
}
// Deliver input to main event loop.
@ -282,6 +341,27 @@ func (s *UtxoSweeper) SweepInput(input input.Input) (chan Result, error) {
return sweeperInput.resultChan, nil
}
// feeRateForPreference returns a fee rate for the given fee preference. It
// ensures that the fee rate respects the bounds of the UtxoSweeper.
func (s *UtxoSweeper) feeRateForPreference(
feePreference FeePreference) (lnwallet.SatPerKWeight, error) {
feeRate, err := DetermineFeePerKw(s.cfg.FeeEstimator, feePreference)
if err != nil {
return 0, err
}
if feeRate < s.relayFeeRate {
return 0, fmt.Errorf("fee preference resulted in invalid fee "+
"rate %v, mininum is %v", feeRate, s.relayFeeRate)
}
if feeRate > s.cfg.MaxFeeRate {
return 0, fmt.Errorf("fee preference resulted in invalid fee "+
"rate %v, maximum is %v", feeRate, s.cfg.MaxFeeRate)
}
return feeRate, nil
}
// collector is the sweeper main loop. It processes new inputs, spend
// notifications and counts down to publication of the sweep tx.
func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
@ -315,6 +395,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
listeners: []chan Result{input.resultChan},
input: input.input,
minPublishHeight: bestHeight,
feePreference: input.feePreference,
}
s.pendingInputs[outpoint] = pendInput
@ -406,29 +487,30 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
// be started when new inputs arrive.
s.timer = nil
// Retrieve fee estimate for input filtering and final
// tx fee calculation.
satPerKW, err := s.cfg.FeeEstimator.EstimateFeePerKW(
s.cfg.SweepTxConfTarget,
)
if err != nil {
log.Errorf("estimate fee: %v", err)
continue
}
// Examine pending inputs and try to construct lists of
// inputs.
inputLists, err := s.getInputLists(bestHeight, satPerKW)
if err != nil {
log.Errorf("get input lists: %v", err)
continue
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(inputs, satPerKW, bestHeight)
// We'll attempt to cluster all of our inputs with
// similar fee rates.
for _, cluster := range s.clusterBySweepFeeRate() {
// Examine pending inputs and try to construct
// lists of inputs.
inputLists, err := s.getInputLists(
cluster, bestHeight,
)
if err != nil {
log.Errorf("sweep: %v", err)
log.Errorf("Unable to examine pending "+
"inputs: %v", err)
continue
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(
inputs, cluster.sweepFeeRate,
bestHeight,
)
if err != nil {
log.Errorf("Unable to sweep "+
"inputs: %v", err)
}
}
}
@ -441,7 +523,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
bestHeight = epoch.Height
log.Debugf("New blocks: height=%v, sha=%v",
log.Debugf("New block: height=%v, sha=%v",
epoch.Height, epoch.Hash)
if err := s.scheduleSweep(bestHeight); err != nil {
@ -454,6 +536,62 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
}
}
// bucketForFeeReate determines the proper bucket for a fee rate. This is done
// in order to batch inputs with similar fee rates together.
func (s *UtxoSweeper) bucketForFeeRate(
feeRate lnwallet.SatPerKWeight) lnwallet.SatPerKWeight {
minBucket := s.relayFeeRate + lnwallet.SatPerKWeight(s.cfg.FeeRateBucketSize)
return lnwallet.SatPerKWeight(
math.Ceil(float64(feeRate) / float64(minBucket)),
)
}
// clusterBySweepFeeRate takes the set of pending inputs within the UtxoSweeper
// and clusters those together with similar fee rates. Each cluster contains a
// sweep fee rate, which is determined by calculating the average fee rate of
// all inputs within that cluster.
func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
bucketInputs := make(map[lnwallet.SatPerKWeight]pendingInputs)
inputFeeRates := make(map[wire.OutPoint]lnwallet.SatPerKWeight)
// First, we'll group together all inputs with similar fee rates. This
// is done by determining the fee rate bucket they should belong in.
for op, input := range s.pendingInputs {
feeRate, err := s.feeRateForPreference(input.feePreference)
if err != nil {
log.Warnf("Skipping input %v: %v", op, err)
continue
}
bucket := s.bucketForFeeRate(feeRate)
inputs, ok := bucketInputs[bucket]
if !ok {
inputs = make(pendingInputs)
bucketInputs[bucket] = inputs
}
inputs[op] = input
inputFeeRates[op] = feeRate
}
// We'll then determine the sweep fee rate for each set of inputs by
// calculating the average fee rate of the inputs within each set.
inputClusters := make([]inputCluster, 0, len(bucketInputs))
for _, inputs := range bucketInputs {
var sweepFeeRate lnwallet.SatPerKWeight
for op := range inputs {
sweepFeeRate += inputFeeRates[op]
}
sweepFeeRate /= lnwallet.SatPerKWeight(len(inputs))
inputClusters = append(inputClusters, inputCluster{
sweepFeeRate: sweepFeeRate,
inputs: inputs,
})
}
return inputClusters
}
// scheduleSweep starts the sweep timer to create an opportunity for more inputs
// to be added.
func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {
@ -464,27 +602,25 @@ func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {
return nil
}
// Retrieve fee estimate for input filtering and final tx fee
// calculation.
satPerKW, err := s.cfg.FeeEstimator.EstimateFeePerKW(
s.cfg.SweepTxConfTarget,
)
if err != nil {
return fmt.Errorf("estimate fee: %v", err)
// We'll only start our timer once we have inputs we're able to sweep.
startTimer := false
for _, cluster := range s.clusterBySweepFeeRate() {
// Examine pending inputs and try to construct lists of inputs.
inputLists, err := s.getInputLists(cluster, currentHeight)
if err != nil {
return fmt.Errorf("get input lists: %v", err)
}
log.Infof("Sweep candidates at height=%v with fee_rate=%v, "+
"yield %v distinct txns", currentHeight,
cluster.sweepFeeRate, len(inputLists))
if len(inputLists) != 0 {
startTimer = true
break
}
}
// Examine pending inputs and try to construct lists of inputs.
inputLists, err := s.getInputLists(currentHeight, satPerKW)
if err != nil {
return fmt.Errorf("get input lists: %v", err)
}
log.Infof("Sweep candidates at height=%v, yield %v distinct txns",
currentHeight, len(inputLists))
// If there are no input sets, there is nothing sweepable and we can
// return without starting the timer.
if len(inputLists) == 0 {
if !startTimer {
return nil
}
@ -533,13 +669,13 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
delete(s.pendingInputs, *outpoint)
}
// getInputLists goes through all pending inputs and constructs sweep lists,
// each up to the configured maximum number of inputs. Negative yield inputs are
// skipped. Transactions with an output below the dust limit are not published.
// Those inputs remain pending and will be bundled with future inputs if
// possible.
func (s *UtxoSweeper) getInputLists(currentHeight int32,
satPerKW lnwallet.SatPerKWeight) ([]inputSet, error) {
// getInputLists goes through the given inputs and constructs multiple distinct
// sweep lists with the given fee rate, each up to the configured maximum number
// of inputs. Negative yield inputs are skipped. Transactions with an output
// below the dust limit are not published. Those inputs remain pending and will
// be bundled with future inputs if possible.
func (s *UtxoSweeper) getInputLists(cluster inputCluster,
currentHeight int32) ([]inputSet, error) {
// Filter for inputs that need to be swept. Create two lists: all
// sweepable inputs and a list containing only the new, never tried
@ -552,7 +688,7 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
// consisting of only new inputs to the list, to make sure that new
// inputs are given a good, isolated chance of being published.
var newInputs, retryInputs []input.Input
for _, input := range s.pendingInputs {
for _, input := range cluster.inputs {
// Skip inputs that have a minimum publish height that is not
// yet reached.
if input.minPublishHeight > currentHeight {
@ -573,9 +709,8 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
if len(retryInputs) > 0 {
var err error
allSets, err = generateInputPartitionings(
append(retryInputs, newInputs...),
s.relayFeePerKW, satPerKW,
s.cfg.MaxInputsPerTx,
append(retryInputs, newInputs...), s.relayFeeRate,
cluster.sweepFeeRate, s.cfg.MaxInputsPerTx,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
@ -584,8 +719,7 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
// Create sets for just the new inputs.
newSets, err := generateInputPartitionings(
newInputs,
s.relayFeePerKW, satPerKW,
newInputs, s.relayFeeRate, cluster.sweepFeeRate,
s.cfg.MaxInputsPerTx,
)
if err != nil {
@ -602,23 +736,22 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
// tx. The output address is only marked as used if the publish succeeds.
func (s *UtxoSweeper) sweep(inputs inputSet,
satPerKW lnwallet.SatPerKWeight, currentHeight int32) error {
func (s *UtxoSweeper) sweep(inputs inputSet, feeRate lnwallet.SatPerKWeight,
currentHeight int32) error {
var err error
// Generate output script if no unused script available.
// Generate an output script if there isn't an unused script available.
if s.currentOutputScript == nil {
s.currentOutputScript, err = s.cfg.GenSweepScript()
pkScript, err := s.cfg.GenSweepScript()
if err != nil {
return fmt.Errorf("gen sweep script: %v", err)
}
s.currentOutputScript = pkScript
}
// Create sweep tx.
tx, err := createSweepTx(
inputs, s.currentOutputScript,
uint32(currentHeight), satPerKW, s.cfg.Signer,
inputs, s.currentOutputScript, uint32(currentHeight), feeRate,
s.cfg.Signer,
)
if err != nil {
return fmt.Errorf("create sweep tx: %v", err)
@ -651,8 +784,8 @@ func (s *UtxoSweeper) sweep(inputs inputSet,
return fmt.Errorf("publish tx: %v", err)
}
// Keep outputScript in case of an error, so that it can be reused for
// the next tx and causes no address inflation.
// Keep the output script in case of an error, so that it can be reused
// for the next transaction and causes no address inflation.
if err == nil {
s.currentOutputScript = nil
}
@ -692,6 +825,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet,
})
}
}
return nil
}

View File

@ -10,10 +10,11 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
)
var (
@ -22,6 +23,8 @@ var (
testMaxSweepAttempts = 3
testMaxInputsPerTx = 3
defaultFeePref = FeePreference{ConfTarget: 1}
)
type sweeperTestContext struct {
@ -96,7 +99,7 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
backend := newMockBackend(notifier)
estimator := newMockFeeEstimator(10000, 1000)
estimator := newMockFeeEstimator(10000, lnwallet.FeePerKwFloor)
publishChan := make(chan wire.MsgTx, 2)
ctx := &sweeperTestContext{
@ -127,10 +130,9 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
ctx.timeoutChan <- c
return c
},
Store: store,
Signer: &mockSigner{},
SweepTxConfTarget: 1,
ChainIO: &mockChainIO{},
Store: store,
Signer: &mockSigner{},
ChainIO: &mockChainIO{},
GenSweepScript: func() ([]byte, error) {
script := []byte{outputScriptCount}
outputScriptCount++
@ -143,6 +145,8 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
// Use delta func without random factor.
return 1 << uint(attempts-1)
},
MaxFeeRate: DefaultMaxFeeRate,
FeeRateBucketSize: DefaultFeeRateBucketSize,
})
ctx.sweeper.Start()
@ -150,6 +154,14 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
return ctx
}
func (ctx *sweeperTestContext) restartSweeper() {
ctx.t.Helper()
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
}
func (ctx *sweeperTestContext) tick() {
testLog.Trace("Waiting for tick to be consumed")
select {
@ -251,11 +263,95 @@ func (ctx *sweeperTestContext) expectResult(c chan Result, expected error) {
}
}
// receiveSpendTx receives the transaction sent through the given resultChan.
func receiveSpendTx(t *testing.T, resultChan chan Result) *wire.MsgTx {
t.Helper()
var result Result
select {
case result = <-resultChan:
case <-time.After(5 * time.Second):
t.Fatal("no sweep result received")
}
if result.Err != nil {
t.Fatalf("expected successful spend, but received error "+
"\"%v\" instead", result.Err)
}
return result.Tx
}
// assertTxSweepsInputs ensures that the transaction returned within the value
// received from resultChan spends the given inputs.
func assertTxSweepsInputs(t *testing.T, sweepTx *wire.MsgTx,
inputs ...input.Input) {
t.Helper()
if len(sweepTx.TxIn) != len(inputs) {
t.Fatalf("expected sweep tx to contain %d inputs, got %d",
len(inputs), len(sweepTx.TxIn))
}
m := make(map[wire.OutPoint]struct{}, len(inputs))
for _, input := range inputs {
m[*input.OutPoint()] = struct{}{}
}
for _, txIn := range sweepTx.TxIn {
if _, ok := m[txIn.PreviousOutPoint]; !ok {
t.Fatalf("expected tx %v to spend input %v",
txIn.PreviousOutPoint, sweepTx.TxHash())
}
}
}
// assertTxFeeRate asserts that the transaction was created with the given
// inputs and fee rate.
//
// NOTE: This assumes that transactions only have one output, as this is the
// only type of transaction the UtxoSweeper can create at the moment.
func assertTxFeeRate(t *testing.T, tx *wire.MsgTx,
expectedFeeRate lnwallet.SatPerKWeight, inputs ...input.Input) {
t.Helper()
if len(tx.TxIn) != len(inputs) {
t.Fatalf("expected %d inputs, got %d", len(tx.TxIn), len(inputs))
}
m := make(map[wire.OutPoint]input.Input, len(inputs))
for _, input := range inputs {
m[*input.OutPoint()] = input
}
var inputAmt int64
for _, txIn := range tx.TxIn {
input, ok := m[txIn.PreviousOutPoint]
if !ok {
t.Fatalf("expected input %v to be provided",
txIn.PreviousOutPoint)
}
inputAmt += input.SignDesc().Output.Value
}
outputAmt := tx.TxOut[0].Value
fee := btcutil.Amount(inputAmt - outputAmt)
_, txWeight, _, _ := getWeightEstimate(inputs)
expectedFee := expectedFeeRate.FeeForWeight(txWeight)
if fee != expectedFee {
t.Fatalf("expected fee rate %v results in %v fee, got %v fee",
expectedFeeRate, expectedFee, fee)
}
}
// TestSuccess tests the sweeper happy flow.
func TestSuccess(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -305,7 +401,7 @@ func TestDust(t *testing.T) {
// sweep tx output script (P2WPKH).
dustInput := createTestInput(5260, input.CommitmentTimeLock)
_, err := ctx.sweeper.SweepInput(&dustInput)
_, err := ctx.sweeper.SweepInput(&dustInput, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -316,7 +412,7 @@ func TestDust(t *testing.T) {
// Sweep another input that brings the tx output above the dust limit.
largeInput := createTestInput(100000, input.CommitmentTimeLock)
_, err = ctx.sweeper.SweepInput(&largeInput)
_, err = ctx.sweeper.SweepInput(&largeInput, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -345,7 +441,9 @@ func TestNegativeInput(t *testing.T) {
// Sweep an input large enough to cover fees, so in any case the tx
// output will be above the dust limit.
largeInput := createTestInput(100000, input.CommitmentNoDelay)
largeInputResult, err := ctx.sweeper.SweepInput(&largeInput)
largeInputResult, err := ctx.sweeper.SweepInput(
&largeInput, defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -354,7 +452,7 @@ func TestNegativeInput(t *testing.T) {
// the HtlcAcceptedRemoteSuccess input type adds more in fees than its
// value at the current fee level.
negInput := createTestInput(2900, input.HtlcOfferedRemoteTimeout)
negInputResult, err := ctx.sweeper.SweepInput(&negInput)
negInputResult, err := ctx.sweeper.SweepInput(&negInput, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -362,7 +460,9 @@ func TestNegativeInput(t *testing.T) {
// Sweep a third input that has a smaller output than the previous one,
// but yields positively because of its lower weight.
positiveInput := createTestInput(2800, input.CommitmentNoDelay)
positiveInputResult, err := ctx.sweeper.SweepInput(&positiveInput)
positiveInputResult, err := ctx.sweeper.SweepInput(
&positiveInput, defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -373,13 +473,7 @@ func TestNegativeInput(t *testing.T) {
// contain the large input. The negative input should stay out of sweeps
// until fees come down to get a positive net yield.
sweepTx1 := ctx.receiveTx()
if !testTxIns(&sweepTx1, []*wire.OutPoint{
largeInput.OutPoint(), positiveInput.OutPoint(),
}) {
t.Fatalf("Tx does not contain expected inputs: %v",
spew.Sdump(sweepTx1))
}
assertTxSweepsInputs(t, &sweepTx1, &largeInput, &positiveInput)
ctx.backend.mine()
@ -389,9 +483,11 @@ func TestNegativeInput(t *testing.T) {
// Lower fee rate so that the negative input is no longer negative.
ctx.estimator.updateFees(1000, 1000)
// Create another large input
// Create another large input.
secondLargeInput := createTestInput(100000, input.CommitmentNoDelay)
secondLargeInputResult, err := ctx.sweeper.SweepInput(&secondLargeInput)
secondLargeInputResult, err := ctx.sweeper.SweepInput(
&secondLargeInput, defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -399,11 +495,7 @@ func TestNegativeInput(t *testing.T) {
ctx.tick()
sweepTx2 := ctx.receiveTx()
if !testTxIns(&sweepTx2, []*wire.OutPoint{
secondLargeInput.OutPoint(), negInput.OutPoint(),
}) {
t.Fatal("Tx does not contain expected inputs")
}
assertTxSweepsInputs(t, &sweepTx2, &secondLargeInput, &negInput)
ctx.backend.mine()
@ -413,32 +505,13 @@ func TestNegativeInput(t *testing.T) {
ctx.finish(1)
}
func testTxIns(tx *wire.MsgTx, inputs []*wire.OutPoint) bool {
if len(tx.TxIn) != len(inputs) {
return false
}
ins := make(map[wire.OutPoint]struct{})
for _, in := range tx.TxIn {
ins[in.PreviousOutPoint] = struct{}{}
}
for _, expectedIn := range inputs {
if _, ok := ins[*expectedIn]; !ok {
return false
}
}
return true
}
// TestChunks asserts that large sets of inputs are split into multiple txes.
func TestChunks(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep five inputs.
for _, input := range spendableInputs[:5] {
_, err := ctx.sweeper.SweepInput(input)
_, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -479,12 +552,16 @@ func TestRemoteSpend(t *testing.T) {
func testRemoteSpend(t *testing.T, postSweep bool) {
ctx := createSweeperTestContext(t)
resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan1, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
resultChan2, err := ctx.sweeper.SweepInput(spendableInputs[1])
resultChan2, err := ctx.sweeper.SweepInput(
spendableInputs[1], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -557,12 +634,13 @@ func testRemoteSpend(t *testing.T, postSweep bool) {
func TestIdempotency(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[0])
input := spendableInputs[0]
resultChan1, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
resultChan2, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan2, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -571,7 +649,7 @@ func TestIdempotency(t *testing.T) {
ctx.receiveTx()
resultChan3, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan3, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -588,7 +666,7 @@ func TestIdempotency(t *testing.T) {
// immediately receive the spend notification with a spending tx hash.
// Because the sweeper kept track of all of its sweep txes, it will
// recognize the spend as its own.
resultChan4, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan4, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -615,8 +693,8 @@ func TestRestart(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep input and expect sweep tx.
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
if err != nil {
input1 := spendableInputs[0]
if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil {
t.Fatal(err)
}
ctx.tick()
@ -624,21 +702,19 @@ func TestRestart(t *testing.T) {
ctx.receiveTx()
// Restart sweeper.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
// Simulate other subsystem (eg contract resolver) re-offering inputs.
spendChan1, err := ctx.sweeper.SweepInput(spendableInputs[0])
spendChan1, err := ctx.sweeper.SweepInput(input1, defaultFeePref)
if err != nil {
t.Fatal(err)
}
spendChan2, err := ctx.sweeper.SweepInput(spendableInputs[1])
input2 := spendableInputs[1]
spendChan2, err := ctx.sweeper.SweepInput(input2, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -676,9 +752,7 @@ func TestRestart(t *testing.T) {
}
// Restart sweeper again. No action is expected.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
@ -693,14 +767,14 @@ func TestRestartRemoteSpend(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep input.
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
if err != nil {
input1 := spendableInputs[0]
if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil {
t.Fatal(err)
}
// Sweep another input.
_, err = ctx.sweeper.SweepInput(spendableInputs[1])
if err != nil {
input2 := spendableInputs[1]
if _, err := ctx.sweeper.SweepInput(input2, defaultFeePref); err != nil {
t.Fatal(err)
}
@ -709,10 +783,7 @@ func TestRestartRemoteSpend(t *testing.T) {
sweepTx := ctx.receiveTx()
// Restart sweeper.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
@ -723,12 +794,11 @@ func TestRestartRemoteSpend(t *testing.T) {
remoteTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{
PreviousOutPoint: *(spendableInputs[1].OutPoint()),
PreviousOutPoint: *(input2.OutPoint()),
},
},
}
err = ctx.backend.publishTransaction(remoteTx)
if err != nil {
if err := ctx.backend.publishTransaction(remoteTx); err != nil {
t.Fatal(err)
}
@ -736,7 +806,7 @@ func TestRestartRemoteSpend(t *testing.T) {
ctx.backend.mine()
// Simulate other subsystem (eg contract resolver) re-offering input 0.
spendChan, err := ctx.sweeper.SweepInput(spendableInputs[0])
spendChan, err := ctx.sweeper.SweepInput(input1, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -760,8 +830,8 @@ func TestRestartConfirmed(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep input.
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
if err != nil {
input := spendableInputs[0]
if _, err := ctx.sweeper.SweepInput(input, defaultFeePref); err != nil {
t.Fatal(err)
}
@ -770,10 +840,7 @@ func TestRestartConfirmed(t *testing.T) {
ctx.receiveTx()
// Restart sweeper.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
@ -782,7 +849,7 @@ func TestRestartConfirmed(t *testing.T) {
ctx.backend.mine()
// Simulate other subsystem (eg contract resolver) re-offering input 0.
spendChan, err := ctx.sweeper.SweepInput(spendableInputs[0])
spendChan, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -801,7 +868,7 @@ func TestRestartConfirmed(t *testing.T) {
func TestRestartRepublish(t *testing.T) {
ctx := createSweeperTestContext(t)
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
_, err := ctx.sweeper.SweepInput(spendableInputs[0], defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -811,9 +878,7 @@ func TestRestartRepublish(t *testing.T) {
sweepTx := ctx.receiveTx()
// Restart sweeper again. No action is expected.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
republishedTx := ctx.receiveTx()
@ -831,7 +896,9 @@ func TestRestartRepublish(t *testing.T) {
func TestRetry(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan0, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan0, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -846,7 +913,9 @@ func TestRetry(t *testing.T) {
ctx.notifier.NotifyEpoch(1000)
// Offer a fresh input.
resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[1])
resultChan1, err := ctx.sweeper.SweepInput(
spendableInputs[1], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -871,7 +940,9 @@ func TestRetry(t *testing.T) {
func TestGiveUp(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan0, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan0, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -902,3 +973,63 @@ func TestGiveUp(t *testing.T) {
ctx.finish(1)
}
// TestDifferentFeePreferences ensures that the sweeper can have different
// transactions for different fee preferences.
func TestDifferentFeePreferences(t *testing.T) {
ctx := createSweeperTestContext(t)
// Throughout this test, we'll be attempting to sweep three inputs, two
// with the higher fee preference, and the last with the lower. We do
// this to ensure the sweeper can broadcast distinct transactions for
// each sweep with a different fee preference.
lowFeePref := FeePreference{
ConfTarget: 12,
}
ctx.estimator.blocksToFee[lowFeePref.ConfTarget] = 5000
highFeePref := FeePreference{
ConfTarget: 6,
}
ctx.estimator.blocksToFee[highFeePref.ConfTarget] = 10000
input1 := spendableInputs[0]
resultChan1, err := ctx.sweeper.SweepInput(input1, highFeePref)
if err != nil {
t.Fatal(err)
}
input2 := spendableInputs[1]
resultChan2, err := ctx.sweeper.SweepInput(input2, highFeePref)
if err != nil {
t.Fatal(err)
}
input3 := spendableInputs[2]
resultChan3, err := ctx.sweeper.SweepInput(input3, lowFeePref)
if err != nil {
t.Fatal(err)
}
// Start the sweeper's batch ticker, which should cause the sweep
// transactions to be broadcast.
ctx.tick()
ctx.receiveTx()
ctx.receiveTx()
// With the transactions broadcast, we'll mine a block to so that the
// result is delivered to each respective client.
ctx.backend.mine()
// We should expect to see a single transaction that sweeps the high fee
// preference inputs.
sweepTx1 := receiveSpendTx(t, resultChan1)
assertTxSweepsInputs(t, sweepTx1, input1, input2)
sweepTx2 := receiveSpendTx(t, resultChan2)
assertTxSweepsInputs(t, sweepTx2, input1, input2)
// We should expect to see a distinct transaction that sweeps the low
// fee preference inputs.
sweepTx3 := receiveSpendTx(t, resultChan3)
assertTxSweepsInputs(t, sweepTx3, input3)
ctx.finish(1)
}

View File

@ -155,6 +155,12 @@ import (
var byteOrder = binary.BigEndian
const (
// kgtnOutputConfTarget is the default confirmation target we'll use for
// sweeps of CSV delayed outputs.
kgtnOutputConfTarget = 6
)
var (
// ErrContractNotFound is returned when the nursery is unable to
// retrieve information about a queried contract.
@ -196,7 +202,7 @@ type NurseryConfig struct {
Store NurseryStore
// Sweep sweeps an input back to the wallet.
SweepInput func(input input.Input) (chan sweep.Result, error)
SweepInput func(input.Input, sweep.FeePreference) (chan sweep.Result, error)
}
// utxoNursery is a system dedicated to incubating time-locked outputs created
@ -804,12 +810,13 @@ func (u *utxoNursery) sweepMatureOutputs(classHeight uint32,
utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx for "+
"height %v", len(kgtnOutputs), classHeight)
feePref := sweep.FeePreference{ConfTarget: kgtnOutputConfTarget}
for _, output := range kgtnOutputs {
// Create local copy to prevent pointer to loop variable to be
// passed in with disastrous consequences.
local := output
resultChan, err := u.cfg.SweepInput(&local)
resultChan, err := u.cfg.SweepInput(&local, feePref)
if err != nil {
return err
}

View File

@ -1051,7 +1051,9 @@ func newMockSweeper(t *testing.T) *mockSweeper {
}
}
func (s *mockSweeper) sweepInput(input input.Input) (chan sweep.Result, error) {
func (s *mockSweeper) sweepInput(input input.Input,
_ sweep.FeePreference) (chan sweep.Result, error) {
utxnLog.Debugf("mockSweeper sweepInput called for %v", *input.OutPoint())
select {