Merge pull request #4226 from breez/fix-chainrpc-crash

Fix crash when ChainNotifier is being accessed too early.
This commit is contained in:
Olaoluwa Osuntokun 2020-05-15 16:17:40 -07:00 committed by GitHub
commit 280bf8d26c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 173 additions and 89 deletions

@ -34,7 +34,8 @@ const (
type BitcoindNotifier struct { type BitcoindNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically. start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
chainConn *chain.BitcoindClient chainConn *chain.BitcoindClient
@ -96,11 +97,46 @@ func New(chainConn *chain.BitcoindConn, chainParams *chaincfg.Params,
// Start connects to the running bitcoind node over websockets, registers for // Start connects to the running bitcoind node over websockets, registers for
// block notifications, and finally launches all related helper goroutines. // block notifications, and finally launches all related helper goroutines.
func (b *BitcoindNotifier) Start() error { func (b *BitcoindNotifier) Start() error {
// Already started? var startErr error
if atomic.AddInt32(&b.started, 1) != 1 { b.start.Do(func() {
startErr = b.startNotifier()
})
return startErr
}
// Stop shutsdown the BitcoindNotifier.
func (b *BitcoindNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil return nil
} }
// Shutdown the rpc client, this gracefully disconnects from bitcoind,
// and cleans up all related resources.
b.chainConn.Stop()
close(b.quit)
b.wg.Wait()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil
}
// Started returns true if this instance has been started, and false otherwise.
func (b *BitcoindNotifier) Started() bool {
return atomic.LoadInt32(&b.active) != 0
}
func (b *BitcoindNotifier) startNotifier() error {
// Connect to bitcoind, and register for notifications on connected, // Connect to bitcoind, and register for notifications on connected,
// and disconnected blocks. // and disconnected blocks.
if err := b.chainConn.Start(); err != nil { if err := b.chainConn.Start(); err != nil {
@ -128,32 +164,9 @@ func (b *BitcoindNotifier) Start() error {
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher() go b.notificationDispatcher()
return nil // Set the active flag now that we've completed the full
} // startup.
atomic.StoreInt32(&b.active, 1)
// Stop shutsdown the BitcoindNotifier.
func (b *BitcoindNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil
}
// Shutdown the rpc client, this gracefully disconnects from bitcoind,
// and cleans up all related resources.
b.chainConn.Stop()
close(b.quit)
b.wg.Wait()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil return nil
} }

@ -53,7 +53,8 @@ type txUpdate struct {
type BtcdNotifier struct { type BtcdNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically. start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
chainConn *rpcclient.Client chainConn *rpcclient.Client
@ -134,11 +135,49 @@ func New(config *rpcclient.ConnConfig, chainParams *chaincfg.Params,
// Start connects to the running btcd node over websockets, registers for block // Start connects to the running btcd node over websockets, registers for block
// notifications, and finally launches all related helper goroutines. // notifications, and finally launches all related helper goroutines.
func (b *BtcdNotifier) Start() error { func (b *BtcdNotifier) Start() error {
// Already started? var startErr error
if atomic.AddInt32(&b.started, 1) != 1 { b.start.Do(func() {
startErr = b.startNotifier()
})
return startErr
}
// Started returns true if this instance has been started, and false otherwise.
func (b *BtcdNotifier) Started() bool {
return atomic.LoadInt32(&b.active) != 0
}
// Stop shutsdown the BtcdNotifier.
func (b *BtcdNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil return nil
} }
// Shutdown the rpc client, this gracefully disconnects from btcd, and
// cleans up all related resources.
b.chainConn.Shutdown()
close(b.quit)
b.wg.Wait()
b.chainUpdates.Stop()
b.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil
}
func (b *BtcdNotifier) startNotifier() error {
// Start our concurrent queues before starting the chain connection, to // Start our concurrent queues before starting the chain connection, to
// ensure onBlockConnected and onRedeemingTx callbacks won't be // ensure onBlockConnected and onRedeemingTx callbacks won't be
// blocked. // blocked.
@ -179,35 +218,9 @@ func (b *BtcdNotifier) Start() error {
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher() go b.notificationDispatcher()
return nil // Set the active flag now that we've completed the full
} // startup.
atomic.StoreInt32(&b.active, 1)
// Stop shutsdown the BtcdNotifier.
func (b *BtcdNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&b.stopped, 1) != 1 {
return nil
}
// Shutdown the rpc client, this gracefully disconnects from btcd, and
// cleans up all related resources.
b.chainConn.Shutdown()
close(b.quit)
b.wg.Wait()
b.chainUpdates.Stop()
b.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range b.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
b.txNotifier.TearDown()
return nil return nil
} }

@ -139,6 +139,9 @@ type ChainNotifier interface {
// ready, and able to receive notification registrations from clients. // ready, and able to receive notification registrations from clients.
Start() error Start() error
// Started returns true if this instance has been started, and false otherwise.
Started() bool
// Stops the concrete ChainNotifier. Once stopped, the ChainNotifier // Stops the concrete ChainNotifier. Once stopped, the ChainNotifier
// should disallow any future requests from potential clients. // should disallow any future requests from potential clients.
// Additionally, all pending client notifications will be canceled // Additionally, all pending client notifications will be canceled

@ -39,7 +39,8 @@ const (
type NeutrinoNotifier struct { type NeutrinoNotifier struct {
epochClientCounter uint64 // To be used atomically. epochClientCounter uint64 // To be used atomically.
started int32 // To be used atomically. start sync.Once
active int32 // To be used atomically.
stopped int32 // To be used atomically. stopped int32 // To be used atomically.
bestBlockMtx sync.RWMutex bestBlockMtx sync.RWMutex
@ -111,11 +112,45 @@ func New(node *neutrino.ChainService, spendHintCache chainntnfs.SpendHintCache,
// Start contacts the running neutrino light client and kicks off an initial // Start contacts the running neutrino light client and kicks off an initial
// empty rescan. // empty rescan.
func (n *NeutrinoNotifier) Start() error { func (n *NeutrinoNotifier) Start() error {
// Already started? var startErr error
if atomic.AddInt32(&n.started, 1) != 1 { n.start.Do(func() {
startErr = n.startNotifier()
})
return startErr
}
// Stop shuts down the NeutrinoNotifier.
func (n *NeutrinoNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&n.stopped, 1) != 1 {
return nil return nil
} }
close(n.quit)
n.wg.Wait()
n.chainUpdates.Stop()
n.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range n.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
n.txNotifier.TearDown()
return nil
}
// Started returns true if this instance has been started, and false otherwise.
func (n *NeutrinoNotifier) Started() bool {
return atomic.LoadInt32(&n.active) != 0
}
func (n *NeutrinoNotifier) startNotifier() error {
// Start our concurrent queues before starting the rescan, to ensure // Start our concurrent queues before starting the rescan, to ensure
// onFilteredBlockConnected and onRelavantTx callbacks won't be // onFilteredBlockConnected and onRelavantTx callbacks won't be
// blocked. // blocked.
@ -171,31 +206,9 @@ func (n *NeutrinoNotifier) Start() error {
n.wg.Add(1) n.wg.Add(1)
go n.notificationDispatcher() go n.notificationDispatcher()
return nil // Set the active flag now that we've completed the full
} // startup.
atomic.StoreInt32(&n.active, 1)
// Stop shuts down the NeutrinoNotifier.
func (n *NeutrinoNotifier) Stop() error {
// Already shutting down?
if atomic.AddInt32(&n.stopped, 1) != 1 {
return nil
}
close(n.quit)
n.wg.Wait()
n.chainUpdates.Stop()
n.txUpdates.Stop()
// Notify all pending clients of our shutdown by closing the related
// notification channels.
for _, epochClient := range n.blockEpochClients {
close(epochClient.cancelChan)
epochClient.wg.Wait()
close(epochClient.epochChan)
}
n.txNotifier.TearDown()
return nil return nil
} }

@ -43,6 +43,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -453,6 +453,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -131,6 +131,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -931,6 +931,10 @@ func (m *mockNotifier) Start() error {
return nil return nil
} }
func (m *mockNotifier) Started() bool {
return true
}
func (m *mockNotifier) Stop() error { func (m *mockNotifier) Stop() error {
return nil return nil
} }

@ -63,6 +63,11 @@ var (
// has been shut down. // has been shut down.
ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " + ErrChainNotifierServerShuttingDown = errors.New("chain notifier RPC " +
"subserver shutting down") "subserver shutting down")
// ErrChainNotifierServerNotActive indicates that the chain notifier hasn't
// finished the startup process.
ErrChainNotifierServerNotActive = errors.New("chain notifier RPC is" +
"still in the process of starting")
) )
// fileExists reports whether the named file or directory exists. // fileExists reports whether the named file or directory exists.
@ -196,6 +201,10 @@ func (s *Server) RegisterWithRootServer(grpcServer *grpc.Server) error {
func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest, func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
confStream ChainNotifier_RegisterConfirmationsNtfnServer) error { confStream ChainNotifier_RegisterConfirmationsNtfnServer) error {
if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}
// We'll start by reconstructing the RPC request into what the // We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects. // underlying ChainNotifier expects.
var txid chainhash.Hash var txid chainhash.Hash
@ -292,6 +301,10 @@ func (s *Server) RegisterConfirmationsNtfn(in *ConfRequest,
func (s *Server) RegisterSpendNtfn(in *SpendRequest, func (s *Server) RegisterSpendNtfn(in *SpendRequest,
spendStream ChainNotifier_RegisterSpendNtfnServer) error { spendStream ChainNotifier_RegisterSpendNtfnServer) error {
if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}
// We'll start by reconstructing the RPC request into what the // We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects. // underlying ChainNotifier expects.
var op *wire.OutPoint var op *wire.OutPoint
@ -399,6 +412,10 @@ func (s *Server) RegisterSpendNtfn(in *SpendRequest,
func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch, func (s *Server) RegisterBlockEpochNtfn(in *BlockEpoch,
epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error { epochStream ChainNotifier_RegisterBlockEpochNtfnServer) error {
if !s.cfg.ChainNotifier.Started() {
return ErrChainNotifierServerNotActive
}
// We'll start by reconstructing the RPC request into what the // We'll start by reconstructing the RPC request into what the
// underlying ChainNotifier expects. // underlying ChainNotifier expects.
var hash chainhash.Hash var hash chainhash.Hash

@ -112,6 +112,10 @@ func (m *mockNotfier) Start() error {
return nil return nil
} }
func (m *mockNotfier) Started() bool {
return true
}
func (m *mockNotfier) Stop() error { func (m *mockNotfier) Stop() error {
return nil return nil
} }

@ -200,6 +200,11 @@ func (m *MockNotifier) Start() error {
return nil return nil
} }
// Started checks if started
func (m *MockNotifier) Started() bool {
return true
}
// Stop the notifier. // Stop the notifier.
func (m *MockNotifier) Stop() error { func (m *MockNotifier) Stop() error {
return nil return nil