peer: retry writes with delay on timeout errors

This commit modifies the writeHandler to catch timeout
errors, and retry writes to the socket after a small
backoff, which increases exponentially from 5s to 1m.
With the growing channel graph size, some lower-powered
devices can be slow to pull messages off the wire during
validation. The current behavior will cause us to
disconnect the peer, and resend all of the messages that
the remote peer is slow to validate. Catching the timeout
helps in preventing such expensive reconnection cycles,
especially as the network continues to grow.

This is also a preliminary step to reducing the
write timeout constant. This will allow concurrent usage
of the write pools w/out devoting excessive amounts of
time blocking the pool for slow peers.
This commit is contained in:
Conner Fromknecht 2019-03-26 16:40:07 -07:00
parent 1a8e4b0316
commit 99150b41d6
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

65
peer.go

@ -1429,27 +1429,80 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
func (p *peer) writeHandler() { func (p *peer) writeHandler() {
var exitErr error var exitErr error
const (
minRetryDelay = 5 * time.Second
maxRetryDelay = time.Minute
)
out: out:
for { for {
select { select {
case outMsg := <-p.sendQueue: case outMsg := <-p.sendQueue:
switch outMsg.msg.(type) { // Record the time at which we first attempt to send the
// message.
startTime := time.Now()
// Initialize a retry delay of zero, which will be
// increased if we encounter a write timeout on the
// send.
var retryDelay time.Duration
retryWithDelay:
if retryDelay > 0 {
select {
case <-time.After(retryDelay):
case <-p.quit:
// Inform synchronous writes that the
// peer is exiting.
if outMsg.errChan != nil {
outMsg.errChan <- ErrPeerExiting
}
exitErr = ErrPeerExiting
break out
}
}
// If we're about to send a ping message, then log the // If we're about to send a ping message, then log the
// exact time in which we send the message so we can // exact time in which we send the message so we can
// use the delay as a rough estimate of latency to the // use the delay as a rough estimate of latency to the
// remote peer. // remote peer.
case *lnwire.Ping: if _, ok := outMsg.msg.(*lnwire.Ping); ok {
// TODO(roasbeef): do this before the write? // TODO(roasbeef): do this before the write?
// possibly account for processing within func? // possibly account for processing within func?
now := time.Now().UnixNano() now := time.Now().UnixNano()
atomic.StoreInt64(&p.pingLastSend, now) atomic.StoreInt64(&p.pingLastSend, now)
} }
// Write out the message to the socket, responding with // Write out the message to the socket. If a timeout
// error if `errChan` is non-nil. The `errChan` allows // error is encountered, we will catch this and retry
// callers to optionally synchronize sends with the // after backing off in case the remote peer is just
// writeHandler. // slow to process messages from the wire.
err := p.writeMessage(outMsg.msg) err := p.writeMessage(outMsg.msg)
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
// Increase the retry delay in the event of a
// timeout error, this prevents us from
// disconnecting if the remote party is slow to
// pull messages off the wire. We back off
// exponentially up to our max delay to prevent
// blocking the write pool.
if retryDelay == 0 {
retryDelay = minRetryDelay
} else {
retryDelay *= 2
if retryDelay > maxRetryDelay {
retryDelay = maxRetryDelay
}
}
peerLog.Debugf("Write timeout detected for "+
"peer %s, retrying after %v, "+
"first attempted %v ago", p, retryDelay,
time.Since(startTime))
goto retryWithDelay
}
// If the peer requested a synchronous write, respond
// with the error.
if outMsg.errChan != nil { if outMsg.errChan != nil {
outMsg.errChan <- err outMsg.errChan <- err
} }