peer: extract error handling logic
This commit is contained in:
parent
2df5a36048
commit
f33a1a61e6
82
peer.go
82
peer.go
@ -155,6 +155,10 @@ type peer struct {
|
||||
// channels to the source peer which handled the funding workflow.
|
||||
newChannels chan *newChannelMsg
|
||||
|
||||
// activeMsgStreams is a map from channel id to the channel streams that
|
||||
// proxy messages to individual, active links.
|
||||
activeMsgStreams map[lnwire.ChannelID]*msgStream
|
||||
|
||||
// activeChanCloses is a map that keep track of all the active
|
||||
// cooperative channel closures that are active. Any channel closing
|
||||
// messages are directed to one of these active state machines. Once
|
||||
@ -253,6 +257,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
|
||||
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
|
||||
newChannels: make(chan *newChannelMsg, 1),
|
||||
|
||||
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
|
||||
|
||||
activeChanCloses: make(map[lnwire.ChannelID]*channelCloser),
|
||||
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
|
||||
linkFailures: make(chan linkFailureReport),
|
||||
@ -990,8 +996,6 @@ func (p *peer) readHandler() {
|
||||
discStream := newDiscMsgStream(p)
|
||||
discStream.Start()
|
||||
defer discStream.Stop()
|
||||
|
||||
chanMsgStreams := make(map[lnwire.ChannelID]*msgStream)
|
||||
out:
|
||||
for atomic.LoadInt32(&p.disconnect) == 0 {
|
||||
nextMsg, err := p.readNextMessage()
|
||||
@ -1081,38 +1085,8 @@ out:
|
||||
}
|
||||
|
||||
case *lnwire.Error:
|
||||
key := p.addr.IdentityKey
|
||||
|
||||
switch {
|
||||
// In the case of an all-zero channel ID we want to
|
||||
// forward the error to all channels with this peer.
|
||||
case msg.ChanID == lnwire.ConnectionWideID:
|
||||
for chanID, chanStream := range chanMsgStreams {
|
||||
chanStream.AddMsg(nextMsg)
|
||||
|
||||
// Also marked this channel as failed,
|
||||
// so we won't try to restart it on
|
||||
// reconnect with this peer.
|
||||
p.failedChannels[chanID] = struct{}{}
|
||||
}
|
||||
|
||||
// If the channel ID for the error message corresponds
|
||||
// to a pending channel, then the funding manager will
|
||||
// handle the error.
|
||||
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key):
|
||||
p.server.fundingMgr.processFundingError(msg, key)
|
||||
|
||||
// If not we hand the error to the channel link for
|
||||
// this channel.
|
||||
default:
|
||||
isLinkUpdate = true
|
||||
targetChan = msg.ChanID
|
||||
|
||||
// Also marked this channel as failed, so we
|
||||
// won't try to restart it on reconnect with
|
||||
// this peer.
|
||||
p.failedChannels[targetChan] = struct{}{}
|
||||
}
|
||||
isLinkUpdate = p.handleError(msg)
|
||||
targetChan = msg.ChanID
|
||||
|
||||
case *lnwire.ChannelReestablish:
|
||||
isLinkUpdate = true
|
||||
@ -1142,13 +1116,13 @@ out:
|
||||
if isLinkUpdate {
|
||||
// If this is a channel update, then we need to feed it
|
||||
// into the channel's in-order message stream.
|
||||
chanStream, ok := chanMsgStreams[targetChan]
|
||||
chanStream, ok := p.activeMsgStreams[targetChan]
|
||||
if !ok {
|
||||
// If a stream hasn't yet been created, then
|
||||
// we'll do so, add it to the map, and finally
|
||||
// start it.
|
||||
chanStream = newChanMsgStream(p, targetChan)
|
||||
chanMsgStreams[targetChan] = chanStream
|
||||
p.activeMsgStreams[targetChan] = chanStream
|
||||
chanStream.Start()
|
||||
defer chanStream.Stop()
|
||||
}
|
||||
@ -1166,6 +1140,42 @@ out:
|
||||
peerLog.Tracef("readHandler for peer %v done", p)
|
||||
}
|
||||
|
||||
// handleError processes an error message read from the remote peer. The boolean
|
||||
// returns indicates whether the message should be delivered to a targeted peer.
|
||||
//
|
||||
// NOTE: This method should only be called from within the readHandler.
|
||||
func (p *peer) handleError(msg *lnwire.Error) bool {
|
||||
key := p.addr.IdentityKey
|
||||
|
||||
switch {
|
||||
|
||||
// In the case of an all-zero channel ID we want to forward the error to
|
||||
// all channels with this peer.
|
||||
case msg.ChanID == lnwire.ConnectionWideID:
|
||||
for chanID, chanStream := range p.activeMsgStreams {
|
||||
chanStream.AddMsg(msg)
|
||||
|
||||
// Also marked this channel as failed, so we won't try
|
||||
// to restart it on reconnect with this peer.
|
||||
p.failedChannels[chanID] = struct{}{}
|
||||
}
|
||||
return false
|
||||
|
||||
// If the channel ID for the error message corresponds to a pending
|
||||
// channel, then the funding manager will handle the error.
|
||||
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key):
|
||||
p.server.fundingMgr.processFundingError(msg, key)
|
||||
return false
|
||||
|
||||
// If not we hand the error to the channel link for this channel.
|
||||
default:
|
||||
// Mark this channel as failed, so we won't try to restart it on
|
||||
// reconnect with this peer.
|
||||
p.failedChannels[msg.ChanID] = struct{}{}
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// messageSummary returns a human-readable string that summarizes a
|
||||
// incoming/outgoing message. Not all messages will have a summary, only those
|
||||
// which have additional data that can be informative at a glance.
|
||||
|
Loading…
Reference in New Issue
Block a user