Merge pull request #4606 from joostjager/cpfp-sweep-prep

cnct+sweep+itest: preparations for cpfp-aware sweeper
This commit is contained in:
Joost Jager 2020-09-16 11:48:39 +02:00 committed by GitHub
commit 2ebfb64b9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 331 additions and 113 deletions

@ -2,6 +2,7 @@ package lnd
import (
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"net"
@ -249,21 +250,15 @@ func newChainControlFromConfig(cfg *Config, localDB, remoteDB *channeldb.DB,
return nil, err
}
// If the user provided an API for fee estimation, activate it now.
// Map the deprecated neutrino feeurl flag to the general fee
// url.
if cfg.NeutrinoMode.FeeURL != "" {
ltndLog.Infof("Using API fee estimator!")
estimator := chainfee.NewWebAPIEstimator(
chainfee.SparseConfFeeSource{
URL: cfg.NeutrinoMode.FeeURL,
},
defaultBitcoinStaticFeePerKW,
)
if err := estimator.Start(); err != nil {
return nil, err
if cfg.FeeURL != "" {
return nil, errors.New("feeurl and " +
"neutrino.feeurl are mutually exclusive")
}
cc.feeEstimator = estimator
cfg.FeeURL = cfg.NeutrinoMode.FeeURL
}
walletConfig.ChainSource = chain.NewNeutrinoClient(
@ -366,9 +361,6 @@ func newChainControlFromConfig(cfg *Config, localDB, remoteDB *channeldb.DB,
if err != nil {
return nil, err
}
if err := cc.feeEstimator.Start(); err != nil {
return nil, err
}
} else if cfg.Litecoin.Active && !cfg.Litecoin.RegTest {
ltndLog.Infof("Initializing litecoind backed fee estimator in "+
"%s mode", bitcoindMode.EstimateMode)
@ -385,9 +377,6 @@ func newChainControlFromConfig(cfg *Config, localDB, remoteDB *channeldb.DB,
if err != nil {
return nil, err
}
if err := cc.feeEstimator.Start(); err != nil {
return nil, err
}
}
case "btcd", "ltcd":
// Otherwise, we'll be speaking directly via RPC to a node.
@ -490,15 +479,34 @@ func newChainControlFromConfig(cfg *Config, localDB, remoteDB *channeldb.DB,
if err != nil {
return nil, err
}
if err := cc.feeEstimator.Start(); err != nil {
return nil, err
}
}
default:
return nil, fmt.Errorf("unknown node type: %s",
homeChainConfig.Node)
}
// Override default fee estimator if an external service is specified.
if cfg.FeeURL != "" {
// Do not cache fees on regtest to make it easier to execute
// manual or automated test cases.
cacheFees := !cfg.Bitcoin.RegTest
ltndLog.Infof("Using external fee estimator %v: cached=%v",
cfg.FeeURL, cacheFees)
cc.feeEstimator = chainfee.NewWebAPIEstimator(
chainfee.SparseConfFeeSource{
URL: cfg.FeeURL,
},
!cacheFees,
)
}
// Start fee estimator.
if err := cc.feeEstimator.Start(); err != nil {
return nil, err
}
wc, err := btcwallet.New(*walletConfig)
if err != nil {
fmt.Printf("unable to create wallet controller: %v\n", err)

@ -218,6 +218,8 @@ type Config struct {
MaxPendingChannels int `long:"maxpendingchannels" description:"The maximum number of incoming pending channels permitted per peer."`
BackupFilePath string `long:"backupfilepath" description:"The target location of the channel backup file"`
FeeURL string `long:"feeurl" description:"Optional URL for external fee estimation. If no URL is specified, the method for fee estimation will depend on the chosen backend and network."`
Bitcoin *lncfg.Chain `group:"Bitcoin" namespace:"bitcoin"`
BtcdMode *lncfg.Btcd `group:"btcd" namespace:"btcd"`
BitcoindMode *lncfg.Bitcoind `group:"bitcoind" namespace:"bitcoind"`

@ -10,7 +10,6 @@ import (
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/sweep"
)
@ -86,41 +85,32 @@ func (c *anchorResolver) Resolve() (ContractResolver, error) {
// situation. We don't want to force sweep anymore, because the anchor
// lost its special purpose to get the commitment confirmed. It is just
// an output that we want to sweep only if it is economical to do so.
//
// An exclusive group is not necessary anymore, because we know that
// this is the only anchor that can be swept.
//
// After a restart or when the remote force closes, the sweeper is not
// yet aware of the anchor. In that case, it will be added as new input
// to the sweeper.
relayFeeRate := c.Sweeper.RelayFeePerKW()
resultChan, err := c.Sweeper.UpdateParams(
c.anchor,
sweep.ParamsUpdate{
anchorInput := input.MakeBaseInput(
&c.anchor,
input.CommitmentAnchor,
&c.anchorSignDescriptor,
c.broadcastHeight,
)
resultChan, err := c.Sweeper.SweepInput(
&anchorInput,
sweep.Params{
Fee: sweep.FeePreference{
FeeRate: relayFeeRate,
},
Force: false,
},
)
// After a restart or when the remote force closes, the sweeper is not
// yet aware of the anchor. In that case, offer it as a new input to the
// sweeper. An exclusive group is not necessary anymore, because we know
// that this is the only anchor that can be swept.
if err == lnwallet.ErrNotMine {
anchorInput := input.MakeBaseInput(
&c.anchor,
input.CommitmentAnchor,
&c.anchorSignDescriptor,
c.broadcastHeight,
)
resultChan, err = c.Sweeper.SweepInput(
&anchorInput,
sweep.Params{
Fee: sweep.FeePreference{
FeeRate: relayFeeRate,
},
},
)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
var (

@ -2237,9 +2237,9 @@ func TestChannelArbitratorAnchors(t *testing.T) {
t.Fatalf("expected anchor resolver, got %T", resolver)
}
// The anchor resolver is expected to offer the anchor input to the
// The anchor resolver is expected to re-offer the anchor input to the
// sweeper.
<-chanArbCtx.sweeper.updatedInputs
<-chanArbCtx.sweeper.sweptInputs
// The mock sweeper immediately signals success for that input. This
// should transition the channel to the resolved state.

@ -10,6 +10,6 @@ type Neutrino struct {
MaxPeers int `long:"maxpeers" description:"Max number of inbound and outbound peers"`
BanDuration time.Duration `long:"banduration" description:"How long to ban misbehaving peers. Valid time units are {s, m, h}. Minimum 1 second"`
BanThreshold uint32 `long:"banthreshold" description:"Maximum allowed ban score before disconnecting and banning misbehaving peers."`
FeeURL string `long:"feeurl" description:"Optional URL for fee estimation. If a URL is not specified, static fees will be used for estimation."`
FeeURL string `long:"feeurl" description:"DEPRECATED: Optional URL for fee estimation. If a URL is not specified, static fees will be used for estimation."`
AssertFilterHeader string `long:"assertfilterheader" description:"Optional filter header in height:hash format to assert the state of neutrino's filter header chain on startup. If the assertion does not hold, then the filter header chain will be re-synced from the genesis block."`
}

106
lntest/fee_service.go Normal file

@ -0,0 +1,106 @@
package lntest
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
)
const (
// feeServiceTarget is the confirmation target for which a fee estimate
// is returned. Requests for higher confirmation targets will fall back
// to this.
feeServiceTarget = 2
// feeServicePort is the tcp port on which the service runs.
feeServicePort = 16534
)
// feeService runs a web service that provides fee estimation information.
type feeService struct {
feeEstimates
srv *http.Server
wg sync.WaitGroup
url string
lock sync.Mutex
}
// feeEstimates contains the current fee estimates.
type feeEstimates struct {
Fees map[uint32]uint32 `json:"fee_by_block_target"`
}
// startFeeService spins up a go-routine to serve fee estimates.
func startFeeService() *feeService {
f := feeService{
url: fmt.Sprintf(
"http://localhost:%v/fee-estimates.json", feeServicePort,
),
}
// Initialize default fee estimate.
f.Fees = map[uint32]uint32{feeServiceTarget: 50000}
listenAddr := fmt.Sprintf(":%v", feeServicePort)
f.srv = &http.Server{
Addr: listenAddr,
}
http.HandleFunc("/fee-estimates.json", f.handleRequest)
f.wg.Add(1)
go func() {
defer f.wg.Done()
if err := f.srv.ListenAndServe(); err != http.ErrServerClosed {
fmt.Printf("error: cannot start fee api: %v", err)
}
}()
return &f
}
// handleRequest handles a client request for fee estimates.
func (f *feeService) handleRequest(w http.ResponseWriter, r *http.Request) {
f.lock.Lock()
defer f.lock.Unlock()
bytes, err := json.Marshal(f.feeEstimates)
if err != nil {
fmt.Printf("error: cannot serialize "+
"estimates: %v", err)
return
}
_, err = io.WriteString(w, string(bytes))
if err != nil {
fmt.Printf("error: cannot send estimates: %v",
err)
}
}
// stop stops the web server.
func (f *feeService) stop() {
if err := f.srv.Shutdown(context.Background()); err != nil {
fmt.Printf("error: cannot stop fee api: %v", err)
}
f.wg.Wait()
}
// setFee changes the current fee estimate for the fixed confirmation target.
func (f *feeService) setFee(fee chainfee.SatPerKWeight) {
f.lock.Lock()
defer f.lock.Unlock()
f.Fees[feeServiceTarget] = uint32(fee.FeePerKVByte())
}

@ -0,0 +1,39 @@
package lntest
import (
"io/ioutil"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// TestFeeService tests the itest fee estimating web service.
func TestFeeService(t *testing.T) {
service := startFeeService()
defer service.stop()
service.setFee(5000)
// Wait for service to start accepting connections.
var resp *http.Response
require.Eventually(
t,
func() bool {
var err error
resp, err = http.Get(service.url) // nolint:bodyclose
return err == nil
},
10*time.Second, time.Second,
)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(
t, "{\"fee_by_block_target\":{\"2\":20000}}", string(body),
)
}

@ -22,6 +22,7 @@ import (
"github.com/lightningnetwork/lnd"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"google.golang.org/grpc/grpclog"
)
@ -63,6 +64,10 @@ type NetworkHarness struct {
// to main process.
lndErrorChan chan error
// feeService is a web service that provides external fee estimates to
// lnd.
feeService *feeService
quit chan struct{}
mtx sync.Mutex
@ -75,6 +80,8 @@ type NetworkHarness struct {
func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string) (
*NetworkHarness, error) {
feeService := startFeeService()
n := NetworkHarness{
activeNodes: make(map[int]*HarnessNode),
nodesByPub: make(map[string]*HarnessNode),
@ -84,6 +91,7 @@ func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string) (
netParams: r.ActiveNet,
Miner: r,
BackendCfg: b,
feeService: feeService,
quit: make(chan struct{}),
lndBinary: lndBinary,
}
@ -251,6 +259,8 @@ func (n *NetworkHarness) TearDownAll() error {
close(n.lndErrorChan)
close(n.quit)
n.feeService.stop()
return nil
}
@ -358,6 +368,7 @@ func (n *NetworkHarness) newNode(name string, extraArgs []string,
BackendCfg: n.BackendCfg,
NetParams: n.netParams,
ExtraArgs: extraArgs,
FeeURL: n.feeService.url,
})
if err != nil {
return nil, err
@ -1404,6 +1415,10 @@ func (n *NetworkHarness) sendCoins(ctx context.Context, amt btcutil.Amount,
return target.WaitForBalance(expectedBalance, true)
}
func (n *NetworkHarness) SetFeeEstimate(fee chainfee.SatPerKWeight) {
n.feeService.setFee(fee)
}
// CopyFile copies the file src to dest.
func CopyFile(dest, src string) error {
s, err := os.Open(src)

@ -154,6 +154,8 @@ type NodeConfig struct {
ProfilePort int
AcceptKeySend bool
FeeURL string
}
func (cfg NodeConfig) P2PAddr() string {
@ -232,6 +234,10 @@ func (cfg NodeConfig) genArgs() []string {
args = append(args, "--accept-keysend")
}
if cfg.FeeURL != "" {
args = append(args, "--feeurl="+cfg.FeeURL)
}
return args
}

@ -540,9 +540,9 @@ type WebAPIEstimator struct {
feesMtx sync.Mutex
feeByBlockTarget map[uint32]uint32
// defaultFeePerKw is a fallback value that we'll use if we're unable
// to query the API for any reason.
defaultFeePerKw SatPerKWeight
// noCache determines whether the web estimator should cache fee
// estimates.
noCache bool
quit chan struct{}
wg sync.WaitGroup
@ -550,13 +550,11 @@ type WebAPIEstimator struct {
// NewWebAPIEstimator creates a new WebAPIEstimator from a given URL and a
// fallback default fee. The fees are updated whenever a new block is mined.
func NewWebAPIEstimator(
api WebAPIFeeSource, defaultFee SatPerKWeight) *WebAPIEstimator {
func NewWebAPIEstimator(api WebAPIFeeSource, noCache bool) *WebAPIEstimator {
return &WebAPIEstimator{
apiSource: api,
feeByBlockTarget: make(map[uint32]uint32),
defaultFeePerKw: defaultFee,
noCache: noCache,
quit: make(chan struct{}),
}
}
@ -573,6 +571,11 @@ func (w *WebAPIEstimator) EstimateFeePerKW(numBlocks uint32) (SatPerKWeight, err
"accepted is %v", numBlocks, minBlockTarget)
}
// Get fee estimates now if we don't refresh periodically.
if w.noCache {
w.updateFeeEstimates()
}
feePerKb, err := w.getCachedFee(numBlocks)
if err != nil {
return 0, err
@ -596,6 +599,11 @@ func (w *WebAPIEstimator) EstimateFeePerKW(numBlocks uint32) (SatPerKWeight, err
//
// NOTE: This method is part of the Estimator interface.
func (w *WebAPIEstimator) Start() error {
// No update loop is needed when we don't cache.
if w.noCache {
return nil
}
var err error
w.started.Do(func() {
log.Infof("Starting web API fee estimator")
@ -615,6 +623,11 @@ func (w *WebAPIEstimator) Start() error {
//
// NOTE: This method is part of the Estimator interface.
func (w *WebAPIEstimator) Stop() error {
// Update loop is not running when we don't cache.
if w.noCache {
return nil
}
w.stopped.Do(func() {
log.Infof("Stopping web API fee estimator")

@ -192,7 +192,7 @@ func TestWebAPIFeeEstimator(t *testing.T) {
fees: testFees,
}
estimator := NewWebAPIEstimator(feeSource, 10)
estimator := NewWebAPIEstimator(feeSource, false)
// Test that requesting a fee when no fees have been cached fails.
_, err := estimator.EstimateFeePerKW(5)

@ -9,9 +9,20 @@ func (b bucket) tryAdd(input *pendingInput) bool {
if exclusiveGroup != nil {
for _, input := range b {
existingGroup := input.params.ExclusiveGroup
if existingGroup != nil &&
*existingGroup == *exclusiveGroup {
// Don't add an exclusive group input if other inputs
// are non-exclusive. The exclusive group input may be
// invalid (for example in the case of commitment
// anchors) and could thereby block sweeping of the
// other inputs.
if existingGroup == nil {
return false
}
// Don't combine inputs from the same exclusive group.
// Because only one input is valid, this may result in
// txes that are always invalid.
if *existingGroup == *exclusiveGroup {
return false
}
}

@ -505,6 +505,9 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
log.Debugf("Already pending input %v received",
outpoint)
// Update sweep parameters.
pendInput.params = input.params
// Add additional result channel to signal
// spend of this input.
pendInput.listeners = append(
@ -1131,8 +1134,9 @@ func (s *UtxoSweeper) handlePendingSweepsReq(
// UpdateParams allows updating the sweep parameters of a pending input in the
// UtxoSweeper. This function can be used to provide an updated fee preference
// that will be used for a new sweep transaction of the input that will act as a
// replacement transaction (RBF) of the original sweeping transaction, if any.
// and force flag that will be used for a new sweep transaction of the input
// that will act as a replacement transaction (RBF) of the original sweeping
// transaction, if any. The exclusive group is left unchanged.
//
// NOTE: This currently doesn't do any fee rate validation to ensure that a bump
// is actually successful. The responsibility of doing so should be handled by

@ -30,9 +30,7 @@ const (
constraintsForce
)
// txInputSet is an object that accumulates tx inputs and keeps running counters
// on various properties of the tx.
type txInputSet struct {
type txInputSetState struct {
// weightEstimate is the (worst case) tx weight with the current set of
// inputs.
weightEstimate input.TxWeightEstimator
@ -43,12 +41,39 @@ type txInputSet struct {
// outputValue is the value of the tx output.
outputValue btcutil.Amount
// feePerKW is the fee rate used to calculate the tx fee.
feePerKW chainfee.SatPerKWeight
// inputs is the set of tx inputs.
inputs []input.Input
// walletInputTotal is the total value of inputs coming from the wallet.
walletInputTotal btcutil.Amount
// force indicates that this set must be swept even if the total yield
// is negative.
force bool
}
func (t *txInputSetState) clone() txInputSetState {
s := txInputSetState{
weightEstimate: t.weightEstimate,
inputTotal: t.inputTotal,
outputValue: t.outputValue,
walletInputTotal: t.walletInputTotal,
force: t.force,
inputs: make([]input.Input, len(t.inputs)),
}
copy(s.inputs, t.inputs)
return s
}
// txInputSet is an object that accumulates tx inputs and keeps running counters
// on various properties of the tx.
type txInputSet struct {
txInputSetState
// feePerKW is the fee rate used to calculate the tx fee.
feePerKW chainfee.SatPerKWeight
// dustLimit is the minimum output value of the tx.
dustLimit btcutil.Amount
@ -56,16 +81,9 @@ type txInputSet struct {
// the set.
maxInputs int
// walletInputTotal is the total value of inputs coming from the wallet.
walletInputTotal btcutil.Amount
// wallet contains wallet functionality required by the input set to
// retrieve utxos.
wallet Wallet
// force indicates that this set must be swept even if the total yield
// is negative.
force bool
}
// newTxInputSet constructs a new, empty input set.
@ -99,56 +117,57 @@ func (t *txInputSet) dustLimitReached() bool {
// add adds a new input to the set. It returns a bool indicating whether the
// input was added to the set. An input is rejected if it decreases the tx
// output value after paying fees.
func (t *txInputSet) add(input input.Input, constraints addConstraints) bool {
func (t *txInputSet) addToState(inp input.Input, constraints addConstraints) *txInputSetState {
// Stop if max inputs is reached. Do not count additional wallet inputs,
// because we don't know in advance how many we may need.
if constraints != constraintsWallet &&
len(t.inputs) >= t.maxInputs {
return false
return nil
}
// Can ignore error, because it has already been checked when
// calculating the yields.
size, isNestedP2SH, _ := input.WitnessType().SizeUpperBound()
size, isNestedP2SH, _ := inp.WitnessType().SizeUpperBound()
// Add weight of this new candidate input to a copy of the weight
// estimator.
newWeightEstimate := t.weightEstimate
// Clone the current set state.
s := t.clone()
// Add the new input.
s.inputs = append(s.inputs, inp)
// Add weight of the new input.
if isNestedP2SH {
newWeightEstimate.AddNestedP2WSHInput(size)
s.weightEstimate.AddNestedP2WSHInput(size)
} else {
newWeightEstimate.AddWitnessInput(size)
s.weightEstimate.AddWitnessInput(size)
}
value := btcutil.Amount(input.SignDesc().Output.Value)
newInputTotal := t.inputTotal + value
// Add the value of the new input.
value := btcutil.Amount(inp.SignDesc().Output.Value)
s.inputTotal += value
weight := newWeightEstimate.Weight()
// Recalculate the tx fee.
weight := s.weightEstimate.Weight()
fee := t.feePerKW.FeeForWeight(int64(weight))
// Calculate the output value if the current input would be
// added to the set.
newOutputValue := newInputTotal - fee
// Initialize new wallet total with the current wallet total. This is
// updated below if this input is a wallet input.
newWalletTotal := t.walletInputTotal
// Calculate the new output value.
s.outputValue = s.inputTotal - fee
// Calculate the yield of this input from the change in tx output value.
inputYield := newOutputValue - t.outputValue
inputYield := s.outputValue - t.outputValue
switch constraints {
// Don't sweep inputs that cost us more to sweep than they give us.
case constraintsRegular:
if inputYield <= 0 {
return false
return nil
}
// For force adds, no further constraints apply.
case constraintsForce:
t.force = true
s.force = true
// We are attaching a wallet input to raise the tx output value above
// the dust limit.
@ -156,12 +175,12 @@ func (t *txInputSet) add(input input.Input, constraints addConstraints) bool {
// Skip this wallet input if adding it would lower the output
// value.
if inputYield <= 0 {
return false
return nil
}
// Calculate the total value that we spend in this tx from the
// wallet if we'd add this wallet input.
newWalletTotal += value
s.walletInputTotal += value
// In any case, we don't want to lose money by sweeping. If we
// don't get more out of the tx then we put in ourselves, do not
@ -176,24 +195,29 @@ func (t *txInputSet) add(input input.Input, constraints addConstraints) bool {
// value of the wallet input and what we get out of this
// transaction. To prevent attaching and locking a big utxo for
// very little benefit.
if !t.force && newWalletTotal >= newOutputValue {
if !s.force && s.walletInputTotal >= s.outputValue {
log.Debugf("Rejecting wallet input of %v, because it "+
"would make a negative yielding transaction "+
"(%v)",
value, newOutputValue-newWalletTotal)
value, s.outputValue-s.walletInputTotal)
return false
return nil
}
}
// Update running values.
//
// TODO: Return new instance?
t.inputTotal = newInputTotal
t.outputValue = newOutputValue
t.inputs = append(t.inputs, input)
t.weightEstimate = newWeightEstimate
t.walletInputTotal = newWalletTotal
return &s
}
// add adds a new input to the set. It returns a bool indicating whether the
// input was added to the set. An input is rejected if it decreases the tx
// output value after paying fees.
func (t *txInputSet) add(input input.Input, constraints addConstraints) bool {
newState := t.addToState(input, constraints)
if newState == nil {
return false
}
t.txInputSetState = *newState
return true
}