Merge pull request #1213 from halseth/fail-channel

[htlcswitch] Optionally force close channel on link failure
This commit is contained in:
Olaoluwa Osuntokun 2018-05-25 17:32:05 -07:00 committed by GitHub
commit dd2859dc61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 864 additions and 166 deletions

@ -85,8 +85,10 @@ type ChannelArbitratorConfig struct {
// ForceCloseChan should force close the contract that this attendant
// is watching over. We'll use this when we decide that we need to go
// to chain. The returned summary contains all items needed to
// eventually resolve all outputs on chain.
// to chain. It should in addition tell the switch to remove the
// corresponding link, such that we won't accept any new updates. The
// returned summary contains all items needed to eventually resolve all
// outputs on chain.
ForceCloseChan func() (*lnwallet.LocalForceCloseSummary, error)
// MarkCommitmentBroadcasted should mark the channel as the commitment
@ -434,9 +436,10 @@ func (c *ChannelArbitrator) stateStep(triggerHeight uint32,
// Now that we have all the actions decided for the set of
// HTLC's, we'll broadcast the commitment transaction, and
// signal the link to exit.
//
// TODO(roasbeef): need to report to switch that channel is
// inactive, should close link
// We'll tell the switch that it should remove the link for
// this channel, in addition to fetching the force close
// summary needed to close this channel on chain.
closeSummary, err := c.cfg.ForceCloseChan()
if err != nil {
log.Errorf("ChannelArbitrator(%v): unable to "+

@ -21,6 +21,8 @@ type Config struct {
FailOutgoing bool `long:"fail-outgoing" description:"Instructs the node to drop outgoing FAILs before applying them to the channel state"`
Commit bool `long:"commit" description:"Instructs the node to add HTLCs to its local commitment state and to open circuits for any ADDs, but abort before committing the changes"`
BogusSettle bool `long:"bogus-settle" description:"Instructs the node to settle back any incoming HTLC with a bogus preimage"`
}
// Mask extracts the flags specified in the configuration, composing a Mask from
@ -52,6 +54,9 @@ func (c *Config) Mask() Mask {
if c.Commit {
flags = append(flags, Commit)
}
if c.BogusSettle {
flags = append(flags, BogusSettle)
}
// NOTE: The value returned here will only honor the configuration if
// the debug build flag is present. In production, this method always

@ -51,6 +51,10 @@ const (
// Commit drops all HTLC after any outgoing circuits have been
// opened, but before the in-memory commitment state is persisted.
Commit
// BogusSettle attempts to settle back any incoming HTLC for which we
// are the exit node with a bogus preimage.
BogusSettle
)
// String returns a human-readable identifier for a given Flag.
@ -72,6 +76,8 @@ func (f Flag) String() string {
return "FailOutgoing"
case Commit:
return "Commit"
case BogusSettle:
return "BogusSettle"
default:
return "UnknownHodlFlag"
}
@ -98,6 +104,8 @@ func (f Flag) Warning() string {
msg = "will not update channel state with downstream FAIL"
case Commit:
msg = "will not commit pending channel updates"
case BogusSettle:
msg = "will settle HTLC with bogus preimage"
default:
msg = "incorrect hodl flag usage"
}

@ -67,6 +67,7 @@ var hodlMaskTests = []struct {
hodl.SettleOutgoing,
hodl.FailOutgoing,
hodl.Commit,
hodl.BogusSettle,
),
flags: map[hodl.Flag]struct{}{
hodl.ExitSettle: {},
@ -77,6 +78,7 @@ var hodlMaskTests = []struct {
hodl.SettleOutgoing: {},
hodl.FailOutgoing: {},
hodl.Commit: {},
hodl.BogusSettle: {},
},
},
}

@ -130,10 +130,6 @@ type Peer interface {
// PubKey returns the serialize public key of the source peer.
PubKey() [33]byte
// Disconnect disconnects with peer if we have error which we can't
// properly handle.
Disconnect(reason error)
}
// ForwardingLog is an interface that represents a time series database which

@ -32,15 +32,6 @@ const (
expiryGraceDelta = 2
)
var (
// ErrInternalLinkFailure is a generic error returned to the remote
// party so as to obfuscate the true failure.
ErrInternalLinkFailure = errors.New("internal link failure")
// ErrLinkShuttingDown signals that the link is shutting down.
ErrLinkShuttingDown = errors.New("link shutting down")
)
// ForwardingPolicy describes the set of constraints that a given ChannelLink
// is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of
// constraints will be consulted in order to ensure that adequate fees are
@ -179,6 +170,16 @@ type ChannelLinkConfig struct {
// subscribed to new events.
PreimageCache contractcourt.WitnessBeacon
// OnChannelFailure is a function closure that we'll call if the
// channel failed for some reason. Depending on the severity of the
// error, the closure potentially must force close this channel and
// disconnect the peer.
//
// NOTE: The method must return in order for the ChannelLink to be able
// to shut down properly.
OnChannelFailure func(lnwire.ChannelID, lnwire.ShortChannelID,
LinkFailureError)
// UpdateContractSignals is a function closure that we'll use to update
// outside sub-systems with the latest signals for our inner Lightning
// channel. These signals will notify the caller when the channel has
@ -253,6 +254,10 @@ type channelLink struct {
started int32
shutdown int32
// failed should be set to true in case a link error happens, making
// sure we don't process any more updates.
failed bool
// batchCounter is the number of updates which we received from remote
// side, but not include in commitment transaction yet and plus the
// current number of settles that have been sent, but not yet committed
@ -694,6 +699,14 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
fwdPkg.Source, fwdPkg.Height, fwdPkg.Adds,
)
needUpdate = l.processRemoteAdds(fwdPkg, adds)
// If the link failed during processing the adds, we must
// return to ensure we won't attempted to update the state
// further.
if l.failed {
return false, fmt.Errorf("link failed while " +
"processing remote adds")
}
}
return needUpdate, nil
@ -770,7 +783,10 @@ func (l *channelLink) htlcManager() {
if err != nil {
l.errorf("unable to synchronize channel states: %v", err)
if err != ErrLinkShuttingDown {
l.fail(err.Error())
// TODO(halseth): must be revisted when
// data-loss protection is in.
l.fail(LinkFailureError{code: ErrSyncError},
err.Error())
}
return
}
@ -787,8 +803,8 @@ func (l *channelLink) htlcManager() {
// replay our forwarding packages to handle any htlcs that can be
// processed locally, or need to be forwarded out to the switch.
if err := l.resolveFwdPkgs(); err != nil {
l.errorf("unable to resolve fwd pkgs: %v", err)
l.fail(ErrInternalLinkFailure.Error())
l.fail(LinkFailureError{code: ErrInternalError},
"unable to resolve fwd pkgs: %v", err)
return
}
@ -801,9 +817,16 @@ func (l *channelLink) htlcManager() {
batchTick := l.cfg.BatchTicker.Start()
defer l.cfg.BatchTicker.Stop()
// TODO(roasbeef): fail chan in case of protocol violation
out:
for {
// We must always check if we failed at some point processing
// the last update before processing the next.
if l.failed {
l.errorf("link failed, exiting htlcManager")
break out
}
select {
// A new block has arrived, we'll check the network fee to see
@ -876,7 +899,8 @@ out:
}
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
break out
}
@ -893,7 +917,8 @@ out:
// update, waiting for the revocation window to open
// up.
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
break out
}
@ -1090,8 +1115,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
pkt.destRef,
&closedCircuitRef,
); err != nil {
// TODO(roasbeef): broadcast on-chain
l.fail("unable to settle incoming HTLC: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to settle incoming HTLC: %v", err)
return
}
@ -1159,7 +1184,8 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
// this is a settle request, then initiate an update.
if l.batchCounter >= l.cfg.BatchSize || isSettle {
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return
}
}
@ -1177,7 +1203,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// "settle" list in the event that we know the preimage.
index, err := l.channel.ReceiveHTLC(msg)
if err != nil {
l.fail("unable to handle upstream add HTLC: %v", err)
l.fail(LinkFailureError{code: ErrInvalidUpdate},
"unable to handle upstream add HTLC: %v", err)
return
}
@ -1188,8 +1215,13 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
pre := msg.PaymentPreimage
idx := msg.ID
if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
// TODO(roasbeef): broadcast on-chain
l.fail("unable to handle upstream settle HTLC: %v", err)
l.fail(
LinkFailureError{
code: ErrInvalidUpdate,
ForceClose: true,
},
"unable to handle upstream settle HTLC: %v", err,
)
return
}
@ -1243,7 +1275,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// message to the usual HTLC fail message.
err := l.channel.ReceiveFailHTLC(msg.ID, b.Bytes())
if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err)
l.fail(LinkFailureError{code: ErrInvalidUpdate},
"unable to handle upstream fail HTLC: %v", err)
return
}
@ -1251,7 +1284,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
idx := msg.ID
err := l.channel.ReceiveFailHTLC(idx, msg.Reason[:])
if err != nil {
l.fail("unable to handle upstream fail HTLC: %v", err)
l.fail(LinkFailureError{code: ErrInvalidUpdate},
"unable to handle upstream fail HTLC: %v", err)
return
}
@ -1265,28 +1299,23 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// commitment, then we'll examine the type of error. If
// it's an InvalidCommitSigError, then we'll send a
// direct error.
//
// TODO(roasbeef): force close chan
var sendErr bool
var sendData []byte
switch err.(type) {
case *lnwallet.InvalidCommitSigError:
sendErr = true
sendData = []byte(err.Error())
case *lnwallet.InvalidHtlcSigError:
sendErr = true
sendData = []byte(err.Error())
}
if sendErr {
err := l.cfg.Peer.SendMessage(&lnwire.Error{
ChanID: l.ChanID(),
Data: []byte(err.Error()),
}, true)
if err != nil {
l.errorf("unable to send msg to "+
"remote peer: %v", err)
}
}
l.fail("ChannelPoint(%v): unable to accept new "+
"commitment: %v", l.channel.ChannelPoint(), err)
l.fail(
LinkFailureError{
code: ErrInvalidCommitment,
ForceClose: true,
SendData: sendData,
},
"ChannelPoint(%v): unable to accept new "+
"commitment: %v",
l.channel.ChannelPoint(), err,
)
return
}
@ -1335,7 +1364,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// so we'll reply with a signature to provide them with their
// version of the latest commitment.
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return
}
@ -1345,16 +1375,26 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// revocation window.
fwdPkg, adds, settleFails, err := l.channel.ReceiveRevocation(msg)
if err != nil {
l.fail("unable to accept revocation: %v", err)
// TODO(halseth): force close?
l.fail(LinkFailureError{code: ErrInvalidRevocation},
"unable to accept revocation: %v", err)
return
}
l.processRemoteSettleFails(fwdPkg, settleFails)
needUpdate := l.processRemoteAdds(fwdPkg, adds)
// If the link failed during processing the adds, we must
// return to ensure we won't attempted to update the state
// further.
if l.failed {
return
}
if needUpdate {
if err := l.updateCommitTx(); err != nil {
l.fail("unable to update commitment: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to update commitment: %v", err)
return
}
}
@ -1364,7 +1404,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// will fail the channel, if not we will apply the update.
fee := lnwallet.SatPerKWeight(msg.FeePerKw)
if err := l.channel.ReceiveUpdateFee(fee); err != nil {
l.fail("error receiving fee update: %v", err)
l.fail(LinkFailureError{code: ErrInvalidUpdate},
"error receiving fee update: %v", err)
return
}
case *lnwire.Error:
@ -1375,7 +1416,8 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
if isASCII(msg.Data) {
errMsg = string(msg.Data)
}
l.fail("ChannelPoint(%v): received error from peer: %v",
l.fail(LinkFailureError{code: ErrRemoteError},
"ChannelPoint(%v): received error from peer: %v",
l.channel.ChannelPoint(), errMsg)
default:
log.Warnf("ChannelPoint(%v): received unknown message of type %T",
@ -1936,8 +1978,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
fwdPkg.ID(), decodeReqs,
)
if sphinxErr != nil {
l.errorf("unable to decode hop iterators: %v", sphinxErr)
l.fail(ErrInternalLinkFailure.Error())
l.fail(LinkFailureError{code: ErrInternalError},
"unable to decode hop iterators: %v", sphinxErr)
return false
}
@ -2174,7 +2216,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
err = l.channel.SettleHTLC(preimage,
pd.HtlcIndex, pd.SourceRef, nil, nil)
if err != nil {
l.fail("unable to settle htlc: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to settle htlc: %v", err)
return false
}
@ -2182,12 +2225,23 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
// settled with this latest commitment update.
err = l.cfg.Registry.SettleInvoice(invoiceHash)
if err != nil {
l.fail("unable to settle invoice: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to settle invoice: %v", err)
return false
}
l.infof("settling %x as exit hop", pd.RHash)
// If the link is in hodl.BogusSettle mode, replace the
// preimage with a fake one before sending it to the
// peer.
if l.cfg.DebugHTLC &&
l.cfg.HodlMask.Active(hodl.BogusSettle) {
l.warnf(hodl.BogusSettle.Warning())
preimage = [32]byte{}
copy(preimage[:], bytes.Repeat([]byte{2}, 32))
}
// HTLC was successfully settled locally send
// notification about it remote peer.
l.cfg.Peer.SendMessage(&lnwire.UpdateFulfillHTLC{
@ -2312,8 +2366,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
l.ShortChanID(),
)
if err != nil {
l.fail("unable to create channel update "+
"while handling the error: %v", err)
l.fail(LinkFailureError{
code: ErrInternalError},
"unable to create channel "+
"update while handling "+
"the error: %v", err)
return false
}
@ -2398,7 +2455,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
if fwdPkg.State == channeldb.FwdStateLockedIn {
err := l.channel.SetFwdFilter(fwdPkg.Height, fwdPkg.FwdFilter)
if err != nil {
l.fail("unable to set fwd filter: %v", err)
l.fail(LinkFailureError{code: ErrInternalError},
"unable to set fwd filter: %v", err)
return false
}
}
@ -2503,12 +2561,28 @@ func (l *channelLink) sendMalformedHTLCError(htlcIndex uint64,
}, false)
}
// fail helper function which is used to encapsulate the action necessary for
// proper disconnect.
func (l *channelLink) fail(format string, a ...interface{}) {
// fail is a function which is used to encapsulate the action necessary for
// properly failing the link. It takes a LinkFailureError, which will be passed
// to the OnChannelFailure closure, in order for it to determine if we should
// force close the channel, and if we should send an error message to the
// remote peer.
func (l *channelLink) fail(linkErr LinkFailureError,
format string, a ...interface{}) {
reason := errors.Errorf(format, a...)
log.Error(reason)
go l.cfg.Peer.Disconnect(reason)
// Return if we have already notified about a failure.
if l.failed {
l.warnf("Ignoring link failure (%v), as link already failed",
reason)
return
}
l.errorf("Failing link: %s", reason)
// Set failed, such that we won't process any more updates, and notify
// the peer about the failure.
l.failed = true
l.cfg.OnChannelFailure(l.ChanID(), l.ShortChanID(), linkErr)
}
// infof prefixes the channel's identifier before printing to info log.

@ -15,6 +15,7 @@ import (
"math"
"github.com/coreos/bbolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
@ -1390,11 +1391,17 @@ func TestChannelLinkSingleHopMessageOrdering(t *testing.T) {
type mockPeer struct {
sync.Mutex
disconnected bool
sentMsgs chan lnwire.Message
quit chan struct{}
}
var _ Peer = (*mockPeer)(nil)
func (m *mockPeer) SendMessage(msg lnwire.Message, sync bool) error {
if m.disconnected {
return fmt.Errorf("disconnected")
}
select {
case m.sentMsgs <- msg:
case <-m.quit:
@ -1408,18 +1415,16 @@ func (m *mockPeer) WipeChannel(*wire.OutPoint) error {
func (m *mockPeer) PubKey() [33]byte {
return [33]byte{}
}
func (m *mockPeer) Disconnect(reason error) {
}
var _ Peer = (*mockPeer)(nil)
func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func(),
chanRestoreFunc, error) {
ChannelLink, *lnwallet.LightningChannel, chan time.Time, func() error,
func(), chanRestoreFunc, error) {
var chanIDBytes [8]byte
if _, err := io.ReadFull(rand.Reader, chanIDBytes[:]); err != nil {
return nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, err
}
chanID := lnwire.NewShortChanIDFromInt(
@ -1430,7 +1435,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
chanReserve, chanReserve, chanID,
)
if err != nil {
return nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, err
}
var (
@ -1461,7 +1466,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
aliceDb := aliceChannel.State().Db
aliceSwitch, err := initSwitchWithDB(aliceDb)
if err != nil {
return nil, nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, nil, err
}
t := make(chan time.Time)
@ -1479,6 +1484,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
},
FetchLastChannelUpdate: mockGetChanUpdateMessage,
PreimageCache: pCache,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {
},
UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil
},
@ -1494,8 +1501,8 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
const startingHeight = 100
aliceLink := NewChannelLink(aliceCfg, aliceChannel, startingHeight)
if err := aliceSwitch.AddLink(aliceLink); err != nil {
return nil, nil, nil, nil, nil, err
start := func() error {
return aliceSwitch.AddLink(aliceLink)
}
go func() {
for {
@ -1514,7 +1521,7 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
defer bobChannel.Stop()
}
return aliceLink, bobChannel, t, cleanUp, restore, nil
return aliceLink, bobChannel, t, start, cleanUp, restore, nil
}
func assertLinkBandwidth(t *testing.T, link ChannelLink,
@ -1686,13 +1693,17 @@ func TestChannelLinkBandwidthConsistency(t *testing.T) {
// We'll start the test by creating a single instance of
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, bobChannel, tmr, cleanUp, _, err :=
aliceLink, bobChannel, tmr, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
var (
carolChanID = lnwire.NewShortChanIDFromInt(3)
mockBlob [lnwire.OnionPacketSize]byte
@ -2104,13 +2115,17 @@ func TestChannelLinkBandwidthConsistencyOverflow(t *testing.T) {
var mockBlob [lnwire.OnionPacketSize]byte
const chanAmt = btcutil.SatoshiPerBitcoin * 5
aliceLink, bobChannel, batchTick, cleanUp, _, err :=
aliceLink, bobChannel, batchTick, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
var (
coreLink = aliceLink.(*channelLink)
defaultCommitFee = coreLink.channel.StateSnapshot().CommitFee
@ -2353,13 +2368,17 @@ func TestChannelLinkTrimCircuitsPending(t *testing.T) {
// We'll start by creating a new link with our chanAmt (5 BTC). We will
// only be testing Alice's behavior, so the reference to Bob's channel
// state is unnecessary.
aliceLink, _, batchTicker, cleanUp, restore, err :=
aliceLink, _, batchTicker, start, cleanUp, restore, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore)
// Compute the static fees that will be used to determine the
@ -2619,13 +2638,17 @@ func TestChannelLinkTrimCircuitsNoCommit(t *testing.T) {
// We'll start by creating a new link with our chanAmt (5 BTC). We will
// only be testing Alice's behavior, so the reference to Bob's channel
// state is unnecessary.
aliceLink, _, batchTicker, cleanUp, restore, err :=
aliceLink, _, batchTicker, start, cleanUp, restore, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
alice := newPersistentLinkHarness(t, aliceLink, batchTicker, restore)
// We'll put Alice into hodl.Commit mode, such that the circuits for any
@ -2876,13 +2899,17 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) {
// channel reserve.
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = btcutil.SatoshiPerBitcoin * 1
aliceLink, bobChannel, batchTimer, cleanUp, _, err :=
aliceLink, bobChannel, batchTimer, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, chanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
var (
mockBlob [lnwire.OnionPacketSize]byte
coreLink = aliceLink.(*channelLink)
@ -2991,13 +3018,17 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) {
// should therefore be 0.
const bobChanAmt = btcutil.SatoshiPerBitcoin * 1
const bobChanReserve = btcutil.SatoshiPerBitcoin * 1.5
bobLink, _, _, bobCleanUp, _, err :=
bobLink, _, _, start, bobCleanUp, _, err :=
newSingleLinkTestHarness(bobChanAmt, bobChanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer bobCleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
// Make sure bandwidth is reported as 0.
assertLinkBandwidth(t, bobLink, 0)
}
@ -4070,13 +4101,17 @@ func TestChannelLinkNoMoreUpdates(t *testing.T) {
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = btcutil.SatoshiPerBitcoin * 1
aliceLink, bobChannel, _, cleanUp, _, err :=
aliceLink, bobChannel, _, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, chanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
var (
coreLink = aliceLink.(*channelLink)
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
@ -4158,13 +4193,17 @@ func TestChannelLinkWaitForRevocation(t *testing.T) {
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = btcutil.SatoshiPerBitcoin * 1
aliceLink, bobChannel, _, cleanUp, _, err :=
aliceLink, bobChannel, _, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, chanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
defer cleanUp()
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
var (
coreLink = aliceLink.(*channelLink)
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
@ -4256,3 +4295,223 @@ func TestChannelLinkWaitForRevocation(t *testing.T) {
case <-time.After(50 * time.Millisecond):
}
}
type mockPackager struct {
failLoadFwdPkgs bool
}
func (*mockPackager) AddFwdPkg(tx *bolt.Tx, fwdPkg *channeldb.FwdPkg) error {
return nil
}
func (*mockPackager) SetFwdFilter(tx *bolt.Tx, height uint64,
fwdFilter *channeldb.PkgFilter) error {
return nil
}
func (*mockPackager) AckAddHtlcs(tx *bolt.Tx,
addRefs ...channeldb.AddRef) error {
return nil
}
func (m *mockPackager) LoadFwdPkgs(tx *bolt.Tx) ([]*channeldb.FwdPkg, error) {
if m.failLoadFwdPkgs {
return nil, fmt.Errorf("failing LoadFwdPkgs")
}
return nil, nil
}
func (*mockPackager) RemovePkg(tx *bolt.Tx, height uint64) error {
return nil
}
func (*mockPackager) AckSettleFails(tx *bolt.Tx,
settleFailRefs ...channeldb.SettleFailRef) error {
return nil
}
// TestChannelLinkFail tests that we will fail the channel, and force close the
// channel in certain situations.
func TestChannelLinkFail(t *testing.T) {
t.Parallel()
testCases := []struct {
// options is used to set up mocks and configure the link
// before it is started.
options func(*channelLink)
// link test is used to execute the given test on the channel
// link after it is started.
linkTest func(*testing.T, *channelLink, *lnwallet.LightningChannel)
// shouldForceClose indicates whether we expect the link to
// force close the channel in response to the actions performed
// during the linkTest.
shouldForceClose bool
}{
{
// Test that we don't force close if syncing states
// fails at startup.
func(c *channelLink) {
c.cfg.SyncStates = true
// Make the syncChanStateCall fail by making
// the SendMessage call fail.
c.cfg.Peer.(*mockPeer).disconnected = true
},
func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) {
// Should fail at startup.
},
false,
},
{
// Test that we don't force closes the channel if
// resolving forward packages fails at startup.
func(c *channelLink) {
// We make the call to resolveFwdPkgs fail by
// making the underlying forwarder fail.
pkg := &mockPackager{
failLoadFwdPkgs: true,
}
c.channel.State().Packager = pkg
},
func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) {
// Should fail at startup.
},
false,
},
{
// Test that we force close the channel if we receive
// an invalid Settle message.
func(c *channelLink) {
},
func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) {
// Recevive an htlc settle for an htlc that was
// never added.
htlcSettle := &lnwire.UpdateFulfillHTLC{
ID: 0,
PaymentPreimage: [32]byte{},
}
c.HandleChannelUpdate(htlcSettle)
},
true,
},
{
// Test that we force close the channel if we receive
// an invalid CommitSig, not containing enough HTLC
// sigs.
func(c *channelLink) {
},
func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) {
// Generate an HTLC and send to the link.
htlc1 := generateHtlc(t, c, remoteChannel, 0)
sendHtlcBobToAlice(t, c, remoteChannel, htlc1)
// Sign a commitment that will include
// signature for the HTLC just sent.
sig, htlcSigs, err :=
remoteChannel.SignNextCommitment()
if err != nil {
t.Fatalf("error signing commitment: %v",
err)
}
// Remove the HTLC sig, such that the commit
// sig will be invalid.
commitSig := &lnwire.CommitSig{
CommitSig: sig,
HtlcSigs: htlcSigs[1:],
}
c.HandleChannelUpdate(commitSig)
},
true,
},
{
// Test that we force close the channel if we receive
// an invalid CommitSig, where the sig itself is
// corrupted.
func(c *channelLink) {
},
func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) {
// Generate an HTLC and send to the link.
htlc1 := generateHtlc(t, c, remoteChannel, 0)
sendHtlcBobToAlice(t, c, remoteChannel, htlc1)
// Sign a commitment that will include
// signature for the HTLC just sent.
sig, htlcSigs, err :=
remoteChannel.SignNextCommitment()
if err != nil {
t.Fatalf("error signing commitment: %v",
err)
}
// Flip a bit on the signature, rendering it
// invalid.
sig[19] ^= 1
commitSig := &lnwire.CommitSig{
CommitSig: sig,
HtlcSigs: htlcSigs,
}
c.HandleChannelUpdate(commitSig)
},
true,
},
}
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = 0
// Execute each test case.
for i, test := range testCases {
link, remoteChannel, _, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
coreLink := link.(*channelLink)
// Set up a channel used to check whether the link error
// force closed the channel.
linkErrors := make(chan LinkFailureError, 1)
coreLink.cfg.OnChannelFailure = func(_ lnwire.ChannelID,
_ lnwire.ShortChannelID, linkErr LinkFailureError) {
linkErrors <- linkErr
}
// Set up the link before starting it.
test.options(coreLink)
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
// Execute the test case.
test.linkTest(t, coreLink, remoteChannel)
// Currently we expect all test cases to lead to link error.
var linkErr LinkFailureError
select {
case linkErr = <-linkErrors:
case <-time.After(10 * time.Second):
t.Fatalf("%d) Alice did not fail"+
"channel", i)
}
// If we expect the link to force close the channel in this
// case, check that it happens. If not, make sure it does not
// happen.
if test.shouldForceClose != linkErr.ForceClose {
t.Fatalf("%d) Expected Alice to force close(%v), "+
"instead got(%v)", i, test.shouldForceClose,
linkErr.ForceClose)
}
// Clean up before starting next test case.
cleanUp()
}
}

95
htlcswitch/linkfailure.go Normal file

@ -0,0 +1,95 @@
package htlcswitch
import "github.com/go-errors/errors"
var (
// ErrLinkShuttingDown signals that the link is shutting down.
ErrLinkShuttingDown = errors.New("link shutting down")
)
// errorCode encodes the possible types of errors that will make us fail the
// current link.
type errorCode uint8
const (
// ErrInternalError indicates that something internal in the link
// failed. In this case we will send a generic error to our peer.
ErrInternalError errorCode = iota
// ErrRemoteError indicates that our peer sent an error, prompting up
// to fail the link.
ErrRemoteError
// ErrSyncError indicates that we failed synchronizing the state of the
// channel with our peer.
ErrSyncError
// ErrInvalidUpdate indicates that the peer send us an invalid update.
ErrInvalidUpdate
// ErrInvalidCommitment indicates that the remote peer sent us an
// invalid commitment signature.
ErrInvalidCommitment
// ErrInvalidRevocation indicates that the remote peer send us an
// invalid revocation message.
ErrInvalidRevocation
)
// LinkFailureError encapsulates an error that will make us fail the current
// link. It contains the necessary information needed to determine if we should
// force close the channel in the process, and if any error data should be sent
// to the peer.
type LinkFailureError struct {
// code is the type of error this LinkFailureError encapsulates.
code errorCode
// ForceClose indicates whether we should force close the channel
// because of this error.
ForceClose bool
// SendData is a byte slice that will be sent to the peer. If nil a
// generic error will be sent.
SendData []byte
}
// A compile time check to ensure LinkFailureError implements the error
// interface.
var _ error = (*LinkFailureError)(nil)
// Error returns a generic error for the LinkFailureError.
//
// NOTE: Part of the error interface.
func (e LinkFailureError) Error() string {
switch e.code {
case ErrInternalError:
return "internal error"
case ErrRemoteError:
return "remote error"
case ErrSyncError:
return "sync error"
case ErrInvalidUpdate:
return "invalid update"
case ErrInvalidCommitment:
return "invalid commitment"
case ErrInvalidRevocation:
return "invalid revocation"
default:
return "unknown error"
}
}
// ShouldSendToPeer indicates whether we should send an error to the peer if
// the link fails with this LinkFailureError.
func (e LinkFailureError) ShouldSendToPeer() bool {
switch e.code {
// If the failure is a result of the peer sending us an error, we don't
// have to respond with one.
case ErrRemoteError:
return false
// In all other cases we will attempt to send our peer an error message.
default:
return true
}
}

@ -506,12 +506,6 @@ func (s *mockServer) PubKey() [33]byte {
return s.id
}
func (s *mockServer) Disconnect(reason error) {
fmt.Printf("server %v disconnected due to %v\n", s.name, reason)
s.t.Fatalf("server %v was disconnected: %v", s.name, reason)
}
func (s *mockServer) WipeChannel(*wire.OutPoint) error {
return nil
}

@ -4068,6 +4068,199 @@ func waitForNTxsInMempool(miner *rpcclient.Client, n int,
}
}
// testFailingChannel tests that we will fail the channel by force closing ii
// in the case where a counterparty tries to settle an HTLC with the wrong
// preimage.
func testFailingChannel(net *lntest.NetworkHarness, t *harnessTest) {
ctxb := context.Background()
const (
timeout = time.Duration(time.Second * 10)
chanAmt = maxFundingAmount
paymentAmt = 10000
defaultCSV = 4
)
// We'll introduce Carol, which will settle any incoming invoice with a
// totally unrelated preimage.
carol, err := net.NewNode("Carol",
[]string{"--debughtlc", "--hodl.bogus-settle"})
if err != nil {
t.Fatalf("unable to create new nodes: %v", err)
}
// Let Alice connect and open a channel to Carol,
if err := net.ConnectNodes(ctxb, net.Alice, carol); err != nil {
t.Fatalf("unable to connect alice to carol: %v", err)
}
ctxt, _ := context.WithTimeout(ctxb, timeout)
chanPoint := openChannelAndAssert(
ctxt, t, net, net.Alice, carol, chanAmt, 0, false,
)
// With the channel open, we'll create a invoice for Carol that Alice
// will attempt to pay.
preimage := bytes.Repeat([]byte{byte(192)}, 32)
invoice := &lnrpc.Invoice{
Memo: "testing",
RPreimage: preimage,
Value: paymentAmt,
}
resp, err := carol.AddInvoice(ctxb, invoice)
if err != nil {
t.Fatalf("unable to add invoice: %v", err)
}
carolPayReqs := []string{resp.PaymentRequest}
// Wait for Alice to receive the channel edge from the funding manager.
ctxt, _ = context.WithTimeout(ctxb, timeout)
err = net.Alice.WaitForNetworkChannelOpen(ctxt, chanPoint)
if err != nil {
t.Fatalf("alice didn't see the alice->carol channel before "+
"timeout: %v", err)
}
// Send the payment from Alice to Carol. We expect Carol to attempt to
// settle this payment with the wrong preimage.
err = completePaymentRequests(ctxb, net.Alice, carolPayReqs, false)
if err != nil {
t.Fatalf("unable to send payments: %v", err)
}
// Since Alice detects that Carol is trying to trick her by providing a
// fake preimage, she should fail and force close the channel.
var predErr error
err = lntest.WaitPredicate(func() bool {
pendingChansRequest := &lnrpc.PendingChannelsRequest{}
pendingChanResp, err := net.Alice.PendingChannels(ctxb,
pendingChansRequest)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
n := len(pendingChanResp.WaitingCloseChannels)
if n != 1 {
predErr = fmt.Errorf("Expected to find %d channels "+
"waiting close, found %d", 1, n)
return false
}
return true
}, time.Second*15)
if err != nil {
t.Fatalf("%v", predErr)
}
// Mine a block to confirm the broadcasted commitment.
block := mineBlocks(t, net, 1)[0]
if len(block.Transactions) != 2 {
t.Fatalf("transaction wasn't mined")
}
// The channel should now show up as force closed both for Alice and
// Carol.
err = lntest.WaitPredicate(func() bool {
pendingChansRequest := &lnrpc.PendingChannelsRequest{}
pendingChanResp, err := net.Alice.PendingChannels(ctxb,
pendingChansRequest)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
n := len(pendingChanResp.WaitingCloseChannels)
if n != 0 {
predErr = fmt.Errorf("Expected to find %d channels "+
"waiting close, found %d", 0, n)
return false
}
n = len(pendingChanResp.PendingForceClosingChannels)
if n != 1 {
predErr = fmt.Errorf("expected to find %d channel "+
"pending force close, found %d", 1, n)
return false
}
return true
}, time.Second*15)
if err != nil {
t.Fatalf("%v", predErr)
}
err = lntest.WaitPredicate(func() bool {
pendingChansRequest := &lnrpc.PendingChannelsRequest{}
pendingChanResp, err := carol.PendingChannels(ctxb,
pendingChansRequest)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
n := len(pendingChanResp.PendingForceClosingChannels)
if n != 1 {
predErr = fmt.Errorf("expected to find %d channel "+
"pending force close, found %d", 1, n)
return false
}
return true
}, time.Second*15)
if err != nil {
t.Fatalf("%v", predErr)
}
// Carol will use the correct preimage to resolve the HTLC on-chain.
_, err = waitForTxInMempool(net.Miner.Node, 5*time.Second)
if err != nil {
t.Fatalf("unable to find Bob's breach tx in mempool: %v", err)
}
// Mine enough blocks for Alice to sweep her funds from the force
// closed channel.
_, err = net.Miner.Node.Generate(defaultCSV)
if err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}
// Wait for the sweeping tx to be broadcast.
_, err = waitForTxInMempool(net.Miner.Node, 5*time.Second)
if err != nil {
t.Fatalf("unable to find Bob's breach tx in mempool: %v", err)
}
// Mine the sweep.
_, err = net.Miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate blocks: %v", err)
}
// No pending channels should be left.
err = lntest.WaitPredicate(func() bool {
pendingChansRequest := &lnrpc.PendingChannelsRequest{}
pendingChanResp, err := net.Alice.PendingChannels(ctxb,
pendingChansRequest)
if err != nil {
predErr = fmt.Errorf("unable to query for pending "+
"channels: %v", err)
return false
}
n := len(pendingChanResp.PendingForceClosingChannels)
if n != 0 {
predErr = fmt.Errorf("expected to find %d channel "+
"pending force close, found %d", 0, n)
return false
}
return true
}, time.Second*15)
if err != nil {
t.Fatalf("%v", predErr)
}
// Finally, shutdown the node we created for the duration of the tests,
// only leaving the two seed nodes (Alice and Bob) within our test
// network.
if err := net.ShutdownNode(carol); err != nil {
t.Fatalf("unable to shutdown carol: %v", err)
}
}
// testRevokedCloseRetribution tests that Alice is able carry out
// retribution in the event that she fails immediately after detecting Bob's
// breach txn in the mempool.
@ -9183,6 +9376,10 @@ var testsCases = []*testCase{
name: "revoked uncooperative close retribution",
test: testRevokedCloseRetribution,
},
{
name: "failing link",
test: testFailingChannel,
},
{
name: "revoked uncooperative close retribution zero value remote output",
test: testRevokedCloseRetributionZeroValueRemoteOutput,

157
peer.go

@ -411,6 +411,104 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
lnChan.Stop()
return err
}
// Create the link and add it to the switch.
err = p.addLink(chanPoint, lnChan, forwardingPolicy, blockEpoch,
chainEvents, currentHeight, true)
if err != nil {
lnChan.Stop()
return err
}
}
return nil
}
// addLink creates and adds a new link from the specified channel.
func (p *peer) addLink(chanPoint *wire.OutPoint,
lnChan *lnwallet.LightningChannel,
forwardingPolicy *htlcswitch.ForwardingPolicy,
blockEpoch *chainntnfs.BlockEpochEvent,
chainEvents *contractcourt.ChainEventSubscription,
currentHeight int32, syncStates bool) error {
// onChannelFailure will be called by the link in case the channel
// fails for some reason.
onChannelFailure := func(chanID lnwire.ChannelID,
shortChanID lnwire.ShortChannelID,
linkErr htlcswitch.LinkFailureError) {
// The link has notified us about a failure. We launch a go
// routine to stop the link, disconnect the peer and optionally
// force close the channel. We must launch a goroutine since we
// must let OnChannelFailure return in order for the link to
// completely stop in the call to RemoveLink.
p.wg.Add(1)
go func() {
defer p.wg.Done()
// We begin by removing the link from the switch, such
// that it won't be attempted used for any more
// updates.
// TODO(halseth): should introduce a way to atomically
// stop/pause the link and cancel back any adds in its
// mailboxes such that we can safely force close
// without the link being added again and updates being
// applied.
err := p.server.htlcSwitch.RemoveLink(chanID)
if err != nil {
peerLog.Errorf("unable to stop link(%v): %v",
shortChanID, err)
}
// If the error encountered was severe enough, we'll
// now force close the channel.
if linkErr.ForceClose {
peerLog.Warnf("Force closing link(%v)",
shortChanID)
closeTx, err := p.server.chainArb.ForceCloseContract(*chanPoint)
if err != nil {
peerLog.Errorf("unable to force close "+
"link(%v): %v", shortChanID,
err)
} else {
peerLog.Infof("channel(%v) force "+
"closed with txid %v",
shortChanID, closeTx.TxHash())
}
}
// Send an error to the peer, why we failed the
// channel.
if linkErr.ShouldSendToPeer() {
// If SendData is set, send it to the peer. If
// not, we'll use the standard error messages
// in the payload. We only include sendData in
// the cases where the error data does not
// contain sensitive information.
data := []byte(linkErr.Error())
if linkErr.SendData != nil {
data = linkErr.SendData
}
err := p.SendMessage(&lnwire.Error{
ChanID: chanID,
Data: data,
}, true)
if err != nil {
peerLog.Errorf("unable to send msg to "+
"remote peer: %v", err)
}
}
// Initiate disconnection.
// TODO(halseth): consider not disconnecting the peer,
// as we might still have other active channels with
// the same peer.
p.Disconnect(linkErr)
}()
}
linkCfg := htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
@ -434,7 +532,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
*chanPoint, signals,
)
},
SyncStates: true,
OnChannelFailure: onChannelFailure,
SyncStates: syncStates,
BatchTicker: htlcswitch.NewBatchTicker(
time.NewTicker(50 * time.Millisecond)),
FwdPkgGCTicker: htlcswitch.NewBatchTicker(
@ -445,13 +544,10 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
link := htlcswitch.NewChannelLink(linkCfg, lnChan,
uint32(currentHeight))
if err := p.server.htlcSwitch.AddLink(link); err != nil {
lnChan.Stop()
return err
}
}
return nil
// With the channel link created, we'll now notify the htlc switch so
// this channel can be used to dispatch local payments and also
// passively forward payments.
return p.server.htlcSwitch.AddLink(link)
}
// WaitForDisconnect waits until the peer has disconnected. A peer may be
@ -1387,46 +1483,15 @@ out:
"events: %v", err)
continue
}
linkConfig := htlcswitch.ChannelLinkConfig{
Peer: p,
DecodeHopIterators: p.server.sphinx.DecodeHopIterators,
ExtractErrorEncrypter: p.server.sphinx.ExtractErrorEncrypter,
FetchLastChannelUpdate: fetchLastChanUpdate(
p.server, p.PubKey(),
),
DebugHTLC: cfg.DebugHTLC,
HodlMask: cfg.Hodl.Mask(),
Registry: p.server.invoices,
Switch: p.server.htlcSwitch,
Circuits: p.server.htlcSwitch.CircuitModifier(),
ForwardPackets: p.server.htlcSwitch.ForwardPackets,
FwrdingPolicy: p.server.cc.routingPolicy,
FeeEstimator: p.server.cc.feeEstimator,
BlockEpochs: blockEpoch,
PreimageCache: p.server.witnessBeacon,
ChainEvents: chainEvents,
UpdateContractSignals: func(signals *contractcourt.ContractSignals) error {
return p.server.chainArb.UpdateContractSignals(
*chanPoint, signals,
)
},
SyncStates: false,
BatchTicker: htlcswitch.NewBatchTicker(
time.NewTicker(50 * time.Millisecond)),
FwdPkgGCTicker: htlcswitch.NewBatchTicker(
time.NewTicker(time.Minute)),
BatchSize: 10,
UnsafeReplay: cfg.UnsafeReplay,
}
link := htlcswitch.NewChannelLink(linkConfig, newChan,
uint32(currentHeight))
// With the channel link created, we'll now notify the
// htlc switch so this channel can be used to dispatch
// local payments and also passively forward payments.
if err := p.server.htlcSwitch.AddLink(link); err != nil {
// Create the link and add it to the switch.
err = p.addLink(chanPoint, newChan,
&p.server.cc.routingPolicy, blockEpoch,
chainEvents, currentHeight, false)
if err != nil {
peerLog.Errorf("can't register new channel "+
"link(%v) with NodeKey(%x)", chanPoint, p.PubKey())
"link(%v) with NodeKey(%x)", chanPoint,
p.PubKey())
}
close(newChanReq.done)