diff --git a/peer.go b/peer.go index e34a9c8d..0115966e 100644 --- a/peer.go +++ b/peer.go @@ -111,14 +111,6 @@ type peer struct { // written onto the wire. Note that this channel is unbuffered. sendQueue chan outgoinMsg - // sendQueueSync is a channel that's used to synchronize sends between - // the queueHandler and the writeHandler. At times the writeHandler may - // get blocked on sending messages. As a result we require a - // synchronization mechanism between the two otherwise the queueHandler - // would need to continually spin checking to see if the writeHandler - // is ready for an additional message. - sendQueueSync chan struct{} - // outgoingQueue is a buffered channel which allows second/third party // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoinMsg @@ -185,7 +177,6 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, localFeatures: localFeatures, sendQueue: make(chan outgoinMsg), - sendQueueSync: make(chan struct{}), outgoingQueue: make(chan outgoinMsg), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), @@ -1030,14 +1021,6 @@ out: break out } - // If the queueHandler was waiting for us to complete - // the last write, then we'll send it a sginal that - // we're done and are awaiting additional messages. - select { - case p.sendQueueSync <- struct{}{}: - default: - } - case <-p.quit: exitErr = errors.Errorf("peer exiting") break out @@ -1058,50 +1041,38 @@ out: func (p *peer) queueHandler() { defer p.wg.Done() + // pendingMsgs will hold all messages waiting to be added + // to the sendQueue. pendingMsgs := list.New() - for { - // Before add a queue'd message our pending message queue, - // we'll first try to aggressively empty out our pending list of - // messaging. - drain: - for { - // Examine the front of the queue. If this message is - // nil, then we've emptied out the queue and can accept - // new messages from outside sub-systems. - elem := pendingMsgs.Front() - if elem == nil { - break - } + for { + // Examine the front of the queue. + elem := pendingMsgs.Front() + if elem != nil { + // There's an element on the queue, try adding + // it to the sendQueue. We also watch for + // messages on the outgoingQueue, in case the + // writeHandler cannot accept messages on the + // sendQueue. select { case p.sendQueue <- elem.Value.(outgoinMsg): pendingMsgs.Remove(elem) + case msg := <-p.outgoingQueue: + pendingMsgs.PushBack(msg) + case <-p.quit: + return + } + } else { + // If there weren't any messages to send to the + // writeHandler, then we'll accept a new message + // into the queue from outside sub-systems. + select { + case msg := <-p.outgoingQueue: + pendingMsgs.PushBack(msg) case <-p.quit: return - default: - // If the write handler is currently blocked, - // then we'll break out of this loop, to avoid - // tightly spinning waiting for a blocked write - // handler. - break drain } } - - // If there weren't any messages to send, or the writehandler - // is still blocked, then we'll accept a new message into the - // queue from outside sub-systems. We'll also attempt to send - // to the writeHandler again, as if this succeeds we'll once - // again try to aggressively drain the pending message queue. - select { - case <-p.quit: - return - case msg := <-p.outgoingQueue: - pendingMsgs.PushBack(msg) - case <-p.sendQueueSync: - // Fall through so we can go back to the top of the - // drain loop. - } - } }