cluster: add etcd based leader elector and factory
This commit is contained in:
parent
e62dbca11a
commit
41532ce634
112
cluster/etcd_elector.go
Normal file
112
cluster/etcd_elector.go
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
// +build kvdb_etcd
|
||||||
|
|
||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/etcd/clientv3"
|
||||||
|
"github.com/coreos/etcd/clientv3/concurrency"
|
||||||
|
"github.com/coreos/etcd/clientv3/namespace"
|
||||||
|
"github.com/coreos/etcd/pkg/transport"
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb/kvdb/etcd"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// etcdConnectionTimeout is the timeout until successful connection to
|
||||||
|
// the etcd instance.
|
||||||
|
etcdConnectionTimeout = 10 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// Enforce that etcdLeaderElector implements the LeaderElector interface.
|
||||||
|
var _ LeaderElector = (*etcdLeaderElector)(nil)
|
||||||
|
|
||||||
|
// etcdLeaderElector is an implemetation of LeaderElector using etcd as the
|
||||||
|
// election governor.
|
||||||
|
type etcdLeaderElector struct {
|
||||||
|
id string
|
||||||
|
ctx context.Context
|
||||||
|
cli *clientv3.Client
|
||||||
|
session *concurrency.Session
|
||||||
|
election *concurrency.Election
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEtcdLeaderElector constructs a new etcdLeaderElector.
|
||||||
|
func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string,
|
||||||
|
cfg *etcd.Config) (*etcdLeaderElector, error) {
|
||||||
|
|
||||||
|
clientCfg := clientv3.Config{
|
||||||
|
Context: ctx,
|
||||||
|
Endpoints: []string{cfg.Host},
|
||||||
|
DialTimeout: etcdConnectionTimeout,
|
||||||
|
Username: cfg.User,
|
||||||
|
Password: cfg.Pass,
|
||||||
|
}
|
||||||
|
|
||||||
|
if !cfg.DisableTLS {
|
||||||
|
tlsInfo := transport.TLSInfo{
|
||||||
|
CertFile: cfg.CertFile,
|
||||||
|
KeyFile: cfg.KeyFile,
|
||||||
|
InsecureSkipVerify: cfg.InsecureSkipVerify,
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsConfig, err := tlsInfo.ClientConfig()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
clientCfg.TLS = tlsConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
cli, err := clientv3.New(clientCfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Unable to connect to etcd: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the namespace.
|
||||||
|
cli.KV = namespace.NewKV(cli.KV, cfg.Namespace)
|
||||||
|
cli.Watcher = namespace.NewWatcher(cli.Watcher, cfg.Namespace)
|
||||||
|
cli.Lease = namespace.NewLease(cli.Lease, cfg.Namespace)
|
||||||
|
log.Infof("Applied namespace to leader elector: %v", cfg.Namespace)
|
||||||
|
|
||||||
|
session, err := concurrency.NewSession(cli)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("Unable to start new leader election session: %v",
|
||||||
|
err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &etcdLeaderElector{
|
||||||
|
id: id,
|
||||||
|
ctx: ctx,
|
||||||
|
cli: cli,
|
||||||
|
session: session,
|
||||||
|
election: concurrency.NewElection(
|
||||||
|
session, electionPrefix,
|
||||||
|
),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Leader returns the leader value for the current election.
|
||||||
|
func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) {
|
||||||
|
resp, err := e.election.Leader(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(resp.Kvs[0].Value), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Campaign will start a new leader election campaign. Campaign will block until
|
||||||
|
// the elector context is canceled or the the caller is elected as the leader.
|
||||||
|
func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
|
||||||
|
return e.election.Campaign(ctx, e.id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resign resigns the leader role allowing other election members to take
|
||||||
|
// the place.
|
||||||
|
func (e *etcdLeaderElector) Resign() error {
|
||||||
|
return e.election.Resign(context.Background())
|
||||||
|
}
|
46
cluster/etcd_elector_factory.go
Normal file
46
cluster/etcd_elector_factory.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
// +build kvdb_etcd
|
||||||
|
|
||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb/kvdb/etcd"
|
||||||
|
)
|
||||||
|
|
||||||
|
// makeEtcdElector will construct a new etcdLeaderElector. It expects a cancel
|
||||||
|
// context a unique (in the cluster) LND id and a *etcd.Config as arguments.
|
||||||
|
func makeEtcdElector(ctx context.Context, args ...interface{}) (LeaderElector,
|
||||||
|
error) {
|
||||||
|
|
||||||
|
if len(args) != 3 {
|
||||||
|
return nil, fmt.Errorf("invalid number of arguments to "+
|
||||||
|
"cluster.makeEtcdElector(): expected 3, got %v",
|
||||||
|
len(args))
|
||||||
|
}
|
||||||
|
|
||||||
|
id, ok := args[0].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid argument (0) to " +
|
||||||
|
"cluster.makeEtcdElector(), expected: string")
|
||||||
|
}
|
||||||
|
|
||||||
|
electionPrefix, ok := args[1].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid argument (1) to " +
|
||||||
|
"cluster.makeEtcdElector(), expected: string")
|
||||||
|
}
|
||||||
|
|
||||||
|
etcdCfg, ok := args[2].(*etcd.Config)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("invalid argument (2) to " +
|
||||||
|
"cluster.makeEtcdElector(), expected: *etcd.Config")
|
||||||
|
}
|
||||||
|
|
||||||
|
return newEtcdLeaderElector(ctx, id, electionPrefix, etcdCfg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
RegisterLeaderElectorFactory(EtcdLeaderElector, makeEtcdElector)
|
||||||
|
}
|
104
cluster/etcd_elector_test.go
Normal file
104
cluster/etcd_elector_test.go
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
// +build kvdb_etcd
|
||||||
|
|
||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"runtime/pprof"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/channeldb/kvdb/etcd"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GuardTimeout implements a test level timeout guard.
|
||||||
|
func GuardTimeout(t *testing.T, timeout time.Duration) func() {
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-time.After(timeout):
|
||||||
|
err := pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
panic("test timeout")
|
||||||
|
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
close(done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEtcdElector tests that two candidates competing for leadership works as
|
||||||
|
// expected and that elected leader can resign and allow others to take on.
|
||||||
|
func TestEtcdElector(t *testing.T) {
|
||||||
|
guard := GuardTimeout(t, 5*time.Second)
|
||||||
|
defer guard()
|
||||||
|
|
||||||
|
tmpDir, err := ioutil.TempDir("", "etcd")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unable to create temp dir: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
etcdCfg, cleanup, err := etcd.NewEmbeddedEtcdInstance(tmpDir, 0, 0)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
const (
|
||||||
|
election = "/election/"
|
||||||
|
id1 = "e1"
|
||||||
|
id2 = "e2"
|
||||||
|
)
|
||||||
|
|
||||||
|
e1, err := newEtcdLeaderElector(
|
||||||
|
ctx, id1, election, etcdCfg,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
e2, err := newEtcdLeaderElector(
|
||||||
|
ctx, id2, election, etcdCfg,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
ch := make(chan *etcdLeaderElector)
|
||||||
|
|
||||||
|
wg.Add(2)
|
||||||
|
ctxb := context.Background()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
require.NoError(t, e1.Campaign(ctxb))
|
||||||
|
ch <- e1
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
require.NoError(t, e2.Campaign(ctxb))
|
||||||
|
ch <- e2
|
||||||
|
}()
|
||||||
|
|
||||||
|
tmp := <-ch
|
||||||
|
first, err := tmp.Leader(ctxb)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, tmp.Resign())
|
||||||
|
|
||||||
|
tmp = <-ch
|
||||||
|
second, err := tmp.Leader(ctxb)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, tmp.Resign())
|
||||||
|
|
||||||
|
require.Contains(t, []string{id1, id2}, first)
|
||||||
|
require.Contains(t, []string{id1, id2}, second)
|
||||||
|
require.NotEqual(t, first, second)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
37
cluster/factory.go
Normal file
37
cluster/factory.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
package cluster
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// leaderElectorFactoryFunc is a LeaderElector factory method type.
|
||||||
|
type leaderElectorFactoryFunc func(context.Context, ...interface{}) (
|
||||||
|
LeaderElector, error)
|
||||||
|
|
||||||
|
var leaderElectorFactories map[string]leaderElectorFactoryFunc
|
||||||
|
|
||||||
|
// RegisterLeaderElectorFactory will register a new LeaderElector factory
|
||||||
|
// method corresponding to the passed id.
|
||||||
|
func RegisterLeaderElectorFactory(id string, factory leaderElectorFactoryFunc) {
|
||||||
|
if leaderElectorFactories == nil {
|
||||||
|
leaderElectorFactories = make(
|
||||||
|
map[string]leaderElectorFactoryFunc,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
leaderElectorFactories[id] = factory
|
||||||
|
}
|
||||||
|
|
||||||
|
// MakeLeaderElector will constuct a LeaderElector identified by id with the
|
||||||
|
// passed arguments.
|
||||||
|
func MakeLeaderElector(ctx context.Context, id string, args ...interface{}) (
|
||||||
|
LeaderElector, error) {
|
||||||
|
|
||||||
|
if _, ok := leaderElectorFactories[id]; !ok {
|
||||||
|
return nil, fmt.Errorf("leader elector factory for '%v' "+
|
||||||
|
"not found", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return leaderElectorFactories[id](ctx, args...)
|
||||||
|
}
|
38
lncfg/cluster.go
Normal file
38
lncfg/cluster.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package lncfg
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/lightningnetwork/lnd/cluster"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Cluster holds configuration for clustered LND.
|
||||||
|
type Cluster struct {
|
||||||
|
LeaderElector string `long:"leader-elector" choice:"etcd" description:"Leader elector to use. Valid values: \"etcd\"."`
|
||||||
|
|
||||||
|
EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector."`
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultCluster creates and returns a new default DB config.
|
||||||
|
func DefaultCluster() *Cluster {
|
||||||
|
return &Cluster{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate validates the Cluster config.
|
||||||
|
func (c *Cluster) Validate() error {
|
||||||
|
switch c.LeaderElector {
|
||||||
|
case cluster.EtcdLeaderElector:
|
||||||
|
if c.EtcdElectionPrefix == "" {
|
||||||
|
return fmt.Errorf("etcd-election-prefix must be set")
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown leader elector, valid values are: "+
|
||||||
|
"\"%v\"", cluster.EtcdLeaderElector)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile-time constraint to ensure Workers implements the Validator interface.
|
||||||
|
var _ Validator = (*Cluster)(nil)
|
Loading…
Reference in New Issue
Block a user