peer: modify readMessage/writeMessage to be message oriented

This commit modifies both readMessage and writeMessage to be further
message oriented. This means that message will be read and written _as
a whole_ rather than piece wise. This also fixes two bugs: the
readHandler could be blocked due to an sync read, and the writeHandler
would unnecessarily chunk up wire messages into distinct crypto
messages rather than writing it in one swoop.

Also with these series of changes, we’re now able to properly parse
messages that have been padded out with additional data as is allowed
by the current specification draft.
This commit is contained in:
Olaoluwa Osuntokun 2017-04-20 15:45:04 -07:00
parent aa2ca81762
commit 010373fe0f
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

36
peer.go

@ -13,6 +13,7 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
@ -373,10 +374,25 @@ func (p *peer) String() string {
// readNextMessage reads, and returns the next message on the wire along with // readNextMessage reads, and returns the next message on the wire along with
// any additional raw payload. // any additional raw payload.
func (p *peer) readNextMessage() (lnwire.Message, error) { func (p *peer) readNextMessage() (lnwire.Message, error) {
// TODO(roasbeef): should take diff of what was read noiseConn, ok := p.conn.(*brontide.Conn)
// * also switch to message oriented reading if !ok {
n, nextMsg, err := lnwire.ReadMessage(p.conn, 0) return nil, fmt.Errorf("brontide.Conn required to read messages")
atomic.AddUint64(&p.bytesReceived, uint64(n)) }
// First we'll read the next _full_ message. We do this rather than
// reading incrementally from the stream as the Lightning wire protocol
// is message oriented and allows nodes to pad on additional data to
// the message stream.
rawMsg, err := noiseConn.ReadNextMessage()
atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg)))
if err != nil {
return nil, err
}
// Next, create a new io.Reader implementation from the raw message,
// and use this to decode the message directly from.
msgReader := bytes.NewReader(rawMsg)
nextMsg, err := lnwire.ReadMessage(msgReader, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -586,9 +602,19 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
// TODO(roasbeef): add message summaries // TODO(roasbeef): add message summaries
p.logWireMessage(msg, false) p.logWireMessage(msg, false)
n, err := lnwire.WriteMessage(p.conn, msg, 0) // As the Lightning wire protocol is fully message oriented, we only
// allows one wire message per outer encapsulated crypto message. So
// we'll create a temporary buffer to write the message directly to.
var msgPayload [lnwire.MaxMessagePayload]byte
b := bytes.NewBuffer(msgPayload[0:0:len(msgPayload)])
// With the temp buffer created and sliced properly (length zero, full
// capacity), we'll now encode the message directly into this buffer.
n, err := lnwire.WriteMessage(b, msg, 0)
atomic.AddUint64(&p.bytesSent, uint64(n)) atomic.AddUint64(&p.bytesSent, uint64(n))
// Finally, write the message itself in a single swoop.
_, err = p.conn.Write(b.Bytes())
return err return err
} }