server: hold mutex shorter during broadcast

We remove the internale broadcastMessage method, and instead handle the
mutex handling within BroadcastMessage. This lets us hold the mutex only
when neccessary.
This commit is contained in:
Johan T. Halseth 2018-06-06 12:18:44 +02:00
parent b8512c3568
commit ac1ab6f516
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -1148,33 +1148,19 @@ func (s *server) establishPersistentConnections() error {
} }
// BroadcastMessage sends a request to the server to broadcast a set of // BroadcastMessage sends a request to the server to broadcast a set of
// messages to all peers other than the one specified by the `skip` parameter. // messages to all peers other than the one specified by the `skips` parameter.
// //
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) BroadcastMessage(skip map[routing.Vertex]struct{}, func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.broadcastMessages(skip, msgs)
}
// broadcastMessages is an internal method that delivers messages to all active
// peers except the one specified by `skip`.
//
// NOTE: This method MUST be called while the server's mutex is locked.
func (s *server) broadcastMessages(
skips map[routing.Vertex]struct{},
msgs []lnwire.Message) error {
srvrLog.Debugf("Broadcasting %v messages", len(msgs)) srvrLog.Debugf("Broadcasting %v messages", len(msgs))
// Iterate over all known peers, dispatching a go routine to enqueue // Filter out peers found in the skips map. We synchronize access to
// all messages to each of peers. We synchronize access to peersByPub // peersByPub throughout this process to ensure we deliver messages to
// throughout this process to ensure we deliver messages to exact set // exact set of peers present at the time of invocation.
// of peers present at the time of invocation. s.mu.RLock()
var wg sync.WaitGroup peers := make([]*peer, 0, len(s.peersByPub))
for _, sPeer := range s.peersByPub { for _, sPeer := range s.peersByPub {
if skips != nil { if skips != nil {
if _, ok := skips[sPeer.pubKeyBytes]; ok { if _, ok := skips[sPeer.pubKeyBytes]; ok {
@ -1184,6 +1170,14 @@ func (s *server) broadcastMessages(
} }
} }
peers = append(peers, sPeer)
}
s.mu.RUnlock()
// Iterate over all known peers, dispatching a go routine to enqueue
// all messages to each of peers.
var wg sync.WaitGroup
for _, sPeer := range peers {
// Dispatch a go routine to enqueue all messages to this peer. // Dispatch a go routine to enqueue all messages to this peer.
wg.Add(1) wg.Add(1)
s.wg.Add(1) s.wg.Add(1)