diff --git a/cluster/etcd_elector.go b/cluster/etcd_elector.go new file mode 100644 index 00000000..01c92975 --- /dev/null +++ b/cluster/etcd_elector.go @@ -0,0 +1,112 @@ +// +build kvdb_etcd + +package cluster + +import ( + "context" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/clientv3/namespace" + "github.com/coreos/etcd/pkg/transport" + "github.com/lightningnetwork/lnd/channeldb/kvdb/etcd" +) + +const ( + // etcdConnectionTimeout is the timeout until successful connection to + // the etcd instance. + etcdConnectionTimeout = 10 * time.Second +) + +// Enforce that etcdLeaderElector implements the LeaderElector interface. +var _ LeaderElector = (*etcdLeaderElector)(nil) + +// etcdLeaderElector is an implemetation of LeaderElector using etcd as the +// election governor. +type etcdLeaderElector struct { + id string + ctx context.Context + cli *clientv3.Client + session *concurrency.Session + election *concurrency.Election +} + +// newEtcdLeaderElector constructs a new etcdLeaderElector. +func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string, + cfg *etcd.Config) (*etcdLeaderElector, error) { + + clientCfg := clientv3.Config{ + Context: ctx, + Endpoints: []string{cfg.Host}, + DialTimeout: etcdConnectionTimeout, + Username: cfg.User, + Password: cfg.Pass, + } + + if !cfg.DisableTLS { + tlsInfo := transport.TLSInfo{ + CertFile: cfg.CertFile, + KeyFile: cfg.KeyFile, + InsecureSkipVerify: cfg.InsecureSkipVerify, + } + + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + + clientCfg.TLS = tlsConfig + } + + cli, err := clientv3.New(clientCfg) + if err != nil { + log.Errorf("Unable to connect to etcd: %v", err) + return nil, err + } + + // Apply the namespace. + cli.KV = namespace.NewKV(cli.KV, cfg.Namespace) + cli.Watcher = namespace.NewWatcher(cli.Watcher, cfg.Namespace) + cli.Lease = namespace.NewLease(cli.Lease, cfg.Namespace) + log.Infof("Applied namespace to leader elector: %v", cfg.Namespace) + + session, err := concurrency.NewSession(cli) + if err != nil { + log.Errorf("Unable to start new leader election session: %v", + err) + return nil, err + } + + return &etcdLeaderElector{ + id: id, + ctx: ctx, + cli: cli, + session: session, + election: concurrency.NewElection( + session, electionPrefix, + ), + }, nil +} + +// Leader returns the leader value for the current election. +func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) { + resp, err := e.election.Leader(ctx) + if err != nil { + return "", err + } + + return string(resp.Kvs[0].Value), nil +} + +// Campaign will start a new leader election campaign. Campaign will block until +// the elector context is canceled or the the caller is elected as the leader. +func (e *etcdLeaderElector) Campaign(ctx context.Context) error { + return e.election.Campaign(ctx, e.id) +} + +// Resign resigns the leader role allowing other election members to take +// the place. +func (e *etcdLeaderElector) Resign() error { + return e.election.Resign(context.Background()) +} diff --git a/cluster/etcd_elector_factory.go b/cluster/etcd_elector_factory.go new file mode 100644 index 00000000..b8d48030 --- /dev/null +++ b/cluster/etcd_elector_factory.go @@ -0,0 +1,46 @@ +// +build kvdb_etcd + +package cluster + +import ( + "context" + "fmt" + + "github.com/lightningnetwork/lnd/channeldb/kvdb/etcd" +) + +// makeEtcdElector will construct a new etcdLeaderElector. It expects a cancel +// context a unique (in the cluster) LND id and a *etcd.Config as arguments. +func makeEtcdElector(ctx context.Context, args ...interface{}) (LeaderElector, + error) { + + if len(args) != 3 { + return nil, fmt.Errorf("invalid number of arguments to "+ + "cluster.makeEtcdElector(): expected 3, got %v", + len(args)) + } + + id, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("invalid argument (0) to " + + "cluster.makeEtcdElector(), expected: string") + } + + electionPrefix, ok := args[1].(string) + if !ok { + return nil, fmt.Errorf("invalid argument (1) to " + + "cluster.makeEtcdElector(), expected: string") + } + + etcdCfg, ok := args[2].(*etcd.Config) + if !ok { + return nil, fmt.Errorf("invalid argument (2) to " + + "cluster.makeEtcdElector(), expected: *etcd.Config") + } + + return newEtcdLeaderElector(ctx, id, electionPrefix, etcdCfg) +} + +func init() { + RegisterLeaderElectorFactory(EtcdLeaderElector, makeEtcdElector) +} diff --git a/cluster/etcd_elector_test.go b/cluster/etcd_elector_test.go new file mode 100644 index 00000000..32688f1a --- /dev/null +++ b/cluster/etcd_elector_test.go @@ -0,0 +1,104 @@ +// +build kvdb_etcd + +package cluster + +import ( + "context" + "io/ioutil" + "os" + "runtime/pprof" + "sync" + "testing" + "time" + + "github.com/lightningnetwork/lnd/channeldb/kvdb/etcd" + "github.com/stretchr/testify/require" +) + +// GuardTimeout implements a test level timeout guard. +func GuardTimeout(t *testing.T, timeout time.Duration) func() { + done := make(chan struct{}) + go func() { + select { + case <-time.After(timeout): + err := pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + require.NoError(t, err) + panic("test timeout") + + case <-done: + } + }() + + return func() { + close(done) + } +} + +// TestEtcdElector tests that two candidates competing for leadership works as +// expected and that elected leader can resign and allow others to take on. +func TestEtcdElector(t *testing.T) { + guard := GuardTimeout(t, 5*time.Second) + defer guard() + + tmpDir, err := ioutil.TempDir("", "etcd") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + + etcdCfg, cleanup, err := etcd.NewEmbeddedEtcdInstance(tmpDir, 0, 0) + require.NoError(t, err) + defer cleanup() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const ( + election = "/election/" + id1 = "e1" + id2 = "e2" + ) + + e1, err := newEtcdLeaderElector( + ctx, id1, election, etcdCfg, + ) + require.NoError(t, err) + + e2, err := newEtcdLeaderElector( + ctx, id2, election, etcdCfg, + ) + require.NoError(t, err) + + var wg sync.WaitGroup + ch := make(chan *etcdLeaderElector) + + wg.Add(2) + ctxb := context.Background() + + go func() { + defer wg.Done() + require.NoError(t, e1.Campaign(ctxb)) + ch <- e1 + }() + + go func() { + defer wg.Done() + require.NoError(t, e2.Campaign(ctxb)) + ch <- e2 + }() + + tmp := <-ch + first, err := tmp.Leader(ctxb) + require.NoError(t, err) + require.NoError(t, tmp.Resign()) + + tmp = <-ch + second, err := tmp.Leader(ctxb) + require.NoError(t, err) + require.NoError(t, tmp.Resign()) + + require.Contains(t, []string{id1, id2}, first) + require.Contains(t, []string{id1, id2}, second) + require.NotEqual(t, first, second) + + wg.Wait() +} diff --git a/cluster/factory.go b/cluster/factory.go new file mode 100644 index 00000000..52ae360b --- /dev/null +++ b/cluster/factory.go @@ -0,0 +1,37 @@ +package cluster + +import ( + "context" + "fmt" +) + +// leaderElectorFactoryFunc is a LeaderElector factory method type. +type leaderElectorFactoryFunc func(context.Context, ...interface{}) ( + LeaderElector, error) + +var leaderElectorFactories map[string]leaderElectorFactoryFunc + +// RegisterLeaderElectorFactory will register a new LeaderElector factory +// method corresponding to the passed id. +func RegisterLeaderElectorFactory(id string, factory leaderElectorFactoryFunc) { + if leaderElectorFactories == nil { + leaderElectorFactories = make( + map[string]leaderElectorFactoryFunc, + ) + } + + leaderElectorFactories[id] = factory +} + +// MakeLeaderElector will constuct a LeaderElector identified by id with the +// passed arguments. +func MakeLeaderElector(ctx context.Context, id string, args ...interface{}) ( + LeaderElector, error) { + + if _, ok := leaderElectorFactories[id]; !ok { + return nil, fmt.Errorf("leader elector factory for '%v' "+ + "not found", id) + } + + return leaderElectorFactories[id](ctx, args...) +} diff --git a/lncfg/cluster.go b/lncfg/cluster.go new file mode 100644 index 00000000..61b92cc2 --- /dev/null +++ b/lncfg/cluster.go @@ -0,0 +1,38 @@ +package lncfg + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/cluster" +) + +// Cluster holds configuration for clustered LND. +type Cluster struct { + LeaderElector string `long:"leader-elector" choice:"etcd" description:"Leader elector to use. Valid values: \"etcd\"."` + + EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector."` +} + +// DefaultCluster creates and returns a new default DB config. +func DefaultCluster() *Cluster { + return &Cluster{} +} + +// Validate validates the Cluster config. +func (c *Cluster) Validate() error { + switch c.LeaderElector { + case cluster.EtcdLeaderElector: + if c.EtcdElectionPrefix == "" { + return fmt.Errorf("etcd-election-prefix must be set") + } + + default: + return fmt.Errorf("unknown leader elector, valid values are: "+ + "\"%v\"", cluster.EtcdLeaderElector) + } + + return nil +} + +// Compile-time constraint to ensure Workers implements the Validator interface. +var _ Validator = (*Cluster)(nil)