htlcswitch/link: make fail() call OnChannelFailure with LinkFailureError

Adds a new closure OnChannelFailure to the link config, which is called
when the link fails. This function closure should use the given
LinkFailureError to properly force close the channel, send an error to
the peer, and disconnect the peer.
This commit is contained in:
Johan T. Halseth 2018-05-09 15:49:58 +02:00
parent 92afcbe3f4
commit fbec83699c
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -32,15 +32,6 @@ const (
expiryGraceDelta = 2 expiryGraceDelta = 2
) )
var (
// ErrInternalLinkFailure is a generic error returned to the remote
// party so as to obfuscate the true failure.
ErrInternalLinkFailure = errors.New("internal link failure")
// ErrLinkShuttingDown signals that the link is shutting down.
ErrLinkShuttingDown = errors.New("link shutting down")
)
// ForwardingPolicy describes the set of constraints that a given ChannelLink // ForwardingPolicy describes the set of constraints that a given ChannelLink
// is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of // is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of
// constraints will be consulted in order to ensure that adequate fees are // constraints will be consulted in order to ensure that adequate fees are
@ -179,6 +170,16 @@ type ChannelLinkConfig struct {
// subscribed to new events. // subscribed to new events.
PreimageCache contractcourt.WitnessBeacon PreimageCache contractcourt.WitnessBeacon
// OnChannelFailure is a function closure that we'll call if the
// channel failed for some reason. Depending on the severity of the
// error, the closure potentially must force close this channel and
// disconnect the peer.
//
// NOTE: The method must return in order for the ChannelLink to be able
// to shut down properly.
OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
LinkFailureError)
// UpdateContractSignals is a function closure that we'll use to update // UpdateContractSignals is a function closure that we'll use to update
// outside sub-systems with the latest signals for our inner Lightning // outside sub-systems with the latest signals for our inner Lightning
// channel. These signals will notify the caller when the channel has // channel. These signals will notify the caller when the channel has
@ -770,7 +771,10 @@ func (l *channelLink) htlcManager() {
if err != nil { if err != nil {
l.errorf("unable to synchronize channel states: %v", err) l.errorf("unable to synchronize channel states: %v", err)
if err != ErrLinkShuttingDown { if err != ErrLinkShuttingDown {
l.fail(err.Error()) // TODO(halseth): must be revisted when
// data-loss protection is in.
l.fail(LinkFailureError{code: ErrSyncError},
err.Error())
} }
return return
} }
@ -787,8 +791,8 @@ func (l *channelLink) htlcManager() {
// replay our forwarding packages to handle any htlcs that can be // replay our forwarding packages to handle any htlcs that can be
// processed locally, or need to be forwarded out to the switch. // processed locally, or need to be forwarded out to the switch.
if err := l.resolveFwdPkgs(); err != nil { if err := l.resolveFwdPkgs(); err != nil {
l.errorf("unable to resolve fwd pkgs: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
l.fail(ErrInternalLinkFailure.Error()) "unable to resolve fwd pkgs: %v", err)
return return
} }
@ -876,7 +880,8 @@ out:
} }
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
break out break out
} }
@ -893,7 +898,8 @@ out:
// update, waiting for the revocation window to open // update, waiting for the revocation window to open
// up. // up.
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
break out break out
} }
@ -1090,8 +1096,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
pkt.destRef, pkt.destRef,
&closedCircuitRef, &closedCircuitRef,
); err != nil { ); err != nil {
// TODO(roasbeef): broadcast on-chain l.fail(LinkFailureError{code: ErrInternalError},
l.fail("unable to settle incoming HTLC: %v", err) "unable to settle incoming HTLC: %v", err)
return return
} }
@ -1159,7 +1165,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// this is a settle request, then initiate an update. // this is a settle request, then initiate an update.
if l.batchCounter >= l.cfg.BatchSize || isSettle { if l.batchCounter >= l.cfg.BatchSize || isSettle {
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return return
} }
} }
@ -1177,7 +1184,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// "settle" list in the event that we know the preimage. // "settle" list in the event that we know the preimage.
index, err := l.channel.ReceiveHTLC(msg) index, err := l.channel.ReceiveHTLC(msg)
if err != nil { if err != nil {
l.fail("unable to handle upstream add HTLC: %v", err) l.fail(LinkFailureError{code: ErrInvalidUpdate},
"unable to handle upstream add HTLC: %v", err)
return return
} }
@ -1188,8 +1196,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
pre := msg.PaymentPreimage pre := msg.PaymentPreimage
idx := msg.ID idx := msg.ID
if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
// TODO(roasbeef): broadcast on-chain l.fail(
l.fail("unable to handle upstream settle HTLC: %v", err) LinkFailureError{
code: ErrInvalidUpdate,
ForceClose: true,
},
"unable to handle upstream settle HTLC: %v", err,
)
return return
} }
@ -1243,7 +1256,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// message to the usual HTLC fail message. // message to the usual HTLC fail message.
err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
if err != nil { if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err) l.fail(LinkFailureError{code: ErrInvalidUpdate},
"unable to handle upstream fail HTLC: %v", err)
return return
} }
@ -1251,7 +1265,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
idx := msg.ID idx := msg.ID
err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
if err != nil { if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err) l.fail(LinkFailureError{code: ErrInvalidUpdate},
"unable to handle upstream fail HTLC: %v", err)
return return
} }
@ -1265,28 +1280,23 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// commitment, then we'll examine the type of error. If // commitment, then we'll examine the type of error. If
// it's an InvalidCommitSigError, then we'll send a // it's an InvalidCommitSigError, then we'll send a
// direct error. // direct error.
// var sendData []byte
// TODO(roasbeef): force close chan
var sendErr bool
switch err.(type) { switch err.(type) {
case *lnwallet.InvalidCommitSigError: case *lnwallet.InvalidCommitSigError:
sendErr = true sendData = []byte(err.Error())
case *lnwallet.InvalidHtlcSigError: case *lnwallet.InvalidHtlcSigError:
sendErr = true sendData = []byte(err.Error())
} }
if sendErr { l.fail(
err := l.cfg.Peer.SendMessage(&lnwire.Error{ LinkFailureError{
ChanID: l.ChanID(), code: ErrInvalidCommitment,
Data: []byte(err.Error()), ForceClose: true,
}, true) SendData: sendData,
if err != nil { },
l.errorf("unable to send msg to "+ "ChannelPoint(%v): unable to accept new "+
"remote peer: %v", err) "commitment: %v",
} l.channel.ChannelPoint(), err,
} )
l.fail("ChannelPoint(%v): unable to accept new "+
"commitment: %v", l.channel.ChannelPoint(), err)
return return
} }
@ -1335,7 +1345,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// so we'll reply with a signature to provide them with their // so we'll reply with a signature to provide them with their
// version of the latest commitment. // version of the latest commitment.
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return return
} }
@ -1345,7 +1356,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// revocation window. // revocation window.
fwdPkg, adds, settleFails, err := l.channel.ReceiveRevocation(msg) fwdPkg, adds, settleFails, err := l.channel.ReceiveRevocation(msg)
if err != nil { if err != nil {
l.fail("unable to accept revocation: %v", err) // TODO(halseth): force close?
l.fail(LinkFailureError{code: ErrInvalidRevocation},
"unable to accept revocation: %v", err)
return return
} }
@ -1354,7 +1367,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
needUpdate := l.processRemoteAdds(fwdPkg, adds) needUpdate := l.processRemoteAdds(fwdPkg, adds)
if needUpdate { if needUpdate {
if err := l.updateCommitTx(); err != nil { if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return return
} }
} }
@ -1364,7 +1378,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// will fail the channel, if not we will apply the update. // will fail the channel, if not we will apply the update.
fee := lnwallet.SatPerKWeight(msg.FeePerKw) fee := lnwallet.SatPerKWeight(msg.FeePerKw)
if err := l.channel.ReceiveUpdateFee(fee); err != nil { if err := l.channel.ReceiveUpdateFee(fee); err != nil {
l.fail("error receiving fee update: %v", err) l.fail(LinkFailureError{code: ErrInvalidUpdate},
"error receiving fee update: %v", err)
return return
} }
case *lnwire.Error: case *lnwire.Error:
@ -1375,7 +1390,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
if isASCII(msg.Data) { if isASCII(msg.Data) {
errMsg = string(msg.Data) errMsg = string(msg.Data)
} }
l.fail("ChannelPoint(%v): received error from peer: %v", l.fail(LinkFailureError{code: ErrRemoteError},
"ChannelPoint(%v): received error from peer: %v",
l.channel.ChannelPoint(), errMsg) l.channel.ChannelPoint(), errMsg)
default: default:
log.Warnf("ChannelPoint(%v): received unknown message of type %T", log.Warnf("ChannelPoint(%v): received unknown message of type %T",
@ -1936,8 +1952,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
fwdPkg.ID(), decodeReqs, fwdPkg.ID(), decodeReqs,
) )
if sphinxErr != nil { if sphinxErr != nil {
l.errorf("unable to decode hop iterators: %v", sphinxErr) l.fail(LinkFailureError{code: ErrInternalError},
l.fail(ErrInternalLinkFailure.Error()) "unable to decode hop iterators: %v", sphinxErr)
return false return false
} }
@ -2174,7 +2190,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
err = l.channel.SettleHTLC(preimage, err = l.channel.SettleHTLC(preimage,
pd.HtlcIndex, pd.SourceRef, nil, nil) pd.HtlcIndex, pd.SourceRef, nil, nil)
if err != nil { if err != nil {
l.fail("unable to settle htlc: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to settle htlc: %v", err)
return false return false
} }
@ -2182,7 +2199,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// settled with this latest commitment update. // settled with this latest commitment update.
err = l.cfg.Registry.SettleInvoice(invoiceHash) err = l.cfg.Registry.SettleInvoice(invoiceHash)
if err != nil { if err != nil {
l.fail("unable to settle invoice: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to settle invoice: %v", err)
return false return false
} }
@ -2312,8 +2330,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
l.ShortChanID(), l.ShortChanID(),
) )
if err != nil { if err != nil {
l.fail("unable to create channel update "+ l.fail(LinkFailureError{
"while handling the error: %v", err) code: ErrInternalError},
"unable to create channel "+
"update while handling "+
"the error: %v", err)
return false return false
} }
@ -2398,7 +2419,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
if fwdPkg.State == channeldb.FwdStateLockedIn { if fwdPkg.State == channeldb.FwdStateLockedIn {
err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter) err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
if err != nil { if err != nil {
l.fail("unable to set fwd filter: %v", err) l.fail(LinkFailureError{code: ErrInternalError},
"unable to set fwd filter: %v", err)
return false return false
} }
} }
@ -2503,12 +2525,16 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
}, false) }, false)
} }
// fail helper function which is used to encapsulate the action necessary for // fail is a function which is used to encapsulate the action necessary for
// proper disconnect. // properly failing the link. It takes a LinkFailureError, which will be passed
func (l *channelLink) fail(format string, a ...interface{}) { // to the OnChannelFailure closure, in order for it to determine if we should
// force close the channel, and if we should send an error message to the
// remote peer.
func (l *channelLink) fail(linkErr LinkFailureError,
format string, a ...interface{}) {
reason := errors.Errorf(format, a...) reason := errors.Errorf(format, a...)
log.Error(reason) l.errorf("Failing link: %s", reason)
go l.cfg.Peer.Disconnect(reason) l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
} }
// infof prefixes the channel's identifier before printing to info log. // infof prefixes the channel's identifier before printing to info log.