sweep: allow fee bumps of inputs/transactions within UtxoSweeper

In this commit, we introduce the ability to bump the fee of an input
within the UtxoSweeper. Once its fee rate is bumped, a replacement
transaction (RBF) will be broadcast with the newer fee rate (assuming
the newer fee rate is high enough to be valid), replacing any
conflicting lower fee rate transactions.

Note that this currently doesn't validate the fee preference of the
bump. This responsibility is delegated to the caller, so care must be
taken to ensure the new fee preference is sufficient.
This commit is contained in:
Wilmer Paulino 2019-05-29 14:00:14 -07:00
parent f206444e96
commit e69d93949c
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 182 additions and 3 deletions

@ -22,7 +22,7 @@ const (
// DefaultMaxFeeRate is the default maximum fee rate allowed within the
// UtxoSweeper. The current value is equivalent to a fee rate of 10,000
// sat/vbyte.
DefaultMaxFeeRate lnwallet.SatPerKWeight = 250 * 1e4
DefaultMaxFeeRate = lnwallet.FeePerKwFloor * 1e4
// DefaultFeeRateBucketSize is the default size of fee rate buckets
// we'll use when clustering inputs into buckets with similar fee rates
@ -137,6 +137,21 @@ type PendingInput struct {
NextBroadcastHeight uint32
}
// bumpFeeReq is an internal message we'll use to represent an external caller's
// intent to bump the fee rate of a given input.
type bumpFeeReq struct {
input wire.OutPoint
feePreference FeePreference
responseChan chan *bumpFeeResp
}
// bumpFeeResp is an internal message we'll use to hand off the response of a
// bumpFeeReq from the UtxoSweeper's main event loop back to the caller.
type bumpFeeResp struct {
resultChan chan Result
err error
}
// UtxoSweeper is responsible for sweeping outputs back into the wallet
type UtxoSweeper struct {
started uint32 // To be used atomically.
@ -152,6 +167,10 @@ type UtxoSweeper struct {
// UtxoSweeper is attempting to sweep.
pendingSweepsReqs chan *pendingSweepsReq
// bumpFeeReqs is a channel that will be sent requests by external
// callers who wish to bump the fee rate of a given input.
bumpFeeReqs chan *bumpFeeReq
// pendingInputs is the total set of inputs the UtxoSweeper has been
// requested to sweep.
pendingInputs pendingInputs
@ -261,6 +280,7 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper {
cfg: cfg,
newInputs: make(chan *sweepInputMessage),
spendChan: make(chan *chainntnfs.SpendDetail),
bumpFeeReqs: make(chan *bumpFeeReq),
pendingSweepsReqs: make(chan *pendingSweepsReq),
quit: make(chan struct{}),
pendingInputs: make(pendingInputs),
@ -355,7 +375,10 @@ func (s *UtxoSweeper) Stop() error {
// SweepInput sweeps inputs back into the wallet. The inputs will be batched and
// swept after the batch time window ends. A custom fee preference can be
// provided, otherwise the UtxoSweeper's default will be used.
// provided to determine what fee rate should be used for the input. Note that
// the input may not always be swept with this exact value, as its possible for
// it to be batched under the same transaction with other similar fee rate
// inputs.
//
// NOTE: Extreme care needs to be taken that input isn't changed externally.
// Because it is an interface and we don't know what is exactly behind it, we
@ -542,6 +565,15 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch,
case req := <-s.pendingSweepsReqs:
req.respChan <- s.handlePendingSweepsReq(req)
// A new external request has been received to bump the fee rate
// of a given input.
case req := <-s.bumpFeeReqs:
resultChan, err := s.handleBumpFeeReq(req, bestHeight)
req.responseChan <- &bumpFeeResp{
resultChan: resultChan,
err: err,
}
// The timer expires and we are going to (re)sweep.
case <-s.timer:
log.Debugf("Sweep timer expired")
@ -989,6 +1021,92 @@ func (s *UtxoSweeper) handlePendingSweepsReq(
return pendingInputs
}
// BumpFee allows bumping the fee of an input being swept by the UtxoSweeper
// according to the provided fee preference. The new fee preference will be used
// for a new sweep transaction of the input that will act as a replacement
// transaction (RBF) of the original sweeping transaction, if any.
//
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
// is actually successful. The responsibility of doing so should be handled by
// the caller.
func (s *UtxoSweeper) BumpFee(input wire.OutPoint,
feePreference FeePreference) (chan Result, error) {
// Ensure the client provided a sane fee preference.
if _, err := s.feeRateForPreference(feePreference); err != nil {
return nil, err
}
responseChan := make(chan *bumpFeeResp, 1)
select {
case s.bumpFeeReqs <- &bumpFeeReq{
input: input,
feePreference: feePreference,
responseChan: responseChan,
}:
case <-s.quit:
return nil, ErrSweeperShuttingDown
}
select {
case response := <-responseChan:
return response.resultChan, response.err
case <-s.quit:
return nil, ErrSweeperShuttingDown
}
}
// handleBumpFeeReq handles a bump fee request by simply updating the inputs fee
// preference. Currently, no validation is done on the new fee preference to
// ensure it will properly create a replacement transaction.
//
// TODO(wilmer):
// * Validate fee preference to ensure we'll create a valid replacement
// transaction to allow the new fee rate to propagate throughout the
// network.
// * Ensure we don't combine this input with any other unconfirmed inputs that
// did not exist in the original sweep transaction, resulting in an invalid
// replacement transaction.
func (s *UtxoSweeper) handleBumpFeeReq(req *bumpFeeReq,
bestHeight int32) (chan Result, error) {
// If the UtxoSweeper is already trying to sweep this input, then we can
// simply just increase its fee rate. This will allow the input to be
// batched with others which also have a similar fee rate, creating a
// higher fee rate transaction that replaces the original input's
// sweeping transaction.
pendingInput, ok := s.pendingInputs[req.input]
if !ok {
return nil, lnwallet.ErrNotMine
}
log.Debugf("Updating fee preference for %v from %v to %v", req.input,
pendingInput.feePreference, req.feePreference)
pendingInput.feePreference = req.feePreference
// We'll reset the input's publish height to the current so that a new
// transaction can be created that replaces the transaction currently
// spending the input. We only do this for inputs that have been
// broadcast at least once to ensure we don't spend an input before its
// maturity height.
//
// NOTE: The UtxoSweeper is not yet offered time-locked inputs, so the
// check for broadcast attempts is redundant at the moment.
if pendingInput.publishAttempts > 0 {
pendingInput.minPublishHeight = bestHeight
}
if err := s.scheduleSweep(bestHeight); err != nil {
log.Errorf("Unable to schedule sweep: %v", err)
}
resultChan := make(chan Result, 1)
pendingInput.listeners = append(pendingInput.listeners, resultChan)
return resultChan, nil
}
// CreateSweepTx accepts a list of inputs and signs and generates a txn that
// spends from them. This method also makes an accurate fee estimate before
// generating the required witnesses.

@ -373,7 +373,7 @@ func TestSuccess(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweeping an input without a fee preference should result in an error.
_, err := ctx.sweeper.SweepInput( spendableInputs[0], FeePreference{})
_, err := ctx.sweeper.SweepInput(spendableInputs[0], FeePreference{})
if err != ErrNoFeePreference {
t.Fatalf("expected ErrNoFeePreference, got %v", err)
}
@ -1128,3 +1128,64 @@ func TestPendingInputs(t *testing.T) {
ctx.finish(1)
}
// TestBumpFeeRBF ensures that the UtxoSweeper can properly handle a fee bump
// request for an input it is currently attempting to sweep. When sweeping the
// input with the higher fee rate, a replacement transaction is created.
func TestBumpFeeRBF(t *testing.T) {
ctx := createSweeperTestContext(t)
lowFeePref := FeePreference{ConfTarget: 144}
lowFeeRate := lnwallet.FeePerKwFloor
ctx.estimator.blocksToFee[lowFeePref.ConfTarget] = lowFeeRate
// We'll first try to bump the fee of an output currently unknown to the
// UtxoSweeper. Doing so should result in a lnwallet.ErrNotMine error.
bumpResult, err := ctx.sweeper.BumpFee(wire.OutPoint{}, lowFeePref)
if err != lnwallet.ErrNotMine {
t.Fatalf("expected error lnwallet.ErrNotMine, got \"%v\"", err)
}
// We'll then attempt to sweep an input, which we'll use to bump its fee
// later on.
input := createTestInput(
btcutil.SatoshiPerBitcoin, input.CommitmentTimeLock,
)
sweepResult, err := ctx.sweeper.SweepInput(&input, lowFeePref)
if err != nil {
t.Fatal(err)
}
// Ensure that a transaction is broadcast with the lower fee preference.
ctx.tick()
lowFeeTx := ctx.receiveTx()
assertTxFeeRate(t, &lowFeeTx, lowFeeRate, &input)
// We'll then attempt to bump its fee rate.
highFeePref := FeePreference{ConfTarget: 6}
highFeeRate := DefaultMaxFeeRate
ctx.estimator.blocksToFee[highFeePref.ConfTarget] = highFeeRate
// We should expect to see an error if a fee preference isn't provided.
_, err = ctx.sweeper.BumpFee(*input.OutPoint(), FeePreference{})
if err != ErrNoFeePreference {
t.Fatalf("expected ErrNoFeePreference, got %v", err)
}
bumpResult, err = ctx.sweeper.BumpFee(*input.OutPoint(), highFeePref)
if err != nil {
t.Fatalf("unable to bump input's fee: %v", err)
}
// A higher fee rate transaction should be immediately broadcast.
ctx.tick()
highFeeTx := ctx.receiveTx()
assertTxFeeRate(t, &highFeeTx, highFeeRate, &input)
// We'll finish our test by mining the sweep transaction.
ctx.backend.mine()
ctx.expectResult(sweepResult, nil)
ctx.expectResult(bumpResult, nil)
ctx.finish(1)
}