peer: ensure link failures are processed in peer life cycle

This commit is contained in:
Conner Fromknecht 2018-08-30 16:54:53 -07:00
parent 2f1b024679
commit 48dc38d9f9
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7

160
peer.go

@ -148,6 +148,10 @@ type peer struct {
// a particular channel are sent over. // a particular channel are sent over.
localCloseChanReqs chan *htlcswitch.ChanClose localCloseChanReqs chan *htlcswitch.ChanClose
// linkFailures receives all reported channel failures from the switch,
// and instructs the channelManager to clean remaining channel state.
linkFailures chan linkFailureReport
// chanCloseMsgs is a channel that any message related to channel // chanCloseMsgs is a channel that any message related to channel
// closures are sent over. This includes lnwire.Shutdown message as // closures are sent over. This includes lnwire.Shutdown message as
// well as lnwire.ClosingSigned messages. // well as lnwire.ClosingSigned messages.
@ -216,6 +220,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
activeChanCloses: make(map[lnwire.ChannelID]*channelCloser), activeChanCloses: make(map[lnwire.ChannelID]*channelCloser),
localCloseChanReqs: make(chan *htlcswitch.ChanClose), localCloseChanReqs: make(chan *htlcswitch.ChanClose),
linkFailures: make(chan linkFailureReport),
chanCloseMsgs: make(chan *closeMsg), chanCloseMsgs: make(chan *closeMsg),
failedChannels: make(map[lnwire.ChannelID]struct{}), failedChannels: make(map[lnwire.ChannelID]struct{}),
@ -481,80 +486,18 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
shortChanID lnwire.ShortChannelID, shortChanID lnwire.ShortChannelID,
linkErr htlcswitch.LinkFailureError) { linkErr htlcswitch.LinkFailureError) {
failure := linkFailureReport{
chanPoint: *chanPoint,
chanID: chanID,
shortChanID: shortChanID,
linkErr: linkErr,
}
select { select {
// If the server is already exiting, then none of the actions case p.linkFailures <- failure:
// below can finish exiting, so we'll exit early as well. case <-p.quit:
case <-p.server.quit: case <-p.server.quit:
return
default:
} }
// The link has notified us about a failure. We launch a go
// routine to stop the link, disconnect the peer and optionally
// force close the channel. We must launch a goroutine since we
// must let OnChannelFailure return in order for the link to
// completely stop in the call to RemoveLink.
p.server.wg.Add(1)
go func() {
defer p.server.wg.Done()
// We begin by removing the link from the switch, such
// that it won't be attempted used for any more
// updates.
// TODO(halseth): should introduce a way to atomically
// stop/pause the link and cancel back any adds in its
// mailboxes such that we can safely force close
// without the link being added again and updates being
// applied.
p.server.htlcSwitch.RemoveLink(chanID)
// If the error encountered was severe enough, we'll
// now force close the channel.
if linkErr.ForceClose {
peerLog.Warnf("Force closing link(%v)",
shortChanID)
closeTx, err := p.server.chainArb.ForceCloseContract(*chanPoint)
if err != nil {
peerLog.Errorf("unable to force close "+
"link(%v): %v", shortChanID,
err)
} else {
peerLog.Infof("channel(%v) force "+
"closed with txid %v",
shortChanID, closeTx.TxHash())
}
}
// Send an error to the peer, why we failed the
// channel.
if linkErr.ShouldSendToPeer() {
// If SendData is set, send it to the peer. If
// not, we'll use the standard error messages
// in the payload. We only include sendData in
// the cases where the error data does not
// contain sensitive information.
data := []byte(linkErr.Error())
if linkErr.SendData != nil {
data = linkErr.SendData
}
err := p.SendMessage(true, &lnwire.Error{
ChanID: chanID,
Data: data,
})
if err != nil {
peerLog.Errorf("unable to send msg to "+
"remote peer: %v", err)
}
}
// Initiate disconnection.
// TODO(halseth): consider not disconnecting the peer,
// as we might still have other active channels with
// the same peer.
p.Disconnect(linkErr)
}()
} }
linkCfg := htlcswitch.ChannelLinkConfig{ linkCfg := htlcswitch.ChannelLinkConfig{
@ -1625,6 +1568,12 @@ out:
case req := <-p.localCloseChanReqs: case req := <-p.localCloseChanReqs:
p.handleLocalCloseReq(req) p.handleLocalCloseReq(req)
// We've received a link failure from a link that was added to
// the switch. This will initiate the teardown of the link, and
// initiate any on-chain closures if necessary.
case failure := <-p.linkFailures:
p.handleLinkFailure(failure)
// We've received a new cooperative channel closure related // We've received a new cooperative channel closure related
// message from the remote peer, we'll use this message to // message from the remote peer, we'll use this message to
// advance the chan closer state machine. // advance the chan closer state machine.
@ -1873,6 +1822,75 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
} }
} }
// linkFailureReport is sent to the channelManager whenever a link that was
// added to the switch reports a link failure, and is forced to exit. The report
// houses the necessary information to cleanup the channel state, send back the
// error message, and force close if necessary.
type linkFailureReport struct {
chanPoint wire.OutPoint
chanID lnwire.ChannelID
shortChanID lnwire.ShortChannelID
linkErr htlcswitch.LinkFailureError
}
// handleLinkFailure processes a link failure report when a link in the switch
// fails. It handles facilitates removal of all channel state within the peer,
// force closing the channel depending on severity, and sending the error
// message back to the remote party.
func (p *peer) handleLinkFailure(failure linkFailureReport) {
// We begin by wiping the link, which will remove it from the switch,
// such that it won't be attempted used for any more updates.
//
// TODO(halseth): should introduce a way to atomically stop/pause the
// link and cancel back any adds in its mailboxes such that we can
// safely force close without the link being added again and updates
// being applied.
if err := p.WipeChannel(&failure.chanPoint); err != nil {
peerLog.Errorf("Unable to wipe link for chanpoint=%v",
failure.chanPoint)
return
}
// If the error encountered was severe enough, we'll now force close the
// channel to prevent readding it to the switch in the future.
if failure.linkErr.ForceClose {
peerLog.Warnf("Force closing link(%v)",
failure.shortChanID)
closeTx, err := p.server.chainArb.ForceCloseContract(
failure.chanPoint,
)
if err != nil {
peerLog.Errorf("unable to force close "+
"link(%v): %v", failure.shortChanID, err)
} else {
peerLog.Infof("channel(%v) force "+
"closed with txid %v",
failure.shortChanID, closeTx.TxHash())
}
}
// Send an error to the peer, why we failed the channel.
if failure.linkErr.ShouldSendToPeer() {
// If SendData is set, send it to the peer. If not, we'll use
// the standard error messages in the payload. We only include
// sendData in the cases where the error data does not contain
// sensitive information.
data := []byte(failure.linkErr.Error())
if failure.linkErr.SendData != nil {
data = failure.linkErr.SendData
}
err := p.SendMessage(true, &lnwire.Error{
ChanID: failure.chanID,
Data: data,
})
if err != nil {
peerLog.Errorf("unable to send msg to "+
"remote peer: %v", err)
}
}
}
// finalizeChanClosure performs the final clean up steps once the cooperative // finalizeChanClosure performs the final clean up steps once the cooperative
// closure transaction has been fully broadcast. The finalized closing state // closure transaction has been fully broadcast. The finalized closing state
// machine should be passed in. Once the transaction has been sufficiently // machine should be passed in. Once the transaction has been sufficiently