From 28a59362d6ce7ac7d092e67937cfb0a5fb5efd3a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sun, 26 Aug 2018 19:58:41 -0700 Subject: [PATCH] peer: ensure readHandler doesn't block on AddMsg to msgStream In this commit, we add a quit channel to the AddMsg method of the msgStream struct. Before this commit, if the queue was full, the readHandler would block and be unable to exit. We remedy this by leveraging the existing quit channel of the peer as an additional select case within the AddMsg method. --- peer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/peer.go b/peer.go index c82ea843..4b5daa02 100644 --- a/peer.go +++ b/peer.go @@ -804,7 +804,7 @@ func (ms *msgStream) msgConsumer() { // AddMsg adds a new message to the msgStream. This function is safe for // concurrent access. -func (ms *msgStream) AddMsg(msg lnwire.Message) { +func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) { // First, we'll attempt to receive from the producerSema struct. This // acts as a sempahore to prevent us from indefinitely buffering // incoming items from the wire. Either the msg queue isn't full, and @@ -812,6 +812,8 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { // we're signalled to quit, or a slot is freed up. select { case <-ms.producerSema: + case <-quit: + return case <-ms.quit: return } @@ -1020,7 +1022,7 @@ out: // forward the error to all channels with this peer. case msg.ChanID == lnwire.ConnectionWideID: for chanID, chanStream := range chanMsgStreams { - chanStream.AddMsg(nextMsg) + chanStream.AddMsg(nextMsg, p.quit) // Also marked this channel as failed, // so we won't try to restart it on @@ -1082,7 +1084,7 @@ out: *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: - discStream.AddMsg(msg) + discStream.AddMsg(msg, p.quit) default: peerLog.Errorf("unknown message %v received from peer "+ @@ -1105,7 +1107,7 @@ out: // With the stream obtained, add the message to the // stream so we can continue processing message. - chanStream.AddMsg(nextMsg) + chanStream.AddMsg(nextMsg, p.quit) } idleTimer.Reset(idleTimeout)