From b8512c3568013b237cc51704333ae69e43398577 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 6 Jun 2018 11:25:14 +0200 Subject: [PATCH 1/2] server: don't hold mutex while sending to peer This commit removes the sendToPeer method from the server, and instead moves the necessary logic into SendToPeer. This let's us make the mutex acquisition more fine-grained, only holding it while reading from the peer map. Earlier it was required to be held during the whole call to sendToPeer, as the method would access the map internally. --- server.go | 54 ++++++++++++++++++------------------------------------ 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/server.go b/server.go index fd7f8e96..49d71ae2 100644 --- a/server.go +++ b/server.go @@ -1205,16 +1205,29 @@ func (s *server) broadcastMessages( func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { - // Queue the incoming messages in the peer's outgoing message buffer. - // We acquire the shared lock here to ensure the peer map doesn't change - // from underneath us. + // Compute the target peer's identifier. + targetPubBytes := target.SerializeCompressed() + + srvrLog.Tracef("Attempting to send msgs %v to: %x", + len(msgs), targetPubBytes) + + // Lookup intended target in peersByPub, returning an error to the + // caller if the peer is unknown. Access to peersByPub is synchronized + // here to ensure we consider the exact set of peers present at the + // time of invocation. s.mu.RLock() - targetPeer, errChans, err := s.sendToPeer(target, msgs) + targetPeer, err := s.findPeerByPubStr(string(targetPubBytes)) s.mu.RUnlock() - if err != nil { + if err == ErrPeerNotConnected { + srvrLog.Errorf("unable to send message to %x, "+ + "peer is not connected", targetPubBytes) return err } + // Send messages to the peer and get the error channels that will be + // signaled by the peer's write handler. + errChans := s.sendPeerMessages(targetPeer, msgs, nil) + // With the server's shared lock released, we now handle all of the // errors being returned from the target peer's write handler. for _, errChan := range errChans { @@ -1259,37 +1272,6 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, s.peerConnectedListeners[pubStr], connectedChan) } -// sendToPeer is an internal method that queues the given messages in the -// outgoing buffer of the specified `target` peer. Upon success, this method -// returns the peer instance and a slice of error chans that will contain -// responses from the write handler. -func (s *server) sendToPeer(target *btcec.PublicKey, - msgs []lnwire.Message) (*peer, []chan error, error) { - - // Compute the target peer's identifier. - targetPubBytes := target.SerializeCompressed() - - srvrLog.Tracef("Attempting to send msgs %v to: %x", - len(msgs), targetPubBytes) - - // Lookup intended target in peersByPub, returning an error to the - // caller if the peer is unknown. Access to peersByPub is synchronized - // here to ensure we consider the exact set of peers present at the - // time of invocation. - targetPeer, err := s.findPeerByPubStr(string(targetPubBytes)) - if err == ErrPeerNotConnected { - srvrLog.Errorf("unable to send message to %x, "+ - "peer is not connected", targetPubBytes) - return nil, nil, err - } - - // Send messages to the peer and return the error channels that will be - // signaled by the peer's write handler. - errChans := s.sendPeerMessages(targetPeer, msgs, nil) - - return targetPeer, errChans, nil -} - // sendPeerMessages enqueues a list of messages into the outgoingQueue of the // `targetPeer`. This method supports additional broadcast-level // synchronization by using the additional `wg` to coordinate a particular From ac1ab6f516da330e214f92fad2383278d1a01212 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 6 Jun 2018 12:18:44 +0200 Subject: [PATCH 2/2] server: hold mutex shorter during broadcast We remove the internale broadcastMessage method, and instead handle the mutex handling within BroadcastMessage. This lets us hold the mutex only when neccessary. --- server.go | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/server.go b/server.go index 49d71ae2..e32df058 100644 --- a/server.go +++ b/server.go @@ -1148,33 +1148,19 @@ func (s *server) establishPersistentConnections() error { } // BroadcastMessage sends a request to the server to broadcast a set of -// messages to all peers other than the one specified by the `skip` parameter. +// messages to all peers other than the one specified by the `skips` parameter. // // NOTE: This function is safe for concurrent access. -func (s *server) BroadcastMessage(skip map[routing.Vertex]struct{}, +func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{}, msgs ...lnwire.Message) error { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.broadcastMessages(skip, msgs) -} - -// broadcastMessages is an internal method that delivers messages to all active -// peers except the one specified by `skip`. -// -// NOTE: This method MUST be called while the server's mutex is locked. -func (s *server) broadcastMessages( - skips map[routing.Vertex]struct{}, - msgs []lnwire.Message) error { - srvrLog.Debugf("Broadcasting %v messages", len(msgs)) - // Iterate over all known peers, dispatching a go routine to enqueue - // all messages to each of peers. We synchronize access to peersByPub - // throughout this process to ensure we deliver messages to exact set - // of peers present at the time of invocation. - var wg sync.WaitGroup + // Filter out peers found in the skips map. We synchronize access to + // peersByPub throughout this process to ensure we deliver messages to + // exact set of peers present at the time of invocation. + s.mu.RLock() + peers := make([]*peer, 0, len(s.peersByPub)) for _, sPeer := range s.peersByPub { if skips != nil { if _, ok := skips[sPeer.pubKeyBytes]; ok { @@ -1184,6 +1170,14 @@ func (s *server) broadcastMessages( } } + peers = append(peers, sPeer) + } + s.mu.RUnlock() + + // Iterate over all known peers, dispatching a go routine to enqueue + // all messages to each of peers. + var wg sync.WaitGroup + for _, sPeer := range peers { // Dispatch a go routine to enqueue all messages to this peer. wg.Add(1) s.wg.Add(1)