From 04a56e7286dd4e9f43ccb31bb8b87d9162bd828d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 1 Nov 2017 15:50:55 -0700 Subject: [PATCH] peer: add async queue for gossiper msgs This commit refactors the core logic of the chanMsgStream to support an additional stream that is used to asynchronously queue for in-order delivery to the authenticated gossiper. The channel streams are slightly adapted to use the more flexible primitive. We may look to refactor this using more isolated interfaces, but for now this provides a minimal change to resolving known flakes. --- peer.go | 183 +++++++++++++++++++++++++++++++++----------------------- 1 file changed, 108 insertions(+), 75 deletions(-) diff --git a/peer.go b/peer.go index 5305ff03..2057b3cc 100644 --- a/peer.go +++ b/peer.go @@ -473,41 +473,39 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { return nextMsg, nil } -// chanMsgStream implements a goroutine-safe, in-order stream of messages to be -// delivered to an active channel. These messages MUST be in order due to the -// nature of the lightning channel commitment state machine. We utilize -// additional synchronization with the fundingManager to ensure we don't -// attempt to dispatch a message to a channel before it is fully active. -type chanMsgStream struct { - fundingMgr *fundingManager - htlcSwitch *htlcswitch.Switch - - cid lnwire.ChannelID - +// msgStream implements a goroutine-safe, in-order stream of messages to be +// delivered via closure to a receiver. These messages MUST be in order due to +// the nature of the lightning channel commitment and gossiper state machines. +// TODO(conner): use stream handler interface to abstract out stream +// state/logging +type msgStream struct { peer *peer + apply func(lnwire.Message) + + startMsg string + stopMsg string + msgCond *sync.Cond msgs []lnwire.Message - chanLink htlcswitch.ChannelLink - mtx sync.Mutex wg sync.WaitGroup quit chan struct{} } -// newChanMsgStream creates a new instance of a chanMsgStream for a particular +// newMsgStream creates a new instance of a chanMsgStream for a particular // channel identified by its channel ID. -func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, - c lnwire.ChannelID) *chanMsgStream { +func newMsgStream(p *peer, startMsg, stopMsg string, + apply func(lnwire.Message)) *msgStream { - stream := &chanMsgStream{ - fundingMgr: f, - htlcSwitch: h, - peer: p, - cid: c, - quit: make(chan struct{}), + stream := &msgStream{ + peer: p, + apply: apply, + startMsg: startMsg, + stopMsg: stopMsg, + quit: make(chan struct{}), } stream.msgCond = sync.NewCond(&stream.mtx) @@ -515,45 +513,44 @@ func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, } // Start starts the chanMsgStream. -func (c *chanMsgStream) Start() { - c.wg.Add(1) - go c.msgConsumer() +func (ms *msgStream) Start() { + ms.wg.Add(1) + go ms.msgConsumer() } // Stop stops the chanMsgStream. -func (c *chanMsgStream) Stop() { +func (ms *msgStream) Stop() { // TODO(roasbeef): signal too? - close(c.quit) + close(ms.quit) // Wake up the msgConsumer is we've been signalled to exit. - c.msgCond.Signal() + ms.msgCond.Signal() - c.wg.Wait() + ms.wg.Wait() } // msgConsumer is the main goroutine that streams messages from the peer's // readHandler directly to the target channel. -func (c *chanMsgStream) msgConsumer() { - defer c.wg.Done() +func (ms *msgStream) msgConsumer() { + defer ms.wg.Done() + defer peerLog.Tracef(ms.stopMsg) - peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:]) + peerLog.Tracef(ms.startMsg) for { // First, we'll check our condition. If the queue of messages // is empty, then we'll wait until a new item is added. - c.msgCond.L.Lock() - for len(c.msgs) == 0 { - c.msgCond.Wait() + ms.msgCond.L.Lock() + for len(ms.msgs) == 0 { + ms.msgCond.Wait() // If we were woke up in order to exit, then we'll do // so. Otherwise, we'll check the message queue for any // new items. select { - case <-c.quit: - peerLog.Tracef("Update stream for "+ - "ChannelID(%x) exiting", c.cid[:]) - c.msgCond.L.Unlock() + case <-ms.quit: + ms.msgCond.L.Unlock() return default: } @@ -562,48 +559,81 @@ func (c *chanMsgStream) msgConsumer() { // Grab the message off the front of the queue, shifting the // slice's reference down one in order to remove the message // from the queue. - msg := c.msgs[0] - c.msgs[0] = nil // Set to nil to prevent GC leak. - c.msgs = c.msgs[1:] + msg := ms.msgs[0] + ms.msgs[0] = nil // Set to nil to prevent GC leak. + ms.msgs = ms.msgs[1:] - c.msgCond.L.Unlock() + ms.msgCond.L.Unlock() - // We'll send a message to the funding manager and wait iff an - // active funding process for this channel hasn't yet - // completed. We do this in order to account for the following - // scenario: we send the funding locked message to the other - // side, they immediately send a channel update message, but we - // haven't yet sent the channel to the channelManager. - c.fundingMgr.waitUntilChannelOpen(c.cid) - - // Dispatch the commitment update message to the proper active - // goroutine dedicated to this channel. - if c.chanLink == nil { - link, err := c.htlcSwitch.GetLink(c.cid) - if err != nil { - peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", c.cid, c.peer) - continue - } - c.chanLink = link - } - - c.chanLink.HandleChannelUpdate(msg) + ms.apply(msg) } } -// AddMsg adds a new message to the chanMsgStream. This function is safe for +// AddMsg adds a new message to the msgStream. This function is safe for // concurrent access. -func (c *chanMsgStream) AddMsg(msg lnwire.Message) { +func (ms *msgStream) AddMsg(msg lnwire.Message) { // First, we'll lock the condition, and add the message to the end of // the message queue. - c.msgCond.L.Lock() - c.msgs = append(c.msgs, msg) - c.msgCond.L.Unlock() + ms.msgCond.L.Lock() + ms.msgs = append(ms.msgs, msg) + ms.msgCond.L.Unlock() // With the message added, we signal to the msgConsumer that there are // additional messages to consume. - c.msgCond.Signal() + ms.msgCond.Signal() +} + +// newChanMsgStream is used to create a msgStream between the peer and +// particular channel link in the htlcswitch. We utilize additional +// synchronization with the fundingManager to ensure we don't attempt to +// dispatch a message to a channel before it is fully active. A reference to the +// channel this stream forwards to his held in scope to prevent unnecessary +// lookups. +func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { + + var chanLink htlcswitch.ChannelLink + + return newMsgStream(p, + fmt.Sprintf("Update stream for ChannelID(%x) created", cid), + fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid), + func(msg lnwire.Message) { + // We'll send a message to the funding manager and wait iff an + // active funding process for this channel hasn't yet completed. + // We do this in order to account for the following scenario: we + // send the funding locked message to the other side, they + // immediately send a channel update message, but we haven't yet + // sent the channel to the channelManager. + p.server.fundingMgr.waitUntilChannelOpen(cid) + + // Dispatch the commitment update message to the proper active + // goroutine dedicated to this channel. + if chanLink == nil { + link, err := p.server.htlcSwitch.GetLink(cid) + if err != nil { + peerLog.Errorf("recv'd update for unknown "+ + "channel %v from %v", cid, p) + return + } + chanLink = link + } + + chanLink.HandleChannelUpdate(msg) + }, + ) +} + +// newDiscMsgStream is used to setup a msgStream between the peer and the +// authenticated gossiper. This stream should be used to forward all remote +// channel announcements. +func newDiscMsgStream(p *peer) *msgStream { + return newMsgStream(p, + "Update stream for gossiper created", + "Update stream for gossiper exited", + func(msg lnwire.Message) { + p.server.authGossiper.ProcessRemoteAnnouncement(msg, + p.addr.IdentityKey) + }, + ) } // readHandler is responsible for reading messages off the wire in series, then @@ -620,7 +650,11 @@ func (p *peer) readHandler() { p.Disconnect(err) }) - chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) + discStream := newDiscMsgStream(p) + discStream.Start() + defer discStream.Stop() + + chanMsgStreams := make(map[lnwire.ChannelID]*msgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() @@ -717,8 +751,8 @@ out: *lnwire.NodeAnnouncement, *lnwire.AnnounceSignatures: - p.server.authGossiper.ProcessRemoteAnnouncement(msg, - p.addr.IdentityKey) + discStream.AddMsg(msg) + default: peerLog.Errorf("unknown message %v received from peer "+ "%v", uint16(msg.MsgType()), p) @@ -732,8 +766,7 @@ out: // If a stream hasn't yet been created, then // we'll do so, add it to the map, and finally // start it. - chanStream = newChanMsgStream(p.server.fundingMgr, - p.server.htlcSwitch, p, targetChan) + chanStream = newChanMsgStream(p, targetChan) chanMsgStreams[targetChan] = chanStream chanStream.Start() }