peer: avoid blocked writeHandler<->queueHandler interaction

This commit addresses an issue that could occur if a
message was attempted added to the sendQueue by the
queueHandler before the writeHandler had started.

If a message was sent to the queueHandler before the
writeHandler was ready to accept messages on the
sendQueue, the message would be added to the
pendingMsg queue, but would not be attempted sent
on the sendQueue again before a new incoming message
triggered a new attempt.
This commit is contained in:
Johan T. Halseth 2017-11-15 17:56:33 -08:00 committed by Olaoluwa Osuntokun
parent 1ca53ae906
commit 555cead5ad

75
peer.go

@ -111,14 +111,6 @@ type peer struct {
// written onto the wire. Note that this channel is unbuffered.
sendQueue chan outgoinMsg
// sendQueueSync is a channel that's used to synchronize sends between
// the queueHandler and the writeHandler. At times the writeHandler may
// get blocked on sending messages. As a result we require a
// synchronization mechanism between the two otherwise the queueHandler
// would need to continually spin checking to see if the writeHandler
// is ready for an additional message.
sendQueueSync chan struct{}
// outgoingQueue is a buffered channel which allows second/third party
// objects to queue messages to be sent out on the wire.
outgoingQueue chan outgoinMsg
@ -185,7 +177,6 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
localFeatures: localFeatures,
sendQueue: make(chan outgoinMsg),
sendQueueSync: make(chan struct{}),
outgoingQueue: make(chan outgoinMsg),
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
@ -1030,14 +1021,6 @@ out:
break out
}
// If the queueHandler was waiting for us to complete
// the last write, then we'll send it a sginal that
// we're done and are awaiting additional messages.
select {
case p.sendQueueSync <- struct{}{}:
default:
}
case <-p.quit:
exitErr = errors.Errorf("peer exiting")
break out
@ -1058,50 +1041,38 @@ out:
func (p *peer) queueHandler() {
defer p.wg.Done()
// pendingMsgs will hold all messages waiting to be added
// to the sendQueue.
pendingMsgs := list.New()
for {
// Before add a queue'd message our pending message queue,
// we'll first try to aggressively empty out our pending list of
// messaging.
drain:
for {
// Examine the front of the queue. If this message is
// nil, then we've emptied out the queue and can accept
// new messages from outside sub-systems.
elem := pendingMsgs.Front()
if elem == nil {
break
}
for {
// Examine the front of the queue.
elem := pendingMsgs.Front()
if elem != nil {
// There's an element on the queue, try adding
// it to the sendQueue. We also watch for
// messages on the outgoingQueue, in case the
// writeHandler cannot accept messages on the
// sendQueue.
select {
case p.sendQueue <- elem.Value.(outgoinMsg):
pendingMsgs.Remove(elem)
case msg := <-p.outgoingQueue:
pendingMsgs.PushBack(msg)
case <-p.quit:
return
}
} else {
// If there weren't any messages to send to the
// writeHandler, then we'll accept a new message
// into the queue from outside sub-systems.
select {
case msg := <-p.outgoingQueue:
pendingMsgs.PushBack(msg)
case <-p.quit:
return
default:
// If the write handler is currently blocked,
// then we'll break out of this loop, to avoid
// tightly spinning waiting for a blocked write
// handler.
break drain
}
}
// If there weren't any messages to send, or the writehandler
// is still blocked, then we'll accept a new message into the
// queue from outside sub-systems. We'll also attempt to send
// to the writeHandler again, as if this succeeds we'll once
// again try to aggressively drain the pending message queue.
select {
case <-p.quit:
return
case msg := <-p.outgoingQueue:
pendingMsgs.PushBack(msg)
case <-p.sendQueueSync:
// Fall through so we can go back to the top of the
// drain loop.
}
}
}