peer: refactor the queueHandler to aggressively drain queue

This commit refactors the queueHandler slightly to be more aggressive
when attempting to drain the pending message queue.

With this new logic the queueHandler will now attempt to _completely_
drain the pendingMsgs queue by sending it all to the writeHandler
_before_ attempting to accept new messages from the outside
sub-systems. The previous queueHandler was a bit more passive and would
result in messages sitting the the pendingMessage queue longer than
required.
This commit is contained in:
Olaoluwa Osuntokun 2017-02-01 17:01:33 -08:00
parent 8731fadb46
commit 4eebc7c994
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

113
peer.go

@ -543,18 +543,19 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) writeHandler() { func (p *peer) writeHandler() {
out: defer func() {
p.wg.Done()
peerLog.Tracef("writeHandler for peer %v done", p)
}()
for { for {
select { select {
case outMsg := <-p.sendQueue: case outMsg := <-p.sendQueue:
if err := p.writeMessage(outMsg.msg); err != nil {
peerLog.Errorf("unable to write message: %v",
err)
p.Disconnect()
break out
}
switch outMsg.msg.(type) { switch outMsg.msg.(type) {
// If we're about to send a ping message, then log the
// exact time in which we send the message so we can
// use the delay as a rough estimate of latency to the
// remote peer.
case *lnwire.Ping: case *lnwire.Ping:
// TODO(roasbeef): do this before the write? // TODO(roasbeef): do this before the write?
// possibly account for processing within func? // possibly account for processing within func?
@ -562,33 +563,26 @@ out:
atomic.StoreInt64(&p.pingLastSend, now) atomic.StoreInt64(&p.pingLastSend, now)
} }
// Synchronize with the writeHandler. // Write out the message to the socket, closing the
p.sendQueueSync <- struct{}{} // 'sentChan' if it's non-nil, The 'sentChan' allows
// callers to optionally synchronize sends with the
// writeHandler.
err := p.writeMessage(outMsg.msg)
if outMsg.sentChan != nil {
close(outMsg.sentChan)
}
if err != nil {
peerLog.Errorf("unable to write message: %v",
err)
p.Disconnect()
return
}
case <-p.quit: case <-p.quit:
break out return
} }
} }
// Wait for the queueHandler to finish so we can empty out all pending
// messages avoiding a possible deadlock somewhere.
<-p.queueQuit
// Drain any lingering messages that we're meant to be sent. But since
// we're shutting down, just ignore them.
fin:
for {
select {
case msg := <-p.sendQueue:
if msg.sentChan != nil {
msg.sentChan <- struct{}{}
}
default:
break fin
}
}
p.wg.Done()
peerLog.Tracef("writeHandler for peer %v done", p)
} }
// queueHandler is responsible for accepting messages from outside subsystems // queueHandler is responsible for accepting messages from outside subsystems
@ -596,42 +590,45 @@ fin:
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) queueHandler() { func (p *peer) queueHandler() {
waitOnSync := false defer p.wg.Done()
pendingMsgs := list.New() pendingMsgs := list.New()
out:
for { for {
// Before add a queue'd message our pending message queue,
// we'll first try to aggressively empty out our pending list of
// messaging.
for {
// Examine the front of the queue. If this message is
// nil, then we've emptied out the queue and can accept
// new messages from outside sub-systems.
elem := pendingMsgs.Front()
if elem == nil {
break
}
select { select {
case p.sendQueue <- elem.Value.(outgoinMsg):
pendingMsgs.Remove(elem)
case <-p.quit:
return
default:
break
}
}
// If there weren't any messages to send, or the writehandler
// is still blocked, then we'll accept a new message into the
// queue from outside sub-systems.
select {
case <-p.quit:
return
case msg := <-p.outgoingQueue: case msg := <-p.outgoingQueue:
if !waitOnSync {
p.sendQueue <- msg
} else {
pendingMsgs.PushBack(msg) pendingMsgs.PushBack(msg)
} }
waitOnSync = true
case <-p.sendQueueSync:
// If there aren't any more remaining messages in the
// queue, then we're no longer waiting to synchronize
// with the writeHandler.
next := pendingMsgs.Front()
if next == nil {
waitOnSync = false
continue
}
// Notify the writeHandler about the next item to
// asynchronously send.
val := pendingMsgs.Remove(next)
p.sendQueue <- val.(outgoinMsg)
// TODO(roasbeef): other sync stuffs
case <-p.quit:
break out
} }
} }
close(p.queueQuit)
p.wg.Done()
}
// pingHandler is responsible for periodically sending ping messages to the // pingHandler is responsible for periodically sending ping messages to the
// remote peer in order to keep the connection alive and/or determine if the // remote peer in order to keep the connection alive and/or determine if the
// connection is still active. // connection is still active.