diff --git a/peer.go b/peer.go index c82ea843..4b5daa02 100644 --- a/peer.go +++ b/peer.go @@ -804,7 +804,7 @@ func (ms *msgStream) msgConsumer() { // AddMsg adds a new message to the msgStream. This function is safe for // concurrent access. -func (ms *msgStream) AddMsg(msg lnwire.Message) { +func (ms *msgStream) AddMsg(msg lnwire.Message, quit chan struct{}) { // First, we'll attempt to receive from the producerSema struct. This // acts as a sempahore to prevent us from indefinitely buffering // incoming items from the wire. Either the msg queue isn't full, and @@ -812,6 +812,8 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { // we're signalled to quit, or a slot is freed up. select { case <-ms.producerSema: + case <-quit: + return case <-ms.quit: return } @@ -1020,7 +1022,7 @@ out: // forward the error to all channels with this peer. case msg.ChanID == lnwire.ConnectionWideID: for chanID, chanStream := range chanMsgStreams { - chanStream.AddMsg(nextMsg) + chanStream.AddMsg(nextMsg, p.quit) // Also marked this channel as failed, // so we won't try to restart it on @@ -1082,7 +1084,7 @@ out: *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: - discStream.AddMsg(msg) + discStream.AddMsg(msg, p.quit) default: peerLog.Errorf("unknown message %v received from peer "+ @@ -1105,7 +1107,7 @@ out: // With the stream obtained, add the message to the // stream so we can continue processing message. - chanStream.AddMsg(nextMsg) + chanStream.AddMsg(nextMsg, p.quit) } idleTimer.Reset(idleTimeout)