diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 2f9f9b19..03fcf6ca 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -22,7 +22,7 @@ const ( // DefaultMaxFeeRate is the default maximum fee rate allowed within the // UtxoSweeper. The current value is equivalent to a fee rate of 10,000 // sat/vbyte. - DefaultMaxFeeRate lnwallet.SatPerKWeight = 250 * 1e4 + DefaultMaxFeeRate = lnwallet.FeePerKwFloor * 1e4 // DefaultFeeRateBucketSize is the default size of fee rate buckets // we'll use when clustering inputs into buckets with similar fee rates @@ -137,6 +137,21 @@ type PendingInput struct { NextBroadcastHeight uint32 } +// bumpFeeReq is an internal message we'll use to represent an external caller's +// intent to bump the fee rate of a given input. +type bumpFeeReq struct { + input wire.OutPoint + feePreference FeePreference + responseChan chan *bumpFeeResp +} + +// bumpFeeResp is an internal message we'll use to hand off the response of a +// bumpFeeReq from the UtxoSweeper's main event loop back to the caller. +type bumpFeeResp struct { + resultChan chan Result + err error +} + // UtxoSweeper is responsible for sweeping outputs back into the wallet type UtxoSweeper struct { started uint32 // To be used atomically. @@ -152,6 +167,10 @@ type UtxoSweeper struct { // UtxoSweeper is attempting to sweep. pendingSweepsReqs chan *pendingSweepsReq + // bumpFeeReqs is a channel that will be sent requests by external + // callers who wish to bump the fee rate of a given input. + bumpFeeReqs chan *bumpFeeReq + // pendingInputs is the total set of inputs the UtxoSweeper has been // requested to sweep. pendingInputs pendingInputs @@ -261,6 +280,7 @@ func New(cfg *UtxoSweeperConfig) *UtxoSweeper { cfg: cfg, newInputs: make(chan *sweepInputMessage), spendChan: make(chan *chainntnfs.SpendDetail), + bumpFeeReqs: make(chan *bumpFeeReq), pendingSweepsReqs: make(chan *pendingSweepsReq), quit: make(chan struct{}), pendingInputs: make(pendingInputs), @@ -355,7 +375,10 @@ func (s *UtxoSweeper) Stop() error { // SweepInput sweeps inputs back into the wallet. The inputs will be batched and // swept after the batch time window ends. A custom fee preference can be -// provided, otherwise the UtxoSweeper's default will be used. +// provided to determine what fee rate should be used for the input. Note that +// the input may not always be swept with this exact value, as its possible for +// it to be batched under the same transaction with other similar fee rate +// inputs. // // NOTE: Extreme care needs to be taken that input isn't changed externally. // Because it is an interface and we don't know what is exactly behind it, we @@ -542,6 +565,15 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch, case req := <-s.pendingSweepsReqs: req.respChan <- s.handlePendingSweepsReq(req) + // A new external request has been received to bump the fee rate + // of a given input. + case req := <-s.bumpFeeReqs: + resultChan, err := s.handleBumpFeeReq(req, bestHeight) + req.responseChan <- &bumpFeeResp{ + resultChan: resultChan, + err: err, + } + // The timer expires and we are going to (re)sweep. case <-s.timer: log.Debugf("Sweep timer expired") @@ -989,6 +1021,92 @@ func (s *UtxoSweeper) handlePendingSweepsReq( return pendingInputs } +// BumpFee allows bumping the fee of an input being swept by the UtxoSweeper +// according to the provided fee preference. The new fee preference will be used +// for a new sweep transaction of the input that will act as a replacement +// transaction (RBF) of the original sweeping transaction, if any. +// +// NOTE: This currently doesn't do any fee rate validation to ensure that a bump +// is actually successful. The responsibility of doing so should be handled by +// the caller. +func (s *UtxoSweeper) BumpFee(input wire.OutPoint, + feePreference FeePreference) (chan Result, error) { + + // Ensure the client provided a sane fee preference. + if _, err := s.feeRateForPreference(feePreference); err != nil { + return nil, err + } + + responseChan := make(chan *bumpFeeResp, 1) + select { + case s.bumpFeeReqs <- &bumpFeeReq{ + input: input, + feePreference: feePreference, + responseChan: responseChan, + }: + case <-s.quit: + return nil, ErrSweeperShuttingDown + } + + select { + case response := <-responseChan: + return response.resultChan, response.err + case <-s.quit: + return nil, ErrSweeperShuttingDown + } +} + +// handleBumpFeeReq handles a bump fee request by simply updating the inputs fee +// preference. Currently, no validation is done on the new fee preference to +// ensure it will properly create a replacement transaction. +// +// TODO(wilmer): +// * Validate fee preference to ensure we'll create a valid replacement +// transaction to allow the new fee rate to propagate throughout the +// network. +// * Ensure we don't combine this input with any other unconfirmed inputs that +// did not exist in the original sweep transaction, resulting in an invalid +// replacement transaction. +func (s *UtxoSweeper) handleBumpFeeReq(req *bumpFeeReq, + bestHeight int32) (chan Result, error) { + + // If the UtxoSweeper is already trying to sweep this input, then we can + // simply just increase its fee rate. This will allow the input to be + // batched with others which also have a similar fee rate, creating a + // higher fee rate transaction that replaces the original input's + // sweeping transaction. + pendingInput, ok := s.pendingInputs[req.input] + if !ok { + return nil, lnwallet.ErrNotMine + } + + log.Debugf("Updating fee preference for %v from %v to %v", req.input, + pendingInput.feePreference, req.feePreference) + + pendingInput.feePreference = req.feePreference + + // We'll reset the input's publish height to the current so that a new + // transaction can be created that replaces the transaction currently + // spending the input. We only do this for inputs that have been + // broadcast at least once to ensure we don't spend an input before its + // maturity height. + // + // NOTE: The UtxoSweeper is not yet offered time-locked inputs, so the + // check for broadcast attempts is redundant at the moment. + if pendingInput.publishAttempts > 0 { + pendingInput.minPublishHeight = bestHeight + } + + if err := s.scheduleSweep(bestHeight); err != nil { + log.Errorf("Unable to schedule sweep: %v", err) + } + + resultChan := make(chan Result, 1) + pendingInput.listeners = append(pendingInput.listeners, resultChan) + + return resultChan, nil +} + // CreateSweepTx accepts a list of inputs and signs and generates a txn that // spends from them. This method also makes an accurate fee estimate before // generating the required witnesses. diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index 9926cd47..8f72a7ab 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -373,7 +373,7 @@ func TestSuccess(t *testing.T) { ctx := createSweeperTestContext(t) // Sweeping an input without a fee preference should result in an error. - _, err := ctx.sweeper.SweepInput( spendableInputs[0], FeePreference{}) + _, err := ctx.sweeper.SweepInput(spendableInputs[0], FeePreference{}) if err != ErrNoFeePreference { t.Fatalf("expected ErrNoFeePreference, got %v", err) } @@ -1128,3 +1128,64 @@ func TestPendingInputs(t *testing.T) { ctx.finish(1) } + +// TestBumpFeeRBF ensures that the UtxoSweeper can properly handle a fee bump +// request for an input it is currently attempting to sweep. When sweeping the +// input with the higher fee rate, a replacement transaction is created. +func TestBumpFeeRBF(t *testing.T) { + ctx := createSweeperTestContext(t) + + lowFeePref := FeePreference{ConfTarget: 144} + lowFeeRate := lnwallet.FeePerKwFloor + ctx.estimator.blocksToFee[lowFeePref.ConfTarget] = lowFeeRate + + // We'll first try to bump the fee of an output currently unknown to the + // UtxoSweeper. Doing so should result in a lnwallet.ErrNotMine error. + bumpResult, err := ctx.sweeper.BumpFee(wire.OutPoint{}, lowFeePref) + if err != lnwallet.ErrNotMine { + t.Fatalf("expected error lnwallet.ErrNotMine, got \"%v\"", err) + } + + // We'll then attempt to sweep an input, which we'll use to bump its fee + // later on. + input := createTestInput( + btcutil.SatoshiPerBitcoin, input.CommitmentTimeLock, + ) + sweepResult, err := ctx.sweeper.SweepInput(&input, lowFeePref) + if err != nil { + t.Fatal(err) + } + + // Ensure that a transaction is broadcast with the lower fee preference. + ctx.tick() + lowFeeTx := ctx.receiveTx() + assertTxFeeRate(t, &lowFeeTx, lowFeeRate, &input) + + // We'll then attempt to bump its fee rate. + highFeePref := FeePreference{ConfTarget: 6} + highFeeRate := DefaultMaxFeeRate + ctx.estimator.blocksToFee[highFeePref.ConfTarget] = highFeeRate + + // We should expect to see an error if a fee preference isn't provided. + _, err = ctx.sweeper.BumpFee(*input.OutPoint(), FeePreference{}) + if err != ErrNoFeePreference { + t.Fatalf("expected ErrNoFeePreference, got %v", err) + } + + bumpResult, err = ctx.sweeper.BumpFee(*input.OutPoint(), highFeePref) + if err != nil { + t.Fatalf("unable to bump input's fee: %v", err) + } + + // A higher fee rate transaction should be immediately broadcast. + ctx.tick() + highFeeTx := ctx.receiveTx() + assertTxFeeRate(t, &highFeeTx, highFeeRate, &input) + + // We'll finish our test by mining the sweep transaction. + ctx.backend.mine() + ctx.expectResult(sweepResult, nil) + ctx.expectResult(bumpResult, nil) + + ctx.finish(1) +}