From f4e7c36c80e8e2be9edb78b3b317c69d28d6d78f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 15 Oct 2017 15:19:45 -0700 Subject: [PATCH] peer: eliminate excessive queueHandler spinning w/ blocked writeHandler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes an existing bug within the iteration between the queueHandler and the writeHandler. Under certain scenarios, if the writeHandler was blocked for a non negligible period of time, then the queueHandler would enter a very tight spinning loop. This was due to the fact that the break statement in the inner select loop of the queueHandler wouldn’t actually break the inner for loop, instead it would cause the execution logic to re-enter that same select loop, causing a very tight spin. In this commit, we fix the issue by adding to things: we now label the inner select loop so we can break out of it if we detect that the writeHandler has blocked. Secondly, we introduce a new channel between the queueHandler and the writeHandler to signal the queueHandler that the writeHandler has finished processing the last message. --- peer.go | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/peer.go b/peer.go index 433bb8e0..898714af 100644 --- a/peer.go +++ b/peer.go @@ -111,6 +111,14 @@ type peer struct { // written onto the wire. Note that this channel is unbuffered. sendQueue chan outgoinMsg + // sendQueueSync is a channel that's used to synchronize sends between + // the queueHandler and the writeHandler. At times the writeHandler may + // get blocked on sending messages. As a result we require a + // synchronization mechanism between the two otherwise the queueHandler + // would need to continually spin checking to see if the writeHandler + // is ready for an additional message. + sendQueueSync chan struct{} + // outgoingQueue is a buffered channel which allows second/third party // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoinMsg @@ -172,6 +180,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, server: server, sendQueue: make(chan outgoinMsg), + sendQueueSync: make(chan struct{}), outgoingQueue: make(chan outgoinMsg), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), @@ -810,6 +819,8 @@ func (p *peer) writeMessage(msg lnwire.Message) error { n, err := lnwire.WriteMessage(b, msg, 0) atomic.AddUint64(&p.bytesSent, uint64(n)) + // TODO(roasbeef): add write deadline? + // Finally, write the message itself in a single swoop. _, err = p.conn.Write(b.Bytes()) return err @@ -817,7 +828,8 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // writeHandler is a goroutine dedicated to reading messages off of an incoming // queue, and writing them out to the wire. This goroutine coordinates with the -// queueHandler in order to ensure the incoming message queue is quickly drained. +// queueHandler in order to ensure the incoming message queue is quickly +// drained. // // NOTE: This method MUST be run as a goroutine. func (p *peer) writeHandler() { @@ -852,6 +864,14 @@ out: break out } + // If the queueHandler was waiting for us to complete + // the last write, then we'll send it a sginal that + // we're done and are awaiting additional messages. + select { + case p.sendQueueSync <- struct{}{}: + default: + } + case <-p.quit: exitErr = errors.Errorf("peer exiting") break out @@ -877,6 +897,7 @@ func (p *peer) queueHandler() { // Before add a queue'd message our pending message queue, // we'll first try to aggressively empty out our pending list of // messaging. + drain: for { // Examine the front of the queue. If this message is // nil, then we've emptied out the queue and can accept @@ -892,21 +913,27 @@ func (p *peer) queueHandler() { case <-p.quit: return default: - break + // If the write handler is currently blocked, + // then we'll break out of this loop, to avoid + // tightly spinning waiting for a blocked write + // handler. + break drain } } // If there weren't any messages to send, or the writehandler // is still blocked, then we'll accept a new message into the - // queue from outside sub-systems. - // - // TODO(roasbeef): need send clause here as well to account for - // writeHandler blocking? + // queue from outside sub-systems. We'll also attempt to send + // to the writeHandler again, as if this succeeds we'll once + // again try to aggressively drain the pending message queue. select { case <-p.quit: return case msg := <-p.outgoingQueue: pendingMsgs.PushBack(msg) + case <-p.sendQueueSync: + // Fall through so we can go back to the top of the + // drain loop. } }