From aea52f4befa6a069a21ec7bc764256f8a1174b14 Mon Sep 17 00:00:00 2001 From: Federico Bond Date: Wed, 23 Jan 2019 14:15:57 -0300 Subject: [PATCH] multi: replace manual CAS with sync.Once in component start/stop This guarantees callers that the method will not return until it has executed completely at least once. --- autopilot/agent.go | 26 ++++++++++++++++---------- autopilot/manager.go | 27 ++++++++++----------------- breacharbiter.go | 28 ++++++++++++++-------------- chanbackup/pubsub.go | 31 ++++++++++++------------------- fundingmanager.go | 26 ++++++++++++++++---------- 5 files changed, 68 insertions(+), 70 deletions(-) diff --git a/autopilot/agent.go b/autopilot/agent.go index 9610427c..d40abf28 100644 --- a/autopilot/agent.go +++ b/autopilot/agent.go @@ -6,7 +6,6 @@ import ( "math/rand" "net" "sync" - "sync/atomic" "time" "github.com/btcsuite/btcd/btcec" @@ -105,9 +104,8 @@ func (c channelState) ConnectedNodes() map[NodeID]struct{} { // // TODO(roasbeef): prob re-word type Agent struct { - // Only to be used atomically. - started uint32 - stopped uint32 + started sync.Once + stopped sync.Once // cfg houses the configuration state of the Ant. cfg Config @@ -197,10 +195,14 @@ func New(cfg Config, initialState []Channel) (*Agent, error) { // Start starts the agent along with any goroutines it needs to perform its // normal duties. func (a *Agent) Start() error { - if !atomic.CompareAndSwapUint32(&a.started, 0, 1) { - return nil - } + var err error + a.started.Do(func() { + err = a.start() + }) + return err +} +func (a *Agent) start() error { rand.Seed(time.Now().Unix()) log.Infof("Autopilot Agent starting") @@ -213,10 +215,14 @@ func (a *Agent) Start() error { // Stop signals the Agent to gracefully shutdown. This function will block // until all goroutines have exited. func (a *Agent) Stop() error { - if !atomic.CompareAndSwapUint32(&a.stopped, 0, 1) { - return nil - } + var err error + a.stopped.Do(func() { + err = a.stop() + }) + return err +} +func (a *Agent) stop() error { log.Infof("Autopilot Agent stopping") close(a.quit) diff --git a/autopilot/manager.go b/autopilot/manager.go index f078b2a6..3253fdaf 100644 --- a/autopilot/manager.go +++ b/autopilot/manager.go @@ -3,7 +3,6 @@ package autopilot import ( "fmt" "sync" - "sync/atomic" "github.com/btcsuite/btcd/btcec" "github.com/lightningnetwork/lnd/lnwallet" @@ -40,8 +39,8 @@ type ManagerCfg struct { // It implements the autopilot grpc service, which is used to get data about // the running autopilot, and give it relevant information. type Manager struct { - started uint32 // To be used atomically. - stopped uint32 // To be used atomically. + started sync.Once + stopped sync.Once cfg *ManagerCfg @@ -64,27 +63,21 @@ func NewManager(cfg *ManagerCfg) (*Manager, error) { // Start starts the Manager. func (m *Manager) Start() error { - if !atomic.CompareAndSwapUint32(&m.started, 0, 1) { - return nil - } - + m.started.Do(func() {}) return nil } // Stop stops the Manager. If an autopilot agent is active, it will also be // stopped. func (m *Manager) Stop() error { - if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) { - return nil - } - - if err := m.StopAgent(); err != nil { - log.Errorf("Unable to stop pilot: %v", err) - } - - close(m.quit) - m.wg.Wait() + m.stopped.Do(func() { + if err := m.StopAgent(); err != nil { + log.Errorf("Unable to stop pilot: %v", err) + } + close(m.quit) + m.wg.Wait() + }) return nil } diff --git a/breacharbiter.go b/breacharbiter.go index 33dd0fd8..fce38289 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "sync" - "sync/atomic" "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/chaincfg/chainhash" @@ -118,8 +117,8 @@ type BreachConfig struct { // counterparties. // TODO(roasbeef): closures in config for subsystem pointers to decouple? type breachArbiter struct { - started uint32 // To be used atomically. - stopped uint32 // To be used atomically. + started sync.Once + stopped sync.Once cfg *BreachConfig @@ -140,10 +139,14 @@ func newBreachArbiter(cfg *BreachConfig) *breachArbiter { // Start is an idempotent method that officially starts the breachArbiter along // with all other goroutines it needs to perform its functions. func (b *breachArbiter) Start() error { - if !atomic.CompareAndSwapUint32(&b.started, 0, 1) { - return nil - } + var err error + b.started.Do(func() { + err = b.start() + }) + return err +} +func (b *breachArbiter) start() error { brarLog.Tracef("Starting breach arbiter") // Load all retributions currently persisted in the retribution store. @@ -226,15 +229,12 @@ func (b *breachArbiter) Start() error { // graceful shutdown. This function will block until all goroutines spawned by // the breachArbiter have gracefully exited. func (b *breachArbiter) Stop() error { - if !atomic.CompareAndSwapUint32(&b.stopped, 0, 1) { - return nil - } - - brarLog.Infof("Breach arbiter shutting down") - - close(b.quit) - b.wg.Wait() + b.stopped.Do(func() { + brarLog.Infof("Breach arbiter shutting down") + close(b.quit) + b.wg.Wait() + }) return nil } diff --git a/chanbackup/pubsub.go b/chanbackup/pubsub.go index 580fb927..b9331820 100644 --- a/chanbackup/pubsub.go +++ b/chanbackup/pubsub.go @@ -5,7 +5,6 @@ import ( "fmt" "net" "sync" - "sync/atomic" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" @@ -79,8 +78,8 @@ type ChannelNotifier interface { // // TODO(roasbeef): better name lol type SubSwapper struct { - started uint32 - stopped uint32 + started sync.Once + stopped sync.Once // backupState are the set of SCBs for all open channels we know of. backupState map[wire.OutPoint]Single @@ -135,29 +134,23 @@ func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier, // Start starts the chanbackup.SubSwapper. func (s *SubSwapper) Start() error { - if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { - return nil - } - - log.Infof("Starting chanbackup.SubSwapper") - - s.wg.Add(1) - go s.backupUpdater() + s.started.Do(func() { + log.Infof("Starting chanbackup.SubSwapper") + s.wg.Add(1) + go s.backupUpdater() + }) return nil } // Stop signals the SubSwapper to being a graceful shutdown. func (s *SubSwapper) Stop() error { - if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { - return nil - } - - log.Infof("Stopping chanbackup.SubSwapper") - - close(s.quit) - s.wg.Wait() + s.stopped.Do(func() { + log.Infof("Stopping chanbackup.SubSwapper") + close(s.quit) + s.wg.Wait() + }) return nil } diff --git a/fundingmanager.go b/fundingmanager.go index 2216e1ff..1b0a4b4a 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "fmt" "sync" - "sync/atomic" "time" "github.com/btcsuite/btcd/btcec" @@ -353,9 +352,8 @@ type fundingConfig struct { // controls between the wallet and remote peers are enforced via the funding // manager. type fundingManager struct { - // MUST be used atomically. - started int32 - stopped int32 + started sync.Once + stopped sync.Once // cfg is a copy of the configuration struct that the FundingManager // was initialized with. @@ -471,10 +469,14 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) { // Start launches all helper goroutines required for handling requests sent // to the funding manager. func (f *fundingManager) Start() error { - if atomic.AddInt32(&f.started, 1) != 1 { // TODO(roasbeef): CAS instead - return nil - } + var err error + f.started.Do(func() { + err = f.start() + }) + return err +} +func (f *fundingManager) start() error { fndgLog.Tracef("Funding manager running") // Upon restart, the Funding Manager will check the database to load any @@ -710,10 +712,14 @@ func (f *fundingManager) Start() error { // Stop signals all helper goroutines to execute a graceful shutdown. This // method will block until all goroutines have exited. func (f *fundingManager) Stop() error { - if atomic.AddInt32(&f.stopped, 1) != 1 { - return nil - } + var err error + f.stopped.Do(func() { + err = f.stop() + }) + return err +} +func (f *fundingManager) stop() error { fndgLog.Infof("Funding manager shutting down") close(f.quit)