peer: estimate RTT to peer using the ping and pong messages

This commit enhances the peer struct slightly be attempting to measure
the ping time to the remote node based on when we send a ping message
an dhow long it takes for us to receive a pong response.

The current method used to measure RTT is rather rough and could be
make much more accurate via the usage of an EMA/WMA and also via
attempting to measure processing time within the readMessage and
writeMessage functions.
This commit is contained in:
Olaoluwa Osuntokun 2017-01-25 18:20:55 -08:00
parent 3988473d45
commit 3c5a23b2c1
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

38
peer.go

@ -76,6 +76,15 @@ type peer struct {
inbound bool inbound bool
id int32 id int32
// pingTime is a rough estimate of the RTT (round-trip-time) between us
// and the connected peer. This time is expressed in micro seconds.
// TODO(roasbeef): also use a WMA or EMA?
pingTime int64
// pingLastSend is the Unix time expressed in nanoseconds when we sent
// our last ping message.
pingLastSend int64
// For purposes of detecting retransmits, etc. // For purposes of detecting retransmits, etc.
lastNMessages map[lnwire.Message]struct{} lastNMessages map[lnwire.Message]struct{}
@ -382,6 +391,15 @@ out:
var targetChan *wire.OutPoint var targetChan *wire.OutPoint
switch msg := nextMsg.(type) { switch msg := nextMsg.(type) {
case *lnwire.Pong:
// When we receive a Pong message in response to our
// last ping message, we'll use the time in which we
// sent the ping message to measure a rough estimate of
// round trip time.
pingSendTime := atomic.LoadInt64(&p.pingLastSend)
delay := (time.Now().UnixNano() - pingSendTime) / 1000
atomic.StoreInt64(&p.pingTime, delay)
case *lnwire.Ping: case *lnwire.Ping:
p.queueMsg(lnwire.NewPong(msg.Nonce), nil) p.queueMsg(lnwire.NewPong(msg.Nonce), nil)
@ -531,16 +549,21 @@ out:
for { for {
select { select {
case outMsg := <-p.sendQueue: case outMsg := <-p.sendQueue:
switch m := outMsg.msg.(type) {
// TODO(roasbeef): handle special write cases
}
if err := p.writeMessage(outMsg.msg); err != nil { if err := p.writeMessage(outMsg.msg); err != nil {
peerLog.Errorf("unable to write message: %v", err) peerLog.Errorf("unable to write message: %v",
err)
p.Disconnect() p.Disconnect()
break out break out
} }
switch outMsg.msg.(type) {
case *lnwire.Ping:
// TODO(roasbeef): do this before the write?
// possibly account for processing within func?
now := time.Now().UnixNano()
atomic.StoreInt64(&p.pingLastSend, now)
}
// Synchronize with the writeHandler. // Synchronize with the writeHandler.
p.sendQueueSync <- struct{}{} p.sendQueueSync <- struct{}{}
case <-p.quit: case <-p.quit:
@ -647,6 +670,11 @@ out:
p.wg.Done() p.wg.Done()
} }
// PingTime returns the estimated ping time to the peer in microseconds.
func (p *peer) PingTime() int64 {
return atomic.LoadInt64(&p.pingTime)
}
// queueMsg queues a new lnwire.Message to be eventually sent out on the // queueMsg queues a new lnwire.Message to be eventually sent out on the
// wire. // wire.
func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) {