server: convert Start/Stop methods to use sync.Once

In this commit, we convert the server's Start/Stop methods to use the
sync.Once. We do this in order to fix concurrency issues that would
allow certain queries to be sent to the server before it has actually
fully start up. Before this commit, we would set started to 1 at the
very top of the method, allowing certain queries to pass before the rest
of the daemon was had started up.

In order to fix this issue, we've converted the server to using a
sync.Once, and two new atomic variables for clients to query to see if
the server has fully started up, or is in the process of stopping.
This commit is contained in:
Olaoluwa Osuntokun 2019-03-11 16:12:15 -07:00
parent 1f187c0116
commit 26c52892cb
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2
2 changed files with 205 additions and 175 deletions

@ -121,7 +121,8 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e
channelShells = append(channelShells, chanShell)
}
ltndLog.Infof("Inserting %v SCB channel shells into DB")
ltndLog.Infof("Inserting %v SCB channel shells into DB",
len(channelShells))
// Now that we have all the backups mapped into a series of Singles,
// we'll insert them all into the database.

377
server.go

@ -81,8 +81,11 @@ var (
// Additionally, the server is also used as a central messaging bus to interact
// with any of its companion objects.
type server struct {
started int32 // atomic
shutdown int32 // atomic
active int32 // atomic
stopping int32 // atomic
start sync.Once
stop sync.Once
// identityPriv is the private key used to authenticate any incoming
// connections.
@ -1035,148 +1038,176 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
// Started returns true if the server has been started, and false otherwise.
// NOTE: This function is safe for concurrent access.
func (s *server) Started() bool {
return atomic.LoadInt32(&s.started) != 0
return atomic.LoadInt32(&s.active) != 0
}
// Start starts the main daemon server, all requested listeners, and any helper
// goroutines.
// NOTE: This function is safe for concurrent access.
func (s *server) Start() error {
// Already running?
if !atomic.CompareAndSwapInt32(&s.started, 0, 1) {
return nil
}
if s.torController != nil {
if err := s.initTorController(); err != nil {
return err
}
}
if s.natTraversal != nil {
s.wg.Add(1)
go s.watchExternalIP()
}
// Start the notification server. This is used so channel management
// goroutines can be notified when a funding transaction reaches a
// sufficient number of confirmations, or when the input for the
// funding transaction is spent in an attempt at an uncooperative close
// by the counterparty.
if err := s.sigPool.Start(); err != nil {
return err
}
if err := s.writePool.Start(); err != nil {
return err
}
if err := s.readPool.Start(); err != nil {
return err
}
if err := s.cc.chainNotifier.Start(); err != nil {
return err
}
if err := s.channelNotifier.Start(); err != nil {
return err
}
if err := s.sphinx.Start(); err != nil {
return err
}
if err := s.htlcSwitch.Start(); err != nil {
return err
}
if err := s.sweeper.Start(); err != nil {
return err
}
if err := s.utxoNursery.Start(); err != nil {
return err
}
if err := s.chainArb.Start(); err != nil {
return err
}
if err := s.breachArbiter.Start(); err != nil {
return err
}
if err := s.authGossiper.Start(); err != nil {
return err
}
if err := s.chanRouter.Start(); err != nil {
return err
}
if err := s.fundingMgr.Start(); err != nil {
return err
}
if err := s.invoices.Start(); err != nil {
return err
}
if err := s.chanStatusMgr.Start(); err != nil {
return err
}
// Before we start the connMgr, we'll check to see if we have any
// backups to recover. We do this now as we want to ensure that have
// all the information we need to handle channel recovery _before_ we
// even accept connections from any peers.
chanRestorer := &chanDBRestorer{
db: s.chanDB,
secretKeys: s.cc.keyRing,
}
if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
err := chanbackup.UnpackAndRecoverSingles(
s.chansToRestore.PackedSingleChanBackups,
s.cc.keyRing, chanRestorer, s,
)
if err != nil {
return fmt.Errorf("unable to unpack single "+
"backups: %v", err)
}
}
if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
err := chanbackup.UnpackAndRecoverMulti(
s.chansToRestore.PackedMultiChanBackup,
s.cc.keyRing, chanRestorer, s,
)
if err != nil {
return fmt.Errorf("unable to unpack chan "+
"backup: %v", err)
}
}
if err := s.chanSubSwapper.Start(); err != nil {
return err
}
s.connMgr.Start()
// With all the relevant sub-systems started, we'll now attempt to
// establish persistent connections to our direct channel collaborators
// within the network. Before doing so however, we'll prune our set of
// link nodes found within the database to ensure we don't reconnect to
// any nodes we no longer have open channels with.
if err := s.chanDB.PruneLinkNodes(); err != nil {
return err
}
if err := s.establishPersistentConnections(); err != nil {
return err
}
// If network bootstrapping hasn't been disabled, then we'll configure
// the set of active bootstrappers, and launch a dedicated goroutine to
// maintain a set of persistent connections.
if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) &&
!(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) {
bootstrappers, err := initNetworkBootstrappers(s)
if err != nil {
return err
var startErr error
s.start.Do(func() {
if s.torController != nil {
if err := s.initTorController(); err != nil {
startErr = err
return
}
}
s.wg.Add(1)
go s.peerBootstrapper(defaultMinPeers, bootstrappers)
} else {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}
if s.natTraversal != nil {
s.wg.Add(1)
go s.watchExternalIP()
}
return nil
// Start the notification server. This is used so channel
// management goroutines can be notified when a funding
// transaction reaches a sufficient number of confirmations, or
// when the input for the funding transaction is spent in an
// attempt at an uncooperative close by the counterparty.
if err := s.sigPool.Start(); err != nil {
startErr = err
return
}
if err := s.writePool.Start(); err != nil {
startErr = err
return
}
if err := s.readPool.Start(); err != nil {
startErr = err
return
}
if err := s.cc.chainNotifier.Start(); err != nil {
startErr = err
return
}
if err := s.channelNotifier.Start(); err != nil {
startErr = err
return
}
if err := s.sphinx.Start(); err != nil {
startErr = err
return
}
if err := s.htlcSwitch.Start(); err != nil {
startErr = err
return
}
if err := s.sweeper.Start(); err != nil {
startErr = err
return
}
if err := s.utxoNursery.Start(); err != nil {
startErr = err
return
}
if err := s.chainArb.Start(); err != nil {
startErr = err
return
}
if err := s.breachArbiter.Start(); err != nil {
startErr = err
return
}
if err := s.authGossiper.Start(); err != nil {
startErr = err
return
}
if err := s.chanRouter.Start(); err != nil {
startErr = err
return
}
if err := s.fundingMgr.Start(); err != nil {
startErr = err
return
}
if err := s.invoices.Start(); err != nil {
startErr = err
return
}
if err := s.chanStatusMgr.Start(); err != nil {
startErr = err
return
}
// Before we start the connMgr, we'll check to see if we have
// any backups to recover. We do this now as we want to ensure
// that have all the information we need to handle channel
// recovery _before_ we even accept connections from any peers.
chanRestorer := &chanDBRestorer{
db: s.chanDB,
secretKeys: s.cc.keyRing,
chainArb: s.chainArb,
}
if len(s.chansToRestore.PackedSingleChanBackups) != 0 {
err := chanbackup.UnpackAndRecoverSingles(
s.chansToRestore.PackedSingleChanBackups,
s.cc.keyRing, chanRestorer, s,
)
if err != nil {
startErr = fmt.Errorf("unable to unpack single "+
"backups: %v", err)
return
}
}
if len(s.chansToRestore.PackedMultiChanBackup) != 0 {
err := chanbackup.UnpackAndRecoverMulti(
s.chansToRestore.PackedMultiChanBackup,
s.cc.keyRing, chanRestorer, s,
)
if err != nil {
startErr = fmt.Errorf("unable to unpack chan "+
"backup: %v", err)
return
}
}
if err := s.chanSubSwapper.Start(); err != nil {
startErr = err
return
}
s.connMgr.Start()
// With all the relevant sub-systems started, we'll now attempt
// to establish persistent connections to our direct channel
// collaborators within the network. Before doing so however,
// we'll prune our set of link nodes found within the database
// to ensure we don't reconnect to any nodes we no longer have
// open channels with.
if err := s.chanDB.PruneLinkNodes(); err != nil {
startErr = err
return
}
if err := s.establishPersistentConnections(); err != nil {
startErr = err
return
}
// If network bootstrapping hasn't been disabled, then we'll
// configure the set of active bootstrappers, and launch a
// dedicated goroutine to maintain a set of persistent
// connections.
if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) &&
!(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) {
bootstrappers, err := initNetworkBootstrappers(s)
if err != nil {
startErr = err
return
}
s.wg.Add(1)
go s.peerBootstrapper(defaultMinPeers, bootstrappers)
} else {
srvrLog.Infof("Auto peer bootstrapping is disabled")
}
// Set the active flag now that we've completed the full
// startup.
atomic.StoreInt32(&s.active, 1)
})
return startErr
}
// Stop gracefully shutsdown the main daemon server. This function will signal
@ -1184,49 +1215,48 @@ func (s *server) Start() error {
// all successfully exited. Additionally, any/all listeners are closed.
// NOTE: This function is safe for concurrent access.
func (s *server) Stop() error {
// Bail if we're already shutting down.
if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) {
return nil
}
s.stop.Do(func() {
atomic.LoadInt32(&s.stopping)
close(s.quit)
close(s.quit)
if s.torController != nil {
s.torController.Stop()
}
if s.torController != nil {
s.torController.Stop()
}
// Shutdown the wallet, funding manager, and the rpc server.
s.chanStatusMgr.Stop()
s.cc.chainNotifier.Stop()
s.chanRouter.Stop()
s.htlcSwitch.Stop()
s.sphinx.Stop()
s.utxoNursery.Stop()
s.breachArbiter.Stop()
s.authGossiper.Stop()
s.chainArb.Stop()
s.sweeper.Stop()
s.channelNotifier.Stop()
s.cc.wallet.Shutdown()
s.cc.chainView.Stop()
s.connMgr.Stop()
s.cc.feeEstimator.Stop()
s.invoices.Stop()
s.fundingMgr.Stop()
s.chanSubSwapper.Start()
// Shutdown the wallet, funding manager, and the rpc server.
s.chanStatusMgr.Stop()
s.cc.chainNotifier.Stop()
s.chanRouter.Stop()
s.htlcSwitch.Stop()
s.sphinx.Stop()
s.utxoNursery.Stop()
s.breachArbiter.Stop()
s.authGossiper.Stop()
s.chainArb.Stop()
s.sweeper.Stop()
s.channelNotifier.Stop()
s.cc.wallet.Shutdown()
s.cc.chainView.Stop()
s.connMgr.Stop()
s.cc.feeEstimator.Stop()
s.invoices.Stop()
s.fundingMgr.Stop()
s.chanSubSwapper.Start()
// Disconnect from each active peers to ensure that
// peerTerminationWatchers signal completion to each peer.
for _, peer := range s.Peers() {
s.DisconnectPeer(peer.addr.IdentityKey)
}
// Disconnect from each active peers to ensure that
// peerTerminationWatchers signal completion to each peer.
for _, peer := range s.Peers() {
s.DisconnectPeer(peer.addr.IdentityKey)
}
// Wait for all lingering goroutines to quit.
s.wg.Wait()
// Wait for all lingering goroutines to quit.
s.wg.Wait()
s.sigPool.Stop()
s.writePool.Stop()
s.readPool.Stop()
s.sigPool.Stop()
s.writePool.Stop()
s.readPool.Stop()
})
return nil
}
@ -1234,7 +1264,7 @@ func (s *server) Stop() error {
// Stopped returns true if the server has been instructed to shutdown.
// NOTE: This function is safe for concurrent access.
func (s *server) Stopped() bool {
return atomic.LoadInt32(&s.shutdown) != 0
return atomic.LoadInt32(&s.stopping) != 0
}
// configurePortForwarding attempts to set up port forwarding for the different
@ -2784,7 +2814,6 @@ type openChanReq struct {
//
// NOTE: This function is safe for concurrent access.
func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
targetPub := string(addr.IdentityKey.SerializeCompressed())
// Acquire mutex, but use explicit unlocking instead of defer for