lnd: integrate leader elector to lnd main

This commit also changes the order of DB init to be run after the RPC
server is up. This will allow us to later add an RPC endpoint to be used
to query leadership status.
This commit is contained in:
Andras Banki-Horvath 2021-02-08 21:56:27 +01:00
parent 41532ce634
commit d161b34ee5
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
4 changed files with 117 additions and 15 deletions

@ -366,6 +366,8 @@ type Config struct {
DB *lncfg.DB `group:"db" namespace:"db"` DB *lncfg.DB `group:"db" namespace:"db"`
Cluster *lncfg.Cluster `group:"cluster" namespace:"cluster"`
// LogWriter is the root logger that all of the daemon's subloggers are // LogWriter is the root logger that all of the daemon's subloggers are
// hooked up to. // hooked up to.
LogWriter *build.RotatingLogWriter LogWriter *build.RotatingLogWriter
@ -532,6 +534,7 @@ func DefaultConfig() Config {
MaxCommitFeeRateAnchors: lnwallet.DefaultAnchorsCommitMaxFeeRateSatPerVByte, MaxCommitFeeRateAnchors: lnwallet.DefaultAnchorsCommitMaxFeeRateSatPerVByte,
LogWriter: build.NewRotatingLogWriter(), LogWriter: build.NewRotatingLogWriter(),
DB: lncfg.DefaultDB(), DB: lncfg.DefaultDB(),
Cluster: lncfg.DefaultCluster(),
registeredChains: chainreg.NewChainRegistry(), registeredChains: chainreg.NewChainRegistry(),
ActiveNetParams: chainreg.BitcoinTestNetParams, ActiveNetParams: chainreg.BitcoinTestNetParams,
ChannelCommitInterval: defaultChannelCommitInterval, ChannelCommitInterval: defaultChannelCommitInterval,
@ -1372,6 +1375,7 @@ func ValidateConfig(cfg Config, usageMessage string,
cfg.Caches, cfg.Caches,
cfg.WtClient, cfg.WtClient,
cfg.DB, cfg.DB,
cfg.Cluster,
cfg.HealthChecks, cfg.HealthChecks,
) )
if err != nil { if err != nil {

@ -1,37 +1,73 @@
package lncfg package lncfg
import ( import (
"context"
"fmt" "fmt"
"os"
"github.com/lightningnetwork/lnd/cluster" "github.com/lightningnetwork/lnd/cluster"
) )
const (
// DefaultEtcdElectionPrefix is used as election prefix if none is provided
// through the config.
DefaultEtcdElectionPrefix = "/leader/"
)
// Cluster holds configuration for clustered LND. // Cluster holds configuration for clustered LND.
type Cluster struct { type Cluster struct {
EnableLeaderElection bool `long:"enable-leader-election" description:"Enables leader election if set."`
LeaderElector string `long:"leader-elector" choice:"etcd" description:"Leader elector to use. Valid values: \"etcd\"."` LeaderElector string `long:"leader-elector" choice:"etcd" description:"Leader elector to use. Valid values: \"etcd\"."`
EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector."` EtcdElectionPrefix string `long:"etcd-election-prefix" description:"Election key prefix when using etcd leader elector. Defaults to \"/leader/\"."`
ID string `long:"id" description:"Identifier for this node inside the cluster (used in leader election). Defaults to the hostname."`
} }
// DefaultCluster creates and returns a new default DB config. // DefaultCluster creates and returns a new default DB config.
func DefaultCluster() *Cluster { func DefaultCluster() *Cluster {
return &Cluster{} hostname, _ := os.Hostname()
return &Cluster{
LeaderElector: cluster.EtcdLeaderElector,
EtcdElectionPrefix: DefaultEtcdElectionPrefix,
ID: hostname,
}
}
// MakeLeaderElector is a helper method to construct the concrete leader elector
// based on the current configuration.
func (c *Cluster) MakeLeaderElector(electionCtx context.Context, db *DB) (
cluster.LeaderElector, error) {
if c.LeaderElector == cluster.EtcdLeaderElector {
return cluster.MakeLeaderElector(
electionCtx, c.LeaderElector, c.ID,
c.EtcdElectionPrefix, db.Etcd,
)
}
return nil, fmt.Errorf("unsupported leader elector")
} }
// Validate validates the Cluster config. // Validate validates the Cluster config.
func (c *Cluster) Validate() error { func (c *Cluster) Validate() error {
if !c.EnableLeaderElection {
return nil
}
switch c.LeaderElector { switch c.LeaderElector {
case cluster.EtcdLeaderElector: case cluster.EtcdLeaderElector:
if c.EtcdElectionPrefix == "" { if c.EtcdElectionPrefix == "" {
return fmt.Errorf("etcd-election-prefix must be set") return fmt.Errorf("etcd-election-prefix must be set")
} }
return nil
default: default:
return fmt.Errorf("unknown leader elector, valid values are: "+ return fmt.Errorf("unknown leader elector, valid values are: "+
"\"%v\"", cluster.EtcdLeaderElector) "\"%v\"", cluster.EtcdLeaderElector)
} }
return nil
} }
// Compile-time constraint to ensure Workers implements the Validator interface. // Compile-time constraint to ensure Workers implements the Validator interface.

70
lnd.go

@ -241,17 +241,6 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
return err return err
} }
localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg)
switch {
case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("%v, exiting", err)
return nil
case err != nil:
return fmt.Errorf("unable to open databases: %v", err)
}
defer cleanUp()
// Only process macaroons if --no-macaroons isn't set. // Only process macaroons if --no-macaroons isn't set.
serverOpts, restDialOpts, restListen, cleanUp, err := getTLSConfig(cfg) serverOpts, restDialOpts, restListen, cleanUp, err := getTLSConfig(cfg)
if err != nil { if err != nil {
@ -327,6 +316,65 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
} }
} }
// Start leader election if we're running on etcd. Continuation will be
// blocked until this instance is elected as the current leader or
// shutting down.
elected := false
if cfg.Cluster.EnableLeaderElection {
electionCtx, cancelElection := context.WithCancel(ctx)
go func() {
<-interceptor.ShutdownChannel()
cancelElection()
}()
ltndLog.Infof("Using %v leader elector",
cfg.Cluster.LeaderElector)
leaderElector, err := cfg.Cluster.MakeLeaderElector(
electionCtx, cfg.DB,
)
if err != nil {
return err
}
defer func() {
if !elected {
return
}
ltndLog.Infof("Attempting to resign from leader role "+
"(%v)", cfg.Cluster.ID)
if err := leaderElector.Resign(); err != nil {
ltndLog.Errorf("Leader elector failed to "+
"resign: %v", err)
}
}()
ltndLog.Infof("Starting leadership campaign (%v)",
cfg.Cluster.ID)
if err := leaderElector.Campaign(electionCtx); err != nil {
ltndLog.Errorf("Leadership campaign failed: %v", err)
return err
}
elected = true
ltndLog.Infof("Elected as leader (%v)", cfg.Cluster.ID)
}
localChanDB, remoteChanDB, cleanUp, err := initializeDatabases(ctx, cfg)
switch {
case err == channeldb.ErrDryRunMigrationOK:
ltndLog.Infof("%v, exiting", err)
return nil
case err != nil:
return fmt.Errorf("unable to open databases: %v", err)
}
defer cleanUp()
// We'll create the WalletUnlockerService and check whether the wallet // We'll create the WalletUnlockerService and check whether the wallet
// already exists. // already exists.
pwService := createWalletUnlockerService(cfg) pwService := createWalletUnlockerService(cfg)

@ -1066,6 +1066,20 @@ litecoin.node=ltcd
; If non zero, LND will use this as peer port for the embedded etcd instance. ; If non zero, LND will use this as peer port for the embedded etcd instance.
; db.etcd.embedded_peer_port=1235 ; db.etcd.embedded_peer_port=1235
[cluster]
; Enables leader election if set.
; cluster.enable-leader-election=true
; Leader elector to use. Valid values: "etcd" (default).
; cluster.leader-elector=etcd
; Election key prefix when using etcd leader elector. Defaults to "/leader/".
; cluster.etcd-election-prefix=/leader/
; Identifier for this node inside the cluster (used in leader election).
; Defaults to the hostname.
; cluster.id=example.com
[bolt] [bolt]
; If true, prevents the database from syncing its freelist to disk. ; If true, prevents the database from syncing its freelist to disk.
; db.bolt.nofreelistsync=1 ; db.bolt.nofreelistsync=1