diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 66c225da..bce629d5 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -32,15 +32,6 @@ const ( expiryGraceDelta = 2 ) -var ( - // ErrInternalLinkFailure is a generic error returned to the remote - // party so as to obfuscate the true failure. - ErrInternalLinkFailure = errors.New("internal link failure") - - // ErrLinkShuttingDown signals that the link is shutting down. - ErrLinkShuttingDown = errors.New("link shutting down") -) - // ForwardingPolicy describes the set of constraints that a given ChannelLink // is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of // constraints will be consulted in order to ensure that adequate fees are @@ -179,6 +170,16 @@ type ChannelLinkConfig struct { // subscribed to new events. PreimageCache contractcourt.WitnessBeacon + // OnChannelFailure is a function closure that we'll call if the + // channel failed for some reason. Depending on the severity of the + // error, the closure potentially must force close this channel and + // disconnect the peer. + // + // NOTE: The method must return in order for the ChannelLink to be able + // to shut down properly. + OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID, + LinkFailureError) + // UpdateContractSignals is a function closure that we'll use to update // outside sub-systems with the latest signals for our inner Lightning // channel. These signals will notify the caller when the channel has @@ -770,7 +771,10 @@ func (l *channelLink) htlcManager() { if err != nil { l.errorf("unable to synchronize channel states: %v", err) if err != ErrLinkShuttingDown { - l.fail(err.Error()) + // TODO(halseth): must be revisted when + // data-loss protection is in. + l.fail(LinkFailureError{code: ErrSyncError}, + err.Error()) } return } @@ -787,8 +791,8 @@ func (l *channelLink) htlcManager() { // replay our forwarding packages to handle any htlcs that can be // processed locally, or need to be forwarded out to the switch. if err := l.resolveFwdPkgs(); err != nil { - l.errorf("unable to resolve fwd pkgs: %v", err) - l.fail(ErrInternalLinkFailure.Error()) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to resolve fwd pkgs: %v", err) return } @@ -876,7 +880,8 @@ out: } if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) break out } @@ -893,7 +898,8 @@ out: // update, waiting for the revocation window to open // up. if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) break out } @@ -1090,8 +1096,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { pkt.destRef, &closedCircuitRef, ); err != nil { - // TODO(roasbeef): broadcast on-chain - l.fail("unable to settle incoming HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to settle incoming HTLC: %v", err) return } @@ -1159,7 +1165,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // this is a settle request, then initiate an update. if l.batchCounter >= l.cfg.BatchSize || isSettle { if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) return } } @@ -1177,7 +1184,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // "settle" list in the event that we know the preimage. index, err := l.channel.ReceiveHTLC(msg) if err != nil { - l.fail("unable to handle upstream add HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream add HTLC: %v", err) return } @@ -1188,8 +1196,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { pre := msg.PaymentPreimage idx := msg.ID if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { - // TODO(roasbeef): broadcast on-chain - l.fail("unable to handle upstream settle HTLC: %v", err) + l.fail( + LinkFailureError{ + code: ErrInvalidUpdate, + ForceClose: true, + }, + "unable to handle upstream settle HTLC: %v", err, + ) return } @@ -1243,7 +1256,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // message to the usual HTLC fail message. err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes()) if err != nil { - l.fail("unable to handle upstream fail HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) return } @@ -1251,7 +1265,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { idx := msg.ID err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:]) if err != nil { - l.fail("unable to handle upstream fail HTLC: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "unable to handle upstream fail HTLC: %v", err) return } @@ -1265,28 +1280,23 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // commitment, then we'll examine the type of error. If // it's an InvalidCommitSigError, then we'll send a // direct error. - // - // TODO(roasbeef): force close chan - var sendErr bool + var sendData []byte switch err.(type) { case *lnwallet.InvalidCommitSigError: - sendErr = true + sendData = []byte(err.Error()) case *lnwallet.InvalidHtlcSigError: - sendErr = true + sendData = []byte(err.Error()) } - if sendErr { - err := l.cfg.Peer.SendMessage(&lnwire.Error{ - ChanID: l.ChanID(), - Data: []byte(err.Error()), - }, true) - if err != nil { - l.errorf("unable to send msg to "+ - "remote peer: %v", err) - } - } - - l.fail("ChannelPoint(%v): unable to accept new "+ - "commitment: %v", l.channel.ChannelPoint(), err) + l.fail( + LinkFailureError{ + code: ErrInvalidCommitment, + ForceClose: true, + SendData: sendData, + }, + "ChannelPoint(%v): unable to accept new "+ + "commitment: %v", + l.channel.ChannelPoint(), err, + ) return } @@ -1335,7 +1345,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // so we'll reply with a signature to provide them with their // version of the latest commitment. if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) return } @@ -1345,7 +1356,9 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // revocation window. fwdPkg, adds, settleFails, err := l.channel.ReceiveRevocation(msg) if err != nil { - l.fail("unable to accept revocation: %v", err) + // TODO(halseth): force close? + l.fail(LinkFailureError{code: ErrInvalidRevocation}, + "unable to accept revocation: %v", err) return } @@ -1354,7 +1367,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { needUpdate := l.processRemoteAdds(fwdPkg, adds) if needUpdate { if err := l.updateCommitTx(); err != nil { - l.fail("unable to update commitment: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to update commitment: %v", err) return } } @@ -1364,7 +1378,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // will fail the channel, if not we will apply the update. fee := lnwallet.SatPerKWeight(msg.FeePerKw) if err := l.channel.ReceiveUpdateFee(fee); err != nil { - l.fail("error receiving fee update: %v", err) + l.fail(LinkFailureError{code: ErrInvalidUpdate}, + "error receiving fee update: %v", err) return } case *lnwire.Error: @@ -1375,7 +1390,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { if isASCII(msg.Data) { errMsg = string(msg.Data) } - l.fail("ChannelPoint(%v): received error from peer: %v", + l.fail(LinkFailureError{code: ErrRemoteError}, + "ChannelPoint(%v): received error from peer: %v", l.channel.ChannelPoint(), errMsg) default: log.Warnf("ChannelPoint(%v): received unknown message of type %T", @@ -1936,8 +1952,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, fwdPkg.ID(), decodeReqs, ) if sphinxErr != nil { - l.errorf("unable to decode hop iterators: %v", sphinxErr) - l.fail(ErrInternalLinkFailure.Error()) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to decode hop iterators: %v", sphinxErr) return false } @@ -2174,7 +2190,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, err = l.channel.SettleHTLC(preimage, pd.HtlcIndex, pd.SourceRef, nil, nil) if err != nil { - l.fail("unable to settle htlc: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to settle htlc: %v", err) return false } @@ -2182,7 +2199,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // settled with this latest commitment update. err = l.cfg.Registry.SettleInvoice(invoiceHash) if err != nil { - l.fail("unable to settle invoice: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to settle invoice: %v", err) return false } @@ -2312,8 +2330,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, l.ShortChanID(), ) if err != nil { - l.fail("unable to create channel update "+ - "while handling the error: %v", err) + l.fail(LinkFailureError{ + code: ErrInternalError}, + "unable to create channel "+ + "update while handling "+ + "the error: %v", err) return false } @@ -2398,7 +2419,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, if fwdPkg.State == channeldb.FwdStateLockedIn { err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter) if err != nil { - l.fail("unable to set fwd filter: %v", err) + l.fail(LinkFailureError{code: ErrInternalError}, + "unable to set fwd filter: %v", err) return false } } @@ -2503,12 +2525,16 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64, }, false) } -// fail helper function which is used to encapsulate the action necessary for -// proper disconnect. -func (l *channelLink) fail(format string, a ...interface{}) { +// fail is a function which is used to encapsulate the action necessary for +// properly failing the link. It takes a LinkFailureError, which will be passed +// to the OnChannelFailure closure, in order for it to determine if we should +// force close the channel, and if we should send an error message to the +// remote peer. +func (l *channelLink) fail(linkErr LinkFailureError, + format string, a ...interface{}) { reason := errors.Errorf(format, a...) - log.Error(reason) - go l.cfg.Peer.Disconnect(reason) + l.errorf("Failing link: %s", reason) + l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr) } // infof prefixes the channel's identifier before printing to info log.