peer: ensure readHandler doesn't block on AddMsg to msgStream

In this commit, we add a quit channel to the AddMsg method of the
msgStream struct. Before this commit, if the queue was full, the
readHandler would block and be unable to exit. We remedy this by
leveraging the existing quit channel of the peer as an additional select
case within the AddMsg method.
This commit is contained in:
Olaoluwa Osuntokun 2018-08-26 19:58:41 -07:00 committed by Conner Fromknecht
parent a5b9279ca1
commit 28a59362d6
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

10
peer.go

@ -804,7 +804,7 @@ func (ms *msgStream) msgConsumer() {
// AddMsg adds a new message to the msgStream. This function is safe for // AddMsg adds a new message to the msgStream. This function is safe for
// concurrent access. // concurrent access.
func (ms *msgStream) AddMsg(msg lnwire.Message) { func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) {
// First, we'll attempt to receive from the producerSema struct. This // First, we'll attempt to receive from the producerSema struct. This
// acts as a sempahore to prevent us from indefinitely buffering // acts as a sempahore to prevent us from indefinitely buffering
// incoming items from the wire. Either the msg queue isn't full, and // incoming items from the wire. Either the msg queue isn't full, and
@ -812,6 +812,8 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) {
// we're signalled to quit, or a slot is freed up. // we're signalled to quit, or a slot is freed up.
select { select {
case <-ms.producerSema: case <-ms.producerSema:
case <-quit:
return
case <-ms.quit: case <-ms.quit:
return return
} }
@ -1020,7 +1022,7 @@ out:
// forward the error to all channels with this peer. // forward the error to all channels with this peer.
case msg.ChanID == lnwire.ConnectionWideID: case msg.ChanID == lnwire.ConnectionWideID:
for chanID, chanStream := range chanMsgStreams { for chanID, chanStream := range chanMsgStreams {
chanStream.AddMsg(nextMsg) chanStream.AddMsg(nextMsg, p.quit)
// Also marked this channel as failed, // Also marked this channel as failed,
// so we won't try to restart it on // so we won't try to restart it on
@ -1082,7 +1084,7 @@ out:
*lnwire.ReplyChannelRange, *lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd: *lnwire.ReplyShortChanIDsEnd:
discStream.AddMsg(msg) discStream.AddMsg(msg, p.quit)
default: default:
peerLog.Errorf("unknown message %v received from peer "+ peerLog.Errorf("unknown message %v received from peer "+
@ -1105,7 +1107,7 @@ out:
// With the stream obtained, add the message to the // With the stream obtained, add the message to the
// stream so we can continue processing message. // stream so we can continue processing message.
chanStream.AddMsg(nextMsg) chanStream.AddMsg(nextMsg, p.quit)
} }
idleTimer.Reset(idleTimeout) idleTimer.Reset(idleTimeout)