diff --git a/peer.go b/peer.go index 433bb8e0..898714af 100644 --- a/peer.go +++ b/peer.go @@ -111,6 +111,14 @@ type peer struct { // written onto the wire. Note that this channel is unbuffered. sendQueue chan outgoinMsg + // sendQueueSync is a channel that's used to synchronize sends between + // the queueHandler and the writeHandler. At times the writeHandler may + // get blocked on sending messages. As a result we require a + // synchronization mechanism between the two otherwise the queueHandler + // would need to continually spin checking to see if the writeHandler + // is ready for an additional message. + sendQueueSync chan struct{} + // outgoingQueue is a buffered channel which allows second/third party // objects to queue messages to be sent out on the wire. outgoingQueue chan outgoinMsg @@ -172,6 +180,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, server: server, sendQueue: make(chan outgoinMsg), + sendQueueSync: make(chan struct{}), outgoingQueue: make(chan outgoinMsg), activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), @@ -810,6 +819,8 @@ func (p *peer) writeMessage(msg lnwire.Message) error { n, err := lnwire.WriteMessage(b, msg, 0) atomic.AddUint64(&p.bytesSent, uint64(n)) + // TODO(roasbeef): add write deadline? + // Finally, write the message itself in a single swoop. _, err = p.conn.Write(b.Bytes()) return err @@ -817,7 +828,8 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // writeHandler is a goroutine dedicated to reading messages off of an incoming // queue, and writing them out to the wire. This goroutine coordinates with the -// queueHandler in order to ensure the incoming message queue is quickly drained. +// queueHandler in order to ensure the incoming message queue is quickly +// drained. // // NOTE: This method MUST be run as a goroutine. func (p *peer) writeHandler() { @@ -852,6 +864,14 @@ out: break out } + // If the queueHandler was waiting for us to complete + // the last write, then we'll send it a sginal that + // we're done and are awaiting additional messages. + select { + case p.sendQueueSync <- struct{}{}: + default: + } + case <-p.quit: exitErr = errors.Errorf("peer exiting") break out @@ -877,6 +897,7 @@ func (p *peer) queueHandler() { // Before add a queue'd message our pending message queue, // we'll first try to aggressively empty out our pending list of // messaging. + drain: for { // Examine the front of the queue. If this message is // nil, then we've emptied out the queue and can accept @@ -892,21 +913,27 @@ func (p *peer) queueHandler() { case <-p.quit: return default: - break + // If the write handler is currently blocked, + // then we'll break out of this loop, to avoid + // tightly spinning waiting for a blocked write + // handler. + break drain } } // If there weren't any messages to send, or the writehandler // is still blocked, then we'll accept a new message into the - // queue from outside sub-systems. - // - // TODO(roasbeef): need send clause here as well to account for - // writeHandler blocking? + // queue from outside sub-systems. We'll also attempt to send + // to the writeHandler again, as if this succeeds we'll once + // again try to aggressively drain the pending message queue. select { case <-p.quit: return case msg := <-p.outgoingQueue: pendingMsgs.PushBack(msg) + case <-p.sendQueueSync: + // Fall through so we can go back to the top of the + // drain loop. } }