Merge pull request #2535 from federicobond/once-refactor
multi: replace manual CAS with sync.Once in component start/stop
This commit is contained in:
commit
c338fdc2f2
@ -6,7 +6,6 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
@ -105,9 +104,8 @@ func (c channelState) ConnectedNodes() map[NodeID]struct{} {
|
|||||||
//
|
//
|
||||||
// TODO(roasbeef): prob re-word
|
// TODO(roasbeef): prob re-word
|
||||||
type Agent struct {
|
type Agent struct {
|
||||||
// Only to be used atomically.
|
started sync.Once
|
||||||
started uint32
|
stopped sync.Once
|
||||||
stopped uint32
|
|
||||||
|
|
||||||
// cfg houses the configuration state of the Ant.
|
// cfg houses the configuration state of the Ant.
|
||||||
cfg Config
|
cfg Config
|
||||||
@ -197,10 +195,14 @@ func New(cfg Config, initialState []Channel) (*Agent, error) {
|
|||||||
// Start starts the agent along with any goroutines it needs to perform its
|
// Start starts the agent along with any goroutines it needs to perform its
|
||||||
// normal duties.
|
// normal duties.
|
||||||
func (a *Agent) Start() error {
|
func (a *Agent) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&a.started, 0, 1) {
|
var err error
|
||||||
return nil
|
a.started.Do(func() {
|
||||||
|
err = a.start()
|
||||||
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) start() error {
|
||||||
rand.Seed(time.Now().Unix())
|
rand.Seed(time.Now().Unix())
|
||||||
log.Infof("Autopilot Agent starting")
|
log.Infof("Autopilot Agent starting")
|
||||||
|
|
||||||
@ -213,10 +215,14 @@ func (a *Agent) Start() error {
|
|||||||
// Stop signals the Agent to gracefully shutdown. This function will block
|
// Stop signals the Agent to gracefully shutdown. This function will block
|
||||||
// until all goroutines have exited.
|
// until all goroutines have exited.
|
||||||
func (a *Agent) Stop() error {
|
func (a *Agent) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&a.stopped, 0, 1) {
|
var err error
|
||||||
return nil
|
a.stopped.Do(func() {
|
||||||
|
err = a.stop()
|
||||||
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) stop() error {
|
||||||
log.Infof("Autopilot Agent stopping")
|
log.Infof("Autopilot Agent stopping")
|
||||||
|
|
||||||
close(a.quit)
|
close(a.quit)
|
||||||
|
@ -3,7 +3,6 @@ package autopilot
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
@ -40,8 +39,8 @@ type ManagerCfg struct {
|
|||||||
// It implements the autopilot grpc service, which is used to get data about
|
// It implements the autopilot grpc service, which is used to get data about
|
||||||
// the running autopilot, and give it relevant information.
|
// the running autopilot, and give it relevant information.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
started uint32 // To be used atomically.
|
started sync.Once
|
||||||
stopped uint32 // To be used atomically.
|
stopped sync.Once
|
||||||
|
|
||||||
cfg *ManagerCfg
|
cfg *ManagerCfg
|
||||||
|
|
||||||
@ -64,27 +63,21 @@ func NewManager(cfg *ManagerCfg) (*Manager, error) {
|
|||||||
|
|
||||||
// Start starts the Manager.
|
// Start starts the Manager.
|
||||||
func (m *Manager) Start() error {
|
func (m *Manager) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) {
|
m.started.Do(func() {})
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the Manager. If an autopilot agent is active, it will also be
|
// Stop stops the Manager. If an autopilot agent is active, it will also be
|
||||||
// stopped.
|
// stopped.
|
||||||
func (m *Manager) Stop() error {
|
func (m *Manager) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
|
m.stopped.Do(func() {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := m.StopAgent(); err != nil {
|
if err := m.StopAgent(); err != nil {
|
||||||
log.Errorf("Unable to stop pilot: %v", err)
|
log.Errorf("Unable to stop pilot: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
close(m.quit)
|
close(m.quit)
|
||||||
m.wg.Wait()
|
m.wg.Wait()
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/blockchain"
|
"github.com/btcsuite/btcd/blockchain"
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
@ -118,8 +117,8 @@ type BreachConfig struct {
|
|||||||
// counterparties.
|
// counterparties.
|
||||||
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
|
// TODO(roasbeef): closures in config for subsystem pointers to decouple?
|
||||||
type breachArbiter struct {
|
type breachArbiter struct {
|
||||||
started uint32 // To be used atomically.
|
started sync.Once
|
||||||
stopped uint32 // To be used atomically.
|
stopped sync.Once
|
||||||
|
|
||||||
cfg *BreachConfig
|
cfg *BreachConfig
|
||||||
|
|
||||||
@ -140,10 +139,14 @@ func newBreachArbiter(cfg *BreachConfig) *breachArbiter {
|
|||||||
// Start is an idempotent method that officially starts the breachArbiter along
|
// Start is an idempotent method that officially starts the breachArbiter along
|
||||||
// with all other goroutines it needs to perform its functions.
|
// with all other goroutines it needs to perform its functions.
|
||||||
func (b *breachArbiter) Start() error {
|
func (b *breachArbiter) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&b.started, 0, 1) {
|
var err error
|
||||||
return nil
|
b.started.Do(func() {
|
||||||
|
err = b.start()
|
||||||
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *breachArbiter) start() error {
|
||||||
brarLog.Tracef("Starting breach arbiter")
|
brarLog.Tracef("Starting breach arbiter")
|
||||||
|
|
||||||
// Load all retributions currently persisted in the retribution store.
|
// Load all retributions currently persisted in the retribution store.
|
||||||
@ -226,15 +229,12 @@ func (b *breachArbiter) Start() error {
|
|||||||
// graceful shutdown. This function will block until all goroutines spawned by
|
// graceful shutdown. This function will block until all goroutines spawned by
|
||||||
// the breachArbiter have gracefully exited.
|
// the breachArbiter have gracefully exited.
|
||||||
func (b *breachArbiter) Stop() error {
|
func (b *breachArbiter) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&b.stopped, 0, 1) {
|
b.stopped.Do(func() {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
brarLog.Infof("Breach arbiter shutting down")
|
brarLog.Infof("Breach arbiter shutting down")
|
||||||
|
|
||||||
close(b.quit)
|
close(b.quit)
|
||||||
b.wg.Wait()
|
b.wg.Wait()
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
@ -79,8 +78,8 @@ type ChannelNotifier interface {
|
|||||||
//
|
//
|
||||||
// TODO(roasbeef): better name lol
|
// TODO(roasbeef): better name lol
|
||||||
type SubSwapper struct {
|
type SubSwapper struct {
|
||||||
started uint32
|
started sync.Once
|
||||||
stopped uint32
|
stopped sync.Once
|
||||||
|
|
||||||
// backupState are the set of SCBs for all open channels we know of.
|
// backupState are the set of SCBs for all open channels we know of.
|
||||||
backupState map[wire.OutPoint]Single
|
backupState map[wire.OutPoint]Single
|
||||||
@ -135,29 +134,23 @@ func NewSubSwapper(startingChans []Single, chanNotifier ChannelNotifier,
|
|||||||
|
|
||||||
// Start starts the chanbackup.SubSwapper.
|
// Start starts the chanbackup.SubSwapper.
|
||||||
func (s *SubSwapper) Start() error {
|
func (s *SubSwapper) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
s.started.Do(func() {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Starting chanbackup.SubSwapper")
|
log.Infof("Starting chanbackup.SubSwapper")
|
||||||
|
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.backupUpdater()
|
go s.backupUpdater()
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop signals the SubSwapper to being a graceful shutdown.
|
// Stop signals the SubSwapper to being a graceful shutdown.
|
||||||
func (s *SubSwapper) Stop() error {
|
func (s *SubSwapper) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
s.stopped.Do(func() {
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("Stopping chanbackup.SubSwapper")
|
log.Infof("Stopping chanbackup.SubSwapper")
|
||||||
|
|
||||||
close(s.quit)
|
close(s.quit)
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
@ -353,9 +352,8 @@ type fundingConfig struct {
|
|||||||
// controls between the wallet and remote peers are enforced via the funding
|
// controls between the wallet and remote peers are enforced via the funding
|
||||||
// manager.
|
// manager.
|
||||||
type fundingManager struct {
|
type fundingManager struct {
|
||||||
// MUST be used atomically.
|
started sync.Once
|
||||||
started int32
|
stopped sync.Once
|
||||||
stopped int32
|
|
||||||
|
|
||||||
// cfg is a copy of the configuration struct that the FundingManager
|
// cfg is a copy of the configuration struct that the FundingManager
|
||||||
// was initialized with.
|
// was initialized with.
|
||||||
@ -471,10 +469,14 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
|
|||||||
// Start launches all helper goroutines required for handling requests sent
|
// Start launches all helper goroutines required for handling requests sent
|
||||||
// to the funding manager.
|
// to the funding manager.
|
||||||
func (f *fundingManager) Start() error {
|
func (f *fundingManager) Start() error {
|
||||||
if atomic.AddInt32(&f.started, 1) != 1 { // TODO(roasbeef): CAS instead
|
var err error
|
||||||
return nil
|
f.started.Do(func() {
|
||||||
|
err = f.start()
|
||||||
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fundingManager) start() error {
|
||||||
fndgLog.Tracef("Funding manager running")
|
fndgLog.Tracef("Funding manager running")
|
||||||
|
|
||||||
// Upon restart, the Funding Manager will check the database to load any
|
// Upon restart, the Funding Manager will check the database to load any
|
||||||
@ -710,10 +712,14 @@ func (f *fundingManager) Start() error {
|
|||||||
// Stop signals all helper goroutines to execute a graceful shutdown. This
|
// Stop signals all helper goroutines to execute a graceful shutdown. This
|
||||||
// method will block until all goroutines have exited.
|
// method will block until all goroutines have exited.
|
||||||
func (f *fundingManager) Stop() error {
|
func (f *fundingManager) Stop() error {
|
||||||
if atomic.AddInt32(&f.stopped, 1) != 1 {
|
var err error
|
||||||
return nil
|
f.stopped.Do(func() {
|
||||||
|
err = f.stop()
|
||||||
|
})
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fundingManager) stop() error {
|
||||||
fndgLog.Infof("Funding manager shutting down")
|
fndgLog.Infof("Funding manager shutting down")
|
||||||
|
|
||||||
close(f.quit)
|
close(f.quit)
|
||||||
|
Loading…
Reference in New Issue
Block a user