diff --git a/peer.go b/peer.go index b27d6b51..433bb8e0 100644 --- a/peer.go +++ b/peer.go @@ -37,6 +37,9 @@ const ( // pingInterval is the interval at which ping messages are sent. pingInterval = 1 * time.Minute + // idleTimeout is the duration of inactivity before we time out a peer. + idleTimeout = 5 * time.Minute + // outgoingQueueLen is the buffer size of the channel which houses // messages to be sent across the wire, requested by objects outside // this struct. @@ -597,10 +600,19 @@ func (c *chanMsgStream) AddMsg(msg lnwire.Message) { // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { + // We'll stop the timer after a new messages is received, and also + // reset it after we process the next message. + idleTimer := time.AfterFunc(idleTimeout, func() { + err := fmt.Errorf("Peer %s no answer for %s -- disconnecting", + p, idleTimeout) + p.Disconnect(err) + }) + chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() + idleTimer.Stop() if err != nil { peerLog.Infof("unable to read message from %v: %v", p, err) @@ -611,6 +623,7 @@ out: // us to introduce new messages in a forwards // compatible manner. case *lnwire.UnknownMessage: + idleTimer.Reset(idleTimeout) continue // If the error we encountered wasn't just a message we @@ -717,6 +730,8 @@ out: // stream so we can continue processing message. chanStream.AddMsg(nextMsg) } + + idleTimer.Reset(idleTimeout) } p.wg.Done()