server: don't hold mutex while sending to peer
This commit removes the sendToPeer method from the server, and instead moves the necessary logic into SendToPeer. This let's us make the mutex acquisition more fine-grained, only holding it while reading from the peer map. Earlier it was required to be held during the whole call to sendToPeer, as the method would access the map internally.
This commit is contained in:
parent
991c9fb7dd
commit
b8512c3568
54
server.go
54
server.go
@ -1205,16 +1205,29 @@ func (s *server) broadcastMessages(
|
|||||||
func (s *server) SendToPeer(target *btcec.PublicKey,
|
func (s *server) SendToPeer(target *btcec.PublicKey,
|
||||||
msgs ...lnwire.Message) error {
|
msgs ...lnwire.Message) error {
|
||||||
|
|
||||||
// Queue the incoming messages in the peer's outgoing message buffer.
|
// Compute the target peer's identifier.
|
||||||
// We acquire the shared lock here to ensure the peer map doesn't change
|
targetPubBytes := target.SerializeCompressed()
|
||||||
// from underneath us.
|
|
||||||
|
srvrLog.Tracef("Attempting to send msgs %v to: %x",
|
||||||
|
len(msgs), targetPubBytes)
|
||||||
|
|
||||||
|
// Lookup intended target in peersByPub, returning an error to the
|
||||||
|
// caller if the peer is unknown. Access to peersByPub is synchronized
|
||||||
|
// here to ensure we consider the exact set of peers present at the
|
||||||
|
// time of invocation.
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
targetPeer, errChans, err := s.sendToPeer(target, msgs)
|
targetPeer, err := s.findPeerByPubStr(string(targetPubBytes))
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
if err != nil {
|
if err == ErrPeerNotConnected {
|
||||||
|
srvrLog.Errorf("unable to send message to %x, "+
|
||||||
|
"peer is not connected", targetPubBytes)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send messages to the peer and get the error channels that will be
|
||||||
|
// signaled by the peer's write handler.
|
||||||
|
errChans := s.sendPeerMessages(targetPeer, msgs, nil)
|
||||||
|
|
||||||
// With the server's shared lock released, we now handle all of the
|
// With the server's shared lock released, we now handle all of the
|
||||||
// errors being returned from the target peer's write handler.
|
// errors being returned from the target peer's write handler.
|
||||||
for _, errChan := range errChans {
|
for _, errChan := range errChans {
|
||||||
@ -1259,37 +1272,6 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
|
|||||||
s.peerConnectedListeners[pubStr], connectedChan)
|
s.peerConnectedListeners[pubStr], connectedChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendToPeer is an internal method that queues the given messages in the
|
|
||||||
// outgoing buffer of the specified `target` peer. Upon success, this method
|
|
||||||
// returns the peer instance and a slice of error chans that will contain
|
|
||||||
// responses from the write handler.
|
|
||||||
func (s *server) sendToPeer(target *btcec.PublicKey,
|
|
||||||
msgs []lnwire.Message) (*peer, []chan error, error) {
|
|
||||||
|
|
||||||
// Compute the target peer's identifier.
|
|
||||||
targetPubBytes := target.SerializeCompressed()
|
|
||||||
|
|
||||||
srvrLog.Tracef("Attempting to send msgs %v to: %x",
|
|
||||||
len(msgs), targetPubBytes)
|
|
||||||
|
|
||||||
// Lookup intended target in peersByPub, returning an error to the
|
|
||||||
// caller if the peer is unknown. Access to peersByPub is synchronized
|
|
||||||
// here to ensure we consider the exact set of peers present at the
|
|
||||||
// time of invocation.
|
|
||||||
targetPeer, err := s.findPeerByPubStr(string(targetPubBytes))
|
|
||||||
if err == ErrPeerNotConnected {
|
|
||||||
srvrLog.Errorf("unable to send message to %x, "+
|
|
||||||
"peer is not connected", targetPubBytes)
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send messages to the peer and return the error channels that will be
|
|
||||||
// signaled by the peer's write handler.
|
|
||||||
errChans := s.sendPeerMessages(targetPeer, msgs, nil)
|
|
||||||
|
|
||||||
return targetPeer, errChans, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
|
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
|
||||||
// `targetPeer`. This method supports additional broadcast-level
|
// `targetPeer`. This method supports additional broadcast-level
|
||||||
// synchronization by using the additional `wg` to coordinate a particular
|
// synchronization by using the additional `wg` to coordinate a particular
|
||||||
|
Loading…
Reference in New Issue
Block a user