server: BroadcastMessage will no longer block until completion
In this commit, we make the BroadcastMessage method on the server more asynchronous by abandoning the two wait groups that it used for synchronization. It has been observed that a circular waiting loop between the AuthenticatedGossiper and a peer’s readHandler can cause the system to dead lock. By removing this unnecessary synchronization, we avoid the deadlock case and allow the gossiper itself to no longer block in this scenario.
This commit is contained in:
parent
7aa64d58da
commit
6db90ef09a
57
server.go
57
server.go
@ -818,9 +818,7 @@ func (s *server) establishPersistentConnections() error {
|
||||
// messages to all peers other than the one specified by the `skip` parameter.
|
||||
//
|
||||
// NOTE: This function is safe for concurrent access.
|
||||
func (s *server) BroadcastMessage(skip *btcec.PublicKey,
|
||||
msgs ...lnwire.Message) error {
|
||||
|
||||
func (s *server) BroadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -831,9 +829,7 @@ func (s *server) BroadcastMessage(skip *btcec.PublicKey,
|
||||
// peers except the one specified by `skip`.
|
||||
//
|
||||
// NOTE: This method MUST be called while the server's mutex is locked.
|
||||
func (s *server) broadcastMessages(
|
||||
skip *btcec.PublicKey,
|
||||
msgs []lnwire.Message) error {
|
||||
func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message) error {
|
||||
|
||||
srvrLog.Debugf("Broadcasting %v messages", len(msgs))
|
||||
|
||||
@ -841,23 +837,15 @@ func (s *server) broadcastMessages(
|
||||
// all messages to each of peers. We synchronize access to peersByPub
|
||||
// throughout this process to ensure we deliver messages to exact set
|
||||
// of peers present at the time of invocation.
|
||||
var wg sync.WaitGroup
|
||||
for pubStr, sPeer := range s.peersByPub {
|
||||
if skip != nil && sPeer.addr.IdentityKey.IsEqual(skip) {
|
||||
srvrLog.Debugf("Skipping %v in broadcast", pubStr)
|
||||
continue
|
||||
}
|
||||
|
||||
// Dispatch a go routine to enqueue all messages to this peer.
|
||||
wg.Add(1)
|
||||
s.wg.Add(1)
|
||||
go s.sendPeerMessages(sPeer, msgs, &wg)
|
||||
go s.sendPeerMessages(sPeer, msgs)
|
||||
}
|
||||
|
||||
// Wait for all messages to have been dispatched before returning to
|
||||
// caller.
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -866,9 +854,7 @@ func (s *server) broadcastMessages(
|
||||
// method will return a non-nil error.
|
||||
//
|
||||
// NOTE: This function is safe for concurrent access.
|
||||
func (s *server) SendToPeer(target *btcec.PublicKey,
|
||||
msgs ...lnwire.Message) error {
|
||||
|
||||
func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -879,8 +865,7 @@ func (s *server) SendToPeer(target *btcec.PublicKey,
|
||||
// particular peer comes online.
|
||||
//
|
||||
// NOTE: This function is safe for concurrent access.
|
||||
func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
|
||||
connectedChan chan<- struct{}) {
|
||||
func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, connectedChan chan<- struct{}) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
@ -904,8 +889,7 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
|
||||
|
||||
// sendToPeer is an internal method that delivers messages to the specified
|
||||
// `target` peer.
|
||||
func (s *server) sendToPeer(target *btcec.PublicKey,
|
||||
msgs []lnwire.Message) error {
|
||||
func (s *server) sendToPeer(target *btcec.PublicKey, msgs []lnwire.Message) error {
|
||||
|
||||
// Compute the target peer's identifier.
|
||||
targetPubBytes := target.SerializeCompressed()
|
||||
@ -924,37 +908,14 @@ func (s *server) sendToPeer(target *btcec.PublicKey,
|
||||
return err
|
||||
}
|
||||
|
||||
s.sendPeerMessages(targetPeer, msgs, nil)
|
||||
s.sendPeerMessages(targetPeer, msgs)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
|
||||
// `targetPeer`. This method supports additional broadcast-level
|
||||
// synchronization by using the additional `wg` to coordinate a particular
|
||||
// broadcast.
|
||||
//
|
||||
// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a
|
||||
// go routine--both `wg` and the server's WaitGroup should be incremented
|
||||
// beforehand. If this method is not spawned as a go routine, the provided
|
||||
// `wg` should be nil, and the server's WaitGroup should not be tracking this
|
||||
// invocation.
|
||||
func (s *server) sendPeerMessages(
|
||||
targetPeer *peer,
|
||||
msgs []lnwire.Message,
|
||||
wg *sync.WaitGroup) {
|
||||
|
||||
// If a WaitGroup is provided, we assume that this method was spawned
|
||||
// as a go routine, and that it is being tracked by both the server's
|
||||
// WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this
|
||||
// event, we defer a call to Done on both WaitGroups to 1) ensure that
|
||||
// server will be able to shutdown after its go routines exit, and 2)
|
||||
// so the server can return to the caller of BroadcastMessage.
|
||||
if wg != nil {
|
||||
defer s.wg.Done()
|
||||
defer wg.Done()
|
||||
}
|
||||
|
||||
// `targetPeer`.
|
||||
func (s *server) sendPeerMessages(targetPeer *peer, msgs []lnwire.Message) {
|
||||
for _, msg := range msgs {
|
||||
targetPeer.queueMsg(msg, nil)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user