From 9dc349488be2d6007268c8b28628372f4a8580e6 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Dec 2019 11:51:13 +0100 Subject: [PATCH] sweep: add exclusive groups Allows certain sweep inputs to be kept in separate transactions at all times. This is a preparation for anchor outputs. Before the commitment tx confirms, there are three potential anchors that can be cpfp'ed. We want to cpfp them all, but if done in the same transaction, the transaction would guaranteed to be invalid. Exponential backoff would eventually get the txes published, but having exclusive groups makes the process faster. --- sweep/bucket_list.go | 45 ++++++++++++++++++++++++ sweep/sweeper.go | 82 ++++++++++++++++++++++++++++++++++--------- sweep/sweeper_test.go | 60 +++++++++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 16 deletions(-) create mode 100644 sweep/bucket_list.go diff --git a/sweep/bucket_list.go b/sweep/bucket_list.go new file mode 100644 index 00000000..4b3c67cd --- /dev/null +++ b/sweep/bucket_list.go @@ -0,0 +1,45 @@ +package sweep + +// bucket contains a set of inputs that are not mutually exclusive. +type bucket pendingInputs + +// tryAdd tries to add a new input to this bucket. +func (b bucket) tryAdd(input *pendingInput) bool { + exclusiveGroup := input.params.ExclusiveGroup + if exclusiveGroup != nil { + for _, input := range b { + existingGroup := input.params.ExclusiveGroup + if existingGroup != nil && + *existingGroup == *exclusiveGroup { + + return false + } + } + } + + b[*input.OutPoint()] = input + + return true +} + +// bucketList is a list of buckets that contain non-mutually exclusive inputs. +type bucketList struct { + buckets []bucket +} + +// add adds a new input. If the input is not accepted by any of the existing +// buckets, a new bucket will be created. +func (b *bucketList) add(input *pendingInput) { + for _, existingBucket := range b.buckets { + if existingBucket.tryAdd(input) { + return + } + } + + // Create a new bucket and add the input. It is not necessary to check + // the return value of tryAdd because it will always succeed on an empty + // bucket. + newBucket := make(bucket) + newBucket.tryAdd(input) + b.buckets = append(b.buckets, newBucket) +} diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 8a9a38da..59971493 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -50,6 +50,11 @@ var ( // request from a client whom did not specify a fee preference. ErrNoFeePreference = errors.New("no fee preference specified") + // ErrExclusiveGroupSpend is returned in case a different input of the + // same exclusive group was spent. + ErrExclusiveGroupSpend = errors.New("other member of exclusive group " + + "was spent") + // ErrSweeperShuttingDown is an error returned when a client attempts to // make a request to the UtxoSweeper, but it is unable to handle it as // it is/has already been stoppepd. @@ -71,11 +76,16 @@ type Params struct { // Force indicates whether the input should be swept regardless of // whether it is economical to do so. Force bool + + // ExclusiveGroup is an identifier that, if set, prevents other inputs + // with the same identifier from being batched together. + ExclusiveGroup *uint64 } // String returns a human readable interpretation of the sweep parameters. func (p Params) String() string { - return fmt.Sprintf("fee=%v, force=%v", p.Fee, p.Force) + return fmt.Sprintf("fee=%v, force=%v, exclusive_group=%v", + p.Fee, p.Force, p.ExclusiveGroup) } // pendingInput is created when an input reaches the main loop for the first @@ -552,7 +562,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // registration, deleted from pendingInputs but // the ntfn was in-flight already. Or this could // be not one of our inputs. - _, ok := s.pendingInputs[outpoint] + input, ok := s.pendingInputs[outpoint] if !ok { continue } @@ -568,6 +578,14 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { Tx: spend.SpendingTx, Err: err, }) + + // Remove all other inputs in this exclusive + // group. + if input.params.ExclusiveGroup != nil { + s.removeExclusiveGroup( + *input.params.ExclusiveGroup, + ) + } } // Now that an input of ours is spent, we can try to @@ -639,6 +657,31 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { } } +// removeExclusiveGroup removes all inputs in the given exclusive group. This +// function is called when one of the exclusive group inputs has been spent. The +// other inputs won't ever be spendable and can be removed. This also prevents +// them from being part of future sweep transactions that would fail. +func (s *UtxoSweeper) removeExclusiveGroup(group uint64) { + for outpoint, input := range s.pendingInputs { + outpoint := outpoint + + // Skip inputs that aren't exclusive. + if input.params.ExclusiveGroup == nil { + continue + } + + // Skip inputs from other exclusive groups. + if *input.params.ExclusiveGroup != group { + continue + } + + // Signal result channels. + s.signalAndRemove(&outpoint, Result{ + Err: ErrExclusiveGroupSpend, + }) + } +} + // sweepCluster tries to sweep the given input cluster. func (s *UtxoSweeper) sweepCluster(cluster inputCluster, currentHeight int32) error { @@ -679,7 +722,7 @@ func (s *UtxoSweeper) bucketForFeeRate( // sweep fee rate, which is determined by calculating the average fee rate of // all inputs within that cluster. func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster { - bucketInputs := make(map[int]pendingInputs) + bucketInputs := make(map[int]*bucketList) inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight) // First, we'll group together all inputs with similar fee rates. This @@ -692,30 +735,37 @@ func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster { } feeGroup := s.bucketForFeeRate(feeRate) - inputs, ok := bucketInputs[feeGroup] + // Create a bucket list for this fee rate if there isn't one + // yet. + buckets, ok := bucketInputs[feeGroup] if !ok { - inputs = make(pendingInputs) - bucketInputs[feeGroup] = inputs + buckets = &bucketList{} + bucketInputs[feeGroup] = buckets } + // Request the bucket list to add this input. The bucket list + // will take into account exclusive group constraints. + buckets.add(input) + input.lastFeeRate = feeRate - inputs[op] = input inputFeeRates[op] = feeRate } // We'll then determine the sweep fee rate for each set of inputs by // calculating the average fee rate of the inputs within each set. inputClusters := make([]inputCluster, 0, len(bucketInputs)) - for _, inputs := range bucketInputs { - var sweepFeeRate chainfee.SatPerKWeight - for op := range inputs { - sweepFeeRate += inputFeeRates[op] + for _, buckets := range bucketInputs { + for _, inputs := range buckets.buckets { + var sweepFeeRate chainfee.SatPerKWeight + for op := range inputs { + sweepFeeRate += inputFeeRates[op] + } + sweepFeeRate /= chainfee.SatPerKWeight(len(inputs)) + inputClusters = append(inputClusters, inputCluster{ + sweepFeeRate: sweepFeeRate, + inputs: inputs, + }) } - sweepFeeRate /= chainfee.SatPerKWeight(len(inputs)) - inputClusters = append(inputClusters, inputCluster{ - sweepFeeRate: sweepFeeRate, - inputs: inputs, - }) } return inputClusters diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index a663c3ab..85908f7e 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1232,3 +1232,63 @@ func TestBumpFeeRBF(t *testing.T) { ctx.finish(1) } + +// TestExclusiveGroup tests the sweeper exclusive group functionality. +func TestExclusiveGroup(t *testing.T) { + ctx := createSweeperTestContext(t) + + // Sweep three inputs in the same exclusive group. + var results []chan Result + for i := 0; i < 3; i++ { + exclusiveGroup := uint64(1) + result, err := ctx.sweeper.SweepInput( + spendableInputs[i], Params{ + Fee: FeePreference{ConfTarget: 6}, + ExclusiveGroup: &exclusiveGroup, + }, + ) + if err != nil { + t.Fatal(err) + } + results = append(results, result) + } + + // We expect all inputs to be published in separate transactions, even + // though they share the same fee preference. + ctx.tick() + for i := 0; i < 3; i++ { + sweepTx := ctx.receiveTx() + if len(sweepTx.TxOut) != 1 { + t.Fatal("expected a single tx out in the sweep tx") + } + + // Remove all txes except for the one that sweeps the first + // input. This simulates the sweeps being conflicting. + if sweepTx.TxIn[0].PreviousOutPoint != + *spendableInputs[0].OutPoint() { + + ctx.backend.deleteUnconfirmed(sweepTx.TxHash()) + } + } + + // Mine the first sweep tx. + ctx.backend.mine() + + // Expect the first input to be swept by the confirmed sweep tx. + result0 := <-results[0] + if result0.Err != nil { + t.Fatal("expected first input to be swept") + } + + // Expect the other two inputs to return an error. They have no chance + // of confirming. + result1 := <-results[1] + if result1.Err != ErrExclusiveGroupSpend { + t.Fatal("expected second input to be canceled") + } + + result2 := <-results[2] + if result2.Err != ErrExclusiveGroupSpend { + t.Fatal("expected third input to be canceled") + } +}