From c47408119e03259c372045e767e8055f3088531c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 31 Jul 2017 21:25:46 -0700 Subject: [PATCH] peer: abandon the prior activeChanStreams scheme in favor of chanMsgStream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes a bug that existed in the prior scheme we used to synchronize between the funding manager and the peer’s readHandler. Previously, it was possible for messages to be re-ordered before the reached the target ChannelLink. This would result in commitment failures as the state machine assumes a strict in-order message delivery. This would be manifested due to the goroutine that was launched in the case of a pending channel funding. The new approach using the chanMsgStream is much simpler, and easier to read. It should also be a bit snappier, as we’ll no longer at times create a goroutine for each message. --- peer.go | 61 ++++++++++++++++++--------------------------------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/peer.go b/peer.go index 66170fa8..a9a01005 100644 --- a/peer.go +++ b/peer.go @@ -506,9 +506,7 @@ func (c *chanMsgStream) AddMsg(msg lnwire.Message) { // // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { - var activeChanMtx sync.Mutex - activeChanStreams := make(map[lnwire.ChannelID]struct{}) - + chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() @@ -603,52 +601,31 @@ out: } if isChanUpdate { - sendUpdate := func() { - // Dispatch the commitment update message to the proper - // active goroutine dedicated to this channel. - link, err := p.server.htlcSwitch.GetLink(targetChan) - if err != nil { - peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", targetChan, p) - return - } - link.HandleChannelUpdate(nextMsg) + // If this is a channel update, then we need to feed it + // into the channel's in-order message stream. + chanStream, ok := chanMsgStreams[targetChan] + if !ok { + // If a stream hasn't yet been created, then + // we'll do so, add it to the map, and finally + // start it. + chanStream = newChanMsgStream(p.server.fundingMgr, + p.server.htlcSwitch, p, targetChan) + chanMsgStreams[targetChan] = chanStream + chanStream.Start() } - // Check the map of active channel streams, if this map - // has an entry, then this means the channel is fully - // open. In this case, we can send the channel update - // directly without any further waiting. - activeChanMtx.Lock() - _, ok := activeChanStreams[targetChan] - activeChanMtx.Unlock() - if ok { - sendUpdate() - continue - } - - // Otherwise, we'll launch a goroutine to synchronize - // the processing of this message, with the opening of - // the channel as marked by the funding manage. - go func() { - // Block until the channel is marked open. - p.server.fundingMgr.waitUntilChannelOpen(targetChan) - - // Once the channel is open, we'll mark the - // stream as active and send the update to the - // channel. Marking the stream lets us take the - // fast path above, skipping the check to the - // funding manager. - activeChanMtx.Lock() - activeChanStreams[targetChan] = struct{}{} - sendUpdate() - activeChanMtx.Unlock() - }() + // With the stream obtained, add the message to the + // stream so we can continue processing message. + chanStream.AddMsg(nextMsg) } } p.Disconnect(errors.New("read handler closed")) + for _, chanStream := range chanMsgStreams { + chanStream.Stop() + } + p.wg.Done() peerLog.Tracef("readHandler for peer %v done", p) }