diff --git a/peer.go b/peer.go index df33543c..cead16c6 100644 --- a/peer.go +++ b/peer.go @@ -155,6 +155,10 @@ type peer struct { // channels to the source peer which handled the funding workflow. newChannels chan *newChannelMsg + // activeMsgStreams is a map from channel id to the channel streams that + // proxy messages to individual, active links. + activeMsgStreams map[lnwire.ChannelID]*msgStream + // activeChanCloses is a map that keep track of all the active // cooperative channel closures that are active. Any channel closing // messages are directed to one of these active state machines. Once @@ -253,6 +257,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), newChannels: make(chan *newChannelMsg, 1), + activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), + activeChanCloses: make(map[lnwire.ChannelID]*channelCloser), localCloseChanReqs: make(chan *htlcswitch.ChanClose), linkFailures: make(chan linkFailureReport), @@ -990,8 +996,6 @@ func (p *peer) readHandler() { discStream := newDiscMsgStream(p) discStream.Start() defer discStream.Stop() - - chanMsgStreams := make(map[lnwire.ChannelID]*msgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() @@ -1081,38 +1085,8 @@ out: } case *lnwire.Error: - key := p.addr.IdentityKey - - switch { - // In the case of an all-zero channel ID we want to - // forward the error to all channels with this peer. - case msg.ChanID == lnwire.ConnectionWideID: - for chanID, chanStream := range chanMsgStreams { - chanStream.AddMsg(nextMsg) - - // Also marked this channel as failed, - // so we won't try to restart it on - // reconnect with this peer. - p.failedChannels[chanID] = struct{}{} - } - - // If the channel ID for the error message corresponds - // to a pending channel, then the funding manager will - // handle the error. - case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): - p.server.fundingMgr.processFundingError(msg, key) - - // If not we hand the error to the channel link for - // this channel. - default: - isLinkUpdate = true - targetChan = msg.ChanID - - // Also marked this channel as failed, so we - // won't try to restart it on reconnect with - // this peer. - p.failedChannels[targetChan] = struct{}{} - } + isLinkUpdate = p.handleError(msg) + targetChan = msg.ChanID case *lnwire.ChannelReestablish: isLinkUpdate = true @@ -1142,13 +1116,13 @@ out: if isLinkUpdate { // If this is a channel update, then we need to feed it // into the channel's in-order message stream. - chanStream, ok := chanMsgStreams[targetChan] + chanStream, ok := p.activeMsgStreams[targetChan] if !ok { // If a stream hasn't yet been created, then // we'll do so, add it to the map, and finally // start it. chanStream = newChanMsgStream(p, targetChan) - chanMsgStreams[targetChan] = chanStream + p.activeMsgStreams[targetChan] = chanStream chanStream.Start() defer chanStream.Stop() } @@ -1166,6 +1140,42 @@ out: peerLog.Tracef("readHandler for peer %v done", p) } +// handleError processes an error message read from the remote peer. The boolean +// returns indicates whether the message should be delivered to a targeted peer. +// +// NOTE: This method should only be called from within the readHandler. +func (p *peer) handleError(msg *lnwire.Error) bool { + key := p.addr.IdentityKey + + switch { + + // In the case of an all-zero channel ID we want to forward the error to + // all channels with this peer. + case msg.ChanID == lnwire.ConnectionWideID: + for chanID, chanStream := range p.activeMsgStreams { + chanStream.AddMsg(msg) + + // Also marked this channel as failed, so we won't try + // to restart it on reconnect with this peer. + p.failedChannels[chanID] = struct{}{} + } + return false + + // If the channel ID for the error message corresponds to a pending + // channel, then the funding manager will handle the error. + case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): + p.server.fundingMgr.processFundingError(msg, key) + return false + + // If not we hand the error to the channel link for this channel. + default: + // Mark this channel as failed, so we won't try to restart it on + // reconnect with this peer. + p.failedChannels[msg.ChanID] = struct{}{} + return true + } +} + // messageSummary returns a human-readable string that summarizes a // incoming/outgoing message. Not all messages will have a summary, only those // which have additional data that can be informative at a glance.