lnrpc: add "waiting to start" state to state service

This commit adds a new "waiting to start" state which may be used to
query if we're still waiting to become the cluster leader. Once leader
we advance the state to "wallet not exist" or "wallet locked" given
wallet availablity.
This commit is contained in:
Andras Banki-Horvath 2021-03-19 15:53:06 +01:00
parent b6a620e6b2
commit 5e215a7a66
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
8 changed files with 196 additions and 104 deletions

138
lnd.go
View File

@ -316,6 +316,68 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
}
}
// Create a new RPC interceptor that we'll add to the GRPC server. This
// will be used to log the API calls invoked on the GRPC server.
interceptorChain := rpcperms.NewInterceptorChain(
rpcsLog, cfg.NoMacaroons,
)
if err := interceptorChain.Start(); err != nil {
return err
}
defer func() {
err := interceptorChain.Stop()
if err != nil {
ltndLog.Warnf("error stopping RPC interceptor "+
"chain: %v", err)
}
}()
rpcServerOpts := interceptorChain.CreateServerOpts()
serverOpts = append(serverOpts, rpcServerOpts...)
grpcServer := grpc.NewServer(serverOpts...)
defer grpcServer.Stop()
// We'll also register the RPC interceptor chain as the StateServer, as
// it can be used to query for the current state of the wallet.
lnrpc.RegisterStateServer(grpcServer, interceptorChain)
// Register the WalletUnlockerService with the GRPC server.
pwService := createWalletUnlockerService(cfg)
lnrpc.RegisterWalletUnlockerServer(grpcServer, pwService)
// Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer.
rpcServer := newRPCServer(
cfg, interceptorChain, lisCfg.ExternalRPCSubserverCfg,
lisCfg.ExternalRestRegistrar,
interceptor,
)
err = rpcServer.RegisterWithGrpcServer(grpcServer)
if err != nil {
return err
}
// Now that both the WalletUnlocker and LightningService have been
// registered with the GRPC server, we can start listening.
err = startGrpcListen(cfg, grpcServer, grpcListeners)
if err != nil {
return err
}
// Now start the REST proxy for our gRPC server above. We'll ensure
// we direct LND to connect to its loopback address rather than a
// wildcard to prevent certificate issues when accessing the proxy
// externally.
stopProxy, err := startRestProxy(
cfg, rpcServer, restDialOpts, restListen,
)
if err != nil {
return err
}
defer stopProxy()
// Start leader election if we're running on etcd. Continuation will be
// blocked until this instance is elected as the current leader or
// shutting down.
@ -401,77 +463,17 @@ func Main(cfg *Config, lisCfg ListenerCfg, interceptor signal.Interceptor) error
)
}
// We'll create the WalletUnlockerService and check whether the wallet
// already exists.
pwService := createWalletUnlockerService(cfg,
[]btcwallet.LoaderOption{loaderOpt},
)
pwService.SetLoaderOpts([]btcwallet.LoaderOption{loaderOpt})
walletExists, err := pwService.WalletExists()
if err != nil {
return err
}
// Create a new RPC interceptor that we'll add to the GRPC server. This
// will be used to log the API calls invoked on the GRPC server.
interceptorChain := rpcperms.NewInterceptorChain(
rpcsLog, cfg.NoMacaroons, walletExists,
)
if err := interceptorChain.Start(); err != nil {
return err
if !walletExists {
interceptorChain.SetWalletNotCreated()
} else {
interceptorChain.SetWalletLocked()
}
defer func() {
err := interceptorChain.Stop()
if err != nil {
ltndLog.Warnf("error stopping RPC interceptor "+
"chain: %v", err)
}
}()
rpcServerOpts := interceptorChain.CreateServerOpts()
serverOpts = append(serverOpts, rpcServerOpts...)
grpcServer := grpc.NewServer(serverOpts...)
defer grpcServer.Stop()
// Register the WalletUnlockerService with the GRPC server.
lnrpc.RegisterWalletUnlockerServer(grpcServer, pwService)
// We'll also register the RPC interceptor chain as the StateServer, as
// it can be used to query for the current state of the wallet.
lnrpc.RegisterStateServer(grpcServer, interceptorChain)
// Initialize, and register our implementation of the gRPC interface
// exported by the rpcServer.
rpcServer := newRPCServer(
cfg, interceptorChain, lisCfg.ExternalRPCSubserverCfg,
lisCfg.ExternalRestRegistrar,
interceptor,
)
err = rpcServer.RegisterWithGrpcServer(grpcServer)
if err != nil {
return err
}
// Now that both the WalletUnlocker and LightningService have been
// registered with the GRPC server, we can start listening.
err = startGrpcListen(cfg, grpcServer, grpcListeners)
if err != nil {
return err
}
// Now start the REST proxy for our gRPC server above. We'll ensure
// we direct LND to connect to its loopback address rather than a
// wildcard to prevent certificate issues when accessing the proxy
// externally.
stopProxy, err := startRestProxy(
cfg, rpcServer, restDialOpts, restListen,
)
if err != nil {
return err
}
defer stopProxy()
// We wait until the user provides a password over RPC. In case lnd is
// started with the --noseedbackup flag, we use the default password
@ -1234,9 +1236,7 @@ type WalletUnlockParams struct {
// createWalletUnlockerService creates a WalletUnlockerService from the passed
// config.
func createWalletUnlockerService(cfg *Config,
loaderOpts []btcwallet.LoaderOption) *walletunlocker.UnlockerService {
func createWalletUnlockerService(cfg *Config) *walletunlocker.UnlockerService {
chainConfig := cfg.Bitcoin
if cfg.registeredChains.PrimaryChain() == chainreg.LitecoinChain {
chainConfig = cfg.Litecoin
@ -1252,7 +1252,7 @@ func createWalletUnlockerService(cfg *Config,
return walletunlocker.New(
chainConfig.ChainDir, cfg.ActiveNetParams.Params,
!cfg.SyncFreelist, macaroonFiles, cfg.DB.Bolt.DBTimeout,
cfg.ResetWalletTransactions, loaderOpts,
cfg.ResetWalletTransactions, nil,
)
}

View File

@ -32,25 +32,28 @@ const _ = proto.ProtoPackageIsVersion4
type WalletState int32
const (
WalletState_NON_EXISTING WalletState = 0
WalletState_LOCKED WalletState = 1
WalletState_UNLOCKED WalletState = 2
WalletState_RPC_ACTIVE WalletState = 3
WalletState_NON_EXISTING WalletState = 0
WalletState_LOCKED WalletState = 1
WalletState_UNLOCKED WalletState = 2
WalletState_RPC_ACTIVE WalletState = 3
WalletState_WAITING_TO_START WalletState = 255
)
// Enum value maps for WalletState.
var (
WalletState_name = map[int32]string{
0: "NON_EXISTING",
1: "LOCKED",
2: "UNLOCKED",
3: "RPC_ACTIVE",
0: "NON_EXISTING",
1: "LOCKED",
2: "UNLOCKED",
3: "RPC_ACTIVE",
255: "WAITING_TO_START",
}
WalletState_value = map[string]int32{
"NON_EXISTING": 0,
"LOCKED": 1,
"UNLOCKED": 2,
"RPC_ACTIVE": 3,
"NON_EXISTING": 0,
"LOCKED": 1,
"UNLOCKED": 2,
"RPC_ACTIVE": 3,
"WAITING_TO_START": 255,
}
)
@ -176,20 +179,22 @@ var file_stateservice_proto_rawDesc = []byte{
0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28,
0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x12, 0x2e,
0x6c, 0x6e, 0x72, 0x70, 0x63, 0x2e, 0x57, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74,
0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2a, 0x49, 0x0a, 0x0b, 0x57, 0x61, 0x6c, 0x6c,
0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x2a, 0x60, 0x0a, 0x0b, 0x57, 0x61, 0x6c, 0x6c,
0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x10, 0x0a, 0x0c, 0x4e, 0x4f, 0x4e, 0x5f, 0x45,
0x58, 0x49, 0x53, 0x54, 0x49, 0x4e, 0x47, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x4c, 0x4f, 0x43,
0x4b, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x55, 0x4e, 0x4c, 0x4f, 0x43, 0x4b, 0x45,
0x44, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x50, 0x43, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56,
0x45, 0x10, 0x03, 0x32, 0x58, 0x0a, 0x05, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x4f, 0x0a, 0x0e,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1c,
0x2e, 0x6c, 0x6e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6c,
0x6e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x74,
0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x27, 0x5a,
0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x67, 0x68,
0x74, 0x6e, 0x69, 0x6e, 0x67, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x6c, 0x6e, 0x64,
0x2f, 0x6c, 0x6e, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x45, 0x10, 0x03, 0x12, 0x15, 0x0a, 0x10, 0x57, 0x41, 0x49, 0x54, 0x49, 0x4e, 0x47, 0x5f, 0x54,
0x4f, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x10, 0xff, 0x01, 0x32, 0x58, 0x0a, 0x05, 0x53, 0x74,
0x61, 0x74, 0x65, 0x12, 0x4f, 0x0a, 0x0e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65,
0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1c, 0x2e, 0x6c, 0x6e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x75,
0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x6c, 0x6e, 0x72, 0x70, 0x63, 0x2e, 0x53, 0x75, 0x62, 0x73,
0x63, 0x72, 0x69, 0x62, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x30, 0x01, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63,
0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x6e, 0x69, 0x6e, 0x67, 0x6e, 0x65, 0x74, 0x77,
0x6f, 0x72, 0x6b, 0x2f, 0x6c, 0x6e, 0x64, 0x2f, 0x6c, 0x6e, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@ -36,6 +36,8 @@ enum WalletState {
LOCKED = 1;
UNLOCKED = 2;
RPC_ACTIVE = 3;
WAITING_TO_START = 255;
}
message SubscribeStateRequest {

View File

@ -59,7 +59,8 @@
"NON_EXISTING",
"LOCKED",
"UNLOCKED",
"RPC_ACTIVE"
"RPC_ACTIVE",
"WAITING_TO_START"
],
"default": "NON_EXISTING"
},

View File

@ -275,3 +275,4 @@
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeInvoices]: context canceled
<time> [ERR] RPCS: [/lnrpc.Lightning/SubscribeChannelGraph]: context deadline exceeded
<time> [ERR] RPCS: [/invoicesrpc.Invoices/SubscribeSingleInvoice]: context canceled
<time> [ERR] RPCS: [/lnrpc.State/SubscribeState]: context canceled

View File

@ -652,6 +652,10 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error) error {
return err
}
if err := hn.waitUntilStarted(conn, DefaultTimeout); err != nil {
return err
}
// If the node was created with a seed, we will need to perform an
// additional step to unlock the wallet. The connection returned will
// only use the TLS certs, and can only perform operations necessary to
@ -664,6 +668,49 @@ func (hn *HarnessNode) start(lndBinary string, lndError chan<- error) error {
return hn.initLightningClient(conn)
}
// waitUntilStarted waits until the wallet state flips from "WAITING_TO_START".
func (hn *HarnessNode) waitUntilStarted(conn grpc.ClientConnInterface,
timeout time.Duration) error {
stateClient := lnrpc.NewStateClient(conn)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stateStream, err := stateClient.SubscribeState(
ctx, &lnrpc.SubscribeStateRequest{},
)
if err != nil {
return err
}
errChan := make(chan error, 1)
started := make(chan struct{})
go func() {
for {
resp, err := stateStream.Recv()
if err != nil {
errChan <- err
}
if resp.State != lnrpc.WalletState_WAITING_TO_START {
close(started)
return
}
}
}()
select {
case <-started:
case err = <-errChan:
case <-time.After(timeout):
return fmt.Errorf("WaitUntilLeader timed out")
}
return err
}
// initClientWhenReady waits until the main gRPC server is detected as active,
// then complete the normal HarnessNode gRPC connection creation. This can be
// used it a node has just been unlocked, or has its wallet state initialized.

View File

@ -21,10 +21,16 @@ import (
type rpcState uint8
const (
// waitingToStart indicates that we're at the beginning of the startup
// process. In a cluster evironment this may mean that we're waiting to
// become the leader in which case RPC calls will be disabled until
// this instance has been elected as leader.
waitingToStart rpcState = iota
// walletNotCreated is the starting state if the RPC server is active,
// but the wallet is not yet created. In this state we'll only allow
// calls to the WalletUnlockerService.
walletNotCreated rpcState = iota
walletNotCreated
// walletLocked indicates the RPC server is active, but the wallet is
// locked. In this state we'll only allow calls to the
@ -40,6 +46,11 @@ const (
)
var (
// ErrWaitingToStart is returned if LND is still wating to start,
// possibly blocked until elected as the leader.
ErrWaitingToStart = fmt.Errorf("waiting to start, RPC services not " +
"available")
// ErrNoWallet is returned if the wallet does not exist.
ErrNoWallet = fmt.Errorf("wallet not created, create one to enable " +
"full RPC access")
@ -110,16 +121,9 @@ type InterceptorChain struct {
var _ lnrpc.StateServer = (*InterceptorChain)(nil)
// NewInterceptorChain creates a new InterceptorChain.
func NewInterceptorChain(log btclog.Logger, noMacaroons,
walletExists bool) *InterceptorChain {
startState := walletNotCreated
if walletExists {
startState = walletLocked
}
func NewInterceptorChain(log btclog.Logger, noMacaroons bool) *InterceptorChain {
return &InterceptorChain{
state: startState,
state: waitingToStart,
ntfnServer: subscribe.NewServer(),
noMacaroons: noMacaroons,
permissionMap: make(map[string][]bakery.Op),
@ -150,6 +154,26 @@ func (r *InterceptorChain) Stop() error {
return err
}
// SetWalletNotCreated moves the RPC state from either waitingToStart to
// walletNotCreated.
func (r *InterceptorChain) SetWalletNotCreated() {
r.Lock()
defer r.Unlock()
r.state = walletNotCreated
_ = r.ntfnServer.SendUpdate(r.state)
}
// SetWalletLocked moves the RPC state from either walletNotCreated to
// walletLocked.
func (r *InterceptorChain) SetWalletLocked() {
r.Lock()
defer r.Unlock()
r.state = walletLocked
_ = r.ntfnServer.SendUpdate(r.state)
}
// SetWalletUnlocked moves the RPC state from either walletNotCreated or
// walletLocked to walletUnlocked.
func (r *InterceptorChain) SetWalletUnlocked() {
@ -179,6 +203,8 @@ func (r *InterceptorChain) SubscribeState(req *lnrpc.SubscribeStateRequest,
sendStateUpdate := func(state rpcState) error {
resp := &lnrpc.SubscribeStateResponse{}
switch state {
case waitingToStart:
resp.State = lnrpc.WalletState_WAITING_TO_START
case walletNotCreated:
resp.State = lnrpc.WalletState_NON_EXISTING
case walletLocked:
@ -189,8 +215,7 @@ func (r *InterceptorChain) SubscribeState(req *lnrpc.SubscribeStateRequest,
resp.State = lnrpc.WalletState_RPC_ACTIVE
default:
return fmt.Errorf("unknown wallet "+
"state %v", state)
return fmt.Errorf("unknown wallet state %v", state)
}
return stream.Send(resp)
@ -459,6 +484,11 @@ func (r *InterceptorChain) checkRPCState(srv interface{}) error {
switch state {
// Do not accept any RPC calls (unless to the state service) until LND
// has not started.
case waitingToStart:
return ErrWaitingToStart
// If the wallet does not exists, only calls to the WalletUnlocker are
// accepted.
case walletNotCreated:

View File

@ -165,6 +165,12 @@ func New(chainDir string, params *chaincfg.Params, noFreelistSync bool,
}
}
// SetLoaderOpts can be used to inject wallet loader options after the unlocker
// service has been hooked to the main RPC server.
func (u *UnlockerService) SetLoaderOpts(loaderOpts []btcwallet.LoaderOption) {
u.loaderOpts = loaderOpts
}
func (u *UnlockerService) newLoader(recoveryWindow uint32) (*wallet.Loader,
error) {