peer: abandon the prior activeChanStreams scheme in favor of chanMsgStream

This commit fixes a bug that existed in the prior scheme we used to
synchronize between the funding manager and the peer’s readHandler.
Previously, it was possible for messages to be re-ordered before the
reached the target ChannelLink. This would result in commitment
failures as the state machine assumes a strict in-order message
delivery. This would be manifested due to the goroutine that was
launched in the case of a pending channel funding.

The new approach using the chanMsgStream is much simpler, and easier to
read. It should also be a bit snappier, as we’ll no longer at times
create a goroutine for each message.
This commit is contained in:
Olaoluwa Osuntokun 2017-07-31 21:25:46 -07:00
parent 5e4b368348
commit c47408119e
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

61
peer.go

@ -506,9 +506,7 @@ func (c *chanMsgStream) AddMsg(msg lnwire.Message) {
// //
// NOTE: This method MUST be run as a goroutine. // NOTE: This method MUST be run as a goroutine.
func (p *peer) readHandler() { func (p *peer) readHandler() {
var activeChanMtx sync.Mutex chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream)
activeChanStreams := make(map[lnwire.ChannelID]struct{})
out: out:
for atomic.LoadInt32(&p.disconnect) == 0 { for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, err := p.readNextMessage() nextMsg, err := p.readNextMessage()
@ -603,52 +601,31 @@ out:
} }
if isChanUpdate { if isChanUpdate {
sendUpdate := func() { // If this is a channel update, then we need to feed it
// Dispatch the commitment update message to the proper // into the channel's in-order message stream.
// active goroutine dedicated to this channel. chanStream, ok := chanMsgStreams[targetChan]
link, err := p.server.htlcSwitch.GetLink(targetChan) if !ok {
if err != nil { // If a stream hasn't yet been created, then
peerLog.Errorf("recv'd update for unknown "+ // we'll do so, add it to the map, and finally
"channel %v from %v", targetChan, p) // start it.
return chanStream = newChanMsgStream(p.server.fundingMgr,
} p.server.htlcSwitch, p, targetChan)
link.HandleChannelUpdate(nextMsg) chanMsgStreams[targetChan] = chanStream
chanStream.Start()
} }
// Check the map of active channel streams, if this map // With the stream obtained, add the message to the
// has an entry, then this means the channel is fully // stream so we can continue processing message.
// open. In this case, we can send the channel update chanStream.AddMsg(nextMsg)
// directly without any further waiting.
activeChanMtx.Lock()
_, ok := activeChanStreams[targetChan]
activeChanMtx.Unlock()
if ok {
sendUpdate()
continue
}
// Otherwise, we'll launch a goroutine to synchronize
// the processing of this message, with the opening of
// the channel as marked by the funding manage.
go func() {
// Block until the channel is marked open.
p.server.fundingMgr.waitUntilChannelOpen(targetChan)
// Once the channel is open, we'll mark the
// stream as active and send the update to the
// channel. Marking the stream lets us take the
// fast path above, skipping the check to the
// funding manager.
activeChanMtx.Lock()
activeChanStreams[targetChan] = struct{}{}
sendUpdate()
activeChanMtx.Unlock()
}()
} }
} }
p.Disconnect(errors.New("read handler closed")) p.Disconnect(errors.New("read handler closed"))
for _, chanStream := range chanMsgStreams {
chanStream.Stop()
}
p.wg.Done() p.wg.Done()
peerLog.Tracef("readHandler for peer %v done", p) peerLog.Tracef("readHandler for peer %v done", p)
} }