server: broadcast using SendMessageLazy, remove unused sendPeerMessages
This commit is contained in:
parent
b50fd33915
commit
935ea7d450
59
server.go
59
server.go
@ -1884,6 +1884,8 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) {
|
|||||||
|
|
||||||
// BroadcastMessage sends a request to the server to broadcast a set of
|
// BroadcastMessage sends a request to the server to broadcast a set of
|
||||||
// messages to all peers other than the one specified by the `skips` parameter.
|
// messages to all peers other than the one specified by the `skips` parameter.
|
||||||
|
// All messages sent via BroadcastMessage will be queued for lazy delivery to
|
||||||
|
// the target peers.
|
||||||
//
|
//
|
||||||
// NOTE: This function is safe for concurrent access.
|
// NOTE: This function is safe for concurrent access.
|
||||||
func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
|
func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
|
||||||
@ -1916,7 +1918,12 @@ func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
|
|||||||
// Dispatch a go routine to enqueue all messages to this peer.
|
// Dispatch a go routine to enqueue all messages to this peer.
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.sendPeerMessages(sPeer, msgs, &wg)
|
go func(p lnpeer.Peer) {
|
||||||
|
defer s.wg.Done()
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
p.SendMessageLazy(false, msgs...)
|
||||||
|
}(sPeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all messages to have been dispatched before returning to
|
// Wait for all messages to have been dispatched before returning to
|
||||||
@ -1989,56 +1996,6 @@ func (s *server) NotifyWhenOffline(peerPubKey [33]byte) <-chan struct{} {
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
|
|
||||||
// `targetPeer`. This method supports additional broadcast-level
|
|
||||||
// synchronization by using the additional `wg` to coordinate a particular
|
|
||||||
// broadcast. Since this method will wait for the return error from sending
|
|
||||||
// each message, it should be run as a goroutine (see comment below) and
|
|
||||||
// the error ignored if used for broadcasting messages, where the result
|
|
||||||
// from sending the messages is not of importance.
|
|
||||||
//
|
|
||||||
// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a
|
|
||||||
// go routine--both `wg` and the server's WaitGroup should be incremented
|
|
||||||
// beforehand. If this method is not spawned as a go routine, the provided
|
|
||||||
// `wg` should be nil, and the server's WaitGroup should not be tracking this
|
|
||||||
// invocation.
|
|
||||||
func (s *server) sendPeerMessages(
|
|
||||||
targetPeer *peer,
|
|
||||||
msgs []lnwire.Message,
|
|
||||||
wg *sync.WaitGroup) []chan error {
|
|
||||||
|
|
||||||
// If a WaitGroup is provided, we assume that this method was spawned
|
|
||||||
// as a go routine, and that it is being tracked by both the server's
|
|
||||||
// WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this
|
|
||||||
// event, we defer a call to Done on both WaitGroups to 1) ensure that
|
|
||||||
// server will be able to shutdown after its go routines exit, and 2)
|
|
||||||
// so the server can return to the caller of BroadcastMessage.
|
|
||||||
isBroadcast := wg != nil
|
|
||||||
if isBroadcast {
|
|
||||||
defer s.wg.Done()
|
|
||||||
defer wg.Done()
|
|
||||||
}
|
|
||||||
|
|
||||||
// We queue each message, creating a slice of error channels that
|
|
||||||
// can be inspected after every message is successfully added to
|
|
||||||
// the queue.
|
|
||||||
var errChans []chan error
|
|
||||||
for _, msg := range msgs {
|
|
||||||
// If this is not broadcast, create error channels to provide
|
|
||||||
// synchronous feedback regarding the delivery of the message to
|
|
||||||
// a specific peer.
|
|
||||||
var errChan chan error
|
|
||||||
if !isBroadcast {
|
|
||||||
errChan = make(chan error, 1)
|
|
||||||
errChans = append(errChans, errChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
targetPeer.queueMsgLazy(msg, errChan)
|
|
||||||
}
|
|
||||||
|
|
||||||
return errChans
|
|
||||||
}
|
|
||||||
|
|
||||||
// FindPeer will return the peer that corresponds to the passed in public key.
|
// FindPeer will return the peer that corresponds to the passed in public key.
|
||||||
// This function is used by the funding manager, allowing it to update the
|
// This function is used by the funding manager, allowing it to update the
|
||||||
// daemon's local representation of the remote peer.
|
// daemon's local representation of the remote peer.
|
||||||
|
Loading…
Reference in New Issue
Block a user