From b1fe0c12bf8b9a4b9d5e2073af5f260672eab965 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 8 Jan 2018 18:41:15 -0800 Subject: [PATCH] peer: ensure that any active msgStreams properly exit upon peer D/C MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we modify the logic within the Stop() method for msgStream to ensure that the main goroutine properly exits. It has been observed on running nodes with tens of connections, that if a node is very flappy, then the node can end up with hundreds of leaked goroutines. In order to fix this, we’ll continually signal the msgConsumer to wake up after the quit channel has been closed. We do this until the msgConsumer sets a bool indicating that it has exited atomically. --- peer.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/peer.go b/peer.go index 37f87e75..3742a162 100644 --- a/peer.go +++ b/peer.go @@ -471,6 +471,8 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { // TODO(conner): use stream handler interface to abstract out stream // state/logging type msgStream struct { + streamShutdown int32 + peer *peer apply func(lnwire.Message) @@ -516,8 +518,12 @@ func (ms *msgStream) Stop() { close(ms.quit) - // Wake up the msgConsumer is we've been signalled to exit. - ms.msgCond.Signal() + // Now that we've closed the channel, we'll repeatedly signal the msg + // consumer until we've detected that it has exited. + for atomic.LoadInt32(&ms.streamShutdown) == 0 { + ms.msgCond.Signal() + time.Sleep(time.Millisecond * 100) + } ms.wg.Wait() } @@ -537,12 +543,13 @@ func (ms *msgStream) msgConsumer() { for len(ms.msgs) == 0 { ms.msgCond.Wait() - // If we were woke up in order to exit, then we'll do - // so. Otherwise, we'll check the message queue for any - // new items. + // If we woke up in order to exit, then we'll do so. + // Otherwise, we'll check the message queue for any new + // items. select { case <-ms.quit: ms.msgCond.L.Unlock() + atomic.StoreInt32(&ms.streamShutdown, 1) return default: } @@ -586,8 +593,8 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { var chanLink htlcswitch.ChannelLink return newMsgStream(p, - fmt.Sprintf("Update stream for ChannelID(%x) created", cid), - fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid), + fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]), + fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), func(msg lnwire.Message) { _, isChanSycMsg := msg.(*lnwire.ChannelReestablish)