From 9b0b945a3dd35eefb60781ea1e057ace4ed8ee22 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 16 Oct 2017 14:53:38 -0700 Subject: [PATCH] Revert "server: BroadcastMessage will no longer block until completion" This reverts commit 6db90ef09ab974df0ff09aeaf75a3d80414d4f50. The root cause was fixed by commit f4e7c36c80e8e2be9edb78b3b317c69d28d6d78f. As a result, this commit is no longer needed. --- server.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/server.go b/server.go index 8e2f3ec0..4425fb72 100644 --- a/server.go +++ b/server.go @@ -818,7 +818,9 @@ func (s *server) establishPersistentConnections() error { // messages to all peers other than the one specified by the `skip` parameter. // // NOTE: This function is safe for concurrent access. -func (s *server) BroadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error { +func (s *server) BroadcastMessage(skip *btcec.PublicKey, + msgs ...lnwire.Message) error { + s.mu.Lock() defer s.mu.Unlock() @@ -829,7 +831,9 @@ func (s *server) BroadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) // peers except the one specified by `skip`. // // NOTE: This method MUST be called while the server's mutex is locked. -func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message) error { +func (s *server) broadcastMessages( + skip *btcec.PublicKey, + msgs []lnwire.Message) error { srvrLog.Debugf("Broadcasting %v messages", len(msgs)) @@ -837,15 +841,23 @@ func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message) // all messages to each of peers. We synchronize access to peersByPub // throughout this process to ensure we deliver messages to exact set // of peers present at the time of invocation. + var wg sync.WaitGroup for pubStr, sPeer := range s.peersByPub { if skip != nil && sPeer.addr.IdentityKey.IsEqual(skip) { srvrLog.Debugf("Skipping %v in broadcast", pubStr) continue } - go s.sendPeerMessages(sPeer, msgs) + // Dispatch a go routine to enqueue all messages to this peer. + wg.Add(1) + s.wg.Add(1) + go s.sendPeerMessages(sPeer, msgs, &wg) } + // Wait for all messages to have been dispatched before returning to + // caller. + wg.Wait() + return nil } @@ -854,7 +866,9 @@ func (s *server) broadcastMessages(skip *btcec.PublicKey, msgs []lnwire.Message) // method will return a non-nil error. // // NOTE: This function is safe for concurrent access. -func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { +func (s *server) SendToPeer(target *btcec.PublicKey, + msgs ...lnwire.Message) error { + s.mu.Lock() defer s.mu.Unlock() @@ -865,7 +879,8 @@ func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) err // particular peer comes online. // // NOTE: This function is safe for concurrent access. -func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, connectedChan chan<- struct{}) { +func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { s.mu.Lock() defer s.mu.Unlock() @@ -889,7 +904,8 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, connectedChan chan<- st // sendToPeer is an internal method that delivers messages to the specified // `target` peer. -func (s *server) sendToPeer(target *btcec.PublicKey, msgs []lnwire.Message) error { +func (s *server) sendToPeer(target *btcec.PublicKey, + msgs []lnwire.Message) error { // Compute the target peer's identifier. targetPubBytes := target.SerializeCompressed() @@ -908,14 +924,37 @@ func (s *server) sendToPeer(target *btcec.PublicKey, msgs []lnwire.Message) erro return err } - s.sendPeerMessages(targetPeer, msgs) + s.sendPeerMessages(targetPeer, msgs, nil) return nil } // sendPeerMessages enqueues a list of messages into the outgoingQueue of the -// `targetPeer`. -func (s *server) sendPeerMessages(targetPeer *peer, msgs []lnwire.Message) { +// `targetPeer`. This method supports additional broadcast-level +// synchronization by using the additional `wg` to coordinate a particular +// broadcast. +// +// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a +// go routine--both `wg` and the server's WaitGroup should be incremented +// beforehand. If this method is not spawned as a go routine, the provided +// `wg` should be nil, and the server's WaitGroup should not be tracking this +// invocation. +func (s *server) sendPeerMessages( + targetPeer *peer, + msgs []lnwire.Message, + wg *sync.WaitGroup) { + + // If a WaitGroup is provided, we assume that this method was spawned + // as a go routine, and that it is being tracked by both the server's + // WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this + // event, we defer a call to Done on both WaitGroups to 1) ensure that + // server will be able to shutdown after its go routines exit, and 2) + // so the server can return to the caller of BroadcastMessage. + if wg != nil { + defer s.wg.Done() + defer wg.Done() + } + for _, msg := range msgs { targetPeer.queueMsg(msg, nil) }