diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 5956a0fb..b39f78a4 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -34,7 +34,8 @@ const ( type BitcoindNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *chain.BitcoindClient @@ -96,11 +97,46 @@ func New(chainConn *chain.BitcoindConn, chainParams *chaincfg.Params, // Start connects to the running bitcoind node over websockets, registers for // block notifications, and finally launches all related helper goroutines. func (b *BitcoindNotifier) Start() error { - // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + var startErr error + b.start.Do(func() { + startErr = b.startNotifier() + }) + return startErr +} + +// Stop shutsdown the BitcoindNotifier. +func (b *BitcoindNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { return nil } + // Shutdown the rpc client, this gracefully disconnects from bitcoind, + // and cleans up all related resources. + b.chainConn.Stop() + + close(b.quit) + b.wg.Wait() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + b.txNotifier.TearDown() + + return nil +} + +// Started returns true if this instance has been started, and false otherwise. +func (b *BitcoindNotifier) Started() bool { + return atomic.LoadInt32(&b.active) != 0 +} + +func (b *BitcoindNotifier) startNotifier() error { // Connect to bitcoind, and register for notifications on connected, // and disconnected blocks. if err := b.chainConn.Start(); err != nil { @@ -128,32 +164,9 @@ func (b *BitcoindNotifier) Start() error { b.wg.Add(1) go b.notificationDispatcher() - return nil -} - -// Stop shutsdown the BitcoindNotifier. -func (b *BitcoindNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&b.stopped, 1) != 1 { - return nil - } - - // Shutdown the rpc client, this gracefully disconnects from bitcoind, - // and cleans up all related resources. - b.chainConn.Stop() - - close(b.quit) - b.wg.Wait() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range b.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - b.txNotifier.TearDown() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&b.active, 1) return nil } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 7b179209..d23a3059 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -53,7 +53,8 @@ type txUpdate struct { type BtcdNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. chainConn *rpcclient.Client @@ -134,11 +135,49 @@ func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params, // Start connects to the running btcd node over websockets, registers for block // notifications, and finally launches all related helper goroutines. func (b *BtcdNotifier) Start() error { - // Already started? - if atomic.AddInt32(&b.started, 1) != 1 { + var startErr error + b.start.Do(func() { + startErr = b.startNotifier() + }) + return startErr +} + +// Started returns true if this instance has been started, and false otherwise. +func (b *BtcdNotifier) Started() bool { + return atomic.LoadInt32(&b.active) != 0 +} + +// Stop shutsdown the BtcdNotifier. +func (b *BtcdNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&b.stopped, 1) != 1 { return nil } + // Shutdown the rpc client, this gracefully disconnects from btcd, and + // cleans up all related resources. + b.chainConn.Shutdown() + + close(b.quit) + b.wg.Wait() + + b.chainUpdates.Stop() + b.txUpdates.Stop() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + b.txNotifier.TearDown() + + return nil +} + +func (b *BtcdNotifier) startNotifier() error { // Start our concurrent queues before starting the chain connection, to // ensure onBlockConnected and onRedeemingTx callbacks won't be // blocked. @@ -179,35 +218,9 @@ func (b *BtcdNotifier) Start() error { b.wg.Add(1) go b.notificationDispatcher() - return nil -} - -// Stop shutsdown the BtcdNotifier. -func (b *BtcdNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&b.stopped, 1) != 1 { - return nil - } - - // Shutdown the rpc client, this gracefully disconnects from btcd, and - // cleans up all related resources. - b.chainConn.Shutdown() - - close(b.quit) - b.wg.Wait() - - b.chainUpdates.Stop() - b.txUpdates.Stop() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range b.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - b.txNotifier.TearDown() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&b.active, 1) return nil } diff --git a/chainntnfs/interface.go b/chainntnfs/interface.go index ace86e77..c224181c 100644 --- a/chainntnfs/interface.go +++ b/chainntnfs/interface.go @@ -139,6 +139,9 @@ type ChainNotifier interface { // ready, and able to receive notification registrations from clients. Start() error + // Started returns true if this instance has been started, and false otherwise. + Started() bool + // Stops the concrete ChainNotifier. Once stopped, the ChainNotifier // should disallow any future requests from potential clients. // Additionally, all pending client notifications will be canceled diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 771c4b86..567fa520 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -39,7 +39,8 @@ const ( type NeutrinoNotifier struct { epochClientCounter uint64 // To be used atomically. - started int32 // To be used atomically. + start sync.Once + active int32 // To be used atomically. stopped int32 // To be used atomically. bestBlockMtx sync.RWMutex @@ -111,11 +112,45 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache, // Start contacts the running neutrino light client and kicks off an initial // empty rescan. func (n *NeutrinoNotifier) Start() error { - // Already started? - if atomic.AddInt32(&n.started, 1) != 1 { + var startErr error + n.start.Do(func() { + startErr = n.startNotifier() + }) + return startErr +} + +// Stop shuts down the NeutrinoNotifier. +func (n *NeutrinoNotifier) Stop() error { + // Already shutting down? + if atomic.AddInt32(&n.stopped, 1) != 1 { return nil } + close(n.quit) + n.wg.Wait() + + n.chainUpdates.Stop() + n.txUpdates.Stop() + + // Notify all pending clients of our shutdown by closing the related + // notification channels. + for _, epochClient := range n.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + + close(epochClient.epochChan) + } + n.txNotifier.TearDown() + + return nil +} + +// Started returns true if this instance has been started, and false otherwise. +func (n *NeutrinoNotifier) Started() bool { + return atomic.LoadInt32(&n.active) != 0 +} + +func (n *NeutrinoNotifier) startNotifier() error { // Start our concurrent queues before starting the rescan, to ensure // onFilteredBlockConnected and onRelavantTx callbacks won't be // blocked. @@ -171,31 +206,9 @@ func (n *NeutrinoNotifier) Start() error { n.wg.Add(1) go n.notificationDispatcher() - return nil -} - -// Stop shuts down the NeutrinoNotifier. -func (n *NeutrinoNotifier) Stop() error { - // Already shutting down? - if atomic.AddInt32(&n.stopped, 1) != 1 { - return nil - } - - close(n.quit) - n.wg.Wait() - - n.chainUpdates.Stop() - n.txUpdates.Stop() - - // Notify all pending clients of our shutdown by closing the related - // notification channels. - for _, epochClient := range n.blockEpochClients { - close(epochClient.cancelChan) - epochClient.wg.Wait() - - close(epochClient.epochChan) - } - n.txNotifier.TearDown() + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&n.active, 1) return nil } diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index 6dc47f53..62c76871 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -43,6 +43,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 75d3cef6..5281d4aa 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -453,6 +453,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 408c1052..40d1f12a 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -131,6 +131,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index e9a2a1ef..0ce5adf6 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -931,6 +931,10 @@ func (m *mockNotifier) Start() error { return nil } +func (m *mockNotifier) Started() bool { + return true +} + func (m *mockNotifier) Stop() error { return nil } diff --git a/lnrpc/chainrpc/chainnotifier_server.go b/lnrpc/chainrpc/chainnotifier_server.go index 2d662c8e..261f7605 100644 --- a/lnrpc/chainrpc/chainnotifier_server.go +++ b/lnrpc/chainrpc/chainnotifier_server.go @@ -63,6 +63,11 @@ var ( // has been shut down. ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " + "subserver shutting down") + + // ErrChainNotifierServerNotActive indicates that the chain notifier hasn't + // finished the startup process. + ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is" + + "still in the process of starting") ) // fileExists reports whether the named file or directory exists. @@ -196,6 +201,10 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error { func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, confStream ChainNotifier_RegisterConfirmationsNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var txid chainhash.Hash @@ -292,6 +301,10 @@ func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, func (s *Server) RegisterSpendNtfn(in *SpendRequest, spendStream ChainNotifier_RegisterSpendNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var op *wire.OutPoint @@ -399,6 +412,10 @@ func (s *Server) RegisterSpendNtfn(in *SpendRequest, func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch, epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error { + if !s.cfg.ChainNotifier.Started() { + return ErrChainNotifierServerNotActive + } + // We'll start by reconstructing the RPC request into what the // underlying ChainNotifier expects. var hash chainhash.Hash diff --git a/mock.go b/mock.go index 0a7306c4..50b9d98e 100644 --- a/mock.go +++ b/mock.go @@ -112,6 +112,10 @@ func (m *mockNotfier) Start() error { return nil } +func (m *mockNotfier) Started() bool { + return true +} + func (m *mockNotfier) Stop() error { return nil } diff --git a/sweep/test_utils.go b/sweep/test_utils.go index 7c28710b..5b83d730 100644 --- a/sweep/test_utils.go +++ b/sweep/test_utils.go @@ -200,6 +200,11 @@ func (m *MockNotifier) Start() error { return nil } +// Started checks if started +func (m *MockNotifier) Started() bool { + return true +} + // Stop the notifier. func (m *MockNotifier) Stop() error { return nil