Merge pull request #3026 from wpaulino/sweeper-fee-preference

sweep: add support for multiple fee preferences to UtxoSweeper
This commit is contained in:
Olaoluwa Osuntokun 2019-05-27 16:02:37 -07:00 committed by GitHub
commit 0343327994
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 497 additions and 180 deletions

View File

@ -5,10 +5,16 @@ import (
"fmt"
"io"
"github.com/lightningnetwork/lnd/input"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/sweep"
)
const (
// commitOutputConfTarget is the default confirmation target we'll use
// for sweeps of commit outputs that belong to us.
commitOutputConfTarget = 6
)
// commitSweepResolver is a resolver that will attempt to sweep the commitment
@ -98,7 +104,8 @@ func (c *commitSweepResolver) Resolve() (ContractResolver, error) {
// sweeper.
log.Infof("%T(%v): sweeping commit output", c, c.chanPoint)
resultChan, err := c.Sweeper.SweepInput(&inp)
feePref := sweep.FeePreference{ConfTarget: commitOutputConfTarget}
resultChan, err := c.Sweeper.SweepInput(&inp, feePref)
if err != nil {
log.Errorf("%T(%v): unable to sweep input: %v",
c, c.chanPoint, err)

View File

@ -54,6 +54,11 @@ func (s SatPerKVByte) FeePerKWeight() SatPerKWeight {
return SatPerKWeight(s / blockchain.WitnessScaleFactor)
}
// String returns a human-readable string of the fee rate.
func (s SatPerKVByte) String() string {
return fmt.Sprintf("%v sat/kb", int64(s))
}
// SatPerKWeight represents a fee rate in sat/kw.
type SatPerKWeight btcutil.Amount
@ -69,6 +74,11 @@ func (s SatPerKWeight) FeePerKVByte() SatPerKVByte {
return SatPerKVByte(s * blockchain.WitnessScaleFactor)
}
// String returns a human-readable string of the fee rate.
func (s SatPerKWeight) String() string {
return fmt.Sprintf("%v sat/kw", int64(s))
}
// FeeEstimator provides the ability to estimate on-chain transaction fees for
// various combinations of transaction sizes and desired confirmation time
// (measured by number of blocks).

View File

@ -717,13 +717,14 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
NewBatchTimer: func() <-chan time.Time {
return time.NewTimer(sweep.DefaultBatchWindowDuration).C
},
SweepTxConfTarget: 6,
Notifier: cc.chainNotifier,
ChainIO: cc.chainIO,
Store: sweeperStore,
MaxInputsPerTx: sweep.DefaultMaxInputsPerTx,
MaxSweepAttempts: sweep.DefaultMaxSweepAttempts,
NextAttemptDeltaFunc: sweep.DefaultNextAttemptDeltaFunc,
MaxFeeRate: sweep.DefaultMaxFeeRate,
FeeRateBucketSize: sweep.DefaultFeeRateBucketSize,
})
s.utxoNursery = newUtxoNursery(&NurseryConfig{

View File

@ -16,6 +16,10 @@ type mockFeeEstimator struct {
blocksToFee map[uint32]lnwallet.SatPerKWeight
// A closure that when set is used instead of the
// mockFeeEstimator.EstimateFeePerKW method.
estimateFeePerKW func(numBlocks uint32) (lnwallet.SatPerKWeight, error)
lock sync.Mutex
}
@ -45,6 +49,10 @@ func (e *mockFeeEstimator) EstimateFeePerKW(numBlocks uint32) (
e.lock.Lock()
defer e.lock.Unlock()
if e.estimateFeePerKW != nil {
return e.estimateFeePerKW(numBlocks)
}
if fee, ok := e.blocksToFee[numBlocks]; ok {
return fee, nil
}

View File

@ -3,7 +3,9 @@ package sweep
import (
"errors"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
@ -16,6 +18,25 @@ import (
"github.com/lightningnetwork/lnd/lnwallet"
)
const (
// DefaultMaxFeeRate is the default maximum fee rate allowed within the
// UtxoSweeper. The current value is equivalent to a fee rate of 10,000
// sat/vbyte.
DefaultMaxFeeRate lnwallet.SatPerKWeight = 250 * 1e4
// DefaultFeeRateBucketSize is the default size of fee rate buckets
// we'll use when clustering inputs into buckets with similar fee rates
// within the UtxoSweeper.
//
// Given a minimum relay fee rate of 1 sat/vbyte, a multiplier of 10
// would result in the following fee rate buckets up to the maximum fee
// rate:
//
// #1: min = 1 sat/vbyte, max = 10 sat/vbyte
// #2: min = 11 sat/vbyte, max = 20 sat/vbyte...
DefaultFeeRateBucketSize = 10
)
var (
// ErrRemoteSpend is returned in case an output that we try to sweep is
// confirmed in a tx of the remote party.
@ -53,6 +74,22 @@ type pendingInput struct {
// publishAttempts records the number of attempts that have already been
// made to sweep this tx.
publishAttempts int
// feePreference is the fee preference of the client who requested the
// input to be swept. If a confirmation target is specified, then we'll
// map it into a fee rate whenever we attempt to cluster inputs for a
// sweep.
feePreference FeePreference
}
// pendingInputs is a type alias for a set of pending inputs.
type pendingInputs = map[wire.OutPoint]*pendingInput
// inputCluster is a helper struct to gather a set of pending inputs that should
// be swept with the specified fee rate.
type inputCluster struct {
sweepFeeRate lnwallet.SatPerKWeight
inputs pendingInputs
}
// UtxoSweeper is responsible for sweeping outputs back into the wallet
@ -65,7 +102,9 @@ type UtxoSweeper struct {
newInputs chan *sweepInputMessage
spendChan chan *chainntnfs.SpendDetail
pendingInputs map[wire.OutPoint]*pendingInput
// pendingInputs is the total set of inputs the UtxoSweeper has been
// requested to sweep.
pendingInputs pendingInputs
// timer is the channel that signals expiry of the sweep batch timer.
timer <-chan time.Time
@ -74,7 +113,7 @@ type UtxoSweeper struct {
currentOutputScript []byte
relayFeePerKW lnwallet.SatPerKWeight
relayFeeRate lnwallet.SatPerKWeight
quit chan struct{}
wg sync.WaitGroup
@ -114,10 +153,6 @@ type UtxoSweeperConfig struct {
// time the incubated outputs need to be spent.
Signer input.Signer
// SweepTxConfTarget assigns a confirmation target for sweep txes on
// which the fee calculation will be based.
SweepTxConfTarget uint32
// MaxInputsPerTx specifies the default maximum number of inputs allowed
// in a single sweep tx. If more need to be swept, multiple txes are
// created and published.
@ -131,6 +166,22 @@ type UtxoSweeperConfig struct {
// NextAttemptDeltaFunc returns given the number of already attempted
// sweeps, how many blocks to wait before retrying to sweep.
NextAttemptDeltaFunc func(int) int32
// MaxFeeRate is the the maximum fee rate allowed within the
// UtxoSweeper.
MaxFeeRate lnwallet.SatPerKWeight
// FeeRateBucketSize is the default size of fee rate buckets we'll use
// when clustering inputs into buckets with similar fee rates within the
// UtxoSweeper.
//
// Given a minimum relay fee rate of 1 sat/vbyte, a fee rate bucket size
// of 10 would result in the following fee rate buckets up to the
// maximum fee rate:
//
// #1: min = 1 sat/vbyte, max = 10 sat/vbyte
// #2: min = 11 sat/vbyte, max = 20 sat/vbyte...
FeeRateBucketSize int
}
// Result is the struct that is pushed through the result channel. Callers can
@ -149,19 +200,19 @@ type Result struct {
// sweepInputMessage structs are used in the internal channel between the
// SweepInput call and the sweeper main loop.
type sweepInputMessage struct {
input input.Input
resultChan chan Result
input input.Input
feePreference FeePreference
resultChan chan Result
}
// New returns a new Sweeper instance.
func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
return &UtxoSweeper{
cfg: cfg,
newInputs: make(chan *sweepInputMessage),
spendChan: make(chan *chainntnfs.SpendDetail),
quit: make(chan struct{}),
pendingInputs: make(map[wire.OutPoint]*pendingInput),
pendingInputs: make(pendingInputs),
}
}
@ -199,7 +250,7 @@ func (s *UtxoSweeper) Start() error {
// Retrieve relay fee for dust limit calculation. Assume that this will
// not change from here on.
s.relayFeePerKW = s.cfg.FeeEstimator.RelayFeePerKW()
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()
// Register for block epochs to retry sweeping every block.
bestHash, bestHeight, err := s.cfg.ChainIO.GetBestBlock()
@ -252,24 +303,33 @@ func (s *UtxoSweeper) Stop() error {
}
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
// swept after the batch time window ends.
// swept after the batch time window ends. A custom fee preference can be
// provided, otherwise the UtxoSweeper's default will be used.
//
// NOTE: Extreme care needs to be taken that input isn't changed externally.
// Because it is an interface and we don't know what is exactly behind it, we
// cannot make a local copy in sweeper.
func (s *UtxoSweeper) SweepInput(input input.Input) (chan Result, error) {
func (s *UtxoSweeper) SweepInput(input input.Input,
feePreference FeePreference) (chan Result, error) {
if input == nil || input.OutPoint() == nil || input.SignDesc() == nil {
return nil, errors.New("nil input received")
}
// Ensure the client provided a sane fee preference.
if _, err := s.feeRateForPreference(feePreference); err != nil {
return nil, err
}
log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
"time_lock=%v, size=%v", input.OutPoint(), input.WitnessType(),
input.BlocksToMaturity(),
btcutil.Amount(input.SignDesc().Output.Value))
"time_lock=%v, amount=%v, fee_preference=%v", input.OutPoint(),
input.WitnessType(), input.BlocksToMaturity(),
btcutil.Amount(input.SignDesc().Output.Value), feePreference)
sweeperInput := &sweepInputMessage{
input: input,
resultChan: make(chan Result, 1),
input: input,
feePreference: feePreference,
resultChan: make(chan Result, 1),
}
// Deliver input to main event loop.
@ -282,6 +342,27 @@ func (s *UtxoSweeper) SweepInput(input input.Input) (chan Result, error) {
return sweeperInput.resultChan, nil
}
// feeRateForPreference returns a fee rate for the given fee preference. It
// ensures that the fee rate respects the bounds of the UtxoSweeper.
func (s *UtxoSweeper) feeRateForPreference(
feePreference FeePreference) (lnwallet.SatPerKWeight, error) {
feeRate, err := DetermineFeePerKw(s.cfg.FeeEstimator, feePreference)
if err != nil {
return 0, err
}
if feeRate < s.relayFeeRate {
return 0, fmt.Errorf("fee preference resulted in invalid fee "+
"rate %v, mininum is %v", feeRate, s.relayFeeRate)
}
if feeRate > s.cfg.MaxFeeRate {
return 0, fmt.Errorf("fee preference resulted in invalid fee "+
"rate %v, maximum is %v", feeRate, s.cfg.MaxFeeRate)
}
return feeRate, nil
}
// collector is the sweeper main loop. It processes new inputs, spend
// notifications and counts down to publication of the sweep tx.
func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
@ -315,6 +396,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
listeners: []chan Result{input.resultChan},
input: input.input,
minPublishHeight: bestHeight,
feePreference: input.feePreference,
}
s.pendingInputs[outpoint] = pendInput
@ -406,29 +488,39 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
// be started when new inputs arrive.
s.timer = nil
// Retrieve fee estimate for input filtering and final
// tx fee calculation.
satPerKW, err := s.cfg.FeeEstimator.EstimateFeePerKW(
s.cfg.SweepTxConfTarget,
)
if err != nil {
log.Errorf("estimate fee: %v", err)
continue
}
// Examine pending inputs and try to construct lists of
// inputs.
inputLists, err := s.getInputLists(bestHeight, satPerKW)
if err != nil {
log.Errorf("get input lists: %v", err)
continue
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(inputs, satPerKW, bestHeight)
// We'll attempt to cluster all of our inputs with
// similar fee rates. Before attempting to sweep them,
// we'll sort them in descending fee rate order. We do
// this to ensure any inputs which have had their fee
// rate bumped are broadcast first in order enforce the
// RBF policy.
inputClusters := s.clusterBySweepFeeRate()
sort.Slice(inputClusters, func(i, j int) bool {
return inputClusters[i].sweepFeeRate >
inputClusters[j].sweepFeeRate
})
for _, cluster := range inputClusters {
// Examine pending inputs and try to construct
// lists of inputs.
inputLists, err := s.getInputLists(
cluster, bestHeight,
)
if err != nil {
log.Errorf("sweep: %v", err)
log.Errorf("Unable to examine pending "+
"inputs: %v", err)
continue
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(
inputs, cluster.sweepFeeRate,
bestHeight,
)
if err != nil {
log.Errorf("Unable to sweep "+
"inputs: %v", err)
}
}
}
@ -441,7 +533,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
bestHeight = epoch.Height
log.Debugf("New blocks: height=%v, sha=%v",
log.Debugf("New block: height=%v, sha=%v",
epoch.Height, epoch.Hash)
if err := s.scheduleSweep(bestHeight); err != nil {
@ -454,6 +546,62 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
}
}
// bucketForFeeReate determines the proper bucket for a fee rate. This is done
// in order to batch inputs with similar fee rates together.
func (s *UtxoSweeper) bucketForFeeRate(
feeRate lnwallet.SatPerKWeight) lnwallet.SatPerKWeight {
minBucket := s.relayFeeRate + lnwallet.SatPerKWeight(s.cfg.FeeRateBucketSize)
return lnwallet.SatPerKWeight(
math.Ceil(float64(feeRate) / float64(minBucket)),
)
}
// clusterBySweepFeeRate takes the set of pending inputs within the UtxoSweeper
// and clusters those together with similar fee rates. Each cluster contains a
// sweep fee rate, which is determined by calculating the average fee rate of
// all inputs within that cluster.
func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
bucketInputs := make(map[lnwallet.SatPerKWeight]pendingInputs)
inputFeeRates := make(map[wire.OutPoint]lnwallet.SatPerKWeight)
// First, we'll group together all inputs with similar fee rates. This
// is done by determining the fee rate bucket they should belong in.
for op, input := range s.pendingInputs {
feeRate, err := s.feeRateForPreference(input.feePreference)
if err != nil {
log.Warnf("Skipping input %v: %v", op, err)
continue
}
bucket := s.bucketForFeeRate(feeRate)
inputs, ok := bucketInputs[bucket]
if !ok {
inputs = make(pendingInputs)
bucketInputs[bucket] = inputs
}
inputs[op] = input
inputFeeRates[op] = feeRate
}
// We'll then determine the sweep fee rate for each set of inputs by
// calculating the average fee rate of the inputs within each set.
inputClusters := make([]inputCluster, 0, len(bucketInputs))
for _, inputs := range bucketInputs {
var sweepFeeRate lnwallet.SatPerKWeight
for op := range inputs {
sweepFeeRate += inputFeeRates[op]
}
sweepFeeRate /= lnwallet.SatPerKWeight(len(inputs))
inputClusters = append(inputClusters, inputCluster{
sweepFeeRate: sweepFeeRate,
inputs: inputs,
})
}
return inputClusters
}
// scheduleSweep starts the sweep timer to create an opportunity for more inputs
// to be added.
func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {
@ -464,27 +612,25 @@ func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {
return nil
}
// Retrieve fee estimate for input filtering and final tx fee
// calculation.
satPerKW, err := s.cfg.FeeEstimator.EstimateFeePerKW(
s.cfg.SweepTxConfTarget,
)
if err != nil {
return fmt.Errorf("estimate fee: %v", err)
// We'll only start our timer once we have inputs we're able to sweep.
startTimer := false
for _, cluster := range s.clusterBySweepFeeRate() {
// Examine pending inputs and try to construct lists of inputs.
inputLists, err := s.getInputLists(cluster, currentHeight)
if err != nil {
return fmt.Errorf("get input lists: %v", err)
}
log.Infof("Sweep candidates at height=%v with fee_rate=%v, "+
"yield %v distinct txns", currentHeight,
cluster.sweepFeeRate, len(inputLists))
if len(inputLists) != 0 {
startTimer = true
break
}
}
// Examine pending inputs and try to construct lists of inputs.
inputLists, err := s.getInputLists(currentHeight, satPerKW)
if err != nil {
return fmt.Errorf("get input lists: %v", err)
}
log.Infof("Sweep candidates at height=%v, yield %v distinct txns",
currentHeight, len(inputLists))
// If there are no input sets, there is nothing sweepable and we can
// return without starting the timer.
if len(inputLists) == 0 {
if !startTimer {
return nil
}
@ -533,13 +679,13 @@ func (s *UtxoSweeper) signalAndRemove(outpoint *wire.OutPoint, result Result) {
delete(s.pendingInputs, *outpoint)
}
// getInputLists goes through all pending inputs and constructs sweep lists,
// each up to the configured maximum number of inputs. Negative yield inputs are
// skipped. Transactions with an output below the dust limit are not published.
// Those inputs remain pending and will be bundled with future inputs if
// possible.
func (s *UtxoSweeper) getInputLists(currentHeight int32,
satPerKW lnwallet.SatPerKWeight) ([]inputSet, error) {
// getInputLists goes through the given inputs and constructs multiple distinct
// sweep lists with the given fee rate, each up to the configured maximum number
// of inputs. Negative yield inputs are skipped. Transactions with an output
// below the dust limit are not published. Those inputs remain pending and will
// be bundled with future inputs if possible.
func (s *UtxoSweeper) getInputLists(cluster inputCluster,
currentHeight int32) ([]inputSet, error) {
// Filter for inputs that need to be swept. Create two lists: all
// sweepable inputs and a list containing only the new, never tried
@ -552,7 +698,7 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
// consisting of only new inputs to the list, to make sure that new
// inputs are given a good, isolated chance of being published.
var newInputs, retryInputs []input.Input
for _, input := range s.pendingInputs {
for _, input := range cluster.inputs {
// Skip inputs that have a minimum publish height that is not
// yet reached.
if input.minPublishHeight > currentHeight {
@ -573,9 +719,8 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
if len(retryInputs) > 0 {
var err error
allSets, err = generateInputPartitionings(
append(retryInputs, newInputs...),
s.relayFeePerKW, satPerKW,
s.cfg.MaxInputsPerTx,
append(retryInputs, newInputs...), s.relayFeeRate,
cluster.sweepFeeRate, s.cfg.MaxInputsPerTx,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
@ -584,8 +729,7 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
// Create sets for just the new inputs.
newSets, err := generateInputPartitionings(
newInputs,
s.relayFeePerKW, satPerKW,
newInputs, s.relayFeeRate, cluster.sweepFeeRate,
s.cfg.MaxInputsPerTx,
)
if err != nil {
@ -602,23 +746,22 @@ func (s *UtxoSweeper) getInputLists(currentHeight int32,
// sweep takes a set of preselected inputs, creates a sweep tx and publishes the
// tx. The output address is only marked as used if the publish succeeds.
func (s *UtxoSweeper) sweep(inputs inputSet,
satPerKW lnwallet.SatPerKWeight, currentHeight int32) error {
func (s *UtxoSweeper) sweep(inputs inputSet, feeRate lnwallet.SatPerKWeight,
currentHeight int32) error {
var err error
// Generate output script if no unused script available.
// Generate an output script if there isn't an unused script available.
if s.currentOutputScript == nil {
s.currentOutputScript, err = s.cfg.GenSweepScript()
pkScript, err := s.cfg.GenSweepScript()
if err != nil {
return fmt.Errorf("gen sweep script: %v", err)
}
s.currentOutputScript = pkScript
}
// Create sweep tx.
tx, err := createSweepTx(
inputs, s.currentOutputScript,
uint32(currentHeight), satPerKW, s.cfg.Signer,
inputs, s.currentOutputScript, uint32(currentHeight), feeRate,
s.cfg.Signer,
)
if err != nil {
return fmt.Errorf("create sweep tx: %v", err)
@ -651,8 +794,8 @@ func (s *UtxoSweeper) sweep(inputs inputSet,
return fmt.Errorf("publish tx: %v", err)
}
// Keep outputScript in case of an error, so that it can be reused for
// the next tx and causes no address inflation.
// Keep the output script in case of an error, so that it can be reused
// for the next transaction and causes no address inflation.
if err == nil {
s.currentOutputScript = nil
}
@ -692,6 +835,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet,
})
}
}
return nil
}

View File

@ -10,10 +10,11 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
)
var (
@ -22,6 +23,8 @@ var (
testMaxSweepAttempts = 3
testMaxInputsPerTx = 3
defaultFeePref = FeePreference{ConfTarget: 1}
)
type sweeperTestContext struct {
@ -96,7 +99,7 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
backend := newMockBackend(notifier)
estimator := newMockFeeEstimator(10000, 1000)
estimator := newMockFeeEstimator(10000, lnwallet.FeePerKwFloor)
publishChan := make(chan wire.MsgTx, 2)
ctx := &sweeperTestContext{
@ -127,10 +130,9 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
ctx.timeoutChan <- c
return c
},
Store: store,
Signer: &mockSigner{},
SweepTxConfTarget: 1,
ChainIO: &mockChainIO{},
Store: store,
Signer: &mockSigner{},
ChainIO: &mockChainIO{},
GenSweepScript: func() ([]byte, error) {
script := []byte{outputScriptCount}
outputScriptCount++
@ -143,6 +145,8 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
// Use delta func without random factor.
return 1 << uint(attempts-1)
},
MaxFeeRate: DefaultMaxFeeRate,
FeeRateBucketSize: DefaultFeeRateBucketSize,
})
ctx.sweeper.Start()
@ -150,6 +154,14 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
return ctx
}
func (ctx *sweeperTestContext) restartSweeper() {
ctx.t.Helper()
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
}
func (ctx *sweeperTestContext) tick() {
testLog.Trace("Waiting for tick to be consumed")
select {
@ -251,11 +263,95 @@ func (ctx *sweeperTestContext) expectResult(c chan Result, expected error) {
}
}
// receiveSpendTx receives the transaction sent through the given resultChan.
func receiveSpendTx(t *testing.T, resultChan chan Result) *wire.MsgTx {
t.Helper()
var result Result
select {
case result = <-resultChan:
case <-time.After(5 * time.Second):
t.Fatal("no sweep result received")
}
if result.Err != nil {
t.Fatalf("expected successful spend, but received error "+
"\"%v\" instead", result.Err)
}
return result.Tx
}
// assertTxSweepsInputs ensures that the transaction returned within the value
// received from resultChan spends the given inputs.
func assertTxSweepsInputs(t *testing.T, sweepTx *wire.MsgTx,
inputs ...input.Input) {
t.Helper()
if len(sweepTx.TxIn) != len(inputs) {
t.Fatalf("expected sweep tx to contain %d inputs, got %d",
len(inputs), len(sweepTx.TxIn))
}
m := make(map[wire.OutPoint]struct{}, len(inputs))
for _, input := range inputs {
m[*input.OutPoint()] = struct{}{}
}
for _, txIn := range sweepTx.TxIn {
if _, ok := m[txIn.PreviousOutPoint]; !ok {
t.Fatalf("expected tx %v to spend input %v",
txIn.PreviousOutPoint, sweepTx.TxHash())
}
}
}
// assertTxFeeRate asserts that the transaction was created with the given
// inputs and fee rate.
//
// NOTE: This assumes that transactions only have one output, as this is the
// only type of transaction the UtxoSweeper can create at the moment.
func assertTxFeeRate(t *testing.T, tx *wire.MsgTx,
expectedFeeRate lnwallet.SatPerKWeight, inputs ...input.Input) {
t.Helper()
if len(tx.TxIn) != len(inputs) {
t.Fatalf("expected %d inputs, got %d", len(tx.TxIn), len(inputs))
}
m := make(map[wire.OutPoint]input.Input, len(inputs))
for _, input := range inputs {
m[*input.OutPoint()] = input
}
var inputAmt int64
for _, txIn := range tx.TxIn {
input, ok := m[txIn.PreviousOutPoint]
if !ok {
t.Fatalf("expected input %v to be provided",
txIn.PreviousOutPoint)
}
inputAmt += input.SignDesc().Output.Value
}
outputAmt := tx.TxOut[0].Value
fee := btcutil.Amount(inputAmt - outputAmt)
_, txWeight, _, _ := getWeightEstimate(inputs)
expectedFee := expectedFeeRate.FeeForWeight(txWeight)
if fee != expectedFee {
t.Fatalf("expected fee rate %v results in %v fee, got %v fee",
expectedFeeRate, expectedFee, fee)
}
}
// TestSuccess tests the sweeper happy flow.
func TestSuccess(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -305,7 +401,7 @@ func TestDust(t *testing.T) {
// sweep tx output script (P2WPKH).
dustInput := createTestInput(5260, input.CommitmentTimeLock)
_, err := ctx.sweeper.SweepInput(&dustInput)
_, err := ctx.sweeper.SweepInput(&dustInput, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -316,7 +412,7 @@ func TestDust(t *testing.T) {
// Sweep another input that brings the tx output above the dust limit.
largeInput := createTestInput(100000, input.CommitmentTimeLock)
_, err = ctx.sweeper.SweepInput(&largeInput)
_, err = ctx.sweeper.SweepInput(&largeInput, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -345,7 +441,9 @@ func TestNegativeInput(t *testing.T) {
// Sweep an input large enough to cover fees, so in any case the tx
// output will be above the dust limit.
largeInput := createTestInput(100000, input.CommitmentNoDelay)
largeInputResult, err := ctx.sweeper.SweepInput(&largeInput)
largeInputResult, err := ctx.sweeper.SweepInput(
&largeInput, defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -354,7 +452,7 @@ func TestNegativeInput(t *testing.T) {
// the HtlcAcceptedRemoteSuccess input type adds more in fees than its
// value at the current fee level.
negInput := createTestInput(2900, input.HtlcOfferedRemoteTimeout)
negInputResult, err := ctx.sweeper.SweepInput(&negInput)
negInputResult, err := ctx.sweeper.SweepInput(&negInput, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -362,7 +460,9 @@ func TestNegativeInput(t *testing.T) {
// Sweep a third input that has a smaller output than the previous one,
// but yields positively because of its lower weight.
positiveInput := createTestInput(2800, input.CommitmentNoDelay)
positiveInputResult, err := ctx.sweeper.SweepInput(&positiveInput)
positiveInputResult, err := ctx.sweeper.SweepInput(
&positiveInput, defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -373,13 +473,7 @@ func TestNegativeInput(t *testing.T) {
// contain the large input. The negative input should stay out of sweeps
// until fees come down to get a positive net yield.
sweepTx1 := ctx.receiveTx()
if !testTxIns(&sweepTx1, []*wire.OutPoint{
largeInput.OutPoint(), positiveInput.OutPoint(),
}) {
t.Fatalf("Tx does not contain expected inputs: %v",
spew.Sdump(sweepTx1))
}
assertTxSweepsInputs(t, &sweepTx1, &largeInput, &positiveInput)
ctx.backend.mine()
@ -389,9 +483,11 @@ func TestNegativeInput(t *testing.T) {
// Lower fee rate so that the negative input is no longer negative.
ctx.estimator.updateFees(1000, 1000)
// Create another large input
// Create another large input.
secondLargeInput := createTestInput(100000, input.CommitmentNoDelay)
secondLargeInputResult, err := ctx.sweeper.SweepInput(&secondLargeInput)
secondLargeInputResult, err := ctx.sweeper.SweepInput(
&secondLargeInput, defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -399,11 +495,7 @@ func TestNegativeInput(t *testing.T) {
ctx.tick()
sweepTx2 := ctx.receiveTx()
if !testTxIns(&sweepTx2, []*wire.OutPoint{
secondLargeInput.OutPoint(), negInput.OutPoint(),
}) {
t.Fatal("Tx does not contain expected inputs")
}
assertTxSweepsInputs(t, &sweepTx2, &secondLargeInput, &negInput)
ctx.backend.mine()
@ -413,32 +505,13 @@ func TestNegativeInput(t *testing.T) {
ctx.finish(1)
}
func testTxIns(tx *wire.MsgTx, inputs []*wire.OutPoint) bool {
if len(tx.TxIn) != len(inputs) {
return false
}
ins := make(map[wire.OutPoint]struct{})
for _, in := range tx.TxIn {
ins[in.PreviousOutPoint] = struct{}{}
}
for _, expectedIn := range inputs {
if _, ok := ins[*expectedIn]; !ok {
return false
}
}
return true
}
// TestChunks asserts that large sets of inputs are split into multiple txes.
func TestChunks(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep five inputs.
for _, input := range spendableInputs[:5] {
_, err := ctx.sweeper.SweepInput(input)
_, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -479,12 +552,16 @@ func TestRemoteSpend(t *testing.T) {
func testRemoteSpend(t *testing.T, postSweep bool) {
ctx := createSweeperTestContext(t)
resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan1, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
resultChan2, err := ctx.sweeper.SweepInput(spendableInputs[1])
resultChan2, err := ctx.sweeper.SweepInput(
spendableInputs[1], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -557,12 +634,13 @@ func testRemoteSpend(t *testing.T, postSweep bool) {
func TestIdempotency(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[0])
input := spendableInputs[0]
resultChan1, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
resultChan2, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan2, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -571,7 +649,7 @@ func TestIdempotency(t *testing.T) {
ctx.receiveTx()
resultChan3, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan3, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -588,7 +666,7 @@ func TestIdempotency(t *testing.T) {
// immediately receive the spend notification with a spending tx hash.
// Because the sweeper kept track of all of its sweep txes, it will
// recognize the spend as its own.
resultChan4, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan4, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -615,8 +693,8 @@ func TestRestart(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep input and expect sweep tx.
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
if err != nil {
input1 := spendableInputs[0]
if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil {
t.Fatal(err)
}
ctx.tick()
@ -624,21 +702,19 @@ func TestRestart(t *testing.T) {
ctx.receiveTx()
// Restart sweeper.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
// Simulate other subsystem (eg contract resolver) re-offering inputs.
spendChan1, err := ctx.sweeper.SweepInput(spendableInputs[0])
spendChan1, err := ctx.sweeper.SweepInput(input1, defaultFeePref)
if err != nil {
t.Fatal(err)
}
spendChan2, err := ctx.sweeper.SweepInput(spendableInputs[1])
input2 := spendableInputs[1]
spendChan2, err := ctx.sweeper.SweepInput(input2, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -676,9 +752,7 @@ func TestRestart(t *testing.T) {
}
// Restart sweeper again. No action is expected.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
@ -693,14 +767,14 @@ func TestRestartRemoteSpend(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep input.
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
if err != nil {
input1 := spendableInputs[0]
if _, err := ctx.sweeper.SweepInput(input1, defaultFeePref); err != nil {
t.Fatal(err)
}
// Sweep another input.
_, err = ctx.sweeper.SweepInput(spendableInputs[1])
if err != nil {
input2 := spendableInputs[1]
if _, err := ctx.sweeper.SweepInput(input2, defaultFeePref); err != nil {
t.Fatal(err)
}
@ -709,10 +783,7 @@ func TestRestartRemoteSpend(t *testing.T) {
sweepTx := ctx.receiveTx()
// Restart sweeper.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
@ -723,12 +794,11 @@ func TestRestartRemoteSpend(t *testing.T) {
remoteTx := &wire.MsgTx{
TxIn: []*wire.TxIn{
{
PreviousOutPoint: *(spendableInputs[1].OutPoint()),
PreviousOutPoint: *(input2.OutPoint()),
},
},
}
err = ctx.backend.publishTransaction(remoteTx)
if err != nil {
if err := ctx.backend.publishTransaction(remoteTx); err != nil {
t.Fatal(err)
}
@ -736,7 +806,7 @@ func TestRestartRemoteSpend(t *testing.T) {
ctx.backend.mine()
// Simulate other subsystem (eg contract resolver) re-offering input 0.
spendChan, err := ctx.sweeper.SweepInput(spendableInputs[0])
spendChan, err := ctx.sweeper.SweepInput(input1, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -760,8 +830,8 @@ func TestRestartConfirmed(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep input.
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
if err != nil {
input := spendableInputs[0]
if _, err := ctx.sweeper.SweepInput(input, defaultFeePref); err != nil {
t.Fatal(err)
}
@ -770,10 +840,7 @@ func TestRestartConfirmed(t *testing.T) {
ctx.receiveTx()
// Restart sweeper.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
// Expect last tx to be republished.
ctx.receiveTx()
@ -782,7 +849,7 @@ func TestRestartConfirmed(t *testing.T) {
ctx.backend.mine()
// Simulate other subsystem (eg contract resolver) re-offering input 0.
spendChan, err := ctx.sweeper.SweepInput(spendableInputs[0])
spendChan, err := ctx.sweeper.SweepInput(input, defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -801,7 +868,7 @@ func TestRestartConfirmed(t *testing.T) {
func TestRestartRepublish(t *testing.T) {
ctx := createSweeperTestContext(t)
_, err := ctx.sweeper.SweepInput(spendableInputs[0])
_, err := ctx.sweeper.SweepInput(spendableInputs[0], defaultFeePref)
if err != nil {
t.Fatal(err)
}
@ -811,9 +878,7 @@ func TestRestartRepublish(t *testing.T) {
sweepTx := ctx.receiveTx()
// Restart sweeper again. No action is expected.
ctx.sweeper.Stop()
ctx.sweeper = New(ctx.sweeper.cfg)
ctx.sweeper.Start()
ctx.restartSweeper()
republishedTx := ctx.receiveTx()
@ -831,7 +896,9 @@ func TestRestartRepublish(t *testing.T) {
func TestRetry(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan0, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan0, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -846,7 +913,9 @@ func TestRetry(t *testing.T) {
ctx.notifier.NotifyEpoch(1000)
// Offer a fresh input.
resultChan1, err := ctx.sweeper.SweepInput(spendableInputs[1])
resultChan1, err := ctx.sweeper.SweepInput(
spendableInputs[1], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -871,7 +940,9 @@ func TestRetry(t *testing.T) {
func TestGiveUp(t *testing.T) {
ctx := createSweeperTestContext(t)
resultChan0, err := ctx.sweeper.SweepInput(spendableInputs[0])
resultChan0, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
if err != nil {
t.Fatal(err)
}
@ -902,3 +973,62 @@ func TestGiveUp(t *testing.T) {
ctx.finish(1)
}
// TestDifferentFeePreferences ensures that the sweeper can have different
// transactions for different fee preferences. These transactions should be
// broadcast from highest to lowest fee rate.
func TestDifferentFeePreferences(t *testing.T) {
ctx := createSweeperTestContext(t)
// Throughout this test, we'll be attempting to sweep three inputs, two
// with the higher fee preference, and the last with the lower. We do
// this to ensure the sweeper can broadcast distinct transactions for
// each sweep with a different fee preference.
lowFeePref := FeePreference{
ConfTarget: 12,
}
ctx.estimator.blocksToFee[lowFeePref.ConfTarget] = 5000
highFeePref := FeePreference{
ConfTarget: 6,
}
ctx.estimator.blocksToFee[highFeePref.ConfTarget] = 10000
input1 := spendableInputs[0]
resultChan1, err := ctx.sweeper.SweepInput(input1, highFeePref)
if err != nil {
t.Fatal(err)
}
input2 := spendableInputs[1]
resultChan2, err := ctx.sweeper.SweepInput(input2, highFeePref)
if err != nil {
t.Fatal(err)
}
input3 := spendableInputs[2]
resultChan3, err := ctx.sweeper.SweepInput(input3, lowFeePref)
if err != nil {
t.Fatal(err)
}
// Start the sweeper's batch ticker, which should cause the sweep
// transactions to be broadcast in order of high to low fee preference.
ctx.tick()
// The first transaction broadcast should be the one spending the higher
// fee rate inputs.
sweepTx1 := ctx.receiveTx()
assertTxSweepsInputs(t, &sweepTx1, input1, input2)
// The second should be the one spending the lower fee rate inputs.
sweepTx2 := ctx.receiveTx()
assertTxSweepsInputs(t, &sweepTx2, input3)
// With the transactions broadcast, we'll mine a block to so that the
// result is delivered to each respective client.
ctx.backend.mine()
resultChans := []chan Result{resultChan1, resultChan2, resultChan3}
for _, resultChan := range resultChans {
ctx.expectResult(resultChan, nil)
}
ctx.finish(1)
}

View File

@ -30,6 +30,14 @@ type FeePreference struct {
FeeRate lnwallet.SatPerKWeight
}
// String returns a human-readable string of the fee preference.
func (p FeePreference) String() string {
if p.ConfTarget != 0 {
return fmt.Sprintf("%v blocks", p.ConfTarget)
}
return p.FeeRate.String()
}
// DetermineFeePerKw will determine the fee in sat/kw that should be paid given
// an estimator, a confirmation target, and a manual value for sat/byte. A
// value is chosen based on the two free parameters as one, or both of them can

View File

@ -155,6 +155,12 @@ import (
var byteOrder = binary.BigEndian
const (
// kgtnOutputConfTarget is the default confirmation target we'll use for
// sweeps of CSV delayed outputs.
kgtnOutputConfTarget = 6
)
var (
// ErrContractNotFound is returned when the nursery is unable to
// retrieve information about a queried contract.
@ -196,7 +202,7 @@ type NurseryConfig struct {
Store NurseryStore
// Sweep sweeps an input back to the wallet.
SweepInput func(input input.Input) (chan sweep.Result, error)
SweepInput func(input.Input, sweep.FeePreference) (chan sweep.Result, error)
}
// utxoNursery is a system dedicated to incubating time-locked outputs created
@ -804,12 +810,13 @@ func (u *utxoNursery) sweepMatureOutputs(classHeight uint32,
utxnLog.Infof("Sweeping %v CSV-delayed outputs with sweep tx for "+
"height %v", len(kgtnOutputs), classHeight)
feePref := sweep.FeePreference{ConfTarget: kgtnOutputConfTarget}
for _, output := range kgtnOutputs {
// Create local copy to prevent pointer to loop variable to be
// passed in with disastrous consequences.
local := output
resultChan, err := u.cfg.SweepInput(&local)
resultChan, err := u.cfg.SweepInput(&local, feePref)
if err != nil {
return err
}

View File

@ -1051,7 +1051,9 @@ func newMockSweeper(t *testing.T) *mockSweeper {
}
}
func (s *mockSweeper) sweepInput(input input.Input) (chan sweep.Result, error) {
func (s *mockSweeper) sweepInput(input input.Input,
_ sweep.FeePreference) (chan sweep.Result, error) {
utxnLog.Debugf("mockSweeper sweepInput called for %v", *input.OutPoint())
select {