peer: replace Write with split WriteMessage then Flush methods

This commit modifies the way the link writes messages to the wire, by
first buffering ciphertexts to the connection using WriteMessage, and
then calling Flush separately. Currently, the call to Write tries to do
both, which can result in a blocking operation for up to the duration of
the write timeout. Splitting these operations permits less blocking in
the write pool, since now we only need to use a write worker to
serialize and encrypt the plaintext.

After the write pool is released, the peer then attempts to flush the
message using the appropriate write timeout. If a timeout error occurs,
the peer will continue to flush the message w/o serializing or
encrypting the message again, until the message is fully written to the
wire or the write idle timer disconnects the peer.
This commit is contained in:
Conner Fromknecht 2019-04-22 16:05:26 -07:00
parent f8345d38fb
commit bfbf3015ad
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

79
peer.go

@ -1403,16 +1403,58 @@ func (p *peer) logWireMessage(msg lnwire.Message, read bool) {
})) }))
} }
// writeMessage writes the target lnwire.Message to the remote peer. // writeMessage writes and flushes the target lnwire.Message to the remote peer.
// If the passed message is nil, this method will only try to flush an existing
// message buffered on the connection. It is safe to recall this method with a
// nil message iff a timeout error is returned. This will continue to flush the
// pending message to the wire.
func (p *peer) writeMessage(msg lnwire.Message) error { func (p *peer) writeMessage(msg lnwire.Message) error {
// Simply exit if we're shutting down. // Simply exit if we're shutting down.
if atomic.LoadInt32(&p.disconnect) != 0 { if atomic.LoadInt32(&p.disconnect) != 0 {
return ErrPeerExiting return ErrPeerExiting
} }
// Only log the message on the first attempt.
if msg != nil {
p.logWireMessage(msg, false) p.logWireMessage(msg, false)
}
var n int noiseConn, ok := p.conn.(*brontide.Conn)
if !ok {
return fmt.Errorf("brontide.Conn required to write messages")
}
flushMsg := func() error {
// Ensure the write deadline is set before we attempt to send
// the message.
writeDeadline := time.Now().Add(writeMessageTimeout)
err := noiseConn.SetWriteDeadline(writeDeadline)
if err != nil {
return err
}
// Flush the pending message to the wire. If an error is
// encountered, e.g. write timeout, the number of bytes written
// so far will be returned.
n, err := noiseConn.Flush()
// Record the number of bytes written on the wire, if any.
if n > 0 {
atomic.AddUint64(&p.bytesSent, uint64(n))
}
return err
}
// If the current message has already been serialized, encrypted, and
// buffered on the underlying connection we will skip straight to
// flushing it to the wire.
if msg == nil {
return flushMsg()
}
// Otherwise, this is a new message. We'll acquire a write buffer to
// serialize the message and buffer the ciphertext on the connection.
err := p.writePool.Submit(func(buf *bytes.Buffer) error { err := p.writePool.Submit(func(buf *bytes.Buffer) error {
// Using a buffer allocated by the write pool, encode the // Using a buffer allocated by the write pool, encode the
// message directly into the buffer. // message directly into the buffer.
@ -1421,25 +1463,17 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
return writeErr return writeErr
} }
// Ensure the write deadline is set before we attempt to send // Finally, write the message itself in a single swoop. This
// the message. // will buffer the ciphertext on the underlying connection. We
writeDeadline := time.Now().Add(writeMessageTimeout) // will defer flushing the message until the write pool has been
writeErr = p.conn.SetWriteDeadline(writeDeadline) // released.
if writeErr != nil { return noiseConn.WriteMessage(buf.Bytes())
return writeErr
}
// Finally, write the message itself in a single swoop.
n, writeErr = p.conn.Write(buf.Bytes())
return writeErr
}) })
if err != nil {
// Record the number of bytes written on the wire, if any. return err
if n > 0 {
atomic.AddUint64(&p.bytesSent, uint64(n))
} }
return err return flushMsg()
} }
// writeHandler is a goroutine dedicated to reading messages off of an incoming // writeHandler is a goroutine dedicated to reading messages off of an incoming
@ -1528,6 +1562,15 @@ out:
"first attempted %v ago", p, retryDelay, "first attempted %v ago", p, retryDelay,
time.Since(startTime)) time.Since(startTime))
// If we received a timeout error, this implies
// that the message was buffered on the
// connection successfully and that a flush was
// attempted. We'll set the message to nil so
// that on a subsequent pass we only try to
// flush the buffered message, and forgo
// reserializing or reencrypting it.
outMsg.msg = nil
goto retryWithDelay goto retryWithDelay
} }