diff --git a/chanrestore.go b/chanrestore.go index 3e219489..7aaada2f 100644 --- a/chanrestore.go +++ b/chanrestore.go @@ -121,7 +121,8 @@ func (c *chanDBRestorer) RestoreChansFromSingles(backups ...chanbackup.Single) e channelShells = append(channelShells, chanShell) } - ltndLog.Infof("Inserting %v SCB channel shells into DB") + ltndLog.Infof("Inserting %v SCB channel shells into DB", + len(channelShells)) // Now that we have all the backups mapped into a series of Singles, // we'll insert them all into the database. diff --git a/server.go b/server.go index bfce1d58..02fd2e4d 100644 --- a/server.go +++ b/server.go @@ -81,8 +81,11 @@ var ( // Additionally, the server is also used as a central messaging bus to interact // with any of its companion objects. type server struct { - started int32 // atomic - shutdown int32 // atomic + active int32 // atomic + stopping int32 // atomic + + start sync.Once + stop sync.Once // identityPriv is the private key used to authenticate any incoming // connections. @@ -1035,148 +1038,176 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, // Started returns true if the server has been started, and false otherwise. // NOTE: This function is safe for concurrent access. func (s *server) Started() bool { - return atomic.LoadInt32(&s.started) != 0 + return atomic.LoadInt32(&s.active) != 0 } // Start starts the main daemon server, all requested listeners, and any helper // goroutines. // NOTE: This function is safe for concurrent access. func (s *server) Start() error { - // Already running? - if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { - return nil - } - - if s.torController != nil { - if err := s.initTorController(); err != nil { - return err - } - } - - if s.natTraversal != nil { - s.wg.Add(1) - go s.watchExternalIP() - } - - // Start the notification server. This is used so channel management - // goroutines can be notified when a funding transaction reaches a - // sufficient number of confirmations, or when the input for the - // funding transaction is spent in an attempt at an uncooperative close - // by the counterparty. - if err := s.sigPool.Start(); err != nil { - return err - } - if err := s.writePool.Start(); err != nil { - return err - } - if err := s.readPool.Start(); err != nil { - return err - } - if err := s.cc.chainNotifier.Start(); err != nil { - return err - } - if err := s.channelNotifier.Start(); err != nil { - return err - } - if err := s.sphinx.Start(); err != nil { - return err - } - if err := s.htlcSwitch.Start(); err != nil { - return err - } - if err := s.sweeper.Start(); err != nil { - return err - } - if err := s.utxoNursery.Start(); err != nil { - return err - } - if err := s.chainArb.Start(); err != nil { - return err - } - if err := s.breachArbiter.Start(); err != nil { - return err - } - if err := s.authGossiper.Start(); err != nil { - return err - } - if err := s.chanRouter.Start(); err != nil { - return err - } - if err := s.fundingMgr.Start(); err != nil { - return err - } - if err := s.invoices.Start(); err != nil { - return err - } - if err := s.chanStatusMgr.Start(); err != nil { - return err - } - - // Before we start the connMgr, we'll check to see if we have any - // backups to recover. We do this now as we want to ensure that have - // all the information we need to handle channel recovery _before_ we - // even accept connections from any peers. - chanRestorer := &chanDBRestorer{ - db: s.chanDB, - secretKeys: s.cc.keyRing, - } - if len(s.chansToRestore.PackedSingleChanBackups) != 0 { - err := chanbackup.UnpackAndRecoverSingles( - s.chansToRestore.PackedSingleChanBackups, - s.cc.keyRing, chanRestorer, s, - ) - if err != nil { - return fmt.Errorf("unable to unpack single "+ - "backups: %v", err) - } - } - if len(s.chansToRestore.PackedMultiChanBackup) != 0 { - err := chanbackup.UnpackAndRecoverMulti( - s.chansToRestore.PackedMultiChanBackup, - s.cc.keyRing, chanRestorer, s, - ) - if err != nil { - return fmt.Errorf("unable to unpack chan "+ - "backup: %v", err) - } - } - - if err := s.chanSubSwapper.Start(); err != nil { - return err - } - - s.connMgr.Start() - - // With all the relevant sub-systems started, we'll now attempt to - // establish persistent connections to our direct channel collaborators - // within the network. Before doing so however, we'll prune our set of - // link nodes found within the database to ensure we don't reconnect to - // any nodes we no longer have open channels with. - if err := s.chanDB.PruneLinkNodes(); err != nil { - return err - } - if err := s.establishPersistentConnections(); err != nil { - return err - } - - // If network bootstrapping hasn't been disabled, then we'll configure - // the set of active bootstrappers, and launch a dedicated goroutine to - // maintain a set of persistent connections. - if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) && - !(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) { - - bootstrappers, err := initNetworkBootstrappers(s) - if err != nil { - return err + var startErr error + s.start.Do(func() { + if s.torController != nil { + if err := s.initTorController(); err != nil { + startErr = err + return + } } - s.wg.Add(1) - go s.peerBootstrapper(defaultMinPeers, bootstrappers) - } else { - srvrLog.Infof("Auto peer bootstrapping is disabled") - } + if s.natTraversal != nil { + s.wg.Add(1) + go s.watchExternalIP() + } - return nil + // Start the notification server. This is used so channel + // management goroutines can be notified when a funding + // transaction reaches a sufficient number of confirmations, or + // when the input for the funding transaction is spent in an + // attempt at an uncooperative close by the counterparty. + if err := s.sigPool.Start(); err != nil { + startErr = err + return + } + if err := s.writePool.Start(); err != nil { + startErr = err + return + } + if err := s.readPool.Start(); err != nil { + startErr = err + return + } + if err := s.cc.chainNotifier.Start(); err != nil { + startErr = err + return + } + if err := s.channelNotifier.Start(); err != nil { + startErr = err + return + } + if err := s.sphinx.Start(); err != nil { + startErr = err + return + } + if err := s.htlcSwitch.Start(); err != nil { + startErr = err + return + } + if err := s.sweeper.Start(); err != nil { + startErr = err + return + } + if err := s.utxoNursery.Start(); err != nil { + startErr = err + return + } + if err := s.chainArb.Start(); err != nil { + startErr = err + return + } + if err := s.breachArbiter.Start(); err != nil { + startErr = err + return + } + if err := s.authGossiper.Start(); err != nil { + startErr = err + return + } + if err := s.chanRouter.Start(); err != nil { + startErr = err + return + } + if err := s.fundingMgr.Start(); err != nil { + startErr = err + return + } + if err := s.invoices.Start(); err != nil { + startErr = err + return + } + if err := s.chanStatusMgr.Start(); err != nil { + startErr = err + return + } + + // Before we start the connMgr, we'll check to see if we have + // any backups to recover. We do this now as we want to ensure + // that have all the information we need to handle channel + // recovery _before_ we even accept connections from any peers. + chanRestorer := &chanDBRestorer{ + db: s.chanDB, + secretKeys: s.cc.keyRing, + chainArb: s.chainArb, + } + if len(s.chansToRestore.PackedSingleChanBackups) != 0 { + err := chanbackup.UnpackAndRecoverSingles( + s.chansToRestore.PackedSingleChanBackups, + s.cc.keyRing, chanRestorer, s, + ) + if err != nil { + startErr = fmt.Errorf("unable to unpack single "+ + "backups: %v", err) + return + } + } + if len(s.chansToRestore.PackedMultiChanBackup) != 0 { + err := chanbackup.UnpackAndRecoverMulti( + s.chansToRestore.PackedMultiChanBackup, + s.cc.keyRing, chanRestorer, s, + ) + if err != nil { + startErr = fmt.Errorf("unable to unpack chan "+ + "backup: %v", err) + return + } + } + + if err := s.chanSubSwapper.Start(); err != nil { + startErr = err + return + } + + s.connMgr.Start() + + // With all the relevant sub-systems started, we'll now attempt + // to establish persistent connections to our direct channel + // collaborators within the network. Before doing so however, + // we'll prune our set of link nodes found within the database + // to ensure we don't reconnect to any nodes we no longer have + // open channels with. + if err := s.chanDB.PruneLinkNodes(); err != nil { + startErr = err + return + } + if err := s.establishPersistentConnections(); err != nil { + startErr = err + return + } + + // If network bootstrapping hasn't been disabled, then we'll + // configure the set of active bootstrappers, and launch a + // dedicated goroutine to maintain a set of persistent + // connections. + if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) && + !(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) { + + bootstrappers, err := initNetworkBootstrappers(s) + if err != nil { + startErr = err + return + } + + s.wg.Add(1) + go s.peerBootstrapper(defaultMinPeers, bootstrappers) + } else { + srvrLog.Infof("Auto peer bootstrapping is disabled") + } + + // Set the active flag now that we've completed the full + // startup. + atomic.StoreInt32(&s.active, 1) + }) + + return startErr } // Stop gracefully shutsdown the main daemon server. This function will signal @@ -1184,49 +1215,48 @@ func (s *server) Start() error { // all successfully exited. Additionally, any/all listeners are closed. // NOTE: This function is safe for concurrent access. func (s *server) Stop() error { - // Bail if we're already shutting down. - if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { - return nil - } + s.stop.Do(func() { + atomic.LoadInt32(&s.stopping) - close(s.quit) + close(s.quit) - if s.torController != nil { - s.torController.Stop() - } + if s.torController != nil { + s.torController.Stop() + } - // Shutdown the wallet, funding manager, and the rpc server. - s.chanStatusMgr.Stop() - s.cc.chainNotifier.Stop() - s.chanRouter.Stop() - s.htlcSwitch.Stop() - s.sphinx.Stop() - s.utxoNursery.Stop() - s.breachArbiter.Stop() - s.authGossiper.Stop() - s.chainArb.Stop() - s.sweeper.Stop() - s.channelNotifier.Stop() - s.cc.wallet.Shutdown() - s.cc.chainView.Stop() - s.connMgr.Stop() - s.cc.feeEstimator.Stop() - s.invoices.Stop() - s.fundingMgr.Stop() - s.chanSubSwapper.Start() + // Shutdown the wallet, funding manager, and the rpc server. + s.chanStatusMgr.Stop() + s.cc.chainNotifier.Stop() + s.chanRouter.Stop() + s.htlcSwitch.Stop() + s.sphinx.Stop() + s.utxoNursery.Stop() + s.breachArbiter.Stop() + s.authGossiper.Stop() + s.chainArb.Stop() + s.sweeper.Stop() + s.channelNotifier.Stop() + s.cc.wallet.Shutdown() + s.cc.chainView.Stop() + s.connMgr.Stop() + s.cc.feeEstimator.Stop() + s.invoices.Stop() + s.fundingMgr.Stop() + s.chanSubSwapper.Start() - // Disconnect from each active peers to ensure that - // peerTerminationWatchers signal completion to each peer. - for _, peer := range s.Peers() { - s.DisconnectPeer(peer.addr.IdentityKey) - } + // Disconnect from each active peers to ensure that + // peerTerminationWatchers signal completion to each peer. + for _, peer := range s.Peers() { + s.DisconnectPeer(peer.addr.IdentityKey) + } - // Wait for all lingering goroutines to quit. - s.wg.Wait() + // Wait for all lingering goroutines to quit. + s.wg.Wait() - s.sigPool.Stop() - s.writePool.Stop() - s.readPool.Stop() + s.sigPool.Stop() + s.writePool.Stop() + s.readPool.Stop() + }) return nil } @@ -1234,7 +1264,7 @@ func (s *server) Stop() error { // Stopped returns true if the server has been instructed to shutdown. // NOTE: This function is safe for concurrent access. func (s *server) Stopped() bool { - return atomic.LoadInt32(&s.shutdown) != 0 + return atomic.LoadInt32(&s.stopping) != 0 } // configurePortForwarding attempts to set up port forwarding for the different @@ -2784,7 +2814,6 @@ type openChanReq struct { // // NOTE: This function is safe for concurrent access. func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { - targetPub := string(addr.IdentityKey.SerializeCompressed()) // Acquire mutex, but use explicit unlocking instead of defer for