Merge pull request #1335 from halseth/peer-mutex-relaxation

Peer mutex relaxation
This commit is contained in:
Olaoluwa Osuntokun 2018-06-06 16:29:53 -07:00 committed by GitHub
commit 2b2b83f97c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1148,33 +1148,19 @@ func (s *server) establishPersistentConnections() error {
}
// BroadcastMessage sends a request to the server to broadcast a set of
// messages to all peers other than the one specified by the `skip` parameter.
// messages to all peers other than the one specified by the `skips` parameter.
//
// NOTE: This function is safe for concurrent access.
func (s *server) BroadcastMessage(skip map[routing.Vertex]struct{},
func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
msgs ...lnwire.Message) error {
s.mu.RLock()
defer s.mu.RUnlock()
return s.broadcastMessages(skip, msgs)
}
// broadcastMessages is an internal method that delivers messages to all active
// peers except the one specified by `skip`.
//
// NOTE: This method MUST be called while the server's mutex is locked.
func (s *server) broadcastMessages(
skips map[routing.Vertex]struct{},
msgs []lnwire.Message) error {
srvrLog.Debugf("Broadcasting %v messages", len(msgs))
// Iterate over all known peers, dispatching a go routine to enqueue
// all messages to each of peers. We synchronize access to peersByPub
// throughout this process to ensure we deliver messages to exact set
// of peers present at the time of invocation.
var wg sync.WaitGroup
// Filter out peers found in the skips map. We synchronize access to
// peersByPub throughout this process to ensure we deliver messages to
// exact set of peers present at the time of invocation.
s.mu.RLock()
peers := make([]*peer, 0, len(s.peersByPub))
for _, sPeer := range s.peersByPub {
if skips != nil {
if _, ok := skips[sPeer.pubKeyBytes]; ok {
@ -1184,6 +1170,14 @@ func (s *server) broadcastMessages(
}
}
peers = append(peers, sPeer)
}
s.mu.RUnlock()
// Iterate over all known peers, dispatching a go routine to enqueue
// all messages to each of peers.
var wg sync.WaitGroup
for _, sPeer := range peers {
// Dispatch a go routine to enqueue all messages to this peer.
wg.Add(1)
s.wg.Add(1)
@ -1205,16 +1199,29 @@ func (s *server) broadcastMessages(
func (s *server) SendToPeer(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
// Queue the incoming messages in the peer's outgoing message buffer.
// We acquire the shared lock here to ensure the peer map doesn't change
// from underneath us.
// Compute the target peer's identifier.
targetPubBytes := target.SerializeCompressed()
srvrLog.Tracef("Attempting to send msgs %v to: %x",
len(msgs), targetPubBytes)
// Lookup intended target in peersByPub, returning an error to the
// caller if the peer is unknown. Access to peersByPub is synchronized
// here to ensure we consider the exact set of peers present at the
// time of invocation.
s.mu.RLock()
targetPeer, errChans, err := s.sendToPeer(target, msgs)
targetPeer, err := s.findPeerByPubStr(string(targetPubBytes))
s.mu.RUnlock()
if err != nil {
if err == ErrPeerNotConnected {
srvrLog.Errorf("unable to send message to %x, "+
"peer is not connected", targetPubBytes)
return err
}
// Send messages to the peer and get the error channels that will be
// signaled by the peer's write handler.
errChans := s.sendPeerMessages(targetPeer, msgs, nil)
// With the server's shared lock released, we now handle all of the
// errors being returned from the target peer's write handler.
for _, errChan := range errChans {
@ -1259,37 +1266,6 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
s.peerConnectedListeners[pubStr], connectedChan)
}
// sendToPeer is an internal method that queues the given messages in the
// outgoing buffer of the specified `target` peer. Upon success, this method
// returns the peer instance and a slice of error chans that will contain
// responses from the write handler.
func (s *server) sendToPeer(target *btcec.PublicKey,
msgs []lnwire.Message) (*peer, []chan error, error) {
// Compute the target peer's identifier.
targetPubBytes := target.SerializeCompressed()
srvrLog.Tracef("Attempting to send msgs %v to: %x",
len(msgs), targetPubBytes)
// Lookup intended target in peersByPub, returning an error to the
// caller if the peer is unknown. Access to peersByPub is synchronized
// here to ensure we consider the exact set of peers present at the
// time of invocation.
targetPeer, err := s.findPeerByPubStr(string(targetPubBytes))
if err == ErrPeerNotConnected {
srvrLog.Errorf("unable to send message to %x, "+
"peer is not connected", targetPubBytes)
return nil, nil, err
}
// Send messages to the peer and return the error channels that will be
// signaled by the peer's write handler.
errChans := s.sendPeerMessages(targetPeer, msgs, nil)
return targetPeer, errChans, nil
}
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
// `targetPeer`. This method supports additional broadcast-level
// synchronization by using the additional `wg` to coordinate a particular