diff --git a/sweep/bucket_list.go b/sweep/bucket_list.go new file mode 100644 index 00000000..4b3c67cd --- /dev/null +++ b/sweep/bucket_list.go @@ -0,0 +1,45 @@ +package sweep + +// bucket contains a set of inputs that are not mutually exclusive. +type bucket pendingInputs + +// tryAdd tries to add a new input to this bucket. +func (b bucket) tryAdd(input *pendingInput) bool { + exclusiveGroup := input.params.ExclusiveGroup + if exclusiveGroup != nil { + for _, input := range b { + existingGroup := input.params.ExclusiveGroup + if existingGroup != nil && + *existingGroup == *exclusiveGroup { + + return false + } + } + } + + b[*input.OutPoint()] = input + + return true +} + +// bucketList is a list of buckets that contain non-mutually exclusive inputs. +type bucketList struct { + buckets []bucket +} + +// add adds a new input. If the input is not accepted by any of the existing +// buckets, a new bucket will be created. +func (b *bucketList) add(input *pendingInput) { + for _, existingBucket := range b.buckets { + if existingBucket.tryAdd(input) { + return + } + } + + // Create a new bucket and add the input. It is not necessary to check + // the return value of tryAdd because it will always succeed on an empty + // bucket. + newBucket := make(bucket) + newBucket.tryAdd(input) + b.buckets = append(b.buckets, newBucket) +} diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 8a9a38da..59971493 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -50,6 +50,11 @@ var ( // request from a client whom did not specify a fee preference. ErrNoFeePreference = errors.New("no fee preference specified") + // ErrExclusiveGroupSpend is returned in case a different input of the + // same exclusive group was spent. + ErrExclusiveGroupSpend = errors.New("other member of exclusive group " + + "was spent") + // ErrSweeperShuttingDown is an error returned when a client attempts to // make a request to the UtxoSweeper, but it is unable to handle it as // it is/has already been stoppepd. @@ -71,11 +76,16 @@ type Params struct { // Force indicates whether the input should be swept regardless of // whether it is economical to do so. Force bool + + // ExclusiveGroup is an identifier that, if set, prevents other inputs + // with the same identifier from being batched together. + ExclusiveGroup *uint64 } // String returns a human readable interpretation of the sweep parameters. func (p Params) String() string { - return fmt.Sprintf("fee=%v, force=%v", p.Fee, p.Force) + return fmt.Sprintf("fee=%v, force=%v, exclusive_group=%v", + p.Fee, p.Force, p.ExclusiveGroup) } // pendingInput is created when an input reaches the main loop for the first @@ -552,7 +562,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // registration, deleted from pendingInputs but // the ntfn was in-flight already. Or this could // be not one of our inputs. - _, ok := s.pendingInputs[outpoint] + input, ok := s.pendingInputs[outpoint] if !ok { continue } @@ -568,6 +578,14 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { Tx: spend.SpendingTx, Err: err, }) + + // Remove all other inputs in this exclusive + // group. + if input.params.ExclusiveGroup != nil { + s.removeExclusiveGroup( + *input.params.ExclusiveGroup, + ) + } } // Now that an input of ours is spent, we can try to @@ -639,6 +657,31 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { } } +// removeExclusiveGroup removes all inputs in the given exclusive group. This +// function is called when one of the exclusive group inputs has been spent. The +// other inputs won't ever be spendable and can be removed. This also prevents +// them from being part of future sweep transactions that would fail. +func (s *UtxoSweeper) removeExclusiveGroup(group uint64) { + for outpoint, input := range s.pendingInputs { + outpoint := outpoint + + // Skip inputs that aren't exclusive. + if input.params.ExclusiveGroup == nil { + continue + } + + // Skip inputs from other exclusive groups. + if *input.params.ExclusiveGroup != group { + continue + } + + // Signal result channels. + s.signalAndRemove(&outpoint, Result{ + Err: ErrExclusiveGroupSpend, + }) + } +} + // sweepCluster tries to sweep the given input cluster. func (s *UtxoSweeper) sweepCluster(cluster inputCluster, currentHeight int32) error { @@ -679,7 +722,7 @@ func (s *UtxoSweeper) bucketForFeeRate( // sweep fee rate, which is determined by calculating the average fee rate of // all inputs within that cluster. func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster { - bucketInputs := make(map[int]pendingInputs) + bucketInputs := make(map[int]*bucketList) inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight) // First, we'll group together all inputs with similar fee rates. This @@ -692,30 +735,37 @@ func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster { } feeGroup := s.bucketForFeeRate(feeRate) - inputs, ok := bucketInputs[feeGroup] + // Create a bucket list for this fee rate if there isn't one + // yet. + buckets, ok := bucketInputs[feeGroup] if !ok { - inputs = make(pendingInputs) - bucketInputs[feeGroup] = inputs + buckets = &bucketList{} + bucketInputs[feeGroup] = buckets } + // Request the bucket list to add this input. The bucket list + // will take into account exclusive group constraints. + buckets.add(input) + input.lastFeeRate = feeRate - inputs[op] = input inputFeeRates[op] = feeRate } // We'll then determine the sweep fee rate for each set of inputs by // calculating the average fee rate of the inputs within each set. inputClusters := make([]inputCluster, 0, len(bucketInputs)) - for _, inputs := range bucketInputs { - var sweepFeeRate chainfee.SatPerKWeight - for op := range inputs { - sweepFeeRate += inputFeeRates[op] + for _, buckets := range bucketInputs { + for _, inputs := range buckets.buckets { + var sweepFeeRate chainfee.SatPerKWeight + for op := range inputs { + sweepFeeRate += inputFeeRates[op] + } + sweepFeeRate /= chainfee.SatPerKWeight(len(inputs)) + inputClusters = append(inputClusters, inputCluster{ + sweepFeeRate: sweepFeeRate, + inputs: inputs, + }) } - sweepFeeRate /= chainfee.SatPerKWeight(len(inputs)) - inputClusters = append(inputClusters, inputCluster{ - sweepFeeRate: sweepFeeRate, - inputs: inputs, - }) } return inputClusters diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index a663c3ab..85908f7e 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -1232,3 +1232,63 @@ func TestBumpFeeRBF(t *testing.T) { ctx.finish(1) } + +// TestExclusiveGroup tests the sweeper exclusive group functionality. +func TestExclusiveGroup(t *testing.T) { + ctx := createSweeperTestContext(t) + + // Sweep three inputs in the same exclusive group. + var results []chan Result + for i := 0; i < 3; i++ { + exclusiveGroup := uint64(1) + result, err := ctx.sweeper.SweepInput( + spendableInputs[i], Params{ + Fee: FeePreference{ConfTarget: 6}, + ExclusiveGroup: &exclusiveGroup, + }, + ) + if err != nil { + t.Fatal(err) + } + results = append(results, result) + } + + // We expect all inputs to be published in separate transactions, even + // though they share the same fee preference. + ctx.tick() + for i := 0; i < 3; i++ { + sweepTx := ctx.receiveTx() + if len(sweepTx.TxOut) != 1 { + t.Fatal("expected a single tx out in the sweep tx") + } + + // Remove all txes except for the one that sweeps the first + // input. This simulates the sweeps being conflicting. + if sweepTx.TxIn[0].PreviousOutPoint != + *spendableInputs[0].OutPoint() { + + ctx.backend.deleteUnconfirmed(sweepTx.TxHash()) + } + } + + // Mine the first sweep tx. + ctx.backend.mine() + + // Expect the first input to be swept by the confirmed sweep tx. + result0 := <-results[0] + if result0.Err != nil { + t.Fatal("expected first input to be swept") + } + + // Expect the other two inputs to return an error. They have no chance + // of confirming. + result1 := <-results[1] + if result1.Err != ErrExclusiveGroupSpend { + t.Fatal("expected second input to be canceled") + } + + result2 := <-results[2] + if result2.Err != ErrExclusiveGroupSpend { + t.Fatal("expected third input to be canceled") + } +}