sweep: add exclusive groups

Allows certain sweep inputs to be kept in separate transactions at all
times. This is a preparation for anchor outputs. Before the commitment
tx confirms, there are three potential anchors that can be cpfp'ed. We
want to cpfp them all, but if done in the same transaction, the
transaction would guaranteed to be invalid. Exponential backoff would
eventually get the txes published, but having exclusive groups makes the
process faster.
This commit is contained in:
Joost Jager 2019-12-09 11:51:13 +01:00
parent 69a6107d06
commit 9dc349488b
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7
3 changed files with 171 additions and 16 deletions

45
sweep/bucket_list.go Normal file

@ -0,0 +1,45 @@
package sweep
// bucket contains a set of inputs that are not mutually exclusive.
type bucket pendingInputs
// tryAdd tries to add a new input to this bucket.
func (b bucket) tryAdd(input *pendingInput) bool {
exclusiveGroup := input.params.ExclusiveGroup
if exclusiveGroup != nil {
for _, input := range b {
existingGroup := input.params.ExclusiveGroup
if existingGroup != nil &&
*existingGroup == *exclusiveGroup {
return false
}
}
}
b[*input.OutPoint()] = input
return true
}
// bucketList is a list of buckets that contain non-mutually exclusive inputs.
type bucketList struct {
buckets []bucket
}
// add adds a new input. If the input is not accepted by any of the existing
// buckets, a new bucket will be created.
func (b *bucketList) add(input *pendingInput) {
for _, existingBucket := range b.buckets {
if existingBucket.tryAdd(input) {
return
}
}
// Create a new bucket and add the input. It is not necessary to check
// the return value of tryAdd because it will always succeed on an empty
// bucket.
newBucket := make(bucket)
newBucket.tryAdd(input)
b.buckets = append(b.buckets, newBucket)
}

@ -50,6 +50,11 @@ var (
// request from a client whom did not specify a fee preference.
ErrNoFeePreference = errors.New("no fee preference specified")
// ErrExclusiveGroupSpend is returned in case a different input of the
// same exclusive group was spent.
ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
"was spent")
// ErrSweeperShuttingDown is an error returned when a client attempts to
// make a request to the UtxoSweeper, but it is unable to handle it as
// it is/has already been stoppepd.
@ -71,11 +76,16 @@ type Params struct {
// Force indicates whether the input should be swept regardless of
// whether it is economical to do so.
Force bool
// ExclusiveGroup is an identifier that, if set, prevents other inputs
// with the same identifier from being batched together.
ExclusiveGroup *uint64
}
// String returns a human readable interpretation of the sweep parameters.
func (p Params) String() string {
return fmt.Sprintf("fee=%v, force=%v", p.Fee, p.Force)
return fmt.Sprintf("fee=%v, force=%v, exclusive_group=%v",
p.Fee, p.Force, p.ExclusiveGroup)
}
// pendingInput is created when an input reaches the main loop for the first
@ -552,7 +562,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// registration, deleted from pendingInputs but
// the ntfn was in-flight already. Or this could
// be not one of our inputs.
_, ok := s.pendingInputs[outpoint]
input, ok := s.pendingInputs[outpoint]
if !ok {
continue
}
@ -568,6 +578,14 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
Tx: spend.SpendingTx,
Err: err,
})
// Remove all other inputs in this exclusive
// group.
if input.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(
*input.params.ExclusiveGroup,
)
}
}
// Now that an input of ours is spent, we can try to
@ -639,6 +657,31 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
}
}
// removeExclusiveGroup removes all inputs in the given exclusive group. This
// function is called when one of the exclusive group inputs has been spent. The
// other inputs won't ever be spendable and can be removed. This also prevents
// them from being part of future sweep transactions that would fail.
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
for outpoint, input := range s.pendingInputs {
outpoint := outpoint
// Skip inputs that aren't exclusive.
if input.params.ExclusiveGroup == nil {
continue
}
// Skip inputs from other exclusive groups.
if *input.params.ExclusiveGroup != group {
continue
}
// Signal result channels.
s.signalAndRemove(&outpoint, Result{
Err: ErrExclusiveGroupSpend,
})
}
}
// sweepCluster tries to sweep the given input cluster.
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
currentHeight int32) error {
@ -679,7 +722,7 @@ func (s *UtxoSweeper) bucketForFeeRate(
// sweep fee rate, which is determined by calculating the average fee rate of
// all inputs within that cluster.
func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
bucketInputs := make(map[int]pendingInputs)
bucketInputs := make(map[int]*bucketList)
inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight)
// First, we'll group together all inputs with similar fee rates. This
@ -692,30 +735,37 @@ func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
}
feeGroup := s.bucketForFeeRate(feeRate)
inputs, ok := bucketInputs[feeGroup]
// Create a bucket list for this fee rate if there isn't one
// yet.
buckets, ok := bucketInputs[feeGroup]
if !ok {
inputs = make(pendingInputs)
bucketInputs[feeGroup] = inputs
buckets = &bucketList{}
bucketInputs[feeGroup] = buckets
}
// Request the bucket list to add this input. The bucket list
// will take into account exclusive group constraints.
buckets.add(input)
input.lastFeeRate = feeRate
inputs[op] = input
inputFeeRates[op] = feeRate
}
// We'll then determine the sweep fee rate for each set of inputs by
// calculating the average fee rate of the inputs within each set.
inputClusters := make([]inputCluster, 0, len(bucketInputs))
for _, inputs := range bucketInputs {
var sweepFeeRate chainfee.SatPerKWeight
for op := range inputs {
sweepFeeRate += inputFeeRates[op]
for _, buckets := range bucketInputs {
for _, inputs := range buckets.buckets {
var sweepFeeRate chainfee.SatPerKWeight
for op := range inputs {
sweepFeeRate += inputFeeRates[op]
}
sweepFeeRate /= chainfee.SatPerKWeight(len(inputs))
inputClusters = append(inputClusters, inputCluster{
sweepFeeRate: sweepFeeRate,
inputs: inputs,
})
}
sweepFeeRate /= chainfee.SatPerKWeight(len(inputs))
inputClusters = append(inputClusters, inputCluster{
sweepFeeRate: sweepFeeRate,
inputs: inputs,
})
}
return inputClusters

@ -1232,3 +1232,63 @@ func TestBumpFeeRBF(t *testing.T) {
ctx.finish(1)
}
// TestExclusiveGroup tests the sweeper exclusive group functionality.
func TestExclusiveGroup(t *testing.T) {
ctx := createSweeperTestContext(t)
// Sweep three inputs in the same exclusive group.
var results []chan Result
for i := 0; i < 3; i++ {
exclusiveGroup := uint64(1)
result, err := ctx.sweeper.SweepInput(
spendableInputs[i], Params{
Fee: FeePreference{ConfTarget: 6},
ExclusiveGroup: &exclusiveGroup,
},
)
if err != nil {
t.Fatal(err)
}
results = append(results, result)
}
// We expect all inputs to be published in separate transactions, even
// though they share the same fee preference.
ctx.tick()
for i := 0; i < 3; i++ {
sweepTx := ctx.receiveTx()
if len(sweepTx.TxOut) != 1 {
t.Fatal("expected a single tx out in the sweep tx")
}
// Remove all txes except for the one that sweeps the first
// input. This simulates the sweeps being conflicting.
if sweepTx.TxIn[0].PreviousOutPoint !=
*spendableInputs[0].OutPoint() {
ctx.backend.deleteUnconfirmed(sweepTx.TxHash())
}
}
// Mine the first sweep tx.
ctx.backend.mine()
// Expect the first input to be swept by the confirmed sweep tx.
result0 := <-results[0]
if result0.Err != nil {
t.Fatal("expected first input to be swept")
}
// Expect the other two inputs to return an error. They have no chance
// of confirming.
result1 := <-results[1]
if result1.Err != ErrExclusiveGroupSpend {
t.Fatal("expected second input to be canceled")
}
result2 := <-results[2]
if result2.Err != ErrExclusiveGroupSpend {
t.Fatal("expected third input to be canceled")
}
}