peer: eliminate excessive queueHandler spinning w/ blocked writeHandler

This commit fixes an existing bug within the iteration between the
queueHandler and the writeHandler. Under certain scenarios, if the
writeHandler was blocked for a non negligible period of time, then the
queueHandler would enter a very tight spinning loop. This was due to
the fact that the break statement in the inner select loop of the
queueHandler wouldn’t actually break the inner for loop, instead it
would cause the execution logic to re-enter that same select loop,
causing a very tight spin.

In this commit, we fix the issue by adding to things: we now label the
inner select loop so we can break out of it if we detect that the
writeHandler has blocked. Secondly, we introduce a new channel between
the queueHandler and the writeHandler to signal the queueHandler that
the writeHandler has finished processing the last message.
This commit is contained in:
Olaoluwa Osuntokun 2017-10-15 15:19:45 -07:00
parent cbdf139696
commit f4e7c36c80
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

39
peer.go

@ -111,6 +111,14 @@ type peer struct {
// written onto the wire. Note that this channel is unbuffered.
sendQueue chan outgoinMsg
// sendQueueSync is a channel that's used to synchronize sends between
// the queueHandler and the writeHandler. At times the writeHandler may
// get blocked on sending messages. As a result we require a
// synchronization mechanism between the two otherwise the queueHandler
// would need to continually spin checking to see if the writeHandler
// is ready for an additional message.
sendQueueSync chan struct{}
// outgoingQueue is a buffered channel which allows second/third party
// objects to queue messages to be sent out on the wire.
outgoingQueue chan outgoinMsg
@ -172,6 +180,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
server: server,
sendQueue: make(chan outgoinMsg),
sendQueueSync: make(chan struct{}),
outgoingQueue: make(chan outgoinMsg),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
@ -810,6 +819,8 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
n, err := lnwire.WriteMessage(b, msg, 0)
atomic.AddUint64(&p.bytesSent, uint64(n))
// TODO(roasbeef): add write deadline?
// Finally, write the message itself in a single swoop.
_, err = p.conn.Write(b.Bytes())
return err
@ -817,7 +828,8 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
// writeHandler is a goroutine dedicated to reading messages off of an incoming
// queue, and writing them out to the wire. This goroutine coordinates with the
// queueHandler in order to ensure the incoming message queue is quickly drained.
// queueHandler in order to ensure the incoming message queue is quickly
// drained.
//
// NOTE: This method MUST be run as a goroutine.
func (p *peer) writeHandler() {
@ -852,6 +864,14 @@ out:
break out
}
// If the queueHandler was waiting for us to complete
// the last write, then we'll send it a sginal that
// we're done and are awaiting additional messages.
select {
case p.sendQueueSync <- struct{}{}:
default:
}
case <-p.quit:
exitErr = errors.Errorf("peer exiting")
break out
@ -877,6 +897,7 @@ func (p *peer) queueHandler() {
// Before add a queue'd message our pending message queue,
// we'll first try to aggressively empty out our pending list of
// messaging.
drain:
for {
// Examine the front of the queue. If this message is
// nil, then we've emptied out the queue and can accept
@ -892,21 +913,27 @@ func (p *peer) queueHandler() {
case <-p.quit:
return
default:
break
// If the write handler is currently blocked,
// then we'll break out of this loop, to avoid
// tightly spinning waiting for a blocked write
// handler.
break drain
}
}
// If there weren't any messages to send, or the writehandler
// is still blocked, then we'll accept a new message into the
// queue from outside sub-systems.
//
// TODO(roasbeef): need send clause here as well to account for
// writeHandler blocking?
// queue from outside sub-systems. We'll also attempt to send
// to the writeHandler again, as if this succeeds we'll once
// again try to aggressively drain the pending message queue.
select {
case <-p.quit:
return
case msg := <-p.outgoingQueue:
pendingMsgs.PushBack(msg)
case <-p.sendQueueSync:
// Fall through so we can go back to the top of the
// drain loop.
}
}