diff --git a/peer.go b/peer.go index 52d1bdd7..b0ee668a 100644 --- a/peer.go +++ b/peer.go @@ -499,24 +499,39 @@ type msgStream struct { mtx sync.Mutex + bufSize uint32 + producerSema chan struct{} + wg sync.WaitGroup quit chan struct{} } // newMsgStream creates a new instance of a chanMsgStream for a particular -// channel identified by its channel ID. -func newMsgStream(p *peer, startMsg, stopMsg string, +// channel identified by its channel ID. bufSize is the max number of messages +// that should be buffered in the internal queue. Callers should set this to a +// sane value that avoids blocking unnecessarily, but doesn't allow an +// unbounded amount of memory to be allocated to buffer incoming messages. +func newMsgStream(p *peer, startMsg, stopMsg string, bufSize uint32, apply func(lnwire.Message)) *msgStream { stream := &msgStream{ - peer: p, - apply: apply, - startMsg: startMsg, - stopMsg: stopMsg, - quit: make(chan struct{}), + peer: p, + apply: apply, + startMsg: startMsg, + stopMsg: stopMsg, + producerSema: make(chan struct{}, bufSize), + quit: make(chan struct{}), } stream.msgCond = sync.NewCond(&stream.mtx) + // Before we return the active stream, we'll populate the producer's + // semaphore channel. We'll use this to ensure that the producer won't + // attempt to allocate memory in the queue for an item until it has + // sufficient extra space. + for i := uint32(0); i < bufSize; i++ { + stream.producerSema <- struct{}{} + } + return stream } @@ -579,13 +594,34 @@ func (ms *msgStream) msgConsumer() { ms.msgCond.L.Unlock() ms.apply(msg) + + // We've just successfully processed an item, so we'll signal + // to the producer that a new slot in the buffer. We'll use + // this to bound the size of the buffer to avoid allowing it to + // grow indefinitely. + select { + case ms.producerSema <- struct{}{}: + case <-ms.quit: + return + } } } // AddMsg adds a new message to the msgStream. This function is safe for // concurrent access. func (ms *msgStream) AddMsg(msg lnwire.Message) { - // First, we'll lock the condition, and add the message to the end of + // First, we'll attempt to receive from the producerSema struct. This + // acts as a sempahore to prevent us from indefinitely buffering + // incoming items from the wire. Either the msg queue isn't full, and + // we'll not block, or the queue is full, and we'll block until either + // we're signalled to quit, or a slot is freed up. + select { + case <-ms.producerSema: + case <-ms.quit: + return + } + + // Next, we'll lock the condition, and add the message to the end of // the message queue. ms.msgCond.L.Lock() ms.msgs = append(ms.msgs, msg) @@ -609,6 +645,7 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { return newMsgStream(p, fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]), fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), + 1000, func(msg lnwire.Message) { _, isChanSycMsg := msg.(*lnwire.ChannelReestablish) @@ -652,6 +689,7 @@ func newDiscMsgStream(p *peer) *msgStream { return newMsgStream(p, "Update stream for gossiper created", "Update stream for gossiper exited", + 1000, func(msg lnwire.Message) { p.server.authGossiper.ProcessRemoteAnnouncement(msg, p.addr.IdentityKey)