peer: add an idle timer to the readHandler
In this commit, we add an idle timer to the readHandler itself. This will serve to slowly prune away inactive TCP connections as a result of remote peer being blocked either upon reading or writing to the socket. Our ping timer interval is 1 minute, so an idle timer interval of 5 minutes seem reasonable.
This commit is contained in:
parent
7a2ce62346
commit
cbdf139696
15
peer.go
15
peer.go
@ -37,6 +37,9 @@ const (
|
|||||||
// pingInterval is the interval at which ping messages are sent.
|
// pingInterval is the interval at which ping messages are sent.
|
||||||
pingInterval = 1 * time.Minute
|
pingInterval = 1 * time.Minute
|
||||||
|
|
||||||
|
// idleTimeout is the duration of inactivity before we time out a peer.
|
||||||
|
idleTimeout = 5 * time.Minute
|
||||||
|
|
||||||
// outgoingQueueLen is the buffer size of the channel which houses
|
// outgoingQueueLen is the buffer size of the channel which houses
|
||||||
// messages to be sent across the wire, requested by objects outside
|
// messages to be sent across the wire, requested by objects outside
|
||||||
// this struct.
|
// this struct.
|
||||||
@ -597,10 +600,19 @@ func (c *chanMsgStream) AddMsg(msg lnwire.Message) {
|
|||||||
// NOTE: This method MUST be run as a goroutine.
|
// NOTE: This method MUST be run as a goroutine.
|
||||||
func (p *peer) readHandler() {
|
func (p *peer) readHandler() {
|
||||||
|
|
||||||
|
// We'll stop the timer after a new messages is received, and also
|
||||||
|
// reset it after we process the next message.
|
||||||
|
idleTimer := time.AfterFunc(idleTimeout, func() {
|
||||||
|
err := fmt.Errorf("Peer %s no answer for %s -- disconnecting",
|
||||||
|
p, idleTimeout)
|
||||||
|
p.Disconnect(err)
|
||||||
|
})
|
||||||
|
|
||||||
chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream)
|
chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream)
|
||||||
out:
|
out:
|
||||||
for atomic.LoadInt32(&p.disconnect) == 0 {
|
for atomic.LoadInt32(&p.disconnect) == 0 {
|
||||||
nextMsg, err := p.readNextMessage()
|
nextMsg, err := p.readNextMessage()
|
||||||
|
idleTimer.Stop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Infof("unable to read message from %v: %v",
|
peerLog.Infof("unable to read message from %v: %v",
|
||||||
p, err)
|
p, err)
|
||||||
@ -611,6 +623,7 @@ out:
|
|||||||
// us to introduce new messages in a forwards
|
// us to introduce new messages in a forwards
|
||||||
// compatible manner.
|
// compatible manner.
|
||||||
case *lnwire.UnknownMessage:
|
case *lnwire.UnknownMessage:
|
||||||
|
idleTimer.Reset(idleTimeout)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
// If the error we encountered wasn't just a message we
|
// If the error we encountered wasn't just a message we
|
||||||
@ -717,6 +730,8 @@ out:
|
|||||||
// stream so we can continue processing message.
|
// stream so we can continue processing message.
|
||||||
chanStream.AddMsg(nextMsg)
|
chanStream.AddMsg(nextMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
idleTimer.Reset(idleTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.wg.Done()
|
p.wg.Done()
|
||||||
|
Loading…
Reference in New Issue
Block a user