From 5d8488871c8e9d49409649ac9696beeeca63ecbb Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Thu, 4 Mar 2021 23:15:04 +0100 Subject: [PATCH] itest: basic failover itest when using leader election on etcd --- lntest/bitcoind_common.go | 8 +- lntest/fee_service.go | 2 +- lntest/harness.go | 102 +++++++++++-- lntest/itest/lnd_etcd_failover_test.go | 143 ++++++++++++++++++ .../itest/lnd_no_etcd_dummy_failover_test.go | 11 ++ lntest/itest/lnd_test_list_on_test.go | 4 + lntest/node.go | 72 +++++++-- 7 files changed, 309 insertions(+), 33 deletions(-) create mode 100644 lntest/itest/lnd_etcd_failover_test.go create mode 100644 lntest/itest/lnd_no_etcd_dummy_failover_test.go diff --git a/lntest/bitcoind_common.go b/lntest/bitcoind_common.go index f673400a..586c927d 100644 --- a/lntest/bitcoind_common.go +++ b/lntest/bitcoind_common.go @@ -93,10 +93,10 @@ func newBackend(miner string, netParams *chaincfg.Params, extraArgs []string) ( fmt.Errorf("unable to create temp directory: %v", err) } - zmqBlockAddr := fmt.Sprintf("tcp://127.0.0.1:%d", nextAvailablePort()) - zmqTxAddr := fmt.Sprintf("tcp://127.0.0.1:%d", nextAvailablePort()) - rpcPort := nextAvailablePort() - p2pPort := nextAvailablePort() + zmqBlockAddr := fmt.Sprintf("tcp://127.0.0.1:%d", NextAvailablePort()) + zmqTxAddr := fmt.Sprintf("tcp://127.0.0.1:%d", NextAvailablePort()) + rpcPort := NextAvailablePort() + p2pPort := NextAvailablePort() cmdArgs := []string{ "-datadir=" + tempBitcoindDir, diff --git a/lntest/fee_service.go b/lntest/fee_service.go index d71dae7d..4f61649c 100644 --- a/lntest/fee_service.go +++ b/lntest/fee_service.go @@ -37,7 +37,7 @@ type feeEstimates struct { // startFeeService spins up a go-routine to serve fee estimates. func startFeeService() *feeService { - port := nextAvailablePort() + port := NextAvailablePort() f := feeService{ url: fmt.Sprintf("http://localhost:%v/fee-estimates.json", port), } diff --git a/lntest/harness.go b/lntest/harness.go index 3cfde484..af747891 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -21,6 +21,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd" + "github.com/lightningnetwork/lnd/channeldb/kvdb/etcd" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lnwallet/chainfee" @@ -61,9 +62,9 @@ type NetworkHarness struct { Alice *HarnessNode Bob *HarnessNode - // useEtcd is set to true if new nodes are to be created with an + // embeddedEtcd is set to true if new nodes are to be created with an // embedded etcd backend instead of just bbolt. - useEtcd bool + embeddedEtcd bool // Channel for transmitting stderr output from failed lightning node // to main process. @@ -83,7 +84,7 @@ type NetworkHarness struct { // current repo. This will save developers from having to manually `go install` // within the repo each time before changes func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string, - useEtcd bool) (*NetworkHarness, error) { + embeddedEtcd bool) (*NetworkHarness, error) { feeService := startFeeService() @@ -97,7 +98,7 @@ func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string, feeService: feeService, quit: make(chan struct{}), lndBinary: lndBinary, - useEtcd: useEtcd, + embeddedEtcd: embeddedEtcd, } return &n, nil } @@ -270,11 +271,70 @@ func (n *NetworkHarness) Stop() { n.feeService.stop() } +// extraArgsEtcd returns extra args for configuring LND to use an external etcd +// database (for remote channel DB and wallet DB). +func extraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool) []string { + extraArgs := []string{ + "--db.backend=etcd", + fmt.Sprintf("--db.etcd.host=%v", etcdCfg.Host), + fmt.Sprintf("--db.etcd.user=%v", etcdCfg.User), + fmt.Sprintf("--db.etcd.pass=%v", etcdCfg.Pass), + fmt.Sprintf("--db.etcd.namespace=%v", etcdCfg.Namespace), + } + + if etcdCfg.InsecureSkipVerify { + extraArgs = append(extraArgs, "--db.etcd.insecure_skip_verify") + + } + + if cluster { + extraArgs = append(extraArgs, "--cluster.enable-leader-election") + extraArgs = append( + extraArgs, fmt.Sprintf("--cluster.id=%v", name), + ) + } + + return extraArgs +} + +// NewNodeWithSeedEtcd starts a new node with seed that'll use an external +// etcd database as its (remote) channel and wallet DB. The passsed cluster +// flag indicates that we'd like the node to join the cluster leader election. +func (n *NetworkHarness) NewNodeWithSeedEtcd(name string, etcdCfg *etcd.Config, + password []byte, entropy []byte, statelessInit, cluster bool) ( + *HarnessNode, []string, []byte, error) { + + // We don't want to use the embedded etcd instance. + const embeddedEtcd = false + + extraArgs := extraArgsEtcd(etcdCfg, name, cluster) + return n.newNodeWithSeed( + name, extraArgs, password, entropy, statelessInit, embeddedEtcd, + ) +} + +// NewNodeWithSeedEtcd starts a new node with seed that'll use an external +// etcd database as its (remote) channel and wallet DB. The passsed cluster +// flag indicates that we'd like the node to join the cluster leader election. +// If the wait flag is false then we won't wait until RPC is available (this is +// useful when the node is not expected to become the leader right away). +func (n *NetworkHarness) NewNodeEtcd(name string, etcdCfg *etcd.Config, + password []byte, cluster, wait bool) (*HarnessNode, error) { + + // We don't want to use the embedded etcd instance. + const embeddedEtcd = false + + extraArgs := extraArgsEtcd(etcdCfg, name, cluster) + return n.newNode(name, extraArgs, true, password, embeddedEtcd, wait) +} + // NewNode fully initializes a returns a new HarnessNode bound to the // current instance of the network harness. The created node is running, but // not yet connected to other nodes within the network. -func (n *NetworkHarness) NewNode(name string, extraArgs []string) (*HarnessNode, error) { - return n.newNode(name, extraArgs, false, nil) +func (n *NetworkHarness) NewNode(name string, extraArgs []string) (*HarnessNode, + error) { + + return n.newNode(name, extraArgs, false, nil, n.embeddedEtcd, true) } // NewNodeWithSeed fully initializes a new HarnessNode after creating a fresh @@ -285,7 +345,18 @@ func (n *NetworkHarness) NewNodeWithSeed(name string, extraArgs []string, password []byte, statelessInit bool) (*HarnessNode, []string, []byte, error) { - node, err := n.newNode(name, extraArgs, true, password) + return n.newNodeWithSeed( + name, extraArgs, password, nil, statelessInit, n.embeddedEtcd, + ) +} + +func (n *NetworkHarness) newNodeWithSeed(name string, extraArgs []string, + password, entropy []byte, statelessInit, embeddedEtcd bool) ( + *HarnessNode, []string, []byte, error) { + + node, err := n.newNode( + name, extraArgs, true, password, embeddedEtcd, true, + ) if err != nil { return nil, nil, nil, err } @@ -296,6 +367,7 @@ func (n *NetworkHarness) NewNodeWithSeed(name string, extraArgs []string, // same password as the internal wallet. genSeedReq := &lnrpc.GenSeedRequest{ AezeedPassphrase: password, + SeedEntropy: entropy, } ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout) @@ -345,7 +417,9 @@ func (n *NetworkHarness) RestoreNodeWithSeed(name string, extraArgs []string, password []byte, mnemonic []string, recoveryWindow int32, chanBackups *lnrpc.ChanBackupSnapshot) (*HarnessNode, error) { - node, err := n.newNode(name, extraArgs, true, password) + node, err := n.newNode( + name, extraArgs, true, password, n.embeddedEtcd, true, + ) if err != nil { return nil, err } @@ -375,7 +449,8 @@ func (n *NetworkHarness) RestoreNodeWithSeed(name string, extraArgs []string, // can be used immediately. Otherwise, the node will require an additional // initialization phase where the wallet is either created or restored. func (n *NetworkHarness) newNode(name string, extraArgs []string, hasSeed bool, - password []byte) (*HarnessNode, error) { + password []byte, embeddedEtcd, wait bool) ( + *HarnessNode, error) { node, err := newNode(NodeConfig{ Name: name, @@ -386,7 +461,7 @@ func (n *NetworkHarness) newNode(name string, extraArgs []string, hasSeed bool, NetParams: n.netParams, ExtraArgs: extraArgs, FeeURL: n.feeService.url, - Etcd: n.useEtcd, + Etcd: embeddedEtcd, }) if err != nil { return nil, err @@ -398,7 +473,8 @@ func (n *NetworkHarness) newNode(name string, extraArgs []string, hasSeed bool, n.activeNodes[node.NodeID] = node n.mtx.Unlock() - if err := node.start(n.lndBinary, n.lndErrorChan); err != nil { + err = node.start(n.lndBinary, n.lndErrorChan, wait) + if err != nil { return nil, err } @@ -684,7 +760,7 @@ func (n *NetworkHarness) RestartNodeNoUnlock(node *HarnessNode, } } - return node.start(n.lndBinary, n.lndErrorChan) + return node.start(n.lndBinary, n.lndErrorChan, true) } // SuspendNode stops the given node and returns a callback that can be used to @@ -695,7 +771,7 @@ func (n *NetworkHarness) SuspendNode(node *HarnessNode) (func() error, error) { } restart := func() error { - return node.start(n.lndBinary, n.lndErrorChan) + return node.start(n.lndBinary, n.lndErrorChan, true) } return restart, nil diff --git a/lntest/itest/lnd_etcd_failover_test.go b/lntest/itest/lnd_etcd_failover_test.go new file mode 100644 index 00000000..e76502e7 --- /dev/null +++ b/lntest/itest/lnd_etcd_failover_test.go @@ -0,0 +1,143 @@ +// +build kvdb_etcd + +package itest + +import ( + "context" + "io/ioutil" + "time" + + "github.com/btcsuite/btcutil" + "github.com/lightningnetwork/lnd/channeldb/kvdb" + "github.com/lightningnetwork/lnd/cluster" + "github.com/lightningnetwork/lnd/lncfg" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/routerrpc" + "github.com/lightningnetwork/lnd/lntest" +) + +func assertLeader(ht *harnessTest, observer cluster.LeaderElector, + expected string) { + + leader, err := observer.Leader(context.Background()) + if err != nil { + ht.Fatalf("Unable to query leader: %v", err) + } + + if leader != expected { + ht.Fatalf("Leader should be '%v', got: '%v'", expected, leader) + } +} + +func testEtcdFailover(net *lntest.NetworkHarness, ht *harnessTest) { + ctxb := context.Background() + + tmpDir, err := ioutil.TempDir("", "etcd") + etcdCfg, cleanup, err := kvdb.StartEtcdTestBackend( + tmpDir, uint16(lntest.NextAvailablePort()), + uint16(lntest.NextAvailablePort()), + ) + if err != nil { + ht.Fatalf("Failed to start etcd instance: %v", err) + } + defer cleanup() + + observer, err := cluster.MakeLeaderElector( + ctxb, cluster.EtcdLeaderElector, "observer", + lncfg.DefaultEtcdElectionPrefix, etcdCfg, + ) + if err != nil { + ht.Fatalf("Cannot start election observer") + } + + password := []byte("the quick brown fox jumps the lazy dog") + entropy := [16]byte{1, 2, 3} + stateless := false + cluster := true + + carol1, _, _, err := net.NewNodeWithSeedEtcd( + "Carol-1", etcdCfg, password, entropy[:], stateless, cluster, + ) + if err != nil { + ht.Fatalf("unable to start Carol-1: %v", err) + } + + ctxt, _ := context.WithTimeout(ctxb, defaultTimeout) + info1, err := carol1.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) + + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + if err := net.ConnectNodes(ctxt, carol1, net.Alice); err != nil { + ht.Fatalf("unable to connect Carol to Alice: %v", err) + } + + // Open a channel with 100k satoshis between Carol and Alice with Alice + // being the sole funder of the channel. + chanAmt := btcutil.Amount(100000) + ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout) + _ = openChannelAndAssert( + ctxt, ht, net, net.Alice, carol1, + lntest.OpenChannelParams{ + Amt: chanAmt, + }, + ) + + // At this point Carol-1 is the elected leader, while Carol-2 will wait + // to become the leader when Carol-1 stops. + carol2, err := net.NewNodeEtcd( + "Carol-2", etcdCfg, password, cluster, false, + ) + if err != nil { + ht.Fatalf("Unable to start Carol-2: %v", err) + } + + assertLeader(ht, observer, "Carol-1") + + amt := btcutil.Amount(1000) + payReqs, _, _, err := createPayReqs(carol1, amt, 2) + if err != nil { + ht.Fatalf("Carol-2 is unable to create payment requests: %v", + err) + } + sendAndAssertSuccess(ctxb, ht, net.Alice, &routerrpc.SendPaymentRequest{ + PaymentRequest: payReqs[0], + TimeoutSeconds: 60, + FeeLimitSat: noFeeLimitMsat, + }) + + // Shut down Carol-1 and wait for Carol-2 to become the leader. + shutdownAndAssert(net, ht, carol1) + err = carol2.WaitUntilLeader(30 * time.Second) + if err != nil { + ht.Fatalf("Waiting for Carol-2 to become the leader failed: %v", + err) + } + + assertLeader(ht, observer, "Carol-2") + + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + err = carol2.Unlock(ctxt, &lnrpc.UnlockWalletRequest{ + WalletPassword: password, + }) + if err != nil { + ht.Fatalf("Unlocking Carol-2 was not successful: %v", err) + } + + ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) + + // Make sure Carol-1 and Carol-2 have the same identity. + info2, err := carol2.GetInfo(ctxt, &lnrpc.GetInfoRequest{}) + if info1.IdentityPubkey != info2.IdentityPubkey { + ht.Fatalf("Carol-1 and Carol-2 must have the same identity: "+ + "%v vs %v", info1.IdentityPubkey, info2.IdentityPubkey) + } + + // Now let Alice pay the second invoice but this time we expect Carol-2 + // to receive the payment. + sendAndAssertSuccess(ctxb, ht, net.Alice, &routerrpc.SendPaymentRequest{ + PaymentRequest: payReqs[1], + TimeoutSeconds: 60, + FeeLimitSat: noFeeLimitMsat, + }) + + shutdownAndAssert(net, ht, carol2) +} diff --git a/lntest/itest/lnd_no_etcd_dummy_failover_test.go b/lntest/itest/lnd_no_etcd_dummy_failover_test.go new file mode 100644 index 00000000..25eb3214 --- /dev/null +++ b/lntest/itest/lnd_no_etcd_dummy_failover_test.go @@ -0,0 +1,11 @@ +// +build !kvdb_etcd + +package itest + +import ( + "github.com/lightningnetwork/lnd/lntest" +) + +// testEtcdFailover is an empty itest when LND is not compiled with etcd +// support. +func testEtcdFailover(net *lntest.NetworkHarness, ht *harnessTest) {} diff --git a/lntest/itest/lnd_test_list_on_test.go b/lntest/itest/lnd_test_list_on_test.go index 736f832d..15a660f0 100644 --- a/lntest/itest/lnd_test_list_on_test.go +++ b/lntest/itest/lnd_test_list_on_test.go @@ -315,4 +315,8 @@ var allTestCases = []*testCase{ name: "wallet import pubkey", test: testWalletImportPubKey, }, + { + name: "etcd_failover", + test: testEtcdFailover, + }, } diff --git a/lntest/node.go b/lntest/node.go index 1a155466..988b4249 100644 --- a/lntest/node.go +++ b/lntest/node.go @@ -91,10 +91,10 @@ var ( ) ) -// nextAvailablePort returns the first port that is available for listening by +// NextAvailablePort returns the first port that is available for listening by // a new node. It panics if no port is found and the maximum available TCP port // is reached. -func nextAvailablePort() int { +func NextAvailablePort() int { port := atomic.AddUint32(&lastPort, 1) for port < 65535 { // If there are no errors while attempting to listen on this @@ -148,18 +148,18 @@ func GetBtcdBinary() string { // addresses with unique ports and should be used to overwrite rpctest's default // generator which is prone to use colliding ports. func GenerateBtcdListenerAddresses() (string, string) { - return fmt.Sprintf(listenerFormat, nextAvailablePort()), - fmt.Sprintf(listenerFormat, nextAvailablePort()) + return fmt.Sprintf(listenerFormat, NextAvailablePort()), + fmt.Sprintf(listenerFormat, NextAvailablePort()) } // generateListeningPorts returns four ints representing ports to listen on // designated for the current lightning network test. This returns the next // available ports for the p2p, rpc, rest and profiling services. func generateListeningPorts() (int, int, int, int) { - p2p := nextAvailablePort() - rpc := nextAvailablePort() - rest := nextAvailablePort() - profile := nextAvailablePort() + p2p := NextAvailablePort() + rpc := NextAvailablePort() + rest := NextAvailablePort() + profile := NextAvailablePort() return p2p, rpc, rest, profile } @@ -303,16 +303,15 @@ func (cfg NodeConfig) genArgs() []string { args = append( args, fmt.Sprintf( "--db.etcd.embedded_client_port=%v", - nextAvailablePort(), + NextAvailablePort(), ), ) args = append( args, fmt.Sprintf( "--db.etcd.embedded_peer_port=%v", - nextAvailablePort(), + NextAvailablePort(), ), ) - args = append(args, "--db.etcd.embedded") } if cfg.FeeURL != "" { @@ -535,7 +534,9 @@ func (hn *HarnessNode) InvoiceMacPath() string { // // This may not clean up properly if an error is returned, so the caller should // call shutdown() regardless of the return value. -func (hn *HarnessNode) start(lndBinary string, lndError chan<- error) error { +func (hn *HarnessNode) start(lndBinary string, lndError chan<- error, + wait bool) error { + hn.quit = make(chan struct{}) args := hn.Cfg.genArgs() @@ -643,6 +644,12 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error) error { return err } + // We may want to skip waiting for the node to come up (eg. the node + // is waiting to become the leader). + if !wait { + return nil + } + // Since Stop uses the LightningClient to stop the node, if we fail to get a // connected client, we have to kill the process. useMacaroons := !hn.Cfg.HasSeed @@ -711,10 +718,45 @@ func (hn *HarnessNode) waitUntilStarted(conn grpc.ClientConnInterface, return err } +// WaitUntilLeader attempts to finish the start procedure by initiating an RPC +// connection and setting up the wallet unlocker client. This is needed when +// a node that has recently been started was waiting to become the leader and +// we're at the point when we expect that it is the leader now (awaiting unlock). +func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error { + var ( + conn *grpc.ClientConn + connErr error + ) + + startTs := time.Now() + if err := wait.NoError(func() error { + conn, connErr = hn.ConnectRPC(!hn.Cfg.HasSeed) + return connErr + }, timeout); err != nil { + return err + } + timeout -= time.Since(startTs) + + if err := hn.waitUntilStarted(conn, timeout); err != nil { + return err + } + + // If the node was created with a seed, we will need to perform an + // additional step to unlock the wallet. The connection returned will + // only use the TLS certs, and can only perform operations necessary to + // unlock the daemon. + if hn.Cfg.HasSeed { + hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn) + return nil + } + + return hn.initLightningClient(conn) +} + // initClientWhenReady waits until the main gRPC server is detected as active, // then complete the normal HarnessNode gRPC connection creation. This can be // used it a node has just been unlocked, or has its wallet state initialized. -func (hn *HarnessNode) initClientWhenReady() error { +func (hn *HarnessNode) initClientWhenReady(timeout time.Duration) error { var ( conn *grpc.ClientConn connErr error @@ -722,7 +764,7 @@ func (hn *HarnessNode) initClientWhenReady() error { if err := wait.NoError(func() error { conn, connErr = hn.ConnectRPC(true) return connErr - }, DefaultTimeout); err != nil { + }, timeout); err != nil { return err } @@ -831,7 +873,7 @@ func (hn *HarnessNode) Unlock(ctx context.Context, // Now that the wallet has been unlocked, we'll wait for the RPC client // to be ready, then establish the normal gRPC connection. - return hn.initClientWhenReady() + return hn.initClientWhenReady(DefaultTimeout) } // initLightningClient constructs the grpc LightningClient from the given client