peer: add message reordering based on priority

This commit is contained in:
Conner Fromknecht 2019-03-05 17:08:05 -08:00
parent 63273e195e
commit 660bbaf646
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

71
peer.go
View File

@ -64,8 +64,9 @@ const (
// a buffered channel which will be sent upon once the write is complete. This
// buffered channel acts as a semaphore to be used for synchronization purposes.
type outgoingMsg struct {
msg lnwire.Message
errChan chan error // MUST be buffered.
priority bool
msg lnwire.Message
errChan chan error // MUST be buffered.
}
// newChannelMsg packages a channeldb.OpenChannel with a channel that allows
@ -1479,24 +1480,45 @@ out:
func (p *peer) queueHandler() {
defer p.wg.Done()
// pendingMsgs will hold all messages waiting to be added
// to the sendQueue.
pendingMsgs := list.New()
// priorityMsgs holds an in order list of messages deemed high-priority
// to be added to the sendQueue. This predominately includes messages
// from the funding manager and htlcswitch.
priorityMsgs := list.New()
// lazyMsgs holds an in order list of messages deemed low-priority to be
// added to the sendQueue only after all high-priority messages have
// been queued. This predominately includes messages from the gossiper.
lazyMsgs := list.New()
for {
// Examine the front of the queue.
elem := pendingMsgs.Front()
// Examine the front of the priority queue, if it is empty check
// the low priority queue.
elem := priorityMsgs.Front()
if elem == nil {
elem = lazyMsgs.Front()
}
if elem != nil {
front := elem.Value.(outgoingMsg)
// There's an element on the queue, try adding
// it to the sendQueue. We also watch for
// messages on the outgoingQueue, in case the
// writeHandler cannot accept messages on the
// sendQueue.
select {
case p.sendQueue <- elem.Value.(outgoingMsg):
pendingMsgs.Remove(elem)
case p.sendQueue <- front:
if front.priority {
priorityMsgs.Remove(elem)
} else {
lazyMsgs.Remove(elem)
}
case msg := <-p.outgoingQueue:
pendingMsgs.PushBack(msg)
if msg.priority {
priorityMsgs.PushBack(msg)
} else {
lazyMsgs.PushBack(msg)
}
case <-p.quit:
return
}
@ -1506,7 +1528,11 @@ func (p *peer) queueHandler() {
// into the queue from outside sub-systems.
select {
case msg := <-p.outgoingQueue:
pendingMsgs.PushBack(msg)
if msg.priority {
priorityMsgs.PushBack(msg)
} else {
lazyMsgs.PushBack(msg)
}
case <-p.quit:
return
}
@ -1544,13 +1570,26 @@ func (p *peer) PingTime() int64 {
return atomic.LoadInt64(&p.pingTime)
}
// queueMsg queues a new lnwire.Message to be eventually sent out on the
// wire. It returns an error if we failed to queue the message. An error
// is sent on errChan if the message fails being sent to the peer, or
// nil otherwise.
// queueMsg adds the lnwire.Message to the back of the high priority send queue.
// If the errChan is non-nil, an error is sent back if the msg failed to queue
// or failed to write, and nil otherwise.
func (p *peer) queueMsg(msg lnwire.Message, errChan chan error) {
p.queue(true, msg, errChan)
}
// queueMsgLazy adds the lnwire.Message to the back of the low priority send
// queue. If the errChan is non-nil, an error is sent back if the msg failed to
// queue or failed to write, and nil otherwise.
func (p *peer) queueMsgLazy(msg lnwire.Message, errChan chan error) {
p.queue(false, msg, errChan)
}
// queue sends a given message to the queueHandler using the passed priority. If
// the errChan is non-nil, an error is sent back if the msg failed to queue or
// failed to write, and nil otherwise.
func (p *peer) queue(priority bool, msg lnwire.Message, errChan chan error) {
select {
case p.outgoingQueue <- outgoingMsg{msg, errChan}:
case p.outgoingQueue <- outgoingMsg{priority, msg, errChan}:
case <-p.quit:
peerLog.Tracef("Peer shutting down, could not enqueue msg.")
if errChan != nil {