diff --git a/peer.go b/peer.go index 9a5edc05..4832ddcb 100644 --- a/peer.go +++ b/peer.go @@ -1403,16 +1403,58 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) { })) } -// writeMessage writes the target lnwire.Message to the remote peer. +// writeMessage writes and flushes the target lnwire.Message to the remote peer. +// If the passed message is nil, this method will only try to flush an existing +// message buffered on the connection. It is safe to recall this method with a +// nil message iff a timeout error is returned. This will continue to flush the +// pending message to the wire. func (p *peer) writeMessage(msg lnwire.Message) error { // Simply exit if we're shutting down. if atomic.LoadInt32(&p.disconnect) != 0 { return ErrPeerExiting } - p.logWireMessage(msg, false) + // Only log the message on the first attempt. + if msg != nil { + p.logWireMessage(msg, false) + } - var n int + noiseConn, ok := p.conn.(*brontide.Conn) + if !ok { + return fmt.Errorf("brontide.Conn required to write messages") + } + + flushMsg := func() error { + // Ensure the write deadline is set before we attempt to send + // the message. + writeDeadline := time.Now().Add(writeMessageTimeout) + err := noiseConn.SetWriteDeadline(writeDeadline) + if err != nil { + return err + } + + // Flush the pending message to the wire. If an error is + // encountered, e.g. write timeout, the number of bytes written + // so far will be returned. + n, err := noiseConn.Flush() + + // Record the number of bytes written on the wire, if any. + if n > 0 { + atomic.AddUint64(&p.bytesSent, uint64(n)) + } + + return err + } + + // If the current message has already been serialized, encrypted, and + // buffered on the underlying connection we will skip straight to + // flushing it to the wire. + if msg == nil { + return flushMsg() + } + + // Otherwise, this is a new message. We'll acquire a write buffer to + // serialize the message and buffer the ciphertext on the connection. err := p.writePool.Submit(func(buf *bytes.Buffer) error { // Using a buffer allocated by the write pool, encode the // message directly into the buffer. @@ -1421,25 +1463,17 @@ func (p *peer) writeMessage(msg lnwire.Message) error { return writeErr } - // Ensure the write deadline is set before we attempt to send - // the message. - writeDeadline := time.Now().Add(writeMessageTimeout) - writeErr = p.conn.SetWriteDeadline(writeDeadline) - if writeErr != nil { - return writeErr - } - - // Finally, write the message itself in a single swoop. - n, writeErr = p.conn.Write(buf.Bytes()) - return writeErr + // Finally, write the message itself in a single swoop. This + // will buffer the ciphertext on the underlying connection. We + // will defer flushing the message until the write pool has been + // released. + return noiseConn.WriteMessage(buf.Bytes()) }) - - // Record the number of bytes written on the wire, if any. - if n > 0 { - atomic.AddUint64(&p.bytesSent, uint64(n)) + if err != nil { + return err } - return err + return flushMsg() } // writeHandler is a goroutine dedicated to reading messages off of an incoming @@ -1528,6 +1562,15 @@ out: "first attempted %v ago", p, retryDelay, time.Since(startTime)) + // If we received a timeout error, this implies + // that the message was buffered on the + // connection successfully and that a flush was + // attempted. We'll set the message to nil so + // that on a subsequent pass we only try to + // flush the buffered message, and forgo + // reserializing or reencrypting it. + outMsg.msg = nil + goto retryWithDelay }