Merge pull request #3178 from federicobond/once-refactor

multi: replace manual CAS with sync.Once in several more modules
This commit is contained in:
Conner Fromknecht 2019-07-08 20:33:44 -07:00 committed by GitHub
commit 933e723ec7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 78 additions and 104 deletions

@ -1,7 +1,7 @@
package channelnotifier
import (
"sync/atomic"
"sync"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
@ -12,8 +12,8 @@ import (
// events pipe through. It takes subscriptions for its events, and whenever
// it receives a new event it notifies its subscribers over the proper channel.
type ChannelNotifier struct {
started uint32
stopped uint32
started sync.Once
stopped sync.Once
ntfnServer *subscribe.Server
@ -57,26 +57,19 @@ func New(chanDB *channeldb.DB) *ChannelNotifier {
// Start starts the ChannelNotifier and all goroutines it needs to carry out its task.
func (c *ChannelNotifier) Start() error {
if !atomic.CompareAndSwapUint32(&c.started, 0, 1) {
return nil
}
log.Tracef("ChannelNotifier %v starting", c)
if err := c.ntfnServer.Start(); err != nil {
var err error
c.started.Do(func() {
log.Trace("ChannelNotifier starting")
err = c.ntfnServer.Start()
})
return err
}
return nil
}
// Stop signals the notifier for a graceful shutdown.
func (c *ChannelNotifier) Stop() {
if !atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
return
}
c.stopped.Do(func() {
c.ntfnServer.Stop()
})
}
// SubscribeChannelEvents returns a subscribe.Client that will receive updates

@ -223,9 +223,9 @@ type Config struct {
// will be rejected by this struct.
type AuthenticatedGossiper struct {
// Parameters which are needed to properly handle the start and stop of
// the service. To be used atomically.
started uint32
stopped uint32
// the service.
started sync.Once
stopped sync.Once
// bestHeight is the height of the block at the tip of the main chain
// as we know it. To be used atomically.
@ -466,10 +466,14 @@ func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
// Start spawns network messages handler goroutine and registers on new block
// notifications in order to properly handle the premature announcements.
func (d *AuthenticatedGossiper) Start() error {
if !atomic.CompareAndSwapUint32(&d.started, 0, 1) {
return nil
var err error
d.started.Do(func() {
err = d.start()
})
return err
}
func (d *AuthenticatedGossiper) start() error {
log.Info("Authenticated Gossiper is starting")
// First we register for new notifications of newly discovered blocks.
@ -504,10 +508,10 @@ func (d *AuthenticatedGossiper) Start() error {
// Stop signals any active goroutines for a graceful closure.
func (d *AuthenticatedGossiper) Stop() {
if !atomic.CompareAndSwapUint32(&d.stopped, 0, 1) {
return
d.stopped.Do(d.stop)
}
func (d *AuthenticatedGossiper) stop() {
log.Info("Authenticated Gossiper is stopping")
d.blockEpochs.Cancel()

@ -4,7 +4,6 @@ import (
"container/list"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/lightningnetwork/lnd/lnwire"
@ -60,8 +59,8 @@ type MailBox interface {
// memoryMailBox is an implementation of the MailBox struct backed by purely
// in-memory queues.
type memoryMailBox struct {
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
started sync.Once
stopped sync.Once
wireMessages *list.List
wireHead *list.Element
@ -123,14 +122,11 @@ const (
//
// NOTE: This method is part of the MailBox interface.
func (m *memoryMailBox) Start() error {
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) {
return nil
}
m.started.Do(func() {
m.wg.Add(2)
go m.mailCourier(wireCourier)
go m.mailCourier(pktCourier)
})
return nil
}
@ -214,15 +210,12 @@ func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
//
// NOTE: This method is part of the MailBox interface.
func (m *memoryMailBox) Stop() error {
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
return nil
}
m.stopped.Do(func() {
close(m.quit)
m.wireCond.Signal()
m.pktCond.Signal()
})
return nil
}

@ -9,7 +9,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync/atomic"
"sync"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
@ -81,8 +81,8 @@ func fileExists(name string) bool {
// to lnd, even backed by multiple distinct lnd across independent failure
// domains.
type Server struct {
started uint32
stopped uint32
started sync.Once
stopped sync.Once
cfg Config
@ -146,10 +146,7 @@ var _ lnrpc.SubServer = (*Server)(nil)
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return nil
}
s.started.Do(func() {})
return nil
}
@ -157,12 +154,9 @@ func (s *Server) Start() error {
//
// NOTE: This is part of the lnrpc.SubServer interface.
func (s *Server) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
s.stopped.Do(func() {
close(s.quit)
})
return nil
}

@ -3,7 +3,6 @@ package lnwallet
import (
"fmt"
"sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
@ -125,8 +124,8 @@ type SignJobResp struct {
// to sign or verify) can be sent to the pool of workers which will
// asynchronously perform the specified job.
type SigPool struct {
started uint32 // To be used atomically.
stopped uint32 // To be used atomically.
started sync.Once
stopped sync.Once
signer input.Signer
@ -155,28 +154,22 @@ func NewSigPool(numWorkers int, signer input.Signer) *SigPool {
// Start starts of all goroutines that the sigPool sig pool needs to
// carry out its duties.
func (s *SigPool) Start() error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return nil
}
s.started.Do(func() {
for i := 0; i < s.numWorkers; i++ {
s.wg.Add(1)
go s.poolWorker()
}
})
return nil
}
// Stop signals any active workers carrying out jobs to exit so the sigPool can
// gracefully shutdown.
func (s *SigPool) Stop() error {
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
return nil
}
s.stopped.Do(func() {
close(s.quit)
s.wg.Wait()
})
return nil
}

@ -3,7 +3,6 @@ package netann
import (
"errors"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
@ -85,8 +84,8 @@ type ChanStatusConfig struct {
// passively. The ChanStatusManager state machine is designed to reduce the
// likelihood of spamming the network with updates for flapping peers.
type ChanStatusManager struct {
started uint32 // to be used atomically
stopped uint32 // to be used atomically
started sync.Once
stopped sync.Once
cfg *ChanStatusConfig
@ -155,10 +154,14 @@ func NewChanStatusManager(cfg *ChanStatusConfig) (*ChanStatusManager, error) {
// Start safely starts the ChanStatusManager.
func (m *ChanStatusManager) Start() error {
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) {
return nil
var err error
m.started.Do(func() {
err = m.start()
})
return err
}
func (m *ChanStatusManager) start() error {
channels, err := m.fetchChannels()
if err != nil {
return err
@ -192,13 +195,10 @@ func (m *ChanStatusManager) Start() error {
// Stop safely shuts down the ChanStatusManager.
func (m *ChanStatusManager) Stop() error {
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
return nil
}
m.stopped.Do(func() {
close(m.quit)
m.wg.Wait()
})
return nil
}

@ -3,7 +3,6 @@ package queue
import (
"container/list"
"sync"
"sync/atomic"
)
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
@ -12,8 +11,8 @@ import (
// items from the in channel to the out channel in the correct order that must
// be started by calling Start().
type ConcurrentQueue struct {
started uint32 // to be used atomically
stopped uint32 // to be used atomically
started sync.Once
stopped sync.Once
chanIn chan interface{}
chanOut chan interface{}
@ -51,10 +50,10 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
// minimize overhead, but if the out channel is full it pushes items to an
// overflow queue. This must be called before using the queue.
func (cq *ConcurrentQueue) Start() {
if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) {
return
cq.started.Do(cq.start)
}
func (cq *ConcurrentQueue) start() {
cq.wg.Add(1)
go func() {
defer cq.wg.Done()
@ -96,10 +95,8 @@ func (cq *ConcurrentQueue) Start() {
// channel. This does not clear the queue state, so the queue can be restarted
// without dropping items.
func (cq *ConcurrentQueue) Stop() {
if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) {
return
}
cq.stopped.Do(func() {
close(cq.quit)
cq.wg.Wait()
})
}