Revert "server: BroadcastMessage will no longer block until completion"

This reverts commit 6db90ef09ab974df0ff09aeaf75a3d80414d4f50.

The root cause was fixed by commit
f4e7c36c80e8e2be9edb78b3b317c69d28d6d78f. As a result, this commit is no
longer needed.
This commit is contained in:
Olaoluwa Osuntokun 2017-10-16 14:53:38 -07:00
parent 7300452225
commit 9b0b945a3d
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -818,7 +818,9 @@ func (s *server) establishPersistentConnections() error {
// messages to all peers other than the one specified by the `skip` parameter. // messages to all peers other than the one specified by the `skip` parameter.
// //
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) BroadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error { func (s *server) BroadcastMessage(skip *btcec.PublicKey,
msgs ...lnwire.Message) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -829,7 +831,9 @@ func (s *server) BroadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message)
// peers except the one specified by `skip`. // peers except the one specified by `skip`.
// //
// NOTE: This method MUST be called while the server's mutex is locked. // NOTE: This method MUST be called while the server's mutex is locked.
func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message) error { func (s *server) broadcastMessages(
skip *btcec.PublicKey,
msgs []lnwire.Message) error {
srvrLog.Debugf("Broadcasting %v messages", len(msgs)) srvrLog.Debugf("Broadcasting %v messages", len(msgs))
@ -837,15 +841,23 @@ func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message)
// all messages to each of peers. We synchronize access to peersByPub // all messages to each of peers. We synchronize access to peersByPub
// throughout this process to ensure we deliver messages to exact set // throughout this process to ensure we deliver messages to exact set
// of peers present at the time of invocation. // of peers present at the time of invocation.
var wg sync.WaitGroup
for pubStr, sPeer := range s.peersByPub { for pubStr, sPeer := range s.peersByPub {
if skip != nil && sPeer.addr.IdentityKey.IsEqual(skip) { if skip != nil && sPeer.addr.IdentityKey.IsEqual(skip) {
srvrLog.Debugf("Skipping %v in broadcast", pubStr) srvrLog.Debugf("Skipping %v in broadcast", pubStr)
continue continue
} }
go s.sendPeerMessages(sPeer, msgs) // Dispatch a go routine to enqueue all messages to this peer.
wg.Add(1)
s.wg.Add(1)
go s.sendPeerMessages(sPeer, msgs, &wg)
} }
// Wait for all messages to have been dispatched before returning to
// caller.
wg.Wait()
return nil return nil
} }
@ -854,7 +866,9 @@ func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message)
// method will return a non-nil error. // method will return a non-nil error.
// //
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { func (s *server) SendToPeer(target *btcec.PublicKey,
msgs ...lnwire.Message) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -865,7 +879,8 @@ func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) err
// particular peer comes online. // particular peer comes online.
// //
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, connectedChan chan<- struct{}) { func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
connectedChan chan<- struct{}) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -889,7 +904,8 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, connectedChan chan<- st
// sendToPeer is an internal method that delivers messages to the specified // sendToPeer is an internal method that delivers messages to the specified
// `target` peer. // `target` peer.
func (s *server) sendToPeer(target *btcec.PublicKey, msgs []lnwire.Message) error { func (s *server) sendToPeer(target *btcec.PublicKey,
msgs []lnwire.Message) error {
// Compute the target peer's identifier. // Compute the target peer's identifier.
targetPubBytes := target.SerializeCompressed() targetPubBytes := target.SerializeCompressed()
@ -908,14 +924,37 @@ func (s *server) sendToPeer(target *btcec.PublicKey, msgs []lnwire.Message) erro
return err return err
} }
s.sendPeerMessages(targetPeer, msgs) s.sendPeerMessages(targetPeer, msgs, nil)
return nil return nil
} }
// sendPeerMessages enqueues a list of messages into the outgoingQueue of the // sendPeerMessages enqueues a list of messages into the outgoingQueue of the
// `targetPeer`. // `targetPeer`. This method supports additional broadcast-level
func (s *server) sendPeerMessages(targetPeer *peer, msgs []lnwire.Message) { // synchronization by using the additional `wg` to coordinate a particular
// broadcast.
//
// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a
// go routine--both `wg` and the server's WaitGroup should be incremented
// beforehand. If this method is not spawned as a go routine, the provided
// `wg` should be nil, and the server's WaitGroup should not be tracking this
// invocation.
func (s *server) sendPeerMessages(
targetPeer *peer,
msgs []lnwire.Message,
wg *sync.WaitGroup) {
// If a WaitGroup is provided, we assume that this method was spawned
// as a go routine, and that it is being tracked by both the server's
// WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this
// event, we defer a call to Done on both WaitGroups to 1) ensure that
// server will be able to shutdown after its go routines exit, and 2)
// so the server can return to the caller of BroadcastMessage.
if wg != nil {
defer s.wg.Done()
defer wg.Done()
}
for _, msg := range msgs { for _, msg := range msgs {
targetPeer.queueMsg(msg, nil) targetPeer.queueMsg(msg, nil)
} }