From 26c52892cb6184995ddb74a32ee8f3b3107887b3 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 11 Mar 2019 16:12:15 -0700 Subject: [PATCH] server: convert Start/Stop methods to use sync.Once In this commit, we convert the server's Start/Stop methods to use the sync.Once. We do this in order to fix concurrency issues that would allow certain queries to be sent to the server before it has actually fully start up. Before this commit, we would set started to 1 at the very top of the method, allowing certain queries to pass before the rest of the daemon was had started up. In order to fix this issue, we've converted the server to using a sync.Once, and two new atomic variables for clients to query to see if the server has fully started up, or is in the process of stopping. --- chanrestore.go | 3 +- server.go | 377 ++++++++++++++++++++++++++----------------------- 2 files changed, 205 insertions(+), 175 deletions(-) diff --git a/chanrestore.go b/chanrestore.go index 3e219489..7aaada2f 100644 --- a/chanrestore.go +++ b/chanrestore.go @@ -121,7 +121,8 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e channelShells = append(channelShells, chanShell) } - ltndLog.Infof("Inserting %v SCB channel shells into DB") + ltndLog.Infof("Inserting %v SCB channel shells into DB", + len(channelShells)) // Now that we have all the backups mapped into a series of Singles, // we'll insert them all into the database. diff --git a/server.go b/server.go index bfce1d58..02fd2e4d 100644 --- a/server.go +++ b/server.go @@ -81,8 +81,11 @@ var ( // Additionally, the server is also used as a central messaging bus to interact // with any of its companion objects. type server struct { - started int32 // atomic - shutdown int32 // atomic + active int32 // atomic + stopping int32 // atomic + + start sync.Once + stop sync.Once // identityPriv is the private key used to authenticate any incoming // connections. @@ -1035,148 +1038,176 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, // Started returns true if the server has been started, and false otherwise. // NOTE: This function is safe for concurrent access. func (s *server) Started() bool { - return atomic.LoadInt32(&s.started) != 0 + return atomic.LoadInt32(&s.active) != 0 } // Start starts the main daemon server, all requested listeners, and any helper // goroutines. // NOTE: This function is safe for concurrent access. func (s *server) Start() error { - // Already running? - if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { - return nil - } - - if s.torController != nil { - if err := s.initTorController(); err != nil { - return err - } - } - - if s.natTraversal != nil { - s.wg.Add(1) - go s.watchExternalIP() - } - - // Start the notification server. This is used so channel management - // goroutines can be notified when a funding transaction reaches a - // sufficient number of confirmations, or when the input for the - // funding transaction is spent in an attempt at an uncooperative close - // by the counterparty. - if err := s.sigPool.Start(); err != nil { - return err - } - if err := s.writePool.Start(); err != nil { - return err - } - if err := s.readPool.Start(); err != nil { - return err - } - if err := s.cc.chainNotifier.Start(); err != nil { - return err - } - if err := s.channelNotifier.Start(); err != nil { - return err - } - if err := s.sphinx.Start(); err != nil { - return err - } - if err := s.htlcSwitch.Start(); err != nil { - return err - } - if err := s.sweeper.Start(); err != nil { - return err - } - if err := s.utxoNursery.Start(); err != nil { - return err - } - if err := s.chainArb.Start(); err != nil { - return err - } - if err := s.breachArbiter.Start(); err != nil { - return err - } - if err := s.authGossiper.Start(); err != nil { - return err - } - if err := s.chanRouter.Start(); err != nil { - return err - } - if err := s.fundingMgr.Start(); err != nil { - return err - } - if err := s.invoices.Start(); err != nil { - return err - } - if err := s.chanStatusMgr.Start(); err != nil { - return err - } - - // Before we start the connMgr, we'll check to see if we have any - // backups to recover. We do this now as we want to ensure that have - // all the information we need to handle channel recovery _before_ we - // even accept connections from any peers. - chanRestorer := &chanDBRestorer{ - db: s.chanDB, - secretKeys: s.cc.keyRing, - } - if len(s.chansToRestore.PackedSingleChanBackups) != 0 { - err := chanbackup.UnpackAndRecoverSingles( - s.chansToRestore.PackedSingleChanBackups, - s.cc.keyRing, chanRestorer, s, - ) - if err != nil { - return fmt.Errorf("unable to unpack single "+ - "backups: %v", err) - } - } - if len(s.chansToRestore.PackedMultiChanBackup) != 0 { - err := chanbackup.UnpackAndRecoverMulti( - s.chansToRestore.PackedMultiChanBackup, - s.cc.keyRing, chanRestorer, s, - ) - if err != nil { - return fmt.Errorf("unable to unpack chan "+ - "backup: %v", err) - } - } - - if err := s.chanSubSwapper.Start(); err != nil { - return err - } - - s.connMgr.Start() - - // With all the relevant sub-systems started, we'll now attempt to - // establish persistent connections to our direct channel collaborators - // within the network. Before doing so however, we'll prune our set of - // link nodes found within the database to ensure we don't reconnect to - // any nodes we no longer have open channels with. - if err := s.chanDB.PruneLinkNodes(); err != nil { - return err - } - if err := s.establishPersistentConnections(); err != nil { - return err - } - - // If network bootstrapping hasn't been disabled, then we'll configure - // the set of active bootstrappers, and launch a dedicated goroutine to - // maintain a set of persistent connections. - if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) && - !(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) { - - bootstrappers, err := initNetworkBootstrappers(s) - if err != nil { - return err + var startErr error + s.start.Do(func() { + if s.torController != nil { + if err := s.initTorController(); err != nil { + startErr = err + return + } } - s.wg.Add(1) - go s.peerBootstrapper(defaultMinPeers, bootstrappers) - } else { - srvrLog.Infof("Auto peer bootstrapping is disabled") - } + if s.natTraversal != nil { + s.wg.Add(1) + go s.watchExternalIP() + } - return nil + // Start the notification server. This is used so channel + // management goroutines can be notified when a funding + // transaction reaches a sufficient number of confirmations, or + // when the input for the funding transaction is spent in an + // attempt at an uncooperative close by the counterparty. + if err := s.sigPool.Start(); err != nil { + startErr = err + return + } + if err := s.writePool.Start(); err != nil { + startErr = err + return + } + if err := s.readPool.Start(); err != nil { + startErr = err + return + } + if err := s.cc.chainNotifier.Start(); err != nil { + startErr = err + return + } + if err := s.channelNotifier.Start(); err != nil { + startErr = err + return + } + if err := s.sphinx.Start(); err != nil { + startErr = err + return + } + if err := s.htlcSwitch.Start(); err != nil { + startErr = err + return + } + if err := s.sweeper.Start(); err != nil { + startErr = err + return + } + if err := s.utxoNursery.Start(); err != nil { + startErr = err + return + } + if err := s.chainArb.Start(); err != nil { + startErr = err + return + } + if err := s.breachArbiter.Start(); err != nil { + startErr = err + return + } + if err := s.authGossiper.Start(); err != nil { + startErr = err + return + } + if err := s.chanRouter.Start(); err != nil { + startErr = err + return + } + if err := s.fundingMgr.Start(); err != nil { + startErr = err + return + } + if err := s.invoices.Start(); err != nil { + startErr = err + return + } + if err := s.chanStatusMgr.Start(); err != nil { + startErr = err + return + } + + // Before we start the connMgr, we'll check to see if we have + // any backups to recover. We do this now as we want to ensure + // that have all the information we need to handle channel + // recovery _before_ we even accept connections from any peers. + chanRestorer := &chanDBRestorer{ + db: s.chanDB, + secretKeys: s.cc.keyRing, + chainArb: s.chainArb, + } + if len(s.chansToRestore.PackedSingleChanBackups) != 0 { + err := chanbackup.UnpackAndRecoverSingles( + s.chansToRestore.PackedSingleChanBackups, + s.cc.keyRing, chanRestorer, s, + ) + if err != nil { + startErr = fmt.Errorf("unable to unpack single "+ + "backups: %v", err) + return + } + } + if len(s.chansToRestore.PackedMultiChanBackup) != 0 { + err := chanbackup.UnpackAndRecoverMulti( + s.chansToRestore.PackedMultiChanBackup, + s.cc.keyRing, chanRestorer, s, + ) + if err != nil { + startErr = fmt.Errorf("unable to unpack chan "+ + "backup: %v", err) + return + } + } + + if err := s.chanSubSwapper.Start(); err != nil { + startErr = err + return + } + + s.connMgr.Start() + + // With all the relevant sub-systems started, we'll now attempt + // to establish persistent connections to our direct channel + // collaborators within the network. Before doing so however, + // we'll prune our set of link nodes found within the database + // to ensure we don't reconnect to any nodes we no longer have + // open channels with. + if err := s.chanDB.PruneLinkNodes(); err != nil { + startErr = err + return + } + if err := s.establishPersistentConnections(); err != nil { + startErr = err + return + } + + // If network bootstrapping hasn't been disabled, then we'll + // configure the set of active bootstrappers, and launch a + // dedicated goroutine to maintain a set of persistent + // connections. + if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) && + !(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) { + + bootstrappers, err := initNetworkBootstrappers(s) + if err != nil { + startErr = err + return + } + + s.wg.Add(1) + go s.peerBootstrapper(defaultMinPeers, bootstrappers) + } else { + srvrLog.Infof("Auto peer bootstrapping is disabled") + } + + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&s.active, 1) + }) + + return startErr } // Stop gracefully shutsdown the main daemon server. This function will signal @@ -1184,49 +1215,48 @@ func (s *server) Start() error { // all successfully exited. Additionally, any/all listeners are closed. // NOTE: This function is safe for concurrent access. func (s *server) Stop() error { - // Bail if we're already shutting down. - if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { - return nil - } + s.stop.Do(func() { + atomic.LoadInt32(&s.stopping) - close(s.quit) + close(s.quit) - if s.torController != nil { - s.torController.Stop() - } + if s.torController != nil { + s.torController.Stop() + } - // Shutdown the wallet, funding manager, and the rpc server. - s.chanStatusMgr.Stop() - s.cc.chainNotifier.Stop() - s.chanRouter.Stop() - s.htlcSwitch.Stop() - s.sphinx.Stop() - s.utxoNursery.Stop() - s.breachArbiter.Stop() - s.authGossiper.Stop() - s.chainArb.Stop() - s.sweeper.Stop() - s.channelNotifier.Stop() - s.cc.wallet.Shutdown() - s.cc.chainView.Stop() - s.connMgr.Stop() - s.cc.feeEstimator.Stop() - s.invoices.Stop() - s.fundingMgr.Stop() - s.chanSubSwapper.Start() + // Shutdown the wallet, funding manager, and the rpc server. + s.chanStatusMgr.Stop() + s.cc.chainNotifier.Stop() + s.chanRouter.Stop() + s.htlcSwitch.Stop() + s.sphinx.Stop() + s.utxoNursery.Stop() + s.breachArbiter.Stop() + s.authGossiper.Stop() + s.chainArb.Stop() + s.sweeper.Stop() + s.channelNotifier.Stop() + s.cc.wallet.Shutdown() + s.cc.chainView.Stop() + s.connMgr.Stop() + s.cc.feeEstimator.Stop() + s.invoices.Stop() + s.fundingMgr.Stop() + s.chanSubSwapper.Start() - // Disconnect from each active peers to ensure that - // peerTerminationWatchers signal completion to each peer. - for _, peer := range s.Peers() { - s.DisconnectPeer(peer.addr.IdentityKey) - } + // Disconnect from each active peers to ensure that + // peerTerminationWatchers signal completion to each peer. + for _, peer := range s.Peers() { + s.DisconnectPeer(peer.addr.IdentityKey) + } - // Wait for all lingering goroutines to quit. - s.wg.Wait() + // Wait for all lingering goroutines to quit. + s.wg.Wait() - s.sigPool.Stop() - s.writePool.Stop() - s.readPool.Stop() + s.sigPool.Stop() + s.writePool.Stop() + s.readPool.Stop() + }) return nil } @@ -1234,7 +1264,7 @@ func (s *server) Stop() error { // Stopped returns true if the server has been instructed to shutdown. // NOTE: This function is safe for concurrent access. func (s *server) Stopped() bool { - return atomic.LoadInt32(&s.shutdown) != 0 + return atomic.LoadInt32(&s.stopping) != 0 } // configurePortForwarding attempts to set up port forwarding for the different @@ -2784,7 +2814,6 @@ type openChanReq struct { // // NOTE: This function is safe for concurrent access. func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { - targetPub := string(addr.IdentityKey.SerializeCompressed()) // Acquire mutex, but use explicit unlocking instead of defer for