multi: replace manual CAS with sync.Once in component start/stop

This guarantees callers that the method will not return until it
has executed completely at least once.
This commit is contained in:
Federico Bond 2019-01-23 14:15:57 -03:00
parent f802ebddba
commit aea52f4bef
5 changed files with 68 additions and 70 deletions

@ -6,7 +6,6 @@ import (
"math/rand" "math/rand"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
@ -105,9 +104,8 @@ func (c channelState) ConnectedNodes() map[NodeID]struct{} {
// //
// TODO(roasbeef): prob re-word // TODO(roasbeef): prob re-word
type Agent struct { type Agent struct {
// Only to be used atomically. started sync.Once
started uint32 stopped sync.Once
stopped uint32
// cfg houses the configuration state of the Ant. // cfg houses the configuration state of the Ant.
cfg Config cfg Config
@ -197,10 +195,14 @@ func New(cfg Config, initialState []Channel) (*Agent, error) {
// Start starts the agent along with any goroutines it needs to perform its // Start starts the agent along with any goroutines it needs to perform its
// normal duties. // normal duties.
func (a *Agent) Start() error { func (a *Agent) Start() error {
if !atomic.CompareAndSwapUint32(&a.started, 0, 1) { var err error
return nil a.started.Do(func() {
} err = a.start()
})
return err
}
func (a *Agent) start() error {
rand.Seed(time.Now().Unix()) rand.Seed(time.Now().Unix())
log.Infof("Autopilot Agent starting") log.Infof("Autopilot Agent starting")
@ -213,10 +215,14 @@ func (a *Agent) Start() error {
// Stop signals the Agent to gracefully shutdown. This function will block // Stop signals the Agent to gracefully shutdown. This function will block
// until all goroutines have exited. // until all goroutines have exited.
func (a *Agent) Stop() error { func (a *Agent) Stop() error {
if !atomic.CompareAndSwapUint32(&a.stopped, 0, 1) { var err error
return nil a.stopped.Do(func() {
} err = a.stop()
})
return err
}
func (a *Agent) stop() error {
log.Infof("Autopilot Agent stopping") log.Infof("Autopilot Agent stopping")
close(a.quit) close(a.quit)

@ -3,7 +3,6 @@ package autopilot
import ( import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
@ -40,8 +39,8 @@ type ManagerCfg struct {
// It implements the autopilot grpc service, which is used to get data about // It implements the autopilot grpc service, which is used to get data about
// the running autopilot, and give it relevant information. // the running autopilot, and give it relevant information.
type Manager struct { type Manager struct {
started uint32 // To be used atomically. started sync.Once
stopped uint32 // To be used atomically. stopped sync.Once
cfg *ManagerCfg cfg *ManagerCfg
@ -64,27 +63,21 @@ func NewManager(cfg *ManagerCfg) (*Manager, error) {
// Start starts the Manager. // Start starts the Manager.
func (m *Manager) Start() error { func (m *Manager) Start() error {
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) { m.started.Do(func() {})
return nil
}
return nil return nil
} }
// Stop stops the Manager. If an autopilot agent is active, it will also be // Stop stops the Manager. If an autopilot agent is active, it will also be
// stopped. // stopped.
func (m *Manager) Stop() error { func (m *Manager) Stop() error {
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) { m.stopped.Do(func() {
return nil if err := m.StopAgent(); err != nil {
} log.Errorf("Unable to stop pilot: %v", err)
}
if err := m.StopAgent(); err != nil {
log.Errorf("Unable to stop pilot: %v", err)
}
close(m.quit)
m.wg.Wait()
close(m.quit)
m.wg.Wait()
})
return nil return nil
} }

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"io" "io"
"sync" "sync"
"sync/atomic"
"github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
@ -118,8 +117,8 @@ type BreachConfig struct {
// counterparties. // counterparties.
// TODO(roasbeef): closures in config for subsystem pointers to decouple? // TODO(roasbeef): closures in config for subsystem pointers to decouple?
type breachArbiter struct { type breachArbiter struct {
started uint32 // To be used atomically. started sync.Once
stopped uint32 // To be used atomically. stopped sync.Once
cfg *BreachConfig cfg *BreachConfig
@ -140,10 +139,14 @@ func newBreachArbiter(cfg *BreachConfig) *breachArbiter {
// Start is an idempotent method that officially starts the breachArbiter along // Start is an idempotent method that officially starts the breachArbiter along
// with all other goroutines it needs to perform its functions. // with all other goroutines it needs to perform its functions.
func (b *breachArbiter) Start() error { func (b *breachArbiter) Start() error {
if !atomic.CompareAndSwapUint32(&b.started, 0, 1) { var err error
return nil b.started.Do(func() {
} err = b.start()
})
return err
}
func (b *breachArbiter) start() error {
brarLog.Tracef("Starting breach arbiter") brarLog.Tracef("Starting breach arbiter")
// Load all retributions currently persisted in the retribution store. // Load all retributions currently persisted in the retribution store.
@ -226,15 +229,12 @@ func (b *breachArbiter) Start() error {
// graceful shutdown. This function will block until all goroutines spawned by // graceful shutdown. This function will block until all goroutines spawned by
// the breachArbiter have gracefully exited. // the breachArbiter have gracefully exited.
func (b *breachArbiter) Stop() error { func (b *breachArbiter) Stop() error {
if !atomic.CompareAndSwapUint32(&b.stopped, 0, 1) { b.stopped.Do(func() {
return nil brarLog.Infof("Breach arbiter shutting down")
}
brarLog.Infof("Breach arbiter shutting down")
close(b.quit)
b.wg.Wait()
close(b.quit)
b.wg.Wait()
})
return nil return nil
} }

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"sync/atomic"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
@ -79,8 +78,8 @@ type ChannelNotifier interface {
// //
// TODO(roasbeef): better name lol // TODO(roasbeef): better name lol
type SubSwapper struct { type SubSwapper struct {
started uint32 started sync.Once
stopped uint32 stopped sync.Once
// backupState are the set of SCBs for all open channels we know of. // backupState are the set of SCBs for all open channels we know of.
backupState map[wire.OutPoint]Single backupState map[wire.OutPoint]Single
@ -135,29 +134,23 @@ func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
// Start starts the chanbackup.SubSwapper. // Start starts the chanbackup.SubSwapper.
func (s *SubSwapper) Start() error { func (s *SubSwapper) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { s.started.Do(func() {
return nil log.Infof("Starting chanbackup.SubSwapper")
}
log.Infof("Starting chanbackup.SubSwapper")
s.wg.Add(1)
go s.backupUpdater()
s.wg.Add(1)
go s.backupUpdater()
})
return nil return nil
} }
// Stop signals the SubSwapper to being a graceful shutdown. // Stop signals the SubSwapper to being a graceful shutdown.
func (s *SubSwapper) Stop() error { func (s *SubSwapper) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { s.stopped.Do(func() {
return nil log.Infof("Stopping chanbackup.SubSwapper")
}
log.Infof("Stopping chanbackup.SubSwapper")
close(s.quit)
s.wg.Wait()
close(s.quit)
s.wg.Wait()
})
return nil return nil
} }

@ -5,7 +5,6 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
@ -353,9 +352,8 @@ type fundingConfig struct {
// controls between the wallet and remote peers are enforced via the funding // controls between the wallet and remote peers are enforced via the funding
// manager. // manager.
type fundingManager struct { type fundingManager struct {
// MUST be used atomically. started sync.Once
started int32 stopped sync.Once
stopped int32
// cfg is a copy of the configuration struct that the FundingManager // cfg is a copy of the configuration struct that the FundingManager
// was initialized with. // was initialized with.
@ -471,10 +469,14 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
// Start launches all helper goroutines required for handling requests sent // Start launches all helper goroutines required for handling requests sent
// to the funding manager. // to the funding manager.
func (f *fundingManager) Start() error { func (f *fundingManager) Start() error {
if atomic.AddInt32(&f.started, 1) != 1 { // TODO(roasbeef): CAS instead var err error
return nil f.started.Do(func() {
} err = f.start()
})
return err
}
func (f *fundingManager) start() error {
fndgLog.Tracef("Funding manager running") fndgLog.Tracef("Funding manager running")
// Upon restart, the Funding Manager will check the database to load any // Upon restart, the Funding Manager will check the database to load any
@ -710,10 +712,14 @@ func (f *fundingManager) Start() error {
// Stop signals all helper goroutines to execute a graceful shutdown. This // Stop signals all helper goroutines to execute a graceful shutdown. This
// method will block until all goroutines have exited. // method will block until all goroutines have exited.
func (f *fundingManager) Stop() error { func (f *fundingManager) Stop() error {
if atomic.AddInt32(&f.stopped, 1) != 1 { var err error
return nil f.stopped.Do(func() {
} err = f.stop()
})
return err
}
func (f *fundingManager) stop() error {
fndgLog.Infof("Funding manager shutting down") fndgLog.Infof("Funding manager shutting down")
close(f.quit) close(f.quit)