lnd.xprv/cluster/etcd_elector.go
2021-05-04 17:33:09 +02:00

113 lines
2.8 KiB
Go

// +build kvdb_etcd
package cluster
import (
"context"
"time"
"github.com/lightningnetwork/lnd/channeldb/kvdb/etcd"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/pkg/transport"
)
const (
// etcdConnectionTimeout is the timeout until successful connection to
// the etcd instance.
etcdConnectionTimeout = 10 * time.Second
)
// Enforce that etcdLeaderElector implements the LeaderElector interface.
var _ LeaderElector = (*etcdLeaderElector)(nil)
// etcdLeaderElector is an implemetation of LeaderElector using etcd as the
// election governor.
type etcdLeaderElector struct {
id string
ctx context.Context
cli *clientv3.Client
session *concurrency.Session
election *concurrency.Election
}
// newEtcdLeaderElector constructs a new etcdLeaderElector.
func newEtcdLeaderElector(ctx context.Context, id, electionPrefix string,
cfg *etcd.Config) (*etcdLeaderElector, error) {
clientCfg := clientv3.Config{
Context: ctx,
Endpoints: []string{cfg.Host},
DialTimeout: etcdConnectionTimeout,
Username: cfg.User,
Password: cfg.Pass,
}
if !cfg.DisableTLS {
tlsInfo := transport.TLSInfo{
CertFile: cfg.CertFile,
KeyFile: cfg.KeyFile,
InsecureSkipVerify: cfg.InsecureSkipVerify,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
clientCfg.TLS = tlsConfig
}
cli, err := clientv3.New(clientCfg)
if err != nil {
log.Errorf("Unable to connect to etcd: %v", err)
return nil, err
}
// Apply the namespace.
cli.KV = namespace.NewKV(cli.KV, cfg.Namespace)
cli.Watcher = namespace.NewWatcher(cli.Watcher, cfg.Namespace)
cli.Lease = namespace.NewLease(cli.Lease, cfg.Namespace)
log.Infof("Applied namespace to leader elector: %v", cfg.Namespace)
session, err := concurrency.NewSession(cli)
if err != nil {
log.Errorf("Unable to start new leader election session: %v",
err)
return nil, err
}
return &etcdLeaderElector{
id: id,
ctx: ctx,
cli: cli,
session: session,
election: concurrency.NewElection(
session, electionPrefix,
),
}, nil
}
// Leader returns the leader value for the current election.
func (e *etcdLeaderElector) Leader(ctx context.Context) (string, error) {
resp, err := e.election.Leader(ctx)
if err != nil {
return "", err
}
return string(resp.Kvs[0].Value), nil
}
// Campaign will start a new leader election campaign. Campaign will block until
// the elector context is canceled or the the caller is elected as the leader.
func (e *etcdLeaderElector) Campaign(ctx context.Context) error {
return e.election.Campaign(ctx, e.id)
}
// Resign resigns the leader role allowing other election members to take
// the place.
func (e *etcdLeaderElector) Resign() error {
return e.election.Resign(context.Background())
}