From bfbf3015ada1c139cf565d0c0c71dd0e696071d8 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 22 Apr 2019 16:05:26 -0700 Subject: [PATCH] peer: replace Write with split WriteMessage then Flush methods This commit modifies the way the link writes messages to the wire, by first buffering ciphertexts to the connection using WriteMessage, and then calling Flush separately. Currently, the call to Write tries to do both, which can result in a blocking operation for up to the duration of the write timeout. Splitting these operations permits less blocking in the write pool, since now we only need to use a write worker to serialize and encrypt the plaintext. After the write pool is released, the peer then attempts to flush the message using the appropriate write timeout. If a timeout error occurs, the peer will continue to flush the message w/o serializing or encrypting the message again, until the message is fully written to the wire or the write idle timer disconnects the peer. --- peer.go | 81 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 62 insertions(+), 19 deletions(-) diff --git a/peer.go b/peer.go index 9a5edc05..4832ddcb 100644 --- a/peer.go +++ b/peer.go @@ -1403,16 +1403,58 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) { })) } -// writeMessage writes the target lnwire.Message to the remote peer. +// writeMessage writes and flushes the target lnwire.Message to the remote peer. +// If the passed message is nil, this method will only try to flush an existing +// message buffered on the connection. It is safe to recall this method with a +// nil message iff a timeout error is returned. This will continue to flush the +// pending message to the wire. func (p *peer) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { return ErrPeerExiting } - p.logWireMessage(msg, false) + // Only log the message on the first attempt. + if msg != nil { + p.logWireMessage(msg, false) + } - var n int + noiseConn, ok := p.conn.(*brontide.Conn) + if !ok { + return fmt.Errorf("brontide.Conn required to write messages") + } + + flushMsg := func() error { + // Ensure the write deadline is set before we attempt to send + // the message. + writeDeadline := time.Now().Add(writeMessageTimeout) + err := noiseConn.SetWriteDeadline(writeDeadline) + if err != nil { + return err + } + + // Flush the pending message to the wire. If an error is + // encountered, e.g. write timeout, the number of bytes written + // so far will be returned. + n, err := noiseConn.Flush() + + // Record the number of bytes written on the wire, if any. + if n > 0 { + atomic.AddUint64(&p.bytesSent, uint64(n)) + } + + return err + } + + // If the current message has already been serialized, encrypted, and + // buffered on the underlying connection we will skip straight to + // flushing it to the wire. + if msg == nil { + return flushMsg() + } + + // Otherwise, this is a new message. We'll acquire a write buffer to + // serialize the message and buffer the ciphertext on the connection. err := p.writePool.Submit(func(buf *bytes.Buffer) error { // Using a buffer allocated by the write pool, encode the // message directly into the buffer. @@ -1421,25 +1463,17 @@ func (p *peer) writeMessage(msg lnwire.Message) error { return writeErr } - // Ensure the write deadline is set before we attempt to send - // the message. - writeDeadline := time.Now().Add(writeMessageTimeout) - writeErr = p.conn.SetWriteDeadline(writeDeadline) - if writeErr != nil { - return writeErr - } - - // Finally, write the message itself in a single swoop. - n, writeErr = p.conn.Write(buf.Bytes()) - return writeErr + // Finally, write the message itself in a single swoop. This + // will buffer the ciphertext on the underlying connection. We + // will defer flushing the message until the write pool has been + // released. + return noiseConn.WriteMessage(buf.Bytes()) }) - - // Record the number of bytes written on the wire, if any. - if n > 0 { - atomic.AddUint64(&p.bytesSent, uint64(n)) + if err != nil { + return err } - return err + return flushMsg() } // writeHandler is a goroutine dedicated to reading messages off of an incoming @@ -1528,6 +1562,15 @@ out: "first attempted %v ago", p, retryDelay, time.Since(startTime)) + // If we received a timeout error, this implies + // that the message was buffered on the + // connection successfully and that a flush was + // attempted. We'll set the message to nil so + // that on a subsequent pass we only try to + // flush the buffered message, and forgo + // reserializing or reencrypting it. + outMsg.msg = nil + goto retryWithDelay }