Merge pull request #3814 from joostjager/sweeper-add-utxo

sweep: add wallet inputs to reach dust limit
This commit is contained in:
Joost Jager 2019-12-18 09:53:04 +01:00 committed by GitHub
commit 7ecbe22531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 653 additions and 195 deletions

@ -74,6 +74,10 @@ const (
// expressed in sat/kw.
defaultBitcoinStaticFeePerKW = chainfee.SatPerKWeight(12500)
// defaultBitcoinStaticMinRelayFeeRate is the min relay fee used for
// static estimators.
defaultBitcoinStaticMinRelayFeeRate = chainfee.FeePerKwFloor
// defaultLitecoinStaticFeePerKW is the fee rate of 200 sat/vbyte
// expressed in sat/kw.
defaultLitecoinStaticFeePerKW = chainfee.SatPerKWeight(50000)
@ -182,7 +186,8 @@ func newChainControlFromConfig(cfg *config, chanDB *channeldb.DB,
}
cc.minHtlcIn = cfg.Bitcoin.MinHTLCIn
cc.feeEstimator = chainfee.NewStaticEstimator(
defaultBitcoinStaticFeePerKW, 0,
defaultBitcoinStaticFeePerKW,
defaultBitcoinStaticMinRelayFeeRate,
)
case litecoinChain:
cc.routingPolicy = htlcswitch.ForwardingPolicy{

@ -206,7 +206,7 @@ func (c *commitSweepResolver) Resolve() (ContractResolver, error) {
c.log.Infof("sweeping commit output")
feePref := sweep.FeePreference{ConfTarget: commitOutputConfTarget}
resultChan, err := c.Sweeper.SweepInput(inp, feePref)
resultChan, err := c.Sweeper.SweepInput(inp, sweep.Params{Fee: feePref})
if err != nil {
c.log.Errorf("unable to sweep input: %v", err)

@ -102,8 +102,8 @@ func newMockSweeper() *mockSweeper {
}
}
func (s *mockSweeper) SweepInput(input input.Input,
feePreference sweep.FeePreference) (chan sweep.Result, error) {
func (s *mockSweeper) SweepInput(input input.Input, params sweep.Params) (
chan sweep.Result, error) {
s.sweptInputs <- input

@ -43,8 +43,8 @@ type OnionProcessor interface {
// UtxoSweeper defines the sweep functions that contract court requires.
type UtxoSweeper interface {
// SweepInput sweeps inputs back into the wallet.
SweepInput(input input.Input,
feePreference sweep.FeePreference) (chan sweep.Result, error)
SweepInput(input input.Input, params sweep.Params) (chan sweep.Result,
error)
// CreateSweepTx accepts a list of inputs and signs and generates a txn
// that spends from them. This method also makes an accurate fee

@ -536,7 +536,7 @@ func (w *WalletKit) BumpFee(ctx context.Context,
}
input := input.NewBaseInput(op, witnessType, signDesc, uint32(currentHeight))
if _, err = w.cfg.Sweeper.SweepInput(input, feePreference); err != nil {
if _, err = w.cfg.Sweeper.SweepInput(input, sweep.Params{Fee: feePreference}); err != nil {
return nil, err
}

@ -800,10 +800,10 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
}
s.sweeper = sweep.New(&sweep.UtxoSweeperConfig{
FeeEstimator: cc.feeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.wallet),
Signer: cc.wallet.Cfg.Signer,
PublishTransaction: cc.wallet.PublishTransaction,
FeeEstimator: cc.feeEstimator,
GenSweepScript: newSweepPkScriptGen(cc.wallet),
Signer: cc.wallet.Cfg.Signer,
Wallet: cc.wallet,
NewBatchTimer: func() <-chan time.Time {
return time.NewTimer(sweep.DefaultBatchWindowDuration).C
},

@ -2,6 +2,8 @@ package sweep
import (
"sync"
"testing"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
@ -11,6 +13,8 @@ import (
// mockBackend simulates a chain backend for realistic behaviour in unit tests
// around double spends.
type mockBackend struct {
t *testing.T
lock sync.Mutex
notifier *MockNotifier
@ -19,14 +23,20 @@ type mockBackend struct {
unconfirmedTxes map[chainhash.Hash]*wire.MsgTx
unconfirmedSpendInputs map[wire.OutPoint]struct{}
publishChan chan wire.MsgTx
walletUtxos []*lnwallet.Utxo
}
func newMockBackend(notifier *MockNotifier) *mockBackend {
func newMockBackend(t *testing.T, notifier *MockNotifier) *mockBackend {
return &mockBackend{
t: t,
notifier: notifier,
unconfirmedTxes: make(map[chainhash.Hash]*wire.MsgTx),
confirmedSpendInputs: make(map[wire.OutPoint]struct{}),
unconfirmedSpendInputs: make(map[wire.OutPoint]struct{}),
publishChan: make(chan wire.MsgTx, 2),
}
}
@ -65,6 +75,27 @@ func (b *mockBackend) publishTransaction(tx *wire.MsgTx) error {
return nil
}
func (b *mockBackend) PublishTransaction(tx *wire.MsgTx) error {
log.Tracef("Publishing tx %v", tx.TxHash())
err := b.publishTransaction(tx)
select {
case b.publishChan <- *tx:
case <-time.After(defaultTestTimeout):
b.t.Fatalf("unexpected tx published")
}
return err
}
func (b *mockBackend) ListUnspentWitness(minconfirms, maxconfirms int32) (
[]*lnwallet.Utxo, error) {
return b.walletUtxos, nil
}
func (b *mockBackend) WithCoinSelectLock(f func() error) error {
return f()
}
func (b *mockBackend) deleteUnconfirmed(txHash chainhash.Hash) {
b.lock.Lock()
defer b.lock.Unlock()

27
sweep/interface.go Normal file

@ -0,0 +1,27 @@
package sweep
import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/lnwallet"
)
// Wallet contains all wallet related functionality required by sweeper.
type Wallet interface {
// PublishTransaction performs cursory validation (dust checks, etc) and
// broadcasts the passed transaction to the Bitcoin network.
PublishTransaction(tx *wire.MsgTx) error
// ListUnspentWitness returns all unspent outputs which are version 0
// witness programs. The 'minconfirms' and 'maxconfirms' parameters
// indicate the minimum and maximum number of confirmations an output
// needs in order to be returned by this method.
ListUnspentWitness(minconfirms, maxconfirms int32) ([]*lnwallet.Utxo,
error)
// WithCoinSelectLock will execute the passed function closure in a
// synchronized manner preventing any coin selection operations from
// proceeding while the closure if executing. This can be seen as the
// ability to execute a function closure under an exclusive coin
// selection lock.
WithCoinSelectLock(f func() error) error
}

@ -3,7 +3,6 @@ package sweep
import (
"errors"
"fmt"
"math"
"math/rand"
"sort"
"sync"
@ -62,17 +61,24 @@ var (
DefaultMaxSweepAttempts = 10
)
// Params contains the parameters that control the sweeping process.
type Params struct {
// Fee is the fee preference of the client who requested the input to be
// swept. If a confirmation target is specified, then we'll map it into
// a fee rate whenever we attempt to cluster inputs for a sweep.
Fee FeePreference
}
// pendingInput is created when an input reaches the main loop for the first
// time. It tracks all relevant state that is needed for sweeping.
// time. It wraps the input and tracks all relevant state that is needed for
// sweeping.
type pendingInput struct {
input.Input
// listeners is a list of channels over which the final outcome of the
// sweep needs to be broadcasted.
listeners []chan Result
// input is the original struct that contains the input and sign
// descriptor.
input input.Input
// ntfnRegCancel is populated with a function that cancels the chain
// notifier spend registration.
ntfnRegCancel func()
@ -85,17 +91,21 @@ type pendingInput struct {
// made to sweep this tx.
publishAttempts int
// feePreference is the fee preference of the client who requested the
// input to be swept. If a confirmation target is specified, then we'll
// map it into a fee rate whenever we attempt to cluster inputs for a
// sweep.
feePreference FeePreference
// params contains the parameters that control the sweeping process.
params Params
// lastFeeRate is the most recent fee rate used for this input within a
// transaction broadcast to the network.
lastFeeRate chainfee.SatPerKWeight
}
// parameters returns the sweep parameters for this input.
//
// NOTE: Part of the txInput interface.
func (p *pendingInput) parameters() Params {
return p.params
}
// pendingInputs is a type alias for a set of pending inputs.
type pendingInputs = map[wire.OutPoint]*pendingInput
@ -200,9 +210,8 @@ type UtxoSweeperConfig struct {
// transaction.
FeeEstimator chainfee.Estimator
// PublishTransaction facilitates the process of broadcasting a signed
// transaction to the appropriate network.
PublishTransaction func(*wire.MsgTx) error
// Wallet contains the wallet functions that sweeper requires.
Wallet Wallet
// NewBatchTimer creates a channel that will be sent on when a certain
// time window has passed. During this time window, new inputs can still
@ -246,8 +255,8 @@ type UtxoSweeperConfig struct {
// of 10 would result in the following fee rate buckets up to the
// maximum fee rate:
//
// #1: min = 1 sat/vbyte, max = 10 sat/vbyte
// #2: min = 11 sat/vbyte, max = 20 sat/vbyte...
// #1: min = 1 sat/vbyte, max (exclusive) = 11 sat/vbyte
// #2: min = 11 sat/vbyte, max (exclusive) = 21 sat/vbyte...
FeeRateBucketSize int
}
@ -267,9 +276,9 @@ type Result struct {
// sweepInputMessage structs are used in the internal channel between the
// SweepInput call and the sweeper main loop.
type sweepInputMessage struct {
input input.Input
feePreference FeePreference
resultChan chan Result
input input.Input
params Params
resultChan chan Result
}
// New returns a new Sweeper instance.
@ -311,7 +320,7 @@ func (s *UtxoSweeper) Start() error {
// Error can be ignored. Because we are starting up, there are
// no pending inputs to update based on the publish result.
err := s.cfg.PublishTransaction(lastTx)
err := s.cfg.Wallet.PublishTransaction(lastTx)
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Errorf("last tx publish: %v", err)
}
@ -369,26 +378,27 @@ func (s *UtxoSweeper) Stop() error {
// Because it is an interface and we don't know what is exactly behind it, we
// cannot make a local copy in sweeper.
func (s *UtxoSweeper) SweepInput(input input.Input,
feePreference FeePreference) (chan Result, error) {
params Params) (chan Result, error) {
if input == nil || input.OutPoint() == nil || input.SignDesc() == nil {
return nil, errors.New("nil input received")
}
// Ensure the client provided a sane fee preference.
if _, err := s.feeRateForPreference(feePreference); err != nil {
if _, err := s.feeRateForPreference(params.Fee); err != nil {
return nil, err
}
log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
"time_lock=%v, amount=%v, fee_preference=%v", input.OutPoint(),
input.WitnessType(), input.BlocksToMaturity(),
btcutil.Amount(input.SignDesc().Output.Value), feePreference)
btcutil.Amount(input.SignDesc().Output.Value),
params.Fee)
sweeperInput := &sweepInputMessage{
input: input,
feePreference: feePreference,
resultChan: make(chan Result, 1),
input: input,
params: params,
resultChan: make(chan Result, 1),
}
// Deliver input to main event loop.
@ -469,9 +479,9 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// channel will be appended to this slice.
pendInput = &pendingInput{
listeners: []chan Result{input.resultChan},
input: input.input,
Input: input.input,
minPublishHeight: bestHeight,
feePreference: input.feePreference,
params: input.params,
}
s.pendingInputs[outpoint] = pendInput
@ -589,27 +599,10 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
inputClusters[j].sweepFeeRate
})
for _, cluster := range inputClusters {
// Examine pending inputs and try to construct
// lists of inputs.
inputLists, err := s.getInputLists(
cluster, bestHeight,
)
err := s.sweepCluster(cluster, bestHeight)
if err != nil {
log.Errorf("Unable to examine pending "+
"inputs: %v", err)
continue
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(
inputs, cluster.sweepFeeRate,
bestHeight,
)
if err != nil {
log.Errorf("Unable to sweep "+
"inputs: %v", err)
}
log.Errorf("input cluster sweep: %v",
err)
}
}
@ -635,15 +628,39 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
}
}
// sweepCluster tries to sweep the given input cluster.
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
currentHeight int32) error {
// Execute the sweep within a coin select lock. Otherwise the coins that
// we are going to spend may be selected for other transactions like
// funding of a channel.
return s.cfg.Wallet.WithCoinSelectLock(func() error {
// Examine pending inputs and try to construct
// lists of inputs.
inputLists, err := s.getInputLists(cluster, currentHeight)
if err != nil {
return fmt.Errorf("unable to examine pending inputs: %v", err)
}
// Sweep selected inputs.
for _, inputs := range inputLists {
err := s.sweep(inputs, cluster.sweepFeeRate, currentHeight)
if err != nil {
return fmt.Errorf("unable to sweep inputs: %v", err)
}
}
return nil
})
}
// bucketForFeeReate determines the proper bucket for a fee rate. This is done
// in order to batch inputs with similar fee rates together.
func (s *UtxoSweeper) bucketForFeeRate(
feeRate chainfee.SatPerKWeight) chainfee.SatPerKWeight {
feeRate chainfee.SatPerKWeight) int {
minBucket := s.relayFeeRate + chainfee.SatPerKWeight(s.cfg.FeeRateBucketSize)
return chainfee.SatPerKWeight(
math.Ceil(float64(feeRate) / float64(minBucket)),
)
return int(feeRate-s.relayFeeRate) / s.cfg.FeeRateBucketSize
}
// clusterBySweepFeeRate takes the set of pending inputs within the UtxoSweeper
@ -651,23 +668,23 @@ func (s *UtxoSweeper) bucketForFeeRate(
// sweep fee rate, which is determined by calculating the average fee rate of
// all inputs within that cluster.
func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
bucketInputs := make(map[chainfee.SatPerKWeight]pendingInputs)
bucketInputs := make(map[int]pendingInputs)
inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight)
// First, we'll group together all inputs with similar fee rates. This
// is done by determining the fee rate bucket they should belong in.
for op, input := range s.pendingInputs {
feeRate, err := s.feeRateForPreference(input.feePreference)
feeRate, err := s.feeRateForPreference(input.params.Fee)
if err != nil {
log.Warnf("Skipping input %v: %v", op, err)
continue
}
bucket := s.bucketForFeeRate(feeRate)
feeGroup := s.bucketForFeeRate(feeRate)
inputs, ok := bucketInputs[bucket]
inputs, ok := bucketInputs[feeGroup]
if !ok {
inputs = make(pendingInputs)
bucketInputs[bucket] = inputs
bucketInputs[feeGroup] = inputs
}
input.lastFeeRate = feeRate
@ -707,6 +724,10 @@ func (s *UtxoSweeper) scheduleSweep(currentHeight int32) error {
startTimer := false
for _, cluster := range s.clusterBySweepFeeRate() {
// Examine pending inputs and try to construct lists of inputs.
// We don't need to obtain the coin selection lock, because we
// just need an indication as to whether we can sweep. More
// inputs may be added until we publish the transaction and
// coins that we select now may be used in other transactions.
inputLists, err := s.getInputLists(cluster, currentHeight)
if err != nil {
return fmt.Errorf("get input lists: %v", err)
@ -788,7 +809,7 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
// contain inputs that failed before. Therefore we also add sets
// consisting of only new inputs to the list, to make sure that new
// inputs are given a good, isolated chance of being published.
var newInputs, retryInputs []input.Input
var newInputs, retryInputs []txInput
for _, input := range cluster.inputs {
// Skip inputs that have a minimum publish height that is not
// yet reached.
@ -798,9 +819,9 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
// Add input to the either one of the lists.
if input.publishAttempts == 0 {
newInputs = append(newInputs, input.input)
newInputs = append(newInputs, input)
} else {
retryInputs = append(retryInputs, input.input)
retryInputs = append(retryInputs, input)
}
}
@ -812,6 +833,7 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
allSets, err = generateInputPartitionings(
append(retryInputs, newInputs...), s.relayFeeRate,
cluster.sweepFeeRate, s.cfg.MaxInputsPerTx,
s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
@ -821,7 +843,7 @@ func (s *UtxoSweeper) getInputLists(cluster inputCluster,
// Create sets for just the new inputs.
newSets, err := generateInputPartitionings(
newInputs, s.relayFeeRate, cluster.sweepFeeRate,
s.cfg.MaxInputsPerTx,
s.cfg.MaxInputsPerTx, s.cfg.Wallet,
)
if err != nil {
return nil, fmt.Errorf("input partitionings: %v", err)
@ -878,7 +900,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight,
}),
)
err = s.cfg.PublishTransaction(tx)
err = s.cfg.Wallet.PublishTransaction(tx)
// In case of an unexpected error, don't try to recover.
if err != nil && err != lnwallet.ErrDoubleSpend {
@ -897,7 +919,9 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight,
if !ok {
// It can be that the input has been removed because it
// exceed the maximum number of attempts in a previous
// input set.
// input set. It could also be that this input is an
// additional wallet input that was attached. In that
// case there also isn't a pending input to update.
continue
}
@ -1000,12 +1024,12 @@ func (s *UtxoSweeper) handlePendingSweepsReq(
for _, pendingInput := range s.pendingInputs {
// Only the exported fields are set, as we expect the response
// to only be consumed externally.
op := *pendingInput.input.OutPoint()
op := *pendingInput.OutPoint()
pendingInputs[op] = &PendingInput{
OutPoint: op,
WitnessType: pendingInput.input.WitnessType(),
WitnessType: pendingInput.WitnessType(),
Amount: btcutil.Amount(
pendingInput.input.SignDesc().Output.Value,
pendingInput.SignDesc().Output.Value,
),
LastFeeRate: pendingInput.lastFeeRate,
BroadcastAttempts: pendingInput.publishAttempts,
@ -1076,9 +1100,9 @@ func (s *UtxoSweeper) handleBumpFeeReq(req *bumpFeeReq,
}
log.Debugf("Updating fee preference for %v from %v to %v", req.input,
pendingInput.feePreference, req.feePreference)
pendingInput.params.Fee, req.feePreference)
pendingInput.feePreference = req.feePreference
pendingInput.params.Fee = req.feePreference
// We'll reset the input's publish height to the current so that a new
// transaction can be created that replaces the transaction currently

@ -25,7 +25,7 @@ var (
testMaxInputsPerTx = 3
defaultFeePref = FeePreference{ConfTarget: 1}
defaultFeePref = Params{Fee: FeePreference{ConfTarget: 1}}
)
type sweeperTestContext struct {
@ -98,14 +98,19 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
store := NewMockSweeperStore()
backend := newMockBackend(notifier)
backend := newMockBackend(t, notifier)
backend.walletUtxos = []*lnwallet.Utxo{
{
Value: btcutil.Amount(10000),
AddressType: lnwallet.WitnessPubKey,
},
}
estimator := newMockFeeEstimator(10000, chainfee.FeePerKwFloor)
publishChan := make(chan wire.MsgTx, 2)
ctx := &sweeperTestContext{
notifier: notifier,
publishChan: publishChan,
publishChan: backend.publishChan,
t: t,
estimator: estimator,
backend: backend,
@ -116,16 +121,7 @@ func createSweeperTestContext(t *testing.T) *sweeperTestContext {
var outputScriptCount byte
ctx.sweeper = New(&UtxoSweeperConfig{
Notifier: notifier,
PublishTransaction: func(tx *wire.MsgTx) error {
log.Tracef("Publishing tx %v", tx.TxHash())
err := backend.publishTransaction(tx)
select {
case publishChan <- *tx:
case <-time.After(defaultTestTimeout):
t.Fatalf("unexpected tx published")
}
return err
},
Wallet: backend,
NewBatchTimer: func() <-chan time.Time {
c := make(chan time.Time, 1)
ctx.timeoutChan <- c
@ -354,7 +350,7 @@ func TestSuccess(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweeping an input without a fee preference should result in an error.
_, err := ctx.sweeper.SweepInput(spendableInputs[0], FeePreference{})
_, err := ctx.sweeper.SweepInput(spendableInputs[0], Params{})
if err != ErrNoFeePreference {
t.Fatalf("expected ErrNoFeePreference, got %v", err)
}
@ -417,7 +413,10 @@ func TestDust(t *testing.T) {
}
// No sweep transaction is expected now. The sweeper should recognize
// that the sweep output will not be relayed and not generate the tx.
// that the sweep output will not be relayed and not generate the tx. It
// isn't possible to attach a wallet utxo either, because the added
// weight would create a negatively yielding transaction at this fee
// rate.
// Sweep another input that brings the tx output above the dust limit.
largeInput := createTestInput(100000, input.CommitmentTimeLock)
@ -443,6 +442,50 @@ func TestDust(t *testing.T) {
ctx.finish(1)
}
// TestWalletUtxo asserts that inputs that are not big enough to raise above the
// dust limit are accompanied by a wallet utxo to make them sweepable.
func TestWalletUtxo(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweeping a single output produces a tx of 439 weight units. At the
// fee floor, the sweep tx will pay 439*253/1000 = 111 sat in fees.
//
// Create an input so that the output after paying fees is still
// positive (183 sat), but less than the dust limit (537 sat) for the
// sweep tx output script (P2WPKH).
//
// What we now expect is that the sweeper will attach a utxo from the
// wallet. This increases the tx weight to 712 units with a fee of 180
// sats. The tx yield becomes then 294-180 = 114 sats.
dustInput := createTestInput(294, input.WitnessKeyHash)
_, err := ctx.sweeper.SweepInput(
&dustInput,
Params{Fee: FeePreference{FeeRate: chainfee.FeePerKwFloor}},
)
if err != nil {
t.Fatal(err)
}
ctx.tick()
sweepTx := ctx.receiveTx()
if len(sweepTx.TxIn) != 2 {
t.Fatalf("Expected tx to sweep 2 inputs, but contains %v "+
"inputs instead", len(sweepTx.TxIn))
}
// Calculate expected output value based on wallet utxo of 10000 sats.
expectedOutputValue := int64(294 + 10000 - 180)
if sweepTx.TxOut[0].Value != expectedOutputValue {
t.Fatalf("Expected output value of %v, but got %v",
expectedOutputValue, sweepTx.TxOut[0].Value)
}
ctx.backend.mine()
ctx.finish(1)
}
// TestNegativeInput asserts that no inputs with a negative yield are swept.
// Negative yield means that the value minus the added fee is negative.
func TestNegativeInput(t *testing.T) {
@ -1003,17 +1046,23 @@ func TestDifferentFeePreferences(t *testing.T) {
ctx.estimator.blocksToFee[highFeePref.ConfTarget] = highFeeRate
input1 := spendableInputs[0]
resultChan1, err := ctx.sweeper.SweepInput(input1, highFeePref)
resultChan1, err := ctx.sweeper.SweepInput(
input1, Params{Fee: highFeePref},
)
if err != nil {
t.Fatal(err)
}
input2 := spendableInputs[1]
resultChan2, err := ctx.sweeper.SweepInput(input2, highFeePref)
resultChan2, err := ctx.sweeper.SweepInput(
input2, Params{Fee: highFeePref},
)
if err != nil {
t.Fatal(err)
}
input3 := spendableInputs[2]
resultChan3, err := ctx.sweeper.SweepInput(input3, lowFeePref)
resultChan3, err := ctx.sweeper.SweepInput(
input3, Params{Fee: lowFeePref},
)
if err != nil {
t.Fatal(err)
}
@ -1067,16 +1116,23 @@ func TestPendingInputs(t *testing.T) {
ctx.estimator.blocksToFee[highFeePref.ConfTarget] = highFeeRate
input1 := spendableInputs[0]
resultChan1, err := ctx.sweeper.SweepInput(input1, highFeePref)
resultChan1, err := ctx.sweeper.SweepInput(
input1, Params{Fee: highFeePref},
)
if err != nil {
t.Fatal(err)
}
input2 := spendableInputs[1]
if _, err := ctx.sweeper.SweepInput(input2, highFeePref); err != nil {
_, err = ctx.sweeper.SweepInput(
input2, Params{Fee: highFeePref},
)
if err != nil {
t.Fatal(err)
}
input3 := spendableInputs[2]
resultChan3, err := ctx.sweeper.SweepInput(input3, lowFeePref)
resultChan3, err := ctx.sweeper.SweepInput(
input3, Params{Fee: lowFeePref},
)
if err != nil {
t.Fatal(err)
}
@ -1132,7 +1188,9 @@ func TestBumpFeeRBF(t *testing.T) {
input := createTestInput(
btcutil.SatoshiPerBitcoin, input.CommitmentTimeLock,
)
sweepResult, err := ctx.sweeper.SweepInput(&input, lowFeePref)
sweepResult, err := ctx.sweeper.SweepInput(
&input, Params{Fee: lowFeePref},
)
if err != nil {
t.Fatal(err)
}

248
sweep/tx_input_set.go Normal file

@ -0,0 +1,248 @@
package sweep
import (
"fmt"
"math"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/wallet/txrules"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
// txInputSet is an object that accumulates tx inputs and keeps running counters
// on various properties of the tx.
type txInputSet struct {
// weightEstimate is the (worst case) tx weight with the current set of
// inputs.
weightEstimate input.TxWeightEstimator
// inputTotal is the total value of all inputs.
inputTotal btcutil.Amount
// outputValue is the value of the tx output.
outputValue btcutil.Amount
// feePerKW is the fee rate used to calculate the tx fee.
feePerKW chainfee.SatPerKWeight
// inputs is the set of tx inputs.
inputs []input.Input
// dustLimit is the minimum output value of the tx.
dustLimit btcutil.Amount
// maxInputs is the maximum number of inputs that will be accepted in
// the set.
maxInputs int
// walletInputTotal is the total value of inputs coming from the wallet.
walletInputTotal btcutil.Amount
// wallet contains wallet functionality required by the input set to
// retrieve utxos.
wallet Wallet
}
// newTxInputSet constructs a new, empty input set.
func newTxInputSet(wallet Wallet, feePerKW,
relayFee chainfee.SatPerKWeight, maxInputs int) *txInputSet {
dustLimit := txrules.GetDustThreshold(
input.P2WPKHSize,
btcutil.Amount(relayFee.FeePerKVByte()),
)
b := txInputSet{
feePerKW: feePerKW,
dustLimit: dustLimit,
maxInputs: maxInputs,
wallet: wallet,
}
// Add the sweep tx output to the weight estimate.
b.weightEstimate.AddP2WKHOutput()
return &b
}
// dustLimitReached returns true if we've accumulated enough inputs to meet the
// dust limit.
func (t *txInputSet) dustLimitReached() bool {
return t.outputValue >= t.dustLimit
}
// add adds a new input to the set. It returns a bool indicating whether the
// input was added to the set. An input is rejected if it decreases the tx
// output value after paying fees.
func (t *txInputSet) add(input input.Input, fromWallet bool) bool {
// Stop if max inputs is reached. Do not count additional wallet inputs,
// because we don't know in advance how many we may need.
if !fromWallet && len(t.inputs) >= t.maxInputs {
return false
}
// Can ignore error, because it has already been checked when
// calculating the yields.
size, isNestedP2SH, _ := input.WitnessType().SizeUpperBound()
// Add weight of this new candidate input to a copy of the weight
// estimator.
newWeightEstimate := t.weightEstimate
if isNestedP2SH {
newWeightEstimate.AddNestedP2WSHInput(size)
} else {
newWeightEstimate.AddWitnessInput(size)
}
value := btcutil.Amount(input.SignDesc().Output.Value)
newInputTotal := t.inputTotal + value
weight := newWeightEstimate.Weight()
fee := t.feePerKW.FeeForWeight(int64(weight))
// Calculate the output value if the current input would be
// added to the set.
newOutputValue := newInputTotal - fee
// If adding this input makes the total output value of the set
// decrease, this is a negative yield input. We don't add the input to
// the set and return the outcome.
if newOutputValue <= t.outputValue {
return false
}
// If this input comes from the wallet, verify that we still gain
// something with this transaction.
if fromWallet {
// Calculate the total value that we spend in this tx from the
// wallet if we'd add this wallet input.
newWalletTotal := t.walletInputTotal + value
// In any case, we don't want to lose money by sweeping. If we
// don't get more out of the tx then we put in ourselves, do not
// add this wallet input.
//
// We should only add wallet inputs to get the tx output value
// above the dust limit, otherwise we'd only burn into fees.
// This is guarded by tryAddWalletInputsIfNeeded.
//
// TODO(joostjager): Possibly require a max ratio between the
// value of the wallet input and what we get out of this
// transaction. To prevent attaching and locking a big utxo for
// very little benefit.
if newWalletTotal >= newOutputValue {
log.Debugf("Rejecting wallet input of %v, because it "+
"would make a negative yielding transaction "+
"(%v)",
value, newOutputValue-newWalletTotal)
return false
}
// We've decided to add the wallet input. Increment the total
// wallet funds that go into this tx.
t.walletInputTotal = newWalletTotal
}
// Update running values.
t.inputTotal = newInputTotal
t.outputValue = newOutputValue
t.inputs = append(t.inputs, input)
t.weightEstimate = newWeightEstimate
return true
}
// addPositiveYieldInputs adds sweepableInputs that have a positive yield to the
// input set. This function assumes that the list of inputs is sorted descending
// by yield.
//
// TODO(roasbeef): Consider including some negative yield inputs too to clean
// up the utxo set even if it costs us some fees up front. In the spirit of
// minimizing any negative externalities we cause for the Bitcoin system as a
// whole.
func (t *txInputSet) addPositiveYieldInputs(sweepableInputs []txInput) {
for _, input := range sweepableInputs {
// Try to add the input to the transaction. If that doesn't
// succeed because it wouldn't increase the output value,
// return. Assuming inputs are sorted by yield, any further
// inputs wouldn't increase the output value either.
if !t.add(input, false) {
return
}
}
// We managed to add all inputs to the set.
}
// tryAddWalletInputsIfNeeded retrieves utxos from the wallet and tries adding as
// many as required to bring the tx output value above the given minimum.
func (t *txInputSet) tryAddWalletInputsIfNeeded() error {
// If we've already reached the dust limit, no action is needed.
if t.dustLimitReached() {
return nil
}
// Retrieve wallet utxos. Only consider confirmed utxos to prevent
// problems around RBF rules for unconfirmed inputs.
utxos, err := t.wallet.ListUnspentWitness(1, math.MaxInt32)
if err != nil {
return err
}
for _, utxo := range utxos {
input, err := createWalletTxInput(utxo)
if err != nil {
return err
}
// If the wallet input isn't positively-yielding at this fee
// rate, skip it.
if !t.add(input, true) {
continue
}
// Return if we've reached the minimum output amount.
if t.dustLimitReached() {
return nil
}
}
// We were not able to reach the minimum output amount.
return nil
}
// createWalletTxInput converts a wallet utxo into an object that can be added
// to the other inputs to sweep.
func createWalletTxInput(utxo *lnwallet.Utxo) (input.Input, error) {
var witnessType input.WitnessType
switch utxo.AddressType {
case lnwallet.WitnessPubKey:
witnessType = input.WitnessKeyHash
case lnwallet.NestedWitnessPubKey:
witnessType = input.NestedWitnessKeyHash
default:
return nil, fmt.Errorf("unknown address type %v",
utxo.AddressType)
}
signDesc := &input.SignDescriptor{
Output: &wire.TxOut{
PkScript: utxo.PkScript,
Value: int64(utxo.Value),
},
HashType: txscript.SigHashAll,
}
// A height hint doesn't need to be set, because we don't monitor these
// inputs for spend.
heightHint := uint32(0)
return input.NewBaseInput(
&utxo.OutPoint, witnessType, signDesc, heightHint,
), nil
}

109
sweep/tx_input_set_test.go Normal file

@ -0,0 +1,109 @@
package sweep
import (
"testing"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
)
// TestTxInputSet tests adding various sized inputs to the set.
func TestTxInputSet(t *testing.T) {
const (
feeRate = 1000
relayFee = 300
maxInputs = 10
)
set := newTxInputSet(nil, feeRate, relayFee, maxInputs)
if set.dustLimit != 537 {
t.Fatalf("incorrect dust limit")
}
// Create a 300 sat input. The fee to sweep this input to a P2WKH output
// is 439 sats. That means that this input yields -139 sats and we
// expect it not to be added.
if set.add(createP2WKHInput(300), false) {
t.Fatal("expected add of negatively yielding input to fail")
}
// A 700 sat input should be accepted into the set, because it yields
// positively.
if !set.add(createP2WKHInput(700), false) {
t.Fatal("expected add of positively yielding input to succeed")
}
// The tx output should now be 700-439 = 261 sats. The dust limit isn't
// reached yet.
if set.outputValue != 261 {
t.Fatal("unexpected output value")
}
if set.dustLimitReached() {
t.Fatal("expected dust limit not yet to be reached")
}
// Add a 1000 sat input. This increases the tx fee to 712 sats. The tx
// output should now be 1000+700 - 712 = 988 sats.
if !set.add(createP2WKHInput(1000), false) {
t.Fatal("expected add of positively yielding input to succeed")
}
if set.outputValue != 988 {
t.Fatal("unexpected output value")
}
if !set.dustLimitReached() {
t.Fatal("expected dust limit to be reached")
}
}
// TestTxInputSetFromWallet tests adding a wallet input to a TxInputSet to reach
// the dust limit.
func TestTxInputSetFromWallet(t *testing.T) {
const (
feeRate = 500
relayFee = 300
maxInputs = 10
)
wallet := &mockWallet{}
set := newTxInputSet(wallet, feeRate, relayFee, maxInputs)
// Add a 700 sat input to the set. It yields positively, but doesn't
// reach the output dust limit.
if !set.add(createP2WKHInput(700), false) {
t.Fatal("expected add of positively yielding input to succeed")
}
if set.dustLimitReached() {
t.Fatal("expected dust limit not yet to be reached")
}
err := set.tryAddWalletInputsIfNeeded()
if err != nil {
t.Fatal(err)
}
if !set.dustLimitReached() {
t.Fatal("expected dust limit to be reached")
}
}
// createP2WKHInput returns a P2WKH test input with the specified amount.
func createP2WKHInput(amt btcutil.Amount) input.Input {
input := createTestInput(int64(amt), input.WitnessKeyHash)
return &input
}
type mockWallet struct {
Wallet
}
func (m *mockWallet) ListUnspentWitness(minconfirms, maxconfirms int32) (
[]*lnwallet.Utxo, error) {
return []*lnwallet.Utxo{
{
AddressType: lnwallet.WitnessPubKey,
Value: 10000,
},
}, nil
}

@ -9,7 +9,6 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwallet/wallet/txrules"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
@ -21,6 +20,13 @@ var (
DefaultMaxInputsPerTx = 100
)
// txInput is an interface that provides the input data required for tx
// generation.
type txInput interface {
input.Input
parameters() Params
}
// inputSet is a set of inputs that can be used as the basis to generate a tx
// on.
type inputSet []input.Input
@ -30,16 +36,9 @@ type inputSet []input.Input
// contains up to the configured maximum number of inputs. Negative yield
// inputs are skipped. No input sets with a total value after fees below the
// dust limit are returned.
func generateInputPartitionings(sweepableInputs []input.Input,
func generateInputPartitionings(sweepableInputs []txInput,
relayFeePerKW, feePerKW chainfee.SatPerKWeight,
maxInputsPerTx int) ([]inputSet, error) {
// Calculate dust limit based on the P2WPKH output script of the sweep
// txes.
dustLimit := txrules.GetDustThreshold(
input.P2WPKHSize,
btcutil.Amount(relayFeePerKW.FeePerKVByte()),
)
maxInputsPerTx int, wallet Wallet) ([]inputSet, error) {
// Sort input by yield. We will start constructing input sets starting
// with the highest yield inputs. This is to prevent the construction
@ -75,98 +74,53 @@ func generateInputPartitionings(sweepableInputs []input.Input,
// Select blocks of inputs up to the configured maximum number.
var sets []inputSet
for len(sweepableInputs) > 0 {
// Get the maximum number of inputs from sweepableInputs that
// we can use to create a positive yielding set from.
count, outputValue := getPositiveYieldInputs(
sweepableInputs, maxInputsPerTx, feePerKW,
// Start building a set of positive-yield tx inputs under the
// condition that the tx will be published with the specified
// fee rate.
txInputs := newTxInputSet(
wallet, feePerKW, relayFeePerKW, maxInputsPerTx,
)
// If there are no positive yield inputs left, we can stop
// here.
if count == 0 {
// From the set of sweepable inputs, keep adding inputs to the
// input set until the tx output value no longer goes up or the
// maximum number of inputs is reached.
txInputs.addPositiveYieldInputs(sweepableInputs)
// If there are no positive yield inputs, we can stop here.
inputCount := len(txInputs.inputs)
if inputCount == 0 {
return sets, nil
}
// Check the current output value and add wallet utxos if
// needed to push the output value to the lower limit.
if err := txInputs.tryAddWalletInputsIfNeeded(); err != nil {
return nil, err
}
// If the output value of this block of inputs does not reach
// the dust limit, stop sweeping. Because of the sorting,
// continuing with the remaining inputs will only lead to sets
// with a even lower output value.
if outputValue < dustLimit {
// with an even lower output value.
if !txInputs.dustLimitReached() {
log.Debugf("Set value %v below dust limit of %v",
outputValue, dustLimit)
txInputs.outputValue, txInputs.dustLimit)
return sets, nil
}
log.Infof("Candidate sweep set of size=%v, has yield=%v",
count, outputValue)
log.Infof("Candidate sweep set of size=%v (+%v wallet inputs), "+
"has yield=%v, weight=%v",
inputCount, len(txInputs.inputs)-inputCount,
txInputs.outputValue-txInputs.walletInputTotal,
txInputs.weightEstimate.Weight())
sets = append(sets, sweepableInputs[:count])
sweepableInputs = sweepableInputs[count:]
sets = append(sets, txInputs.inputs)
sweepableInputs = sweepableInputs[inputCount:]
}
return sets, nil
}
// getPositiveYieldInputs returns the maximum of a number n for which holds
// that the inputs [0,n) of sweepableInputs have a positive yield.
// Additionally, the total values of these inputs minus the fee is returned.
//
// TODO(roasbeef): Consider including some negative yield inputs too to clean
// up the utxo set even if it costs us some fees up front. In the spirit of
// minimizing any negative externalities we cause for the Bitcoin system as a
// whole.
func getPositiveYieldInputs(sweepableInputs []input.Input, maxInputs int,
feePerKW chainfee.SatPerKWeight) (int, btcutil.Amount) {
var weightEstimate input.TxWeightEstimator
// Add the sweep tx output to the weight estimate.
weightEstimate.AddP2WKHOutput()
var total, outputValue btcutil.Amount
for idx, input := range sweepableInputs {
// Can ignore error, because it has already been checked when
// calculating the yields.
size, isNestedP2SH, _ := input.WitnessType().SizeUpperBound()
// Keep a running weight estimate of the input set.
if isNestedP2SH {
weightEstimate.AddNestedP2WSHInput(size)
} else {
weightEstimate.AddWitnessInput(size)
}
newTotal := total + btcutil.Amount(input.SignDesc().Output.Value)
weight := weightEstimate.Weight()
fee := feePerKW.FeeForWeight(int64(weight))
// Calculate the output value if the current input would be
// added to the set.
newOutputValue := newTotal - fee
// If adding this input makes the total output value of the set
// decrease, this is a negative yield input. It shouldn't be
// added to the set. We return the current index as the number
// of inputs, so the current input is being excluded.
if newOutputValue <= outputValue {
return idx, outputValue
}
// Update running values.
total = newTotal
outputValue = newOutputValue
// Stop if max inputs is reached.
if idx == maxInputs-1 {
return maxInputs, outputValue
}
}
// We could add all inputs to the set, so return them all.
return len(sweepableInputs), outputValue
}
// createSweepTx builds a signed tx spending the inputs to a the output script.
func createSweepTx(inputs []input.Input, outputPkScript []byte,
currentBlockHeight uint32, feePerKw chainfee.SatPerKWeight,
@ -174,12 +128,12 @@ func createSweepTx(inputs []input.Input, outputPkScript []byte,
inputs, txWeight := getWeightEstimate(inputs)
log.Infof("Creating sweep transaction for %v inputs (%s) "+
"using %v sat/kw", len(inputs), inputTypeSummary(inputs),
int64(feePerKw))
txFee := feePerKw.FeeForWeight(txWeight)
log.Infof("Creating sweep transaction for %v inputs (%s) "+
"using %v sat/kw, tx_fee=%v", len(inputs),
inputTypeSummary(inputs), int64(feePerKw), txFee)
// Sum up the total value contained in the inputs.
var totalSum btcutil.Amount
for _, o := range inputs {

@ -201,7 +201,7 @@ type NurseryConfig struct {
Store NurseryStore
// Sweep sweeps an input back to the wallet.
SweepInput func(input.Input, sweep.FeePreference) (chan sweep.Result, error)
SweepInput func(input.Input, sweep.Params) (chan sweep.Result, error)
}
// utxoNursery is a system dedicated to incubating time-locked outputs created
@ -778,7 +778,9 @@ func (u *utxoNursery) sweepMatureOutputs(classHeight uint32,
// passed in with disastrous consequences.
local := output
resultChan, err := u.cfg.SweepInput(&local, feePref)
resultChan, err := u.cfg.SweepInput(
&local, sweep.Params{Fee: feePref},
)
if err != nil {
return err
}

@ -983,7 +983,7 @@ func newMockSweeper(t *testing.T) *mockSweeper {
}
func (s *mockSweeper) sweepInput(input input.Input,
_ sweep.FeePreference) (chan sweep.Result, error) {
_ sweep.Params) (chan sweep.Result, error) {
utxnLog.Debugf("mockSweeper sweepInput called for %v", *input.OutPoint())