Merge pull request #823 from Roasbeef/chan-stream-buf-limit
peer: modify the msgStream to not buffer messages off the wire indefi…
This commit is contained in:
commit
53045450ad
54
peer.go
54
peer.go
@ -499,24 +499,39 @@ type msgStream struct {
|
|||||||
|
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
|
|
||||||
|
bufSize uint32
|
||||||
|
producerSema chan struct{}
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMsgStream creates a new instance of a chanMsgStream for a particular
|
// newMsgStream creates a new instance of a chanMsgStream for a particular
|
||||||
// channel identified by its channel ID.
|
// channel identified by its channel ID. bufSize is the max number of messages
|
||||||
func newMsgStream(p *peer, startMsg, stopMsg string,
|
// that should be buffered in the internal queue. Callers should set this to a
|
||||||
|
// sane value that avoids blocking unnecessarily, but doesn't allow an
|
||||||
|
// unbounded amount of memory to be allocated to buffer incoming messages.
|
||||||
|
func newMsgStream(p *peer, startMsg, stopMsg string, bufSize uint32,
|
||||||
apply func(lnwire.Message)) *msgStream {
|
apply func(lnwire.Message)) *msgStream {
|
||||||
|
|
||||||
stream := &msgStream{
|
stream := &msgStream{
|
||||||
peer: p,
|
peer: p,
|
||||||
apply: apply,
|
apply: apply,
|
||||||
startMsg: startMsg,
|
startMsg: startMsg,
|
||||||
stopMsg: stopMsg,
|
stopMsg: stopMsg,
|
||||||
quit: make(chan struct{}),
|
producerSema: make(chan struct{}, bufSize),
|
||||||
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
stream.msgCond = sync.NewCond(&stream.mtx)
|
stream.msgCond = sync.NewCond(&stream.mtx)
|
||||||
|
|
||||||
|
// Before we return the active stream, we'll populate the producer's
|
||||||
|
// semaphore channel. We'll use this to ensure that the producer won't
|
||||||
|
// attempt to allocate memory in the queue for an item until it has
|
||||||
|
// sufficient extra space.
|
||||||
|
for i := uint32(0); i < bufSize; i++ {
|
||||||
|
stream.producerSema <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -579,13 +594,34 @@ func (ms *msgStream) msgConsumer() {
|
|||||||
ms.msgCond.L.Unlock()
|
ms.msgCond.L.Unlock()
|
||||||
|
|
||||||
ms.apply(msg)
|
ms.apply(msg)
|
||||||
|
|
||||||
|
// We've just successfully processed an item, so we'll signal
|
||||||
|
// to the producer that a new slot in the buffer. We'll use
|
||||||
|
// this to bound the size of the buffer to avoid allowing it to
|
||||||
|
// grow indefinitely.
|
||||||
|
select {
|
||||||
|
case ms.producerSema <- struct{}{}:
|
||||||
|
case <-ms.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddMsg adds a new message to the msgStream. This function is safe for
|
// AddMsg adds a new message to the msgStream. This function is safe for
|
||||||
// concurrent access.
|
// concurrent access.
|
||||||
func (ms *msgStream) AddMsg(msg lnwire.Message) {
|
func (ms *msgStream) AddMsg(msg lnwire.Message) {
|
||||||
// First, we'll lock the condition, and add the message to the end of
|
// First, we'll attempt to receive from the producerSema struct. This
|
||||||
|
// acts as a sempahore to prevent us from indefinitely buffering
|
||||||
|
// incoming items from the wire. Either the msg queue isn't full, and
|
||||||
|
// we'll not block, or the queue is full, and we'll block until either
|
||||||
|
// we're signalled to quit, or a slot is freed up.
|
||||||
|
select {
|
||||||
|
case <-ms.producerSema:
|
||||||
|
case <-ms.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next, we'll lock the condition, and add the message to the end of
|
||||||
// the message queue.
|
// the message queue.
|
||||||
ms.msgCond.L.Lock()
|
ms.msgCond.L.Lock()
|
||||||
ms.msgs = append(ms.msgs, msg)
|
ms.msgs = append(ms.msgs, msg)
|
||||||
@ -609,6 +645,7 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
|
|||||||
return newMsgStream(p,
|
return newMsgStream(p,
|
||||||
fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]),
|
fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]),
|
||||||
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]),
|
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]),
|
||||||
|
1000,
|
||||||
func(msg lnwire.Message) {
|
func(msg lnwire.Message) {
|
||||||
_, isChanSycMsg := msg.(*lnwire.ChannelReestablish)
|
_, isChanSycMsg := msg.(*lnwire.ChannelReestablish)
|
||||||
|
|
||||||
@ -652,6 +689,7 @@ func newDiscMsgStream(p *peer) *msgStream {
|
|||||||
return newMsgStream(p,
|
return newMsgStream(p,
|
||||||
"Update stream for gossiper created",
|
"Update stream for gossiper created",
|
||||||
"Update stream for gossiper exited",
|
"Update stream for gossiper exited",
|
||||||
|
1000,
|
||||||
func(msg lnwire.Message) {
|
func(msg lnwire.Message) {
|
||||||
p.server.authGossiper.ProcessRemoteAnnouncement(msg,
|
p.server.authGossiper.ProcessRemoteAnnouncement(msg,
|
||||||
p.addr.IdentityKey)
|
p.addr.IdentityKey)
|
||||||
|
Loading…
Reference in New Issue
Block a user