peer: ensure that any active msgStreams properly exit upon peer D/C

In this commit, we modify the logic within the Stop() method for
msgStream to ensure that the main goroutine properly exits. It has been
observed on running nodes with tens of connections, that if a node is
very flappy, then the node can end up with hundreds of leaked
goroutines.

In order to fix this, we’ll continually signal the msgConsumer to wake
up after the quit channel has been closed. We do this until the
msgConsumer sets a bool indicating that it has exited atomically.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-08 18:41:15 -08:00
parent 52e3d86d12
commit b1fe0c12bf
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

21
peer.go

@ -471,6 +471,8 @@ func (p *peer) readNextMessage() (lnwire.Message, error) {
// TODO(conner): use stream handler interface to abstract out stream // TODO(conner): use stream handler interface to abstract out stream
// state/logging // state/logging
type msgStream struct { type msgStream struct {
streamShutdown int32
peer *peer peer *peer
apply func(lnwire.Message) apply func(lnwire.Message)
@ -516,8 +518,12 @@ func (ms *msgStream) Stop() {
close(ms.quit) close(ms.quit)
// Wake up the msgConsumer is we've been signalled to exit. // Now that we've closed the channel, we'll repeatedly signal the msg
ms.msgCond.Signal() // consumer until we've detected that it has exited.
for atomic.LoadInt32(&ms.streamShutdown) == 0 {
ms.msgCond.Signal()
time.Sleep(time.Millisecond * 100)
}
ms.wg.Wait() ms.wg.Wait()
} }
@ -537,12 +543,13 @@ func (ms *msgStream) msgConsumer() {
for len(ms.msgs) == 0 { for len(ms.msgs) == 0 {
ms.msgCond.Wait() ms.msgCond.Wait()
// If we were woke up in order to exit, then we'll do // If we woke up in order to exit, then we'll do so.
// so. Otherwise, we'll check the message queue for any // Otherwise, we'll check the message queue for any new
// new items. // items.
select { select {
case <-ms.quit: case <-ms.quit:
ms.msgCond.L.Unlock() ms.msgCond.L.Unlock()
atomic.StoreInt32(&ms.streamShutdown, 1)
return return
default: default:
} }
@ -586,8 +593,8 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
var chanLink htlcswitch.ChannelLink var chanLink htlcswitch.ChannelLink
return newMsgStream(p, return newMsgStream(p,
fmt.Sprintf("Update stream for ChannelID(%x) created", cid), fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]),
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid), fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]),
func(msg lnwire.Message) { func(msg lnwire.Message) {
_, isChanSycMsg := msg.(*lnwire.ChannelReestablish) _, isChanSycMsg := msg.(*lnwire.ChannelReestablish)