From 0a9141763e51e3871d6b8d379c1c6d84c6e8bcf9 Mon Sep 17 00:00:00 2001 From: Federico Bond Date: Thu, 6 Jun 2019 14:15:11 -0300 Subject: [PATCH] multi: replace manual CAS with sync.Once in several more modules --- channelnotifier/channelnotifier.go | 31 +++++++++++---------------- discovery/gossiper.go | 22 +++++++++++-------- htlcswitch/mailbox.go | 31 +++++++++++---------------- lnrpc/chainrpc/chainnotifer_server.go | 20 ++++++----------- lnwallet/sigpool.go | 31 +++++++++++---------------- netann/chan_status_manager.go | 26 +++++++++++----------- queue/queue.go | 21 ++++++++---------- 7 files changed, 78 insertions(+), 104 deletions(-) diff --git a/channelnotifier/channelnotifier.go b/channelnotifier/channelnotifier.go index 58a1bf1c..0f0e537f 100644 --- a/channelnotifier/channelnotifier.go +++ b/channelnotifier/channelnotifier.go @@ -1,7 +1,7 @@ package channelnotifier import ( - "sync/atomic" + "sync" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" @@ -12,8 +12,8 @@ import ( // events pipe through. It takes subscriptions for its events, and whenever // it receives a new event it notifies its subscribers over the proper channel. type ChannelNotifier struct { - started uint32 - stopped uint32 + started sync.Once + stopped sync.Once ntfnServer *subscribe.Server @@ -57,26 +57,19 @@ func New(chanDB *channeldb.DB) *ChannelNotifier { // Start starts the ChannelNotifier and all goroutines it needs to carry out its task. func (c *ChannelNotifier) Start() error { - if !atomic.CompareAndSwapUint32(&c.started, 0, 1) { - return nil - } - - log.Tracef("ChannelNotifier %v starting", c) - - if err := c.ntfnServer.Start(); err != nil { - return err - } - - return nil + var err error + c.started.Do(func() { + log.Tracef("ChannelNotifier %v starting", c) + err = c.ntfnServer.Start() + }) + return err } // Stop signals the notifier for a graceful shutdown. func (c *ChannelNotifier) Stop() { - if !atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { - return - } - - c.ntfnServer.Stop() + c.stopped.Do(func() { + c.ntfnServer.Stop() + }) } // SubscribeChannelEvents returns a subscribe.Client that will receive updates diff --git a/discovery/gossiper.go b/discovery/gossiper.go index cd2d5573..dec3d3a8 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -225,9 +225,9 @@ type Config struct { // will be rejected by this struct. type AuthenticatedGossiper struct { // Parameters which are needed to properly handle the start and stop of - // the service. To be used atomically. - started uint32 - stopped uint32 + // the service. + started sync.Once + stopped sync.Once // bestHeight is the height of the block at the tip of the main chain // as we know it. To be used atomically. @@ -468,10 +468,14 @@ func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate( // Start spawns network messages handler goroutine and registers on new block // notifications in order to properly handle the premature announcements. func (d *AuthenticatedGossiper) Start() error { - if !atomic.CompareAndSwapUint32(&d.started, 0, 1) { - return nil - } + var err error + d.started.Do(func() { + err = d.start() + }) + return err +} +func (d *AuthenticatedGossiper) start() error { log.Info("Authenticated Gossiper is starting") // First we register for new notifications of newly discovered blocks. @@ -506,10 +510,10 @@ func (d *AuthenticatedGossiper) Start() error { // Stop signals any active goroutines for a graceful closure. func (d *AuthenticatedGossiper) Stop() { - if !atomic.CompareAndSwapUint32(&d.stopped, 0, 1) { - return - } + d.stopped.Do(d.stop) +} +func (d *AuthenticatedGossiper) stop() { log.Info("Authenticated Gossiper is stopping") d.blockEpochs.Cancel() diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 38872a12..99ec9e1e 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -4,7 +4,6 @@ import ( "container/list" "errors" "sync" - "sync/atomic" "time" "github.com/lightningnetwork/lnd/lnwire" @@ -60,8 +59,8 @@ type MailBox interface { // memoryMailBox is an implementation of the MailBox struct backed by purely // in-memory queues. type memoryMailBox struct { - started uint32 // To be used atomically. - stopped uint32 // To be used atomically. + started sync.Once + stopped sync.Once wireMessages *list.List wireHead *list.Element @@ -123,14 +122,11 @@ const ( // // NOTE: This method is part of the MailBox interface. func (m *memoryMailBox) Start() error { - if !atomic.CompareAndSwapUint32(&m.started, 0, 1) { - return nil - } - - m.wg.Add(2) - go m.mailCourier(wireCourier) - go m.mailCourier(pktCourier) - + m.started.Do(func() { + m.wg.Add(2) + go m.mailCourier(wireCourier) + go m.mailCourier(pktCourier) + }) return nil } @@ -214,15 +210,12 @@ func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool { // // NOTE: This method is part of the MailBox interface. func (m *memoryMailBox) Stop() error { - if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) { - return nil - } - - close(m.quit) - - m.wireCond.Signal() - m.pktCond.Signal() + m.stopped.Do(func() { + close(m.quit) + m.wireCond.Signal() + m.pktCond.Signal() + }) return nil } diff --git a/lnrpc/chainrpc/chainnotifer_server.go b/lnrpc/chainrpc/chainnotifer_server.go index 494f0c0a..2d662c8e 100644 --- a/lnrpc/chainrpc/chainnotifer_server.go +++ b/lnrpc/chainrpc/chainnotifer_server.go @@ -9,7 +9,7 @@ import ( "io/ioutil" "os" "path/filepath" - "sync/atomic" + "sync" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" @@ -81,8 +81,8 @@ func fileExists(name string) bool { // to lnd, even backed by multiple distinct lnd across independent failure // domains. type Server struct { - started uint32 - stopped uint32 + started sync.Once + stopped sync.Once cfg Config @@ -146,10 +146,7 @@ var _ lnrpc.SubServer = (*Server)(nil) // // NOTE: This is part of the lnrpc.SubServer interface. func (s *Server) Start() error { - if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { - return nil - } - + s.started.Do(func() {}) return nil } @@ -157,12 +154,9 @@ func (s *Server) Start() error { // // NOTE: This is part of the lnrpc.SubServer interface. func (s *Server) Stop() error { - if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { - return nil - } - - close(s.quit) - + s.stopped.Do(func() { + close(s.quit) + }) return nil } diff --git a/lnwallet/sigpool.go b/lnwallet/sigpool.go index be22491e..57b84988 100644 --- a/lnwallet/sigpool.go +++ b/lnwallet/sigpool.go @@ -3,7 +3,6 @@ package lnwallet import ( "fmt" "sync" - "sync/atomic" "github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/wire" @@ -125,8 +124,8 @@ type SignJobResp struct { // to sign or verify) can be sent to the pool of workers which will // asynchronously perform the specified job. type SigPool struct { - started uint32 // To be used atomically. - stopped uint32 // To be used atomically. + started sync.Once + stopped sync.Once signer input.Signer @@ -155,28 +154,22 @@ func NewSigPool(numWorkers int, signer input.Signer) *SigPool { // Start starts of all goroutines that the sigPool sig pool needs to // carry out its duties. func (s *SigPool) Start() error { - if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { - return nil - } - - for i := 0; i < s.numWorkers; i++ { - s.wg.Add(1) - go s.poolWorker() - } - + s.started.Do(func() { + for i := 0; i < s.numWorkers; i++ { + s.wg.Add(1) + go s.poolWorker() + } + }) return nil } // Stop signals any active workers carrying out jobs to exit so the sigPool can // gracefully shutdown. func (s *SigPool) Stop() error { - if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { - return nil - } - - close(s.quit) - s.wg.Wait() - + s.stopped.Do(func() { + close(s.quit) + s.wg.Wait() + }) return nil } diff --git a/netann/chan_status_manager.go b/netann/chan_status_manager.go index cefa4fe6..146fef47 100644 --- a/netann/chan_status_manager.go +++ b/netann/chan_status_manager.go @@ -3,7 +3,6 @@ package netann import ( "errors" "sync" - "sync/atomic" "time" "github.com/btcsuite/btcd/btcec" @@ -85,8 +84,8 @@ type ChanStatusConfig struct { // passively. The ChanStatusManager state machine is designed to reduce the // likelihood of spamming the network with updates for flapping peers. type ChanStatusManager struct { - started uint32 // to be used atomically - stopped uint32 // to be used atomically + started sync.Once + stopped sync.Once cfg *ChanStatusConfig @@ -155,10 +154,14 @@ func NewChanStatusManager(cfg *ChanStatusConfig) (*ChanStatusManager, error) { // Start safely starts the ChanStatusManager. func (m *ChanStatusManager) Start() error { - if !atomic.CompareAndSwapUint32(&m.started, 0, 1) { - return nil - } + var err error + m.started.Do(func() { + err = m.start() + }) + return err +} +func (m *ChanStatusManager) start() error { channels, err := m.fetchChannels() if err != nil { return err @@ -192,13 +195,10 @@ func (m *ChanStatusManager) Start() error { // Stop safely shuts down the ChanStatusManager. func (m *ChanStatusManager) Stop() error { - if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) { - return nil - } - - close(m.quit) - m.wg.Wait() - + m.stopped.Do(func() { + close(m.quit) + m.wg.Wait() + }) return nil } diff --git a/queue/queue.go b/queue/queue.go index 07eca3fa..e3b01b26 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -3,7 +3,6 @@ package queue import ( "container/list" "sync" - "sync/atomic" ) // ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity. @@ -12,8 +11,8 @@ import ( // items from the in channel to the out channel in the correct order that must // be started by calling Start(). type ConcurrentQueue struct { - started uint32 // to be used atomically - stopped uint32 // to be used atomically + started sync.Once + stopped sync.Once chanIn chan interface{} chanOut chan interface{} @@ -51,10 +50,10 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} { // minimize overhead, but if the out channel is full it pushes items to an // overflow queue. This must be called before using the queue. func (cq *ConcurrentQueue) Start() { - if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) { - return - } + cq.started.Do(cq.start) +} +func (cq *ConcurrentQueue) start() { cq.wg.Add(1) go func() { defer cq.wg.Done() @@ -96,10 +95,8 @@ func (cq *ConcurrentQueue) Start() { // channel. This does not clear the queue state, so the queue can be restarted // without dropping items. func (cq *ConcurrentQueue) Stop() { - if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) { - return - } - - close(cq.quit) - cq.wg.Wait() + cq.stopped.Do(func() { + close(cq.quit) + cq.wg.Wait() + }) }