Ensure chain notifier is started before accessed.

The use case comes from the RPC layer that is ready before the
chain notifier which is used in the sub server.
This commit is contained in:
Roei Erez 2020-04-30 12:54:33 +03:00
parent cfe0babd78
commit ae2c37e043
11 changed files with 75 additions and 0 deletions

@ -35,6 +35,7 @@ type BitcoindNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
start sync.Once start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
chainConn *chain.BitcoindClient chainConn *chain.BitcoindClient
@ -130,6 +131,11 @@ func (b *BitcoindNotifier) Stop() error {
return nil return nil
} }
// Started returns true if this instance has been started, and false otherwise.
func (b *BitcoindNotifier) Started() bool {
return atomic.LoadInt32(&b.active) != 0
}
func (b *BitcoindNotifier) startNotifier() error { func (b *BitcoindNotifier) startNotifier() error {
// Connect to bitcoind, and register for notifications on connected, // Connect to bitcoind, and register for notifications on connected,
// and disconnected blocks. // and disconnected blocks.
@ -158,6 +164,10 @@ func (b *BitcoindNotifier) startNotifier() error {
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher() go b.notificationDispatcher()
// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&b.active, 1)
return nil return nil
} }

@ -54,6 +54,7 @@ type BtcdNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
start sync.Once start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
chainConn *rpcclient.Client chainConn *rpcclient.Client
@ -141,6 +142,11 @@ func (b *BtcdNotifier) Start() error {
return startErr return startErr
} }
// Started returns true if this instance has been started, and false otherwise.
func (b *BtcdNotifier) Started() bool {
return atomic.LoadInt32(&b.active) != 0
}
// Stop shutsdown the BtcdNotifier. // Stop shutsdown the BtcdNotifier.
func (b *BtcdNotifier) Stop() error { func (b *BtcdNotifier) Stop() error {
// Already shutting down? // Already shutting down?
@ -212,6 +218,10 @@ func (b *BtcdNotifier) startNotifier() error {
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher() go b.notificationDispatcher()
// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&b.active, 1)
return nil return nil
} }

@ -139,6 +139,9 @@ type ChainNotifier interface {
// ready, and able to receive notification registrations from clients. // ready, and able to receive notification registrations from clients.
Start() error Start() error
// Started returns true if this instance has been started, and false otherwise.
Started() bool
// Stops the concrete ChainNotifier. Once stopped, the ChainNotifier // Stops the concrete ChainNotifier. Once stopped, the ChainNotifier
// should disallow any future requests from potential clients. // should disallow any future requests from potential clients.
// Additionally, all pending client notifications will be canceled // Additionally, all pending client notifications will be canceled

@ -40,6 +40,7 @@ type NeutrinoNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
start sync.Once start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
bestBlockMtx sync.RWMutex bestBlockMtx sync.RWMutex
@ -144,6 +145,11 @@ func (n *NeutrinoNotifier) Stop() error {
return nil return nil
} }
// Started returns true if this instance has been started, and false otherwise.
func (n *NeutrinoNotifier) Started() bool {
return atomic.LoadInt32(&n.active) != 0
}
func (n *NeutrinoNotifier) startNotifier() error { func (n *NeutrinoNotifier) startNotifier() error {
// Start our concurrent queues before starting the rescan, to ensure // Start our concurrent queues before starting the rescan, to ensure
// onFilteredBlockConnected and onRelavantTx callbacks won't be // onFilteredBlockConnected and onRelavantTx callbacks won't be
@ -200,6 +206,10 @@ func (n *NeutrinoNotifier) startNotifier() error {
n.wg.Add(1) n.wg.Add(1)
go n.notificationDispatcher() go n.notificationDispatcher()
// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&n.active, 1)
return nil return nil
} }

@ -43,6 +43,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -453,6 +453,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -131,6 +131,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -931,6 +931,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -63,6 +63,11 @@ var (
// has been shut down. // has been shut down.
ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " + ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " +
"subserver shutting down") "subserver shutting down")
// ErrChainNotifierServerNotActive indicates that the chain notifier hasn't
// finished the startup process.
ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is" +
"still in the process of starting")
) )
// fileExists reports whether the named file or directory exists. // fileExists reports whether the named file or directory exists.
@ -196,6 +201,10 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error {
func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
confStream ChainNotifier_RegisterConfirmationsNtfnServer) error { confStream ChainNotifier_RegisterConfirmationsNtfnServer) error {
if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}
// We'll start by reconstructing the RPC request into what the // We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects. // underlying ChainNotifier expects.
var txid chainhash.Hash var txid chainhash.Hash
@ -292,6 +301,10 @@ func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
func (s *Server) RegisterSpendNtfn(in *SpendRequest, func (s *Server) RegisterSpendNtfn(in *SpendRequest,
spendStream ChainNotifier_RegisterSpendNtfnServer) error { spendStream ChainNotifier_RegisterSpendNtfnServer) error {
if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}
// We'll start by reconstructing the RPC request into what the // We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects. // underlying ChainNotifier expects.
var op *wire.OutPoint var op *wire.OutPoint
@ -399,6 +412,10 @@ func (s *Server) RegisterSpendNtfn(in *SpendRequest,
func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch, func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch,
epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error { epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error {
if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}
// We'll start by reconstructing the RPC request into what the // We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects. // underlying ChainNotifier expects.
var hash chainhash.Hash var hash chainhash.Hash

@ -112,6 +112,10 @@ func (m *mockNotfier) Start() error {
return nil return nil
} }
func (m *mockNotfier) Started() bool {
return true
}
func (m *mockNotfier) Stop() error { func (m *mockNotfier) Stop() error {
return nil return nil
} }

@ -200,6 +200,11 @@ func (m *MockNotifier) Start() error {
return nil return nil
} }
// Started checks if started
func (m *MockNotifier) Started() bool {
return true
}
// Stop the notifier. // Stop the notifier.
func (m *MockNotifier) Stop() error { func (m *MockNotifier) Stop() error {
return nil return nil