peer: fix deadlock bug, block readHandler when waiting for chan open
This commit fixes a deadlock bug within the readHandler of the peer. Previously, once a channel was pending opening, _no_ other message would be processed by the readHandler as it would be blocked waiting for the channel to open. On testnet this would be manifsted as a node locking up, until the channel was detected as being open. We fix this bug by tracking which channel streams are active. If a channel stream is active, then we can send the update directly to it. Otherwise, we launch a goroutine that’ll block until the channel is open, then in a synchronized manner, update the channel stream as being active and send the update to the channel.
This commit is contained in:
parent
60c0cebfd5
commit
193936374a
56
peer.go
56
peer.go
@ -394,6 +394,9 @@ func (p *peer) readNextMessage() (lnwire.Message, []byte, error) {
|
||||
//
|
||||
// NOTE: This method MUST be run as a goroutine.
|
||||
func (p *peer) readHandler() {
|
||||
var activeChanMtx sync.Mutex
|
||||
activeChanStreams := make(map[lnwire.ChannelID]struct{})
|
||||
|
||||
out:
|
||||
for atomic.LoadInt32(&p.disconnect) == 0 {
|
||||
nextMsg, _, err := p.readNextMessage()
|
||||
@ -481,18 +484,51 @@ out:
|
||||
}
|
||||
|
||||
if isChanUpdate {
|
||||
p.server.fundingMgr.waitUntilChannelOpen(targetChan)
|
||||
// Dispatch the commitment update message to the proper
|
||||
// active goroutine dedicated to this channel.
|
||||
p.htlcManMtx.Lock()
|
||||
channel, ok := p.htlcManagers[targetChan]
|
||||
p.htlcManMtx.Unlock()
|
||||
if !ok {
|
||||
peerLog.Errorf("recv'd update for unknown "+
|
||||
"channel %v from %v", targetChan, p)
|
||||
sendUpdate := func() {
|
||||
// Dispatch the commitment update message to
|
||||
// the proper active goroutine dedicated to
|
||||
// this channel.
|
||||
p.htlcManMtx.RLock()
|
||||
channel, ok := p.htlcManagers[targetChan]
|
||||
p.htlcManMtx.RUnlock()
|
||||
if !ok {
|
||||
peerLog.Errorf("recv'd update for unknown "+
|
||||
"channel %v from %v", targetChan, p)
|
||||
return
|
||||
}
|
||||
|
||||
channel <- nextMsg
|
||||
}
|
||||
|
||||
// Check the map of active channel streams, if this map
|
||||
// has an entry, then this means the channel is fully
|
||||
// open. In this case, we can send the channel update
|
||||
// directly without any further waiting.
|
||||
activeChanMtx.Lock()
|
||||
_, ok := activeChanStreams[targetChan]
|
||||
activeChanMtx.Unlock()
|
||||
if ok {
|
||||
sendUpdate()
|
||||
continue
|
||||
}
|
||||
channel <- nextMsg
|
||||
|
||||
// Otherwise, we'll launch a goroutine to synchronize
|
||||
// the processing of this message, with the opening of
|
||||
// the channel as marked by the funding manage.
|
||||
go func() {
|
||||
// Block until the channel is marked open.
|
||||
p.server.fundingMgr.waitUntilChannelOpen(targetChan)
|
||||
|
||||
// Once the channel is open, we'll mark the
|
||||
// stream as active and send the update to the
|
||||
// channel. Marking the stream lets us take the
|
||||
// fast path above, skipping the check to the
|
||||
// funding manager.
|
||||
activeChanMtx.Lock()
|
||||
activeChanStreams[targetChan] = struct{}{}
|
||||
sendUpdate()
|
||||
activeChanMtx.Unlock()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user