multi: replace manual CAS with sync.Once in several more modules
This commit is contained in:
parent
e45d4d703a
commit
0a9141763e
@ -1,7 +1,7 @@
|
|||||||
package channelnotifier
|
package channelnotifier
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync/atomic"
|
"sync"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
@ -12,8 +12,8 @@ import (
|
|||||||
// events pipe through. It takes subscriptions for its events, and whenever
|
// events pipe through. It takes subscriptions for its events, and whenever
|
||||||
// it receives a new event it notifies its subscribers over the proper channel.
|
// it receives a new event it notifies its subscribers over the proper channel.
|
||||||
type ChannelNotifier struct {
|
type ChannelNotifier struct {
|
||||||
started uint32
|
started sync.Once
|
||||||
stopped uint32
|
stopped sync.Once
|
||||||
|
|
||||||
ntfnServer *subscribe.Server
|
ntfnServer *subscribe.Server
|
||||||
|
|
||||||
@ -57,26 +57,19 @@ func New(chanDB *channeldb.DB) *ChannelNotifier {
|
|||||||
|
|
||||||
// Start starts the ChannelNotifier and all goroutines it needs to carry out its task.
|
// Start starts the ChannelNotifier and all goroutines it needs to carry out its task.
|
||||||
func (c *ChannelNotifier) Start() error {
|
func (c *ChannelNotifier) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&c.started, 0, 1) {
|
var err error
|
||||||
return nil
|
c.started.Do(func() {
|
||||||
}
|
log.Tracef("ChannelNotifier %v starting", c)
|
||||||
|
err = c.ntfnServer.Start()
|
||||||
log.Tracef("ChannelNotifier %v starting", c)
|
})
|
||||||
|
return err
|
||||||
if err := c.ntfnServer.Start(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop signals the notifier for a graceful shutdown.
|
// Stop signals the notifier for a graceful shutdown.
|
||||||
func (c *ChannelNotifier) Stop() {
|
func (c *ChannelNotifier) Stop() {
|
||||||
if !atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
|
c.stopped.Do(func() {
|
||||||
return
|
c.ntfnServer.Stop()
|
||||||
}
|
})
|
||||||
|
|
||||||
c.ntfnServer.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
|
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
|
||||||
|
@ -225,9 +225,9 @@ type Config struct {
|
|||||||
// will be rejected by this struct.
|
// will be rejected by this struct.
|
||||||
type AuthenticatedGossiper struct {
|
type AuthenticatedGossiper struct {
|
||||||
// Parameters which are needed to properly handle the start and stop of
|
// Parameters which are needed to properly handle the start and stop of
|
||||||
// the service. To be used atomically.
|
// the service.
|
||||||
started uint32
|
started sync.Once
|
||||||
stopped uint32
|
stopped sync.Once
|
||||||
|
|
||||||
// bestHeight is the height of the block at the tip of the main chain
|
// bestHeight is the height of the block at the tip of the main chain
|
||||||
// as we know it. To be used atomically.
|
// as we know it. To be used atomically.
|
||||||
@ -468,10 +468,14 @@ func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
|
|||||||
// Start spawns network messages handler goroutine and registers on new block
|
// Start spawns network messages handler goroutine and registers on new block
|
||||||
// notifications in order to properly handle the premature announcements.
|
// notifications in order to properly handle the premature announcements.
|
||||||
func (d *AuthenticatedGossiper) Start() error {
|
func (d *AuthenticatedGossiper) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&d.started, 0, 1) {
|
var err error
|
||||||
return nil
|
d.started.Do(func() {
|
||||||
}
|
err = d.start()
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *AuthenticatedGossiper) start() error {
|
||||||
log.Info("Authenticated Gossiper is starting")
|
log.Info("Authenticated Gossiper is starting")
|
||||||
|
|
||||||
// First we register for new notifications of newly discovered blocks.
|
// First we register for new notifications of newly discovered blocks.
|
||||||
@ -506,10 +510,10 @@ func (d *AuthenticatedGossiper) Start() error {
|
|||||||
|
|
||||||
// Stop signals any active goroutines for a graceful closure.
|
// Stop signals any active goroutines for a graceful closure.
|
||||||
func (d *AuthenticatedGossiper) Stop() {
|
func (d *AuthenticatedGossiper) Stop() {
|
||||||
if !atomic.CompareAndSwapUint32(&d.stopped, 0, 1) {
|
d.stopped.Do(d.stop)
|
||||||
return
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
func (d *AuthenticatedGossiper) stop() {
|
||||||
log.Info("Authenticated Gossiper is stopping")
|
log.Info("Authenticated Gossiper is stopping")
|
||||||
|
|
||||||
d.blockEpochs.Cancel()
|
d.blockEpochs.Cancel()
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"container/list"
|
"container/list"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
@ -60,8 +59,8 @@ type MailBox interface {
|
|||||||
// memoryMailBox is an implementation of the MailBox struct backed by purely
|
// memoryMailBox is an implementation of the MailBox struct backed by purely
|
||||||
// in-memory queues.
|
// in-memory queues.
|
||||||
type memoryMailBox struct {
|
type memoryMailBox struct {
|
||||||
started uint32 // To be used atomically.
|
started sync.Once
|
||||||
stopped uint32 // To be used atomically.
|
stopped sync.Once
|
||||||
|
|
||||||
wireMessages *list.List
|
wireMessages *list.List
|
||||||
wireHead *list.Element
|
wireHead *list.Element
|
||||||
@ -123,14 +122,11 @@ const (
|
|||||||
//
|
//
|
||||||
// NOTE: This method is part of the MailBox interface.
|
// NOTE: This method is part of the MailBox interface.
|
||||||
func (m *memoryMailBox) Start() error {
|
func (m *memoryMailBox) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) {
|
m.started.Do(func() {
|
||||||
return nil
|
m.wg.Add(2)
|
||||||
}
|
go m.mailCourier(wireCourier)
|
||||||
|
go m.mailCourier(pktCourier)
|
||||||
m.wg.Add(2)
|
})
|
||||||
go m.mailCourier(wireCourier)
|
|
||||||
go m.mailCourier(pktCourier)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,15 +210,12 @@ func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
|
|||||||
//
|
//
|
||||||
// NOTE: This method is part of the MailBox interface.
|
// NOTE: This method is part of the MailBox interface.
|
||||||
func (m *memoryMailBox) Stop() error {
|
func (m *memoryMailBox) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
|
m.stopped.Do(func() {
|
||||||
return nil
|
close(m.quit)
|
||||||
}
|
|
||||||
|
|
||||||
close(m.quit)
|
|
||||||
|
|
||||||
m.wireCond.Signal()
|
|
||||||
m.pktCond.Signal()
|
|
||||||
|
|
||||||
|
m.wireCond.Signal()
|
||||||
|
m.pktCond.Signal()
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync/atomic"
|
"sync"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
@ -81,8 +81,8 @@ func fileExists(name string) bool {
|
|||||||
// to lnd, even backed by multiple distinct lnd across independent failure
|
// to lnd, even backed by multiple distinct lnd across independent failure
|
||||||
// domains.
|
// domains.
|
||||||
type Server struct {
|
type Server struct {
|
||||||
started uint32
|
started sync.Once
|
||||||
stopped uint32
|
stopped sync.Once
|
||||||
|
|
||||||
cfg Config
|
cfg Config
|
||||||
|
|
||||||
@ -146,10 +146,7 @@ var _ lnrpc.SubServer = (*Server)(nil)
|
|||||||
//
|
//
|
||||||
// NOTE: This is part of the lnrpc.SubServer interface.
|
// NOTE: This is part of the lnrpc.SubServer interface.
|
||||||
func (s *Server) Start() error {
|
func (s *Server) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
s.started.Do(func() {})
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,12 +154,9 @@ func (s *Server) Start() error {
|
|||||||
//
|
//
|
||||||
// NOTE: This is part of the lnrpc.SubServer interface.
|
// NOTE: This is part of the lnrpc.SubServer interface.
|
||||||
func (s *Server) Stop() error {
|
func (s *Server) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
s.stopped.Do(func() {
|
||||||
return nil
|
close(s.quit)
|
||||||
}
|
})
|
||||||
|
|
||||||
close(s.quit)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,7 +3,6 @@ package lnwallet
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
"github.com/btcsuite/btcd/wire"
|
"github.com/btcsuite/btcd/wire"
|
||||||
@ -125,8 +124,8 @@ type SignJobResp struct {
|
|||||||
// to sign or verify) can be sent to the pool of workers which will
|
// to sign or verify) can be sent to the pool of workers which will
|
||||||
// asynchronously perform the specified job.
|
// asynchronously perform the specified job.
|
||||||
type SigPool struct {
|
type SigPool struct {
|
||||||
started uint32 // To be used atomically.
|
started sync.Once
|
||||||
stopped uint32 // To be used atomically.
|
stopped sync.Once
|
||||||
|
|
||||||
signer input.Signer
|
signer input.Signer
|
||||||
|
|
||||||
@ -155,28 +154,22 @@ func NewSigPool(numWorkers int, signer input.Signer) *SigPool {
|
|||||||
// Start starts of all goroutines that the sigPool sig pool needs to
|
// Start starts of all goroutines that the sigPool sig pool needs to
|
||||||
// carry out its duties.
|
// carry out its duties.
|
||||||
func (s *SigPool) Start() error {
|
func (s *SigPool) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
s.started.Do(func() {
|
||||||
return nil
|
for i := 0; i < s.numWorkers; i++ {
|
||||||
}
|
s.wg.Add(1)
|
||||||
|
go s.poolWorker()
|
||||||
for i := 0; i < s.numWorkers; i++ {
|
}
|
||||||
s.wg.Add(1)
|
})
|
||||||
go s.poolWorker()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop signals any active workers carrying out jobs to exit so the sigPool can
|
// Stop signals any active workers carrying out jobs to exit so the sigPool can
|
||||||
// gracefully shutdown.
|
// gracefully shutdown.
|
||||||
func (s *SigPool) Stop() error {
|
func (s *SigPool) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
s.stopped.Do(func() {
|
||||||
return nil
|
close(s.quit)
|
||||||
}
|
s.wg.Wait()
|
||||||
|
})
|
||||||
close(s.quit)
|
|
||||||
s.wg.Wait()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,7 +3,6 @@ package netann
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcd/btcec"
|
"github.com/btcsuite/btcd/btcec"
|
||||||
@ -85,8 +84,8 @@ type ChanStatusConfig struct {
|
|||||||
// passively. The ChanStatusManager state machine is designed to reduce the
|
// passively. The ChanStatusManager state machine is designed to reduce the
|
||||||
// likelihood of spamming the network with updates for flapping peers.
|
// likelihood of spamming the network with updates for flapping peers.
|
||||||
type ChanStatusManager struct {
|
type ChanStatusManager struct {
|
||||||
started uint32 // to be used atomically
|
started sync.Once
|
||||||
stopped uint32 // to be used atomically
|
stopped sync.Once
|
||||||
|
|
||||||
cfg *ChanStatusConfig
|
cfg *ChanStatusConfig
|
||||||
|
|
||||||
@ -155,10 +154,14 @@ func NewChanStatusManager(cfg *ChanStatusConfig) (*ChanStatusManager, error) {
|
|||||||
|
|
||||||
// Start safely starts the ChanStatusManager.
|
// Start safely starts the ChanStatusManager.
|
||||||
func (m *ChanStatusManager) Start() error {
|
func (m *ChanStatusManager) Start() error {
|
||||||
if !atomic.CompareAndSwapUint32(&m.started, 0, 1) {
|
var err error
|
||||||
return nil
|
m.started.Do(func() {
|
||||||
}
|
err = m.start()
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *ChanStatusManager) start() error {
|
||||||
channels, err := m.fetchChannels()
|
channels, err := m.fetchChannels()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -192,13 +195,10 @@ func (m *ChanStatusManager) Start() error {
|
|||||||
|
|
||||||
// Stop safely shuts down the ChanStatusManager.
|
// Stop safely shuts down the ChanStatusManager.
|
||||||
func (m *ChanStatusManager) Stop() error {
|
func (m *ChanStatusManager) Stop() error {
|
||||||
if !atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
|
m.stopped.Do(func() {
|
||||||
return nil
|
close(m.quit)
|
||||||
}
|
m.wg.Wait()
|
||||||
|
})
|
||||||
close(m.quit)
|
|
||||||
m.wg.Wait()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,7 +3,6 @@ package queue
|
|||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
|
// ConcurrentQueue is a concurrent-safe FIFO queue with unbounded capacity.
|
||||||
@ -12,8 +11,8 @@ import (
|
|||||||
// items from the in channel to the out channel in the correct order that must
|
// items from the in channel to the out channel in the correct order that must
|
||||||
// be started by calling Start().
|
// be started by calling Start().
|
||||||
type ConcurrentQueue struct {
|
type ConcurrentQueue struct {
|
||||||
started uint32 // to be used atomically
|
started sync.Once
|
||||||
stopped uint32 // to be used atomically
|
stopped sync.Once
|
||||||
|
|
||||||
chanIn chan interface{}
|
chanIn chan interface{}
|
||||||
chanOut chan interface{}
|
chanOut chan interface{}
|
||||||
@ -51,10 +50,10 @@ func (cq *ConcurrentQueue) ChanOut() <-chan interface{} {
|
|||||||
// minimize overhead, but if the out channel is full it pushes items to an
|
// minimize overhead, but if the out channel is full it pushes items to an
|
||||||
// overflow queue. This must be called before using the queue.
|
// overflow queue. This must be called before using the queue.
|
||||||
func (cq *ConcurrentQueue) Start() {
|
func (cq *ConcurrentQueue) Start() {
|
||||||
if !atomic.CompareAndSwapUint32(&cq.started, 0, 1) {
|
cq.started.Do(cq.start)
|
||||||
return
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
func (cq *ConcurrentQueue) start() {
|
||||||
cq.wg.Add(1)
|
cq.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer cq.wg.Done()
|
defer cq.wg.Done()
|
||||||
@ -96,10 +95,8 @@ func (cq *ConcurrentQueue) Start() {
|
|||||||
// channel. This does not clear the queue state, so the queue can be restarted
|
// channel. This does not clear the queue state, so the queue can be restarted
|
||||||
// without dropping items.
|
// without dropping items.
|
||||||
func (cq *ConcurrentQueue) Stop() {
|
func (cq *ConcurrentQueue) Stop() {
|
||||||
if !atomic.CompareAndSwapUint32(&cq.stopped, 0, 1) {
|
cq.stopped.Do(func() {
|
||||||
return
|
close(cq.quit)
|
||||||
}
|
cq.wg.Wait()
|
||||||
|
})
|
||||||
close(cq.quit)
|
|
||||||
cq.wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user