From ac1ab6f516da330e214f92fad2383278d1a01212 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 6 Jun 2018 12:18:44 +0200 Subject: [PATCH] server: hold mutex shorter during broadcast We remove the internale broadcastMessage method, and instead handle the mutex handling within BroadcastMessage. This lets us hold the mutex only when neccessary. --- server.go | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/server.go b/server.go index 49d71ae2..e32df058 100644 --- a/server.go +++ b/server.go @@ -1148,33 +1148,19 @@ func (s *server) establishPersistentConnections() error { } // BroadcastMessage sends a request to the server to broadcast a set of -// messages to all peers other than the one specified by the `skip` parameter. +// messages to all peers other than the one specified by the `skips` parameter. // // NOTE: This function is safe for concurrent access. -func (s *server) BroadcastMessage(skip map[routing.Vertex]struct{}, +func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{}, msgs ...lnwire.Message) error { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.broadcastMessages(skip, msgs) -} - -// broadcastMessages is an internal method that delivers messages to all active -// peers except the one specified by `skip`. -// -// NOTE: This method MUST be called while the server's mutex is locked. -func (s *server) broadcastMessages( - skips map[routing.Vertex]struct{}, - msgs []lnwire.Message) error { - srvrLog.Debugf("Broadcasting %v messages", len(msgs)) - // Iterate over all known peers, dispatching a go routine to enqueue - // all messages to each of peers. We synchronize access to peersByPub - // throughout this process to ensure we deliver messages to exact set - // of peers present at the time of invocation. - var wg sync.WaitGroup + // Filter out peers found in the skips map. We synchronize access to + // peersByPub throughout this process to ensure we deliver messages to + // exact set of peers present at the time of invocation. + s.mu.RLock() + peers := make([]*peer, 0, len(s.peersByPub)) for _, sPeer := range s.peersByPub { if skips != nil { if _, ok := skips[sPeer.pubKeyBytes]; ok { @@ -1184,6 +1170,14 @@ func (s *server) broadcastMessages( } } + peers = append(peers, sPeer) + } + s.mu.RUnlock() + + // Iterate over all known peers, dispatching a go routine to enqueue + // all messages to each of peers. + var wg sync.WaitGroup + for _, sPeer := range peers { // Dispatch a go routine to enqueue all messages to this peer. wg.Add(1) s.wg.Add(1)