multi: update WalletController PublishTransaction to include label

Add label parameter to PublishTransaction in WalletController
interface. A labels package is added to store generic labels that are
used for the different types of transactions that are published by lnd.

To keep commit size down, the two endpoints that require a label
parameter be passed down have a todo added, which will be removed in
subsequent commits.
This commit is contained in:
carla 2020-05-18 14:13:23 +02:00
parent c2516d9352
commit 75370ce6b4
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
25 changed files with 74 additions and 52 deletions

@ -90,7 +90,7 @@ type BreachConfig struct {
// PublishTransaction facilitates the process of broadcasting a
// transaction to the network.
PublishTransaction func(*wire.MsgTx) error
PublishTransaction func(*wire.MsgTx, string) error
// ContractBreaches is a channel where the breachArbiter will receive
// notifications in the event of a contract breach being observed. A
@ -566,7 +566,7 @@ justiceTxBroadcast:
// We'll now attempt to broadcast the transaction which finalized the
// channel's retribution against the cheating counter party.
err = b.cfg.PublishTransaction(finalTx)
err = b.cfg.PublishTransaction(finalTx, "")
if err != nil {
brarLog.Errorf("Unable to broadcast justice tx: %v", err)

@ -1371,7 +1371,7 @@ func testBreachSpends(t *testing.T, test breachTest) {
// Make PublishTransaction always return ErrDoubleSpend to begin with.
publErr = lnwallet.ErrDoubleSpend
brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error {
brar.cfg.PublishTransaction = func(tx *wire.MsgTx, _ string) error {
publTx <- tx
publMtx.Lock()
@ -1681,7 +1681,7 @@ func createTestArbiter(t *testing.T, contractBreaches chan *ContractBreachEvent,
ContractBreaches: contractBreaches,
Signer: signer,
Notifier: notifier,
PublishTransaction: func(_ *wire.MsgTx) error { return nil },
PublishTransaction: func(_ *wire.MsgTx, _ string) error { return nil },
Store: store,
})

@ -83,7 +83,7 @@ type chanCloseCfg struct {
unregisterChannel func(lnwire.ChannelID)
// broadcastTx broadcasts the passed transaction to the network.
broadcastTx func(*wire.MsgTx) error
broadcastTx func(*wire.MsgTx, string) error
// disableChannel disables a channel, resulting in it not being able to
// forward payments.
@ -544,7 +544,8 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b
newLogClosure(func() string {
return spew.Sdump(closeTx)
}))
if err := c.cfg.broadcastTx(closeTx); err != nil {
err = c.cfg.broadcastTx(closeTx, "")
if err != nil {
return nil, false, err
}

@ -79,7 +79,7 @@ type ChainArbitratorConfig struct {
// PublishTx reliably broadcasts a transaction to the network. Once
// this function exits without an error, then they transaction MUST
// continually be rebroadcast if needed.
PublishTx func(*wire.MsgTx) error
PublishTx func(*wire.MsgTx, string) error
// DeliverResolutionMsg is a function that will append an outgoing
// message to the "out box" for a ChannelLink. This is used to cancel
@ -699,7 +699,7 @@ func (c *ChainArbitrator) rebroadcast(channel *channeldb.OpenChannel,
log.Infof("Re-publishing %s close tx(%v) for channel %v",
kind, closeTx.TxHash(), chanPoint)
err = c.cfg.PublishTx(closeTx)
err = c.cfg.PublishTx(closeTx, "")
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Warnf("Unable to broadcast %s close tx(%v): %v",
kind, closeTx.TxHash(), err)

@ -82,7 +82,7 @@ func TestChainArbitratorRepublishCloses(t *testing.T) {
chainArbCfg := ChainArbitratorConfig{
ChainIO: &mockChainIO{},
Notifier: &mockNotifier{},
PublishTx: func(tx *wire.MsgTx) error {
PublishTx: func(tx *wire.MsgTx, _ string) error {
published[tx.TxHash()]++
return nil
},
@ -174,7 +174,7 @@ func TestResolveContract(t *testing.T) {
chainArbCfg := ChainArbitratorConfig{
ChainIO: &mockChainIO{},
Notifier: &mockNotifier{},
PublishTx: func(tx *wire.MsgTx) error {
PublishTx: func(tx *wire.MsgTx, _ string) error {
return nil
},
Clock: clock.NewDefaultClock(),

@ -851,7 +851,7 @@ func (c *ChannelArbitrator) stateStep(
// At this point, we'll now broadcast the commitment
// transaction itself.
if err := c.cfg.PublishTx(closeTx); err != nil {
if err := c.cfg.PublishTx(closeTx, ""); err != nil {
log.Errorf("ChannelArbitrator(%v): unable to broadcast "+
"close tx: %v", c.cfg.ChanPoint, err)
if err != lnwallet.ErrDoubleSpend {

@ -324,7 +324,7 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
mockSweeper := newMockSweeper()
chainArbCfg := ChainArbitratorConfig{
ChainIO: chainIO,
PublishTx: func(*wire.MsgTx) error {
PublishTx: func(*wire.MsgTx, string) error {
return nil
},
DeliverResolutionMsg: func(msgs ...ResolutionMsg) error {
@ -575,7 +575,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) {
// We create a channel we can use to pause the ChannelArbitrator at the
// point where it broadcasts the close tx, and check its state.
stateChan := make(chan ArbitratorState)
chanArb.cfg.PublishTx = func(*wire.MsgTx) error {
chanArb.cfg.PublishTx = func(*wire.MsgTx, string) error {
// When the force close tx is being broadcasted, check that the
// state is correct at that point.
select {
@ -998,7 +998,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) {
// Create a channel we can use to assert the state when it publishes
// the close tx.
stateChan := make(chan ArbitratorState)
chanArb.cfg.PublishTx = func(*wire.MsgTx) error {
chanArb.cfg.PublishTx = func(*wire.MsgTx, string) error {
// When the force close tx is being broadcasted, check that the
// state is correct at that point.
select {
@ -1106,7 +1106,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) {
// Return ErrDoubleSpend when attempting to publish the tx.
stateChan := make(chan ArbitratorState)
chanArb.cfg.PublishTx = func(*wire.MsgTx) error {
chanArb.cfg.PublishTx = func(*wire.MsgTx, string) error {
// When the force close tx is being broadcasted, check that the
// state is correct at that point.
select {
@ -1339,7 +1339,7 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) {
// unexpected publication error, causing the state machine to halt.
expErr := errors.New("intentional publication error")
stateChan := make(chan ArbitratorState)
chanArb.cfg.PublishTx = func(*wire.MsgTx) error {
chanArb.cfg.PublishTx = func(*wire.MsgTx, string) error {
// When the force close tx is being broadcasted, check that the
// state is correct at that point.
select {

@ -155,7 +155,7 @@ func (h *htlcSuccessResolver) Resolve() (ContractResolver, error) {
// Regardless of whether an existing transaction was found or newly
// constructed, we'll broadcast the sweep transaction to the
// network.
err := h.PublishTx(h.sweepTx)
err := h.PublishTx(h.sweepTx, "")
if err != nil {
log.Infof("%T(%x): unable to publish tx: %v",
h, h.htlc.RHash[:], err)
@ -199,7 +199,7 @@ func (h *htlcSuccessResolver) Resolve() (ContractResolver, error) {
// the claiming process.
//
// TODO(roasbeef): after changing sighashes send to tx bundler
err := h.PublishTx(h.htlcResolution.SignedSuccessTx)
err := h.PublishTx(h.htlcResolution.SignedSuccessTx, "")
if err != nil {
return nil, err
}

@ -236,7 +236,7 @@ type fundingConfig struct {
// PublishTransaction facilitates the process of broadcasting a
// transaction to the network.
PublishTransaction func(*wire.MsgTx) error
PublishTransaction func(*wire.MsgTx, string) error
// FeeEstimator calculates appropriate fee rates based on historical
// transaction information.
@ -553,7 +553,7 @@ func (f *fundingManager) start() error {
channel.IsInitiator {
err := f.cfg.PublishTransaction(
channel.FundingTxn,
channel.FundingTxn, "",
)
if err != nil {
fndgLog.Errorf("Unable to rebroadcast "+
@ -1995,7 +1995,7 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
fndgLog.Infof("Broadcasting funding tx for ChannelPoint(%v): %v",
completeChan.FundingOutpoint, spew.Sdump(fundingTx))
err = f.cfg.PublishTransaction(fundingTx)
err = f.cfg.PublishTransaction(fundingTx, "")
if err != nil {
fndgLog.Errorf("Unable to broadcast funding tx for "+
"ChannelPoint(%v): %v",

@ -412,7 +412,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
ReportShortChanID: func(wire.OutPoint) error {
return nil
},
PublishTransaction: func(txn *wire.MsgTx) error {
PublishTransaction: func(txn *wire.MsgTx, _ string) error {
publTxChan <- txn
return nil
},
@ -515,7 +515,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
},
DefaultMinHtlcIn: 5,
RequiredRemoteMaxValue: oldCfg.RequiredRemoteMaxValue,
PublishTransaction: func(txn *wire.MsgTx) error {
PublishTransaction: func(txn *wire.MsgTx, _ string) error {
publishChan <- txn
return nil
},

8
labels/labels.go Normal file

@ -0,0 +1,8 @@
// Package labels contains labels used to label transactions broadcast by lnd.
// These labels are used across packages, so they are declared in a separate
// package to avoid dependency issues.
package labels
// External labels a transaction as user initiated via the api. This
// label is only used when a custom user provided label is not given.
const External = "external"

@ -16,6 +16,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnwallet"
@ -273,7 +274,7 @@ func (w *WalletKit) PublishTransaction(ctx context.Context,
return nil, err
}
err := w.cfg.Wallet.PublishTransaction(tx)
err := w.cfg.Wallet.PublishTransaction(tx, labels.External)
if err != nil {
return nil, err
}

@ -433,8 +433,8 @@ func (b *BtcWallet) ListUnspentWitness(minConfs, maxConfs int32) (
// publishing the transaction fails, an error describing the reason is returned
// (currently ErrDoubleSpend). If the transaction is already published to the
// network (either in the mempool or chain) no error will be returned.
func (b *BtcWallet) PublishTransaction(tx *wire.MsgTx) error {
if err := b.wallet.PublishTransaction(tx, ""); err != nil {
func (b *BtcWallet) PublishTransaction(tx *wire.MsgTx, label string) error {
if err := b.wallet.PublishTransaction(tx, label); err != nil {
// If we failed to publish the transaction, check whether we
// got an error of known type.

@ -224,8 +224,9 @@ type WalletController interface {
// already known transaction, ErrDoubleSpend is returned. If the
// transaction is already known (published already), no error will be
// returned. Other error returned depends on the currently active chain
// backend.
PublishTransaction(tx *wire.MsgTx) error
// backend. It takes an optional label which will save a label with the
// published transaction.
PublishTransaction(tx *wire.MsgTx, label string) error
// SubscribeTransactions returns a TransactionSubscription client which
// is capable of receiving async notifications as new transactions

@ -38,6 +38,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/labels"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/btcwallet"
@ -529,7 +530,8 @@ func testDualFundingReservationWorkflow(miner *rpctest.Harness,
}
// Let Alice publish the funding transaction.
if err := alice.PublishTransaction(fundingTx); err != nil {
err = alice.PublishTransaction(fundingTx, "")
if err != nil {
t.Fatalf("unable to publish funding tx: %v", err)
}
@ -1024,7 +1026,8 @@ func testSingleFunderReservationWorkflow(miner *rpctest.Harness,
}
// Let Alice publish the funding transaction.
if err := alice.PublishTransaction(fundingTx); err != nil {
err = alice.PublishTransaction(fundingTx, "")
if err != nil {
t.Fatalf("unable to publish funding tx: %v", err)
}
@ -1721,7 +1724,8 @@ func testPublishTransaction(r *rpctest.Harness,
tx1 := newTx(t, r, keyDesc.PubKey, alice, false)
// Publish the transaction.
if err := alice.PublishTransaction(tx1); err != nil {
err = alice.PublishTransaction(tx1, labels.External)
if err != nil {
t.Fatalf("unable to publish: %v", err)
}
@ -1733,7 +1737,8 @@ func testPublishTransaction(r *rpctest.Harness,
// Publish the exact same transaction again. This should not return an
// error, even though the transaction is already in the mempool.
if err := alice.PublishTransaction(tx1); err != nil {
err = alice.PublishTransaction(tx1, labels.External)
if err != nil {
t.Fatalf("unable to publish: %v", err)
}
@ -1752,7 +1757,8 @@ func testPublishTransaction(r *rpctest.Harness,
tx2 := newTx(t, r, keyDesc.PubKey, alice, false)
// Publish this tx.
if err := alice.PublishTransaction(tx2); err != nil {
err = alice.PublishTransaction(tx2, labels.External)
if err != nil {
t.Fatalf("unable to publish: %v", err)
}
@ -1763,7 +1769,8 @@ func testPublishTransaction(r *rpctest.Harness,
// Publish the transaction again. It is already mined, and we don't
// expect this to return an error.
if err := alice.PublishTransaction(tx2); err != nil {
err = alice.PublishTransaction(tx2, labels.External)
if err != nil {
t.Fatalf("unable to publish: %v", err)
}
@ -1779,7 +1786,8 @@ func testPublishTransaction(r *rpctest.Harness,
// transaction. Create a new tx and publish it. This is the
// output we'll try to double spend.
tx3 = newTx(t, r, keyDesc.PubKey, alice, false)
if err := alice.PublishTransaction(tx3); err != nil {
err := alice.PublishTransaction(tx3, labels.External)
if err != nil {
t.Fatalf("unable to publish: %v", err)
}
@ -1799,7 +1807,8 @@ func testPublishTransaction(r *rpctest.Harness,
}
// This should be accepted into the mempool.
if err := alice.PublishTransaction(tx4); err != nil {
err = alice.PublishTransaction(tx4, labels.External)
if err != nil {
t.Fatalf("unable to publish: %v", err)
}
@ -1833,7 +1842,7 @@ func testPublishTransaction(r *rpctest.Harness,
t.Fatal(err)
}
err = alice.PublishTransaction(tx5)
err = alice.PublishTransaction(tx5, labels.External)
if err != lnwallet.ErrDoubleSpend {
t.Fatalf("expected ErrDoubleSpend, got: %v", err)
}
@ -1861,7 +1870,7 @@ func testPublishTransaction(r *rpctest.Harness,
expErr = nil
tx3Spend = tx6
}
err = alice.PublishTransaction(tx6)
err = alice.PublishTransaction(tx6, labels.External)
if err != expErr {
t.Fatalf("expected ErrDoubleSpend, got: %v", err)
}
@ -1896,7 +1905,7 @@ func testPublishTransaction(r *rpctest.Harness,
}
// Expect rejection.
err = alice.PublishTransaction(tx7)
err = alice.PublishTransaction(tx7, labels.External)
if err != lnwallet.ErrDoubleSpend {
t.Fatalf("expected ErrDoubleSpend, got: %v", err)
}

@ -322,7 +322,7 @@ func (*mockWalletController) ListTransactionDetails(_, _ int32) ([]*lnwallet.Tra
}
func (*mockWalletController) LockOutpoint(o wire.OutPoint) {}
func (*mockWalletController) UnlockOutpoint(o wire.OutPoint) {}
func (m *mockWalletController) PublishTransaction(tx *wire.MsgTx) error {
func (m *mockWalletController) PublishTransaction(tx *wire.MsgTx, _ string) error {
m.publishedTransactions <- tx
return nil
}

@ -1187,7 +1187,9 @@ func (r *rpcServer) SendCoins(ctx context.Context,
// As our sweep transaction was created, successfully, we'll
// now attempt to publish it, cancelling the sweep pkg to
// return all outputs if it fails.
err = wallet.PublishTransaction(sweepTxPkg.SweepTx)
err = wallet.PublishTransaction(
sweepTxPkg.SweepTx, "",
)
if err != nil {
sweepTxPkg.CancelSweepAttempt()

@ -75,7 +75,7 @@ func (b *mockBackend) publishTransaction(tx *wire.MsgTx) error {
return nil
}
func (b *mockBackend) PublishTransaction(tx *wire.MsgTx) error {
func (b *mockBackend) PublishTransaction(tx *wire.MsgTx, _ string) error {
log.Tracef("Publishing tx %v", tx.TxHash())
err := b.publishTransaction(tx)
select {

@ -9,7 +9,7 @@ import (
type Wallet interface {
// PublishTransaction performs cursory validation (dust checks, etc) and
// broadcasts the passed transaction to the Bitcoin network.
PublishTransaction(tx *wire.MsgTx) error
PublishTransaction(tx *wire.MsgTx, label string) error
// ListUnspentWitness returns all unspent outputs which are version 0
// witness programs. The 'minconfirms' and 'maxconfirms' parameters

@ -355,7 +355,7 @@ func (s *UtxoSweeper) Start() error {
// Error can be ignored. Because we are starting up, there are
// no pending inputs to update based on the publish result.
err := s.cfg.Wallet.PublishTransaction(lastTx)
err := s.cfg.Wallet.PublishTransaction(lastTx, "")
if err != nil && err != lnwallet.ErrDoubleSpend {
log.Errorf("last tx publish: %v", err)
}
@ -988,7 +988,7 @@ func (s *UtxoSweeper) sweep(inputs inputSet, feeRate chainfee.SatPerKWeight,
}),
)
err = s.cfg.Wallet.PublishTransaction(tx)
err = s.cfg.Wallet.PublishTransaction(tx, "")
// In case of an unexpected error, don't try to recover.
if err != nil && err != lnwallet.ErrDoubleSpend {

@ -194,7 +194,7 @@ type NurseryConfig struct {
// PublishTransaction facilitates the process of broadcasting a signed
// transaction to the appropriate network.
PublishTransaction func(*wire.MsgTx) error
PublishTransaction func(*wire.MsgTx, string) error
// Store provides access to and modification of the persistent state
// maintained about the utxo nursery's incubating outputs.
@ -867,7 +867,7 @@ func (u *utxoNursery) sweepCribOutput(classHeight uint32, baby *babyOutput) erro
// We'll now broadcast the HTLC transaction, then wait for it to be
// confirmed before transitioning it to kindergarten.
err := u.cfg.PublishTransaction(baby.timeoutTx)
err := u.cfg.PublishTransaction(baby.timeoutTx, "")
if err != nil && err != lnwallet.ErrDoubleSpend {
utxnLog.Errorf("Unable to broadcast baby tx: "+
"%v, %v", err, spew.Sdump(baby.timeoutTx))

@ -467,7 +467,7 @@ func createNurseryTestContext(t *testing.T,
Store: storeIntercepter,
ChainIO: chainIO,
SweepInput: sweeper.sweepInput,
PublishTransaction: func(tx *wire.MsgTx) error {
PublishTransaction: func(tx *wire.MsgTx, _ string) error {
return publishFunc(tx, "nursery")
},
}

@ -72,7 +72,7 @@ type Config struct {
//
// TODO(conner): replace with lnwallet.WalletController interface to
// have stronger guarantees wrt. returned error types.
PublishTx func(*wire.MsgTx) error
PublishTx func(*wire.MsgTx, string) error
// ListenAddrs specifies the listening addresses of the tower.
ListenAddrs []net.Addr

@ -293,7 +293,7 @@ func testJusticeDescriptor(t *testing.T, blobType blob.Type) {
// over the buffered channel.
publications := make(chan *wire.MsgTx, 1)
punisher := lookout.NewBreachPunisher(&lookout.PunisherConfig{
PublishTx: func(tx *wire.MsgTx) error {
PublishTx: func(tx *wire.MsgTx, _ string) error {
publications <- tx
return nil
},

@ -8,7 +8,7 @@ import (
type PunisherConfig struct {
// PublishTx provides the ability to send a signed transaction to the
// network.
PublishTx func(*wire.MsgTx) error
PublishTx func(*wire.MsgTx, string) error
// TODO(conner) add DB tracking and spend ntfn registration to see if
// ours confirmed or not
@ -42,7 +42,7 @@ func (p *BreachPunisher) Punish(desc *JusticeDescriptor, quit <-chan struct{}) e
log.Infof("Publishing justice transaction for client=%s with txid=%s",
desc.SessionInfo.ID, justiceTxn.TxHash())
err = p.cfg.PublishTx(justiceTxn)
err = p.cfg.PublishTx(justiceTxn, "")
if err != nil {
log.Errorf("Unable to publish justice txn for client=%s"+
"with breach-txid=%s: %v",