htlcswitch/link: add pending commit ticker for stall detection

This commit adds a PendingCommitTicker to the link config, which allows
us to control how quickly we fail the link if the commitment dance
stalls. Now that the mailbox has the ability to cancel packets, when the
link fails it will reset the mailbox packets on exit, forcing a
reevaluation of the HTLCs against their mailbox expiries.
This commit is contained in:
Conner Fromknecht 2020-04-14 10:51:30 -07:00
parent a8977651cc
commit ec1b8d874d
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
5 changed files with 117 additions and 8 deletions

@ -223,6 +223,11 @@ type ChannelLinkConfig struct {
// syncing. // syncing.
FwdPkgGCTicker ticker.Ticker FwdPkgGCTicker ticker.Ticker
// PendingCommitTicker is a ticker that allows the link to determine if
// a locally initiated commitment dance gets stuck waiting for the
// remote party to revoke.
PendingCommitTicker ticker.Ticker
// BatchSize is the max size of a batch of updates done to the link // BatchSize is the max size of a batch of updates done to the link
// before we do a state update. // before we do a state update.
BatchSize uint32 BatchSize uint32
@ -1098,6 +1103,11 @@ func (l *channelLink) htlcManager() {
return return
} }
case <-l.cfg.PendingCommitTicker.Ticks():
l.fail(LinkFailureError{code: ErrRemoteUnresponsive},
"unable to complete dance")
return
// A message from the switch was just received. This indicates // A message from the switch was just received. This indicates
// that the link is an intermediate hop in a multi-hop HTLC // that the link is an intermediate hop in a multi-hop HTLC
// circuit. // circuit.
@ -1934,6 +1944,8 @@ func (l *channelLink) updateCommitTx() error {
theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment() theirCommitSig, htlcSigs, pendingHTLCs, err := l.channel.SignNextCommitment()
if err == lnwallet.ErrNoWindow { if err == lnwallet.ErrNoWindow {
l.cfg.PendingCommitTicker.Resume()
l.log.Tracef("revocation window exhausted, unable to send: "+ l.log.Tracef("revocation window exhausted, unable to send: "+
"%v, pend_updates=%v, dangling_closes%v", "%v, pend_updates=%v, dangling_closes%v",
l.channel.PendingLocalUpdateCount(), l.channel.PendingLocalUpdateCount(),
@ -1953,6 +1965,8 @@ func (l *channelLink) updateCommitTx() error {
return err return err
} }
l.cfg.PendingCommitTicker.Pause()
// The remote party now has a new pending commitment, so we'll update // The remote party now has a new pending commitment, so we'll update
// the contract court to be aware of this new set (the prior old remote // the contract court to be aware of this new set (the prior old remote
// pending). // pending).

@ -1699,10 +1699,11 @@ func newSingleLinkTestHarness(chanAmt, chanReserve btcutil.Amount) (
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
Registry: invoiceRegistry, Registry: invoiceRegistry,
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker, BatchTicker: bticker,
FwdPkgGCTicker: ticker.NewForce(15 * time.Second), FwdPkgGCTicker: ticker.NewForce(15 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests. // to not trigger commit updates automatically during tests.
BatchSize: 10000, BatchSize: 10000,
@ -4203,10 +4204,11 @@ func (h *persistentLinkHarness) restartLink(
UpdateContractSignals: func(*contractcourt.ContractSignals) error { UpdateContractSignals: func(*contractcourt.ContractSignals) error {
return nil return nil
}, },
Registry: h.coreLink.cfg.Registry, Registry: h.coreLink.cfg.Registry,
ChainEvents: &contractcourt.ChainEventSubscription{}, ChainEvents: &contractcourt.ChainEventSubscription{},
BatchTicker: bticker, BatchTicker: bticker,
FwdPkgGCTicker: ticker.New(5 * time.Second), FwdPkgGCTicker: ticker.New(5 * time.Second),
PendingCommitTicker: ticker.New(time.Minute),
// Make the BatchSize and Min/MaxFeeUpdateTimeout large enough // Make the BatchSize and Min/MaxFeeUpdateTimeout large enough
// to not trigger commit updates automatically during tests. // to not trigger commit updates automatically during tests.
BatchSize: 10000, BatchSize: 10000,
@ -6134,6 +6136,91 @@ func TestChannelLinkReceiveEmptySig(t *testing.T) {
aliceLink.Stop() aliceLink.Stop()
} }
// TestPendingCommitTicker tests that a link will fail itself after a timeout if
// the commitment dance stalls out.
func TestPendingCommitTicker(t *testing.T) {
t.Parallel()
const chanAmt = btcutil.SatoshiPerBitcoin * 5
const chanReserve = btcutil.SatoshiPerBitcoin * 1
aliceLink, bobChannel, batchTicker, start, cleanUp, _, err :=
newSingleLinkTestHarness(chanAmt, chanReserve)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
var (
coreLink = aliceLink.(*channelLink)
aliceMsgs = coreLink.cfg.Peer.(*mockPeer).sentMsgs
)
coreLink.cfg.PendingCommitTicker = ticker.NewForce(time.Millisecond)
linkErrs := make(chan LinkFailureError)
coreLink.cfg.OnChannelFailure = func(_ lnwire.ChannelID,
_ lnwire.ShortChannelID, linkErr LinkFailureError) {
linkErrs <- linkErr
}
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
defer cleanUp()
ctx := linkTestContext{
t: t,
aliceLink: aliceLink,
bobChannel: bobChannel,
aliceMsgs: aliceMsgs,
}
// Send an HTLC from Alice to Bob, and signal the batch ticker to signa
// a commitment.
htlc, _ := generateHtlcAndInvoice(t, 0)
ctx.sendHtlcAliceToBob(0, htlc)
ctx.receiveHtlcAliceToBob()
batchTicker <- time.Now()
select {
case msg := <-aliceMsgs:
if _, ok := msg.(*lnwire.CommitSig); !ok {
t.Fatalf("expected CommitSig, got: %T", msg)
}
case <-time.After(time.Second):
t.Fatalf("alice did not send commit sig")
}
// Check that Alice hasn't failed.
select {
case linkErr := <-linkErrs:
t.Fatalf("link failed unexpectedly: %v", linkErr)
case <-time.After(50 * time.Millisecond):
}
// Without completing the dance, send another HTLC from Alice to Bob.
// Since the revocation window has been exhausted, we should see the
// link fail itself immediately due to the low pending commit timeout.
// In production this would be much longer, e.g. a minute.
htlc, _ = generateHtlcAndInvoice(t, 1)
ctx.sendHtlcAliceToBob(1, htlc)
ctx.receiveHtlcAliceToBob()
batchTicker <- time.Now()
// Assert that we get the expected link failure from Alice.
select {
case linkErr := <-linkErrs:
if linkErr.code != ErrRemoteUnresponsive {
t.Fatalf("error code mismatch, "+
"want: ErrRemoteUnresponsive, got: %v",
linkErr.code)
}
case <-time.After(time.Second):
t.Fatalf("did not receive failure")
}
}
// assertFailureCode asserts that an error is of type ClearTextError and that // assertFailureCode asserts that an error is of type ClearTextError and that
// the failure code is as expected. // the failure code is as expected.
func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) { func assertFailureCode(t *testing.T, err error, code lnwire.FailCode) {

@ -20,6 +20,10 @@ const (
// to fail the link. // to fail the link.
ErrRemoteError ErrRemoteError
// ErrRemoteUnresponsive indicates that our peer took too long to
// complete a commitment dance.
ErrRemoteUnresponsive
// ErrSyncError indicates that we failed synchronizing the state of the // ErrSyncError indicates that we failed synchronizing the state of the
// channel with our peer. // channel with our peer.
ErrSyncError ErrSyncError
@ -71,6 +75,8 @@ func (e LinkFailureError) Error() string {
return "internal error" return "internal error"
case ErrRemoteError: case ErrRemoteError:
return "remote error" return "remote error"
case ErrRemoteUnresponsive:
return "remote unresponsive"
case ErrSyncError: case ErrSyncError:
return "sync error" return "sync error"
case ErrInvalidUpdate: case ErrInvalidUpdate:

@ -1167,6 +1167,7 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer,
BatchSize: 10, BatchSize: 10,
BatchTicker: ticker.NewForce(testBatchTimeout), BatchTicker: ticker.NewForce(testBatchTimeout),
FwdPkgGCTicker: ticker.NewForce(fwdPkgTimeout), FwdPkgGCTicker: ticker.NewForce(fwdPkgTimeout),
PendingCommitTicker: ticker.NewForce(time.Minute),
MinFeeUpdateTimeout: minFeeUpdateTimeout, MinFeeUpdateTimeout: minFeeUpdateTimeout,
MaxFeeUpdateTimeout: maxFeeUpdateTimeout, MaxFeeUpdateTimeout: maxFeeUpdateTimeout,
OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {}, OnChannelFailure: func(lnwire.ChannelID, lnwire.ShortChannelID, LinkFailureError) {},

@ -669,6 +669,7 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
SyncStates: syncStates, SyncStates: syncStates,
BatchTicker: ticker.New(50 * time.Millisecond), BatchTicker: ticker.New(50 * time.Millisecond),
FwdPkgGCTicker: ticker.New(time.Minute), FwdPkgGCTicker: ticker.New(time.Minute),
PendingCommitTicker: ticker.New(time.Minute),
BatchSize: 10, BatchSize: 10,
UnsafeReplay: cfg.UnsafeReplay, UnsafeReplay: cfg.UnsafeReplay,
MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout, MinFeeUpdateTimeout: htlcswitch.DefaultMinLinkFeeUpdateTimeout,