From 99150b41d6377981a65bad48e44c9fb022f1a704 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 26 Mar 2019 16:40:07 -0700 Subject: [PATCH] peer: retry writes with delay on timeout errors This commit modifies the writeHandler to catch timeout errors, and retry writes to the socket after a small backoff, which increases exponentially from 5s to 1m. With the growing channel graph size, some lower-powered devices can be slow to pull messages off the wire during validation. The current behavior will cause us to disconnect the peer, and resend all of the messages that the remote peer is slow to validate. Catching the timeout helps in preventing such expensive reconnection cycles, especially as the network continues to grow. This is also a preliminary step to reducing the write timeout constant. This will allow concurrent usage of the write pools w/out devoting excessive amounts of time blocking the pool for slow peers. --- peer.go | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/peer.go b/peer.go index f1fe9490..84cfe9ea 100644 --- a/peer.go +++ b/peer.go @@ -1429,27 +1429,80 @@ func (p *peer) writeMessage(msg lnwire.Message) error { func (p *peer) writeHandler() { var exitErr error + const ( + minRetryDelay = 5 * time.Second + maxRetryDelay = time.Minute + ) + out: for { select { case outMsg := <-p.sendQueue: - switch outMsg.msg.(type) { + // Record the time at which we first attempt to send the + // message. + startTime := time.Now() + + // Initialize a retry delay of zero, which will be + // increased if we encounter a write timeout on the + // send. + var retryDelay time.Duration + retryWithDelay: + if retryDelay > 0 { + select { + case <-time.After(retryDelay): + case <-p.quit: + // Inform synchronous writes that the + // peer is exiting. + if outMsg.errChan != nil { + outMsg.errChan <- ErrPeerExiting + } + exitErr = ErrPeerExiting + break out + } + } + // If we're about to send a ping message, then log the // exact time in which we send the message so we can // use the delay as a rough estimate of latency to the // remote peer. - case *lnwire.Ping: + if _, ok := outMsg.msg.(*lnwire.Ping); ok { // TODO(roasbeef): do this before the write? // possibly account for processing within func? now := time.Now().UnixNano() atomic.StoreInt64(&p.pingLastSend, now) } - // Write out the message to the socket, responding with - // error if `errChan` is non-nil. The `errChan` allows - // callers to optionally synchronize sends with the - // writeHandler. + // Write out the message to the socket. If a timeout + // error is encountered, we will catch this and retry + // after backing off in case the remote peer is just + // slow to process messages from the wire. err := p.writeMessage(outMsg.msg) + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + // Increase the retry delay in the event of a + // timeout error, this prevents us from + // disconnecting if the remote party is slow to + // pull messages off the wire. We back off + // exponentially up to our max delay to prevent + // blocking the write pool. + if retryDelay == 0 { + retryDelay = minRetryDelay + } else { + retryDelay *= 2 + if retryDelay > maxRetryDelay { + retryDelay = maxRetryDelay + } + } + + peerLog.Debugf("Write timeout detected for "+ + "peer %s, retrying after %v, "+ + "first attempted %v ago", p, retryDelay, + time.Since(startTime)) + + goto retryWithDelay + } + + // If the peer requested a synchronous write, respond + // with the error. if outMsg.errChan != nil { outMsg.errChan <- err }