peer: add async queue for gossiper msgs

This commit refactors the core logic of the
  chanMsgStream to support an additional stream
  that is used to asynchronously queue for in-order
  delivery to the authenticated gossiper. The channel
  streams are slightly adapted to use the more flexible
  primitive. We may look to refactor this using more
  isolated interfaces, but for now this provides a
  minimal change to resolving known flakes.
This commit is contained in:
Conner Fromknecht 2017-11-01 15:50:55 -07:00
parent 8a9cf9af16
commit 04a56e7286
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

183
peer.go

@ -473,41 +473,39 @@ func (p *peer) readNextMessage() (lnwire.Message, error) {
return nextMsg, nil
}
// chanMsgStream implements a goroutine-safe, in-order stream of messages to be
// delivered to an active channel. These messages MUST be in order due to the
// nature of the lightning channel commitment state machine. We utilize
// additional synchronization with the fundingManager to ensure we don't
// attempt to dispatch a message to a channel before it is fully active.
type chanMsgStream struct {
fundingMgr *fundingManager
htlcSwitch *htlcswitch.Switch
cid lnwire.ChannelID
// msgStream implements a goroutine-safe, in-order stream of messages to be
// delivered via closure to a receiver. These messages MUST be in order due to
// the nature of the lightning channel commitment and gossiper state machines.
// TODO(conner): use stream handler interface to abstract out stream
// state/logging
type msgStream struct {
peer *peer
apply func(lnwire.Message)
startMsg string
stopMsg string
msgCond *sync.Cond
msgs []lnwire.Message
chanLink htlcswitch.ChannelLink
mtx sync.Mutex
wg sync.WaitGroup
quit chan struct{}
}
// newChanMsgStream creates a new instance of a chanMsgStream for a particular
// newMsgStream creates a new instance of a chanMsgStream for a particular
// channel identified by its channel ID.
func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer,
c lnwire.ChannelID) *chanMsgStream {
func newMsgStream(p *peer, startMsg, stopMsg string,
apply func(lnwire.Message)) *msgStream {
stream := &chanMsgStream{
fundingMgr: f,
htlcSwitch: h,
peer: p,
cid: c,
quit: make(chan struct{}),
stream := &msgStream{
peer: p,
apply: apply,
startMsg: startMsg,
stopMsg: stopMsg,
quit: make(chan struct{}),
}
stream.msgCond = sync.NewCond(&stream.mtx)
@ -515,45 +513,44 @@ func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer,
}
// Start starts the chanMsgStream.
func (c *chanMsgStream) Start() {
c.wg.Add(1)
go c.msgConsumer()
func (ms *msgStream) Start() {
ms.wg.Add(1)
go ms.msgConsumer()
}
// Stop stops the chanMsgStream.
func (c *chanMsgStream) Stop() {
func (ms *msgStream) Stop() {
// TODO(roasbeef): signal too?
close(c.quit)
close(ms.quit)
// Wake up the msgConsumer is we've been signalled to exit.
c.msgCond.Signal()
ms.msgCond.Signal()
c.wg.Wait()
ms.wg.Wait()
}
// msgConsumer is the main goroutine that streams messages from the peer's
// readHandler directly to the target channel.
func (c *chanMsgStream) msgConsumer() {
defer c.wg.Done()
func (ms *msgStream) msgConsumer() {
defer ms.wg.Done()
defer peerLog.Tracef(ms.stopMsg)
peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:])
peerLog.Tracef(ms.startMsg)
for {
// First, we'll check our condition. If the queue of messages
// is empty, then we'll wait until a new item is added.
c.msgCond.L.Lock()
for len(c.msgs) == 0 {
c.msgCond.Wait()
ms.msgCond.L.Lock()
for len(ms.msgs) == 0 {
ms.msgCond.Wait()
// If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the message queue for any
// new items.
select {
case <-c.quit:
peerLog.Tracef("Update stream for "+
"ChannelID(%x) exiting", c.cid[:])
c.msgCond.L.Unlock()
case <-ms.quit:
ms.msgCond.L.Unlock()
return
default:
}
@ -562,48 +559,81 @@ func (c *chanMsgStream) msgConsumer() {
// Grab the message off the front of the queue, shifting the
// slice's reference down one in order to remove the message
// from the queue.
msg := c.msgs[0]
c.msgs[0] = nil // Set to nil to prevent GC leak.
c.msgs = c.msgs[1:]
msg := ms.msgs[0]
ms.msgs[0] = nil // Set to nil to prevent GC leak.
ms.msgs = ms.msgs[1:]
c.msgCond.L.Unlock()
ms.msgCond.L.Unlock()
// We'll send a message to the funding manager and wait iff an
// active funding process for this channel hasn't yet
// completed. We do this in order to account for the following
// scenario: we send the funding locked message to the other
// side, they immediately send a channel update message, but we
// haven't yet sent the channel to the channelManager.
c.fundingMgr.waitUntilChannelOpen(c.cid)
// Dispatch the commitment update message to the proper active
// goroutine dedicated to this channel.
if c.chanLink == nil {
link, err := c.htlcSwitch.GetLink(c.cid)
if err != nil {
peerLog.Errorf("recv'd update for unknown "+
"channel %v from %v", c.cid, c.peer)
continue
}
c.chanLink = link
}
c.chanLink.HandleChannelUpdate(msg)
ms.apply(msg)
}
}
// AddMsg adds a new message to the chanMsgStream. This function is safe for
// AddMsg adds a new message to the msgStream. This function is safe for
// concurrent access.
func (c *chanMsgStream) AddMsg(msg lnwire.Message) {
func (ms *msgStream) AddMsg(msg lnwire.Message) {
// First, we'll lock the condition, and add the message to the end of
// the message queue.
c.msgCond.L.Lock()
c.msgs = append(c.msgs, msg)
c.msgCond.L.Unlock()
ms.msgCond.L.Lock()
ms.msgs = append(ms.msgs, msg)
ms.msgCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are
// additional messages to consume.
c.msgCond.Signal()
ms.msgCond.Signal()
}
// newChanMsgStream is used to create a msgStream between the peer and
// particular channel link in the htlcswitch. We utilize additional
// synchronization with the fundingManager to ensure we don't attempt to
// dispatch a message to a channel before it is fully active. A reference to the
// channel this stream forwards to his held in scope to prevent unnecessary
// lookups.
func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
var chanLink htlcswitch.ChannelLink
return newMsgStream(p,
fmt.Sprintf("Update stream for ChannelID(%x) created", cid),
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid),
func(msg lnwire.Message) {
// We'll send a message to the funding manager and wait iff an
// active funding process for this channel hasn't yet completed.
// We do this in order to account for the following scenario: we
// send the funding locked message to the other side, they
// immediately send a channel update message, but we haven't yet
// sent the channel to the channelManager.
p.server.fundingMgr.waitUntilChannelOpen(cid)
// Dispatch the commitment update message to the proper active
// goroutine dedicated to this channel.
if chanLink == nil {
link, err := p.server.htlcSwitch.GetLink(cid)
if err != nil {
peerLog.Errorf("recv'd update for unknown "+
"channel %v from %v", cid, p)
return
}
chanLink = link
}
chanLink.HandleChannelUpdate(msg)
},
)
}
// newDiscMsgStream is used to setup a msgStream between the peer and the
// authenticated gossiper. This stream should be used to forward all remote
// channel announcements.
func newDiscMsgStream(p *peer) *msgStream {
return newMsgStream(p,
"Update stream for gossiper created",
"Update stream for gossiper exited",
func(msg lnwire.Message) {
p.server.authGossiper.ProcessRemoteAnnouncement(msg,
p.addr.IdentityKey)
},
)
}
// readHandler is responsible for reading messages off the wire in series, then
@ -620,7 +650,11 @@ func (p *peer) readHandler() {
p.Disconnect(err)
})
chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream)
discStream := newDiscMsgStream(p)
discStream.Start()
defer discStream.Stop()
chanMsgStreams := make(map[lnwire.ChannelID]*msgStream)
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, err := p.readNextMessage()
@ -717,8 +751,8 @@ out:
*lnwire.NodeAnnouncement,
*lnwire.AnnounceSignatures:
p.server.authGossiper.ProcessRemoteAnnouncement(msg,
p.addr.IdentityKey)
discStream.AddMsg(msg)
default:
peerLog.Errorf("unknown message %v received from peer "+
"%v", uint16(msg.MsgType()), p)
@ -732,8 +766,7 @@ out:
// If a stream hasn't yet been created, then
// we'll do so, add it to the map, and finally
// start it.
chanStream = newChanMsgStream(p.server.fundingMgr,
p.server.htlcSwitch, p, targetChan)
chanStream = newChanMsgStream(p, targetChan)
chanMsgStreams[targetChan] = chanStream
chanStream.Start()
}