itest: basic failover itest when using leader election on etcd

This commit is contained in:
Andras Banki-Horvath 2021-03-04 23:15:04 +01:00
parent 5e215a7a66
commit 5d8488871c
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
7 changed files with 309 additions and 33 deletions

View File

@ -93,10 +93,10 @@ func newBackend(miner string, netParams *chaincfg.Params, extraArgs []string) (
fmt.Errorf("unable to create temp directory: %v", err) fmt.Errorf("unable to create temp directory: %v", err)
} }
zmqBlockAddr := fmt.Sprintf("tcp://127.0.0.1:%d", nextAvailablePort()) zmqBlockAddr := fmt.Sprintf("tcp://127.0.0.1:%d", NextAvailablePort())
zmqTxAddr := fmt.Sprintf("tcp://127.0.0.1:%d", nextAvailablePort()) zmqTxAddr := fmt.Sprintf("tcp://127.0.0.1:%d", NextAvailablePort())
rpcPort := nextAvailablePort() rpcPort := NextAvailablePort()
p2pPort := nextAvailablePort() p2pPort := NextAvailablePort()
cmdArgs := []string{ cmdArgs := []string{
"-datadir=" + tempBitcoindDir, "-datadir=" + tempBitcoindDir,

View File

@ -37,7 +37,7 @@ type feeEstimates struct {
// startFeeService spins up a go-routine to serve fee estimates. // startFeeService spins up a go-routine to serve fee estimates.
func startFeeService() *feeService { func startFeeService() *feeService {
port := nextAvailablePort() port := NextAvailablePort()
f := feeService{ f := feeService{
url: fmt.Sprintf("http://localhost:%v/fee-estimates.json", port), url: fmt.Sprintf("http://localhost:%v/fee-estimates.json", port),
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd" "github.com/lightningnetwork/lnd"
"github.com/lightningnetwork/lnd/channeldb/kvdb/etcd"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwallet/chainfee"
@ -61,9 +62,9 @@ type NetworkHarness struct {
Alice *HarnessNode Alice *HarnessNode
Bob *HarnessNode Bob *HarnessNode
// useEtcd is set to true if new nodes are to be created with an // embeddedEtcd is set to true if new nodes are to be created with an
// embedded etcd backend instead of just bbolt. // embedded etcd backend instead of just bbolt.
useEtcd bool embeddedEtcd bool
// Channel for transmitting stderr output from failed lightning node // Channel for transmitting stderr output from failed lightning node
// to main process. // to main process.
@ -83,7 +84,7 @@ type NetworkHarness struct {
// current repo. This will save developers from having to manually `go install` // current repo. This will save developers from having to manually `go install`
// within the repo each time before changes // within the repo each time before changes
func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string, func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string,
useEtcd bool) (*NetworkHarness, error) { embeddedEtcd bool) (*NetworkHarness, error) {
feeService := startFeeService() feeService := startFeeService()
@ -97,7 +98,7 @@ func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string,
feeService: feeService, feeService: feeService,
quit: make(chan struct{}), quit: make(chan struct{}),
lndBinary: lndBinary, lndBinary: lndBinary,
useEtcd: useEtcd, embeddedEtcd: embeddedEtcd,
} }
return &n, nil return &n, nil
} }
@ -270,11 +271,70 @@ func (n *NetworkHarness) Stop() {
n.feeService.stop() n.feeService.stop()
} }
// extraArgsEtcd returns extra args for configuring LND to use an external etcd
// database (for remote channel DB and wallet DB).
func extraArgsEtcd(etcdCfg *etcd.Config, name string, cluster bool) []string {
extraArgs := []string{
"--db.backend=etcd",
fmt.Sprintf("--db.etcd.host=%v", etcdCfg.Host),
fmt.Sprintf("--db.etcd.user=%v", etcdCfg.User),
fmt.Sprintf("--db.etcd.pass=%v", etcdCfg.Pass),
fmt.Sprintf("--db.etcd.namespace=%v", etcdCfg.Namespace),
}
if etcdCfg.InsecureSkipVerify {
extraArgs = append(extraArgs, "--db.etcd.insecure_skip_verify")
}
if cluster {
extraArgs = append(extraArgs, "--cluster.enable-leader-election")
extraArgs = append(
extraArgs, fmt.Sprintf("--cluster.id=%v", name),
)
}
return extraArgs
}
// NewNodeWithSeedEtcd starts a new node with seed that'll use an external
// etcd database as its (remote) channel and wallet DB. The passsed cluster
// flag indicates that we'd like the node to join the cluster leader election.
func (n *NetworkHarness) NewNodeWithSeedEtcd(name string, etcdCfg *etcd.Config,
password []byte, entropy []byte, statelessInit, cluster bool) (
*HarnessNode, []string, []byte, error) {
// We don't want to use the embedded etcd instance.
const embeddedEtcd = false
extraArgs := extraArgsEtcd(etcdCfg, name, cluster)
return n.newNodeWithSeed(
name, extraArgs, password, entropy, statelessInit, embeddedEtcd,
)
}
// NewNodeWithSeedEtcd starts a new node with seed that'll use an external
// etcd database as its (remote) channel and wallet DB. The passsed cluster
// flag indicates that we'd like the node to join the cluster leader election.
// If the wait flag is false then we won't wait until RPC is available (this is
// useful when the node is not expected to become the leader right away).
func (n *NetworkHarness) NewNodeEtcd(name string, etcdCfg *etcd.Config,
password []byte, cluster, wait bool) (*HarnessNode, error) {
// We don't want to use the embedded etcd instance.
const embeddedEtcd = false
extraArgs := extraArgsEtcd(etcdCfg, name, cluster)
return n.newNode(name, extraArgs, true, password, embeddedEtcd, wait)
}
// NewNode fully initializes a returns a new HarnessNode bound to the // NewNode fully initializes a returns a new HarnessNode bound to the
// current instance of the network harness. The created node is running, but // current instance of the network harness. The created node is running, but
// not yet connected to other nodes within the network. // not yet connected to other nodes within the network.
func (n *NetworkHarness) NewNode(name string, extraArgs []string) (*HarnessNode, error) { func (n *NetworkHarness) NewNode(name string, extraArgs []string) (*HarnessNode,
return n.newNode(name, extraArgs, false, nil) error) {
return n.newNode(name, extraArgs, false, nil, n.embeddedEtcd, true)
} }
// NewNodeWithSeed fully initializes a new HarnessNode after creating a fresh // NewNodeWithSeed fully initializes a new HarnessNode after creating a fresh
@ -285,7 +345,18 @@ func (n *NetworkHarness) NewNodeWithSeed(name string, extraArgs []string,
password []byte, statelessInit bool) (*HarnessNode, []string, []byte, password []byte, statelessInit bool) (*HarnessNode, []string, []byte,
error) { error) {
node, err := n.newNode(name, extraArgs, true, password) return n.newNodeWithSeed(
name, extraArgs, password, nil, statelessInit, n.embeddedEtcd,
)
}
func (n *NetworkHarness) newNodeWithSeed(name string, extraArgs []string,
password, entropy []byte, statelessInit, embeddedEtcd bool) (
*HarnessNode, []string, []byte, error) {
node, err := n.newNode(
name, extraArgs, true, password, embeddedEtcd, true,
)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -296,6 +367,7 @@ func (n *NetworkHarness) NewNodeWithSeed(name string, extraArgs []string,
// same password as the internal wallet. // same password as the internal wallet.
genSeedReq := &lnrpc.GenSeedRequest{ genSeedReq := &lnrpc.GenSeedRequest{
AezeedPassphrase: password, AezeedPassphrase: password,
SeedEntropy: entropy,
} }
ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout) ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout)
@ -345,7 +417,9 @@ func (n *NetworkHarness) RestoreNodeWithSeed(name string, extraArgs []string,
password []byte, mnemonic []string, recoveryWindow int32, password []byte, mnemonic []string, recoveryWindow int32,
chanBackups *lnrpc.ChanBackupSnapshot) (*HarnessNode, error) { chanBackups *lnrpc.ChanBackupSnapshot) (*HarnessNode, error) {
node, err := n.newNode(name, extraArgs, true, password) node, err := n.newNode(
name, extraArgs, true, password, n.embeddedEtcd, true,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -375,7 +449,8 @@ func (n *NetworkHarness) RestoreNodeWithSeed(name string, extraArgs []string,
// can be used immediately. Otherwise, the node will require an additional // can be used immediately. Otherwise, the node will require an additional
// initialization phase where the wallet is either created or restored. // initialization phase where the wallet is either created or restored.
func (n *NetworkHarness) newNode(name string, extraArgs []string, hasSeed bool, func (n *NetworkHarness) newNode(name string, extraArgs []string, hasSeed bool,
password []byte) (*HarnessNode, error) { password []byte, embeddedEtcd, wait bool) (
*HarnessNode, error) {
node, err := newNode(NodeConfig{ node, err := newNode(NodeConfig{
Name: name, Name: name,
@ -386,7 +461,7 @@ func (n *NetworkHarness) newNode(name string, extraArgs []string, hasSeed bool,
NetParams: n.netParams, NetParams: n.netParams,
ExtraArgs: extraArgs, ExtraArgs: extraArgs,
FeeURL: n.feeService.url, FeeURL: n.feeService.url,
Etcd: n.useEtcd, Etcd: embeddedEtcd,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -398,7 +473,8 @@ func (n *NetworkHarness) newNode(name string, extraArgs []string, hasSeed bool,
n.activeNodes[node.NodeID] = node n.activeNodes[node.NodeID] = node
n.mtx.Unlock() n.mtx.Unlock()
if err := node.start(n.lndBinary, n.lndErrorChan); err != nil { err = node.start(n.lndBinary, n.lndErrorChan, wait)
if err != nil {
return nil, err return nil, err
} }
@ -684,7 +760,7 @@ func (n *NetworkHarness) RestartNodeNoUnlock(node *HarnessNode,
} }
} }
return node.start(n.lndBinary, n.lndErrorChan) return node.start(n.lndBinary, n.lndErrorChan, true)
} }
// SuspendNode stops the given node and returns a callback that can be used to // SuspendNode stops the given node and returns a callback that can be used to
@ -695,7 +771,7 @@ func (n *NetworkHarness) SuspendNode(node *HarnessNode) (func() error, error) {
} }
restart := func() error { restart := func() error {
return node.start(n.lndBinary, n.lndErrorChan) return node.start(n.lndBinary, n.lndErrorChan, true)
} }
return restart, nil return restart, nil

View File

@ -0,0 +1,143 @@
// +build kvdb_etcd
package itest
import (
"context"
"io/ioutil"
"time"
"github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
"github.com/lightningnetwork/lnd/cluster"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntest"
)
func assertLeader(ht *harnessTest, observer cluster.LeaderElector,
expected string) {
leader, err := observer.Leader(context.Background())
if err != nil {
ht.Fatalf("Unable to query leader: %v", err)
}
if leader != expected {
ht.Fatalf("Leader should be '%v', got: '%v'", expected, leader)
}
}
func testEtcdFailover(net *lntest.NetworkHarness, ht *harnessTest) {
ctxb := context.Background()
tmpDir, err := ioutil.TempDir("", "etcd")
etcdCfg, cleanup, err := kvdb.StartEtcdTestBackend(
tmpDir, uint16(lntest.NextAvailablePort()),
uint16(lntest.NextAvailablePort()),
)
if err != nil {
ht.Fatalf("Failed to start etcd instance: %v", err)
}
defer cleanup()
observer, err := cluster.MakeLeaderElector(
ctxb, cluster.EtcdLeaderElector, "observer",
lncfg.DefaultEtcdElectionPrefix, etcdCfg,
)
if err != nil {
ht.Fatalf("Cannot start election observer")
}
password := []byte("the quick brown fox jumps the lazy dog")
entropy := [16]byte{1, 2, 3}
stateless := false
cluster := true
carol1, _, _, err := net.NewNodeWithSeedEtcd(
"Carol-1", etcdCfg, password, entropy[:], stateless, cluster,
)
if err != nil {
ht.Fatalf("unable to start Carol-1: %v", err)
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
info1, err := carol1.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
if err := net.ConnectNodes(ctxt, carol1, net.Alice); err != nil {
ht.Fatalf("unable to connect Carol to Alice: %v", err)
}
// Open a channel with 100k satoshis between Carol and Alice with Alice
// being the sole funder of the channel.
chanAmt := btcutil.Amount(100000)
ctxt, _ = context.WithTimeout(ctxb, channelOpenTimeout)
_ = openChannelAndAssert(
ctxt, ht, net, net.Alice, carol1,
lntest.OpenChannelParams{
Amt: chanAmt,
},
)
// At this point Carol-1 is the elected leader, while Carol-2 will wait
// to become the leader when Carol-1 stops.
carol2, err := net.NewNodeEtcd(
"Carol-2", etcdCfg, password, cluster, false,
)
if err != nil {
ht.Fatalf("Unable to start Carol-2: %v", err)
}
assertLeader(ht, observer, "Carol-1")
amt := btcutil.Amount(1000)
payReqs, _, _, err := createPayReqs(carol1, amt, 2)
if err != nil {
ht.Fatalf("Carol-2 is unable to create payment requests: %v",
err)
}
sendAndAssertSuccess(ctxb, ht, net.Alice, &routerrpc.SendPaymentRequest{
PaymentRequest: payReqs[0],
TimeoutSeconds: 60,
FeeLimitSat: noFeeLimitMsat,
})
// Shut down Carol-1 and wait for Carol-2 to become the leader.
shutdownAndAssert(net, ht, carol1)
err = carol2.WaitUntilLeader(30 * time.Second)
if err != nil {
ht.Fatalf("Waiting for Carol-2 to become the leader failed: %v",
err)
}
assertLeader(ht, observer, "Carol-2")
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
err = carol2.Unlock(ctxt, &lnrpc.UnlockWalletRequest{
WalletPassword: password,
})
if err != nil {
ht.Fatalf("Unlocking Carol-2 was not successful: %v", err)
}
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
// Make sure Carol-1 and Carol-2 have the same identity.
info2, err := carol2.GetInfo(ctxt, &lnrpc.GetInfoRequest{})
if info1.IdentityPubkey != info2.IdentityPubkey {
ht.Fatalf("Carol-1 and Carol-2 must have the same identity: "+
"%v vs %v", info1.IdentityPubkey, info2.IdentityPubkey)
}
// Now let Alice pay the second invoice but this time we expect Carol-2
// to receive the payment.
sendAndAssertSuccess(ctxb, ht, net.Alice, &routerrpc.SendPaymentRequest{
PaymentRequest: payReqs[1],
TimeoutSeconds: 60,
FeeLimitSat: noFeeLimitMsat,
})
shutdownAndAssert(net, ht, carol2)
}

View File

@ -0,0 +1,11 @@
// +build !kvdb_etcd
package itest
import (
"github.com/lightningnetwork/lnd/lntest"
)
// testEtcdFailover is an empty itest when LND is not compiled with etcd
// support.
func testEtcdFailover(net *lntest.NetworkHarness, ht *harnessTest) {}

View File

@ -315,4 +315,8 @@ var allTestCases = []*testCase{
name: "wallet import pubkey", name: "wallet import pubkey",
test: testWalletImportPubKey, test: testWalletImportPubKey,
}, },
{
name: "etcd_failover",
test: testEtcdFailover,
},
} }

View File

@ -91,10 +91,10 @@ var (
) )
) )
// nextAvailablePort returns the first port that is available for listening by // NextAvailablePort returns the first port that is available for listening by
// a new node. It panics if no port is found and the maximum available TCP port // a new node. It panics if no port is found and the maximum available TCP port
// is reached. // is reached.
func nextAvailablePort() int { func NextAvailablePort() int {
port := atomic.AddUint32(&lastPort, 1) port := atomic.AddUint32(&lastPort, 1)
for port < 65535 { for port < 65535 {
// If there are no errors while attempting to listen on this // If there are no errors while attempting to listen on this
@ -148,18 +148,18 @@ func GetBtcdBinary() string {
// addresses with unique ports and should be used to overwrite rpctest's default // addresses with unique ports and should be used to overwrite rpctest's default
// generator which is prone to use colliding ports. // generator which is prone to use colliding ports.
func GenerateBtcdListenerAddresses() (string, string) { func GenerateBtcdListenerAddresses() (string, string) {
return fmt.Sprintf(listenerFormat, nextAvailablePort()), return fmt.Sprintf(listenerFormat, NextAvailablePort()),
fmt.Sprintf(listenerFormat, nextAvailablePort()) fmt.Sprintf(listenerFormat, NextAvailablePort())
} }
// generateListeningPorts returns four ints representing ports to listen on // generateListeningPorts returns four ints representing ports to listen on
// designated for the current lightning network test. This returns the next // designated for the current lightning network test. This returns the next
// available ports for the p2p, rpc, rest and profiling services. // available ports for the p2p, rpc, rest and profiling services.
func generateListeningPorts() (int, int, int, int) { func generateListeningPorts() (int, int, int, int) {
p2p := nextAvailablePort() p2p := NextAvailablePort()
rpc := nextAvailablePort() rpc := NextAvailablePort()
rest := nextAvailablePort() rest := NextAvailablePort()
profile := nextAvailablePort() profile := NextAvailablePort()
return p2p, rpc, rest, profile return p2p, rpc, rest, profile
} }
@ -303,16 +303,15 @@ func (cfg NodeConfig) genArgs() []string {
args = append( args = append(
args, fmt.Sprintf( args, fmt.Sprintf(
"--db.etcd.embedded_client_port=%v", "--db.etcd.embedded_client_port=%v",
nextAvailablePort(), NextAvailablePort(),
), ),
) )
args = append( args = append(
args, fmt.Sprintf( args, fmt.Sprintf(
"--db.etcd.embedded_peer_port=%v", "--db.etcd.embedded_peer_port=%v",
nextAvailablePort(), NextAvailablePort(),
), ),
) )
args = append(args, "--db.etcd.embedded")
} }
if cfg.FeeURL != "" { if cfg.FeeURL != "" {
@ -535,7 +534,9 @@ func (hn *HarnessNode) InvoiceMacPath() string {
// //
// This may not clean up properly if an error is returned, so the caller should // This may not clean up properly if an error is returned, so the caller should
// call shutdown() regardless of the return value. // call shutdown() regardless of the return value.
func (hn *HarnessNode) start(lndBinary string, lndError chan<- error) error { func (hn *HarnessNode) start(lndBinary string, lndError chan<- error,
wait bool) error {
hn.quit = make(chan struct{}) hn.quit = make(chan struct{})
args := hn.Cfg.genArgs() args := hn.Cfg.genArgs()
@ -643,6 +644,12 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error) error {
return err return err
} }
// We may want to skip waiting for the node to come up (eg. the node
// is waiting to become the leader).
if !wait {
return nil
}
// Since Stop uses the LightningClient to stop the node, if we fail to get a // Since Stop uses the LightningClient to stop the node, if we fail to get a
// connected client, we have to kill the process. // connected client, we have to kill the process.
useMacaroons := !hn.Cfg.HasSeed useMacaroons := !hn.Cfg.HasSeed
@ -711,10 +718,45 @@ func (hn *HarnessNode) waitUntilStarted(conn grpc.ClientConnInterface,
return err return err
} }
// WaitUntilLeader attempts to finish the start procedure by initiating an RPC
// connection and setting up the wallet unlocker client. This is needed when
// a node that has recently been started was waiting to become the leader and
// we're at the point when we expect that it is the leader now (awaiting unlock).
func (hn *HarnessNode) WaitUntilLeader(timeout time.Duration) error {
var (
conn *grpc.ClientConn
connErr error
)
startTs := time.Now()
if err := wait.NoError(func() error {
conn, connErr = hn.ConnectRPC(!hn.Cfg.HasSeed)
return connErr
}, timeout); err != nil {
return err
}
timeout -= time.Since(startTs)
if err := hn.waitUntilStarted(conn, timeout); err != nil {
return err
}
// If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to
// unlock the daemon.
if hn.Cfg.HasSeed {
hn.WalletUnlockerClient = lnrpc.NewWalletUnlockerClient(conn)
return nil
}
return hn.initLightningClient(conn)
}
// initClientWhenReady waits until the main gRPC server is detected as active, // initClientWhenReady waits until the main gRPC server is detected as active,
// then complete the normal HarnessNode gRPC connection creation. This can be // then complete the normal HarnessNode gRPC connection creation. This can be
// used it a node has just been unlocked, or has its wallet state initialized. // used it a node has just been unlocked, or has its wallet state initialized.
func (hn *HarnessNode) initClientWhenReady() error { func (hn *HarnessNode) initClientWhenReady(timeout time.Duration) error {
var ( var (
conn *grpc.ClientConn conn *grpc.ClientConn
connErr error connErr error
@ -722,7 +764,7 @@ func (hn *HarnessNode) initClientWhenReady() error {
if err := wait.NoError(func() error { if err := wait.NoError(func() error {
conn, connErr = hn.ConnectRPC(true) conn, connErr = hn.ConnectRPC(true)
return connErr return connErr
}, DefaultTimeout); err != nil { }, timeout); err != nil {
return err return err
} }
@ -831,7 +873,7 @@ func (hn *HarnessNode) Unlock(ctx context.Context,
// Now that the wallet has been unlocked, we'll wait for the RPC client // Now that the wallet has been unlocked, we'll wait for the RPC client
// to be ready, then establish the normal gRPC connection. // to be ready, then establish the normal gRPC connection.
return hn.initClientWhenReady() return hn.initClientWhenReady(DefaultTimeout)
} }
// initLightningClient constructs the grpc LightningClient from the given client // initLightningClient constructs the grpc LightningClient from the given client