From db0ec1241299c6213ad6d63cc3f4b9c0551785e5 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 15 Feb 2021 13:31:08 +0100 Subject: [PATCH] breacharbiter: broadcast "splitted" justice tx if spend all not confirming In case 4 block passes without our justice tx confirming, we'll "split" it up, and separately sweep the commitment outs, and HTLC outs. --- breacharbiter.go | 95 +++++++++++++++++++++- breacharbiter_test.go | 184 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 278 insertions(+), 1 deletion(-) diff --git a/breacharbiter.go b/breacharbiter.go index dfd6452d..0766f9f7 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -25,6 +25,21 @@ import ( "github.com/lightningnetwork/lnd/lnwallet/chainfee" ) +const ( + // justiceTxConfTarget is the number of blocks we'll use as a + // confirmation target when creating the justice transaction. We'll + // choose an aggressive target, since we want to be sure it confirms + // quickly. + justiceTxConfTarget = 2 + + // blocksPassedSplitPublish is the number of blocks without + // confirmation of the justice tx we'll wait before starting to publish + // smaller variants of the justice tx. We do this to mitigate an attack + // the channel peer can do by pinning the HTLC outputs of the + // commitment with low-fee HTLC transactions. + blocksPassedSplitPublish = 4 +) + var ( // retributionBucket stores retribution state on disk between detecting // a contract breach, broadcasting a justice transaction that sweeps the @@ -608,8 +623,20 @@ justiceTxBroadcast: spendChan <- spends }() + // We'll also register for block notifications, such that in case our + // justice tx doesn't confirm within a reasonable timeframe, we can + // start to more aggressively sweep the time sensitive outputs. + newBlockChan, err := b.cfg.Notifier.RegisterBlockEpochNtfn(nil) + if err != nil { + brarLog.Errorf("Unable to register for block notifications: %v", + err) + return + } + defer newBlockChan.Cancel() + Loop: for { + select { case spends := <-spendChan: // Print the funds swept by the txs. @@ -654,6 +681,72 @@ Loop: wg.Wait() goto justiceTxBroadcast + // On every new block, we check whether we should republish the + // transactions. + case epoch, ok := <-newBlockChan.Epochs: + if !ok { + return + } + + // If less than four blocks have passed since the + // breach confirmed, we'll continue waiting. It was + // published with a 2-block fee estimate, so it's not + // unexpected that four blocks without confirmation can + // pass. + splitHeight := breachInfo.breachHeight + + blocksPassedSplitPublish + if uint32(epoch.Height) < splitHeight { + continue Loop + } + + brarLog.Warnf("Block height %v arrived without "+ + "justice tx confirming (breached at "+ + "height %v), splitting justice tx.", + epoch.Height, breachInfo.breachHeight) + + // Otherwise we'll attempt to publish the two separate + // justice transactions that sweeps the commitment + // outputs and the HTLC outputs separately. This is to + // mitigate the case where our "spend all" justice TX + // doesn't propagate because the HTLC outputs have been + // pinned by low fee HTLC txs. + label := labels.MakeLabel( + labels.LabelTypeJusticeTransaction, nil, + ) + if justiceTxs.spendCommitOuts != nil { + tx := justiceTxs.spendCommitOuts + + brarLog.Debugf("Broadcasting justice tx "+ + "spending commitment outs: %v", + newLogClosure(func() string { + return spew.Sdump(tx) + })) + + err = b.cfg.PublishTransaction(tx, label) + if err != nil { + brarLog.Warnf("Unable to broadcast "+ + "commit out spending justice "+ + "tx: %v", err) + } + } + + if justiceTxs.spendHTLCs != nil { + tx := justiceTxs.spendHTLCs + + brarLog.Debugf("Broadcasting justice tx "+ + "spending HTLC outs: %v", + newLogClosure(func() string { + return spew.Sdump(tx) + })) + + err = b.cfg.PublishTransaction(tx, label) + if err != nil { + brarLog.Warnf("Unable to broadcast "+ + "HTLC out spending justice "+ + "tx: %v", err) + } + } + case err := <-errChan: if err != errBrarShuttingDown { brarLog.Errorf("error waiting for "+ @@ -1224,7 +1317,7 @@ func (b *breachArbiter) sweepSpendableOutputsTxn(txWeight int64, // We'll actually attempt to target inclusion within the next two // blocks as we'd like to sweep these funds back into our wallet ASAP. - feePerKw, err := b.cfg.Estimator.EstimateFeePerKW(2) + feePerKw, err := b.cfg.Estimator.EstimateFeePerKW(justiceTxConfTarget) if err != nil { return nil, err } diff --git a/breacharbiter_test.go b/breacharbiter_test.go index e8deb82a..358a826a 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -1798,6 +1798,190 @@ func testBreachSpends(t *testing.T, test breachTest) { assertBrarCleanup(t, brar, alice.ChanPoint, alice.State().Db) } +// TestBreachDelayedJusticeConfirmation tests that the breach arbiter will +// "split" the justice tx in case the first justice tx doesn't confirm within +// a reasonable time. +func TestBreachDelayedJusticeConfirmation(t *testing.T) { + brar, alice, _, bobClose, contractBreaches, + cleanUpChans, cleanUpArb := initBreachedState(t) + defer cleanUpChans() + defer cleanUpArb() + + var ( + height = bobClose.ChanSnapshot.CommitHeight + blockHeight = int32(10) + forceCloseTx = bobClose.CloseTx + chanPoint = alice.ChanPoint + publTx = make(chan *wire.MsgTx) + ) + + // Make PublishTransaction always return succeed. + brar.cfg.PublishTransaction = func(tx *wire.MsgTx, _ string) error { + publTx <- tx + return nil + } + + // Notify the breach arbiter about the breach. + retribution, err := lnwallet.NewBreachRetribution( + alice.State(), height, uint32(blockHeight), + ) + if err != nil { + t.Fatalf("unable to create breach retribution: %v", err) + } + + processACK := make(chan error, 1) + breach := &ContractBreachEvent{ + ChanPoint: *chanPoint, + ProcessACK: func(brarErr error) { + processACK <- brarErr + }, + BreachRetribution: retribution, + } + + select { + case contractBreaches <- breach: + case <-time.After(15 * time.Second): + t.Fatalf("breach not delivered") + } + + // We'll also wait to consume the ACK back from the breach arbiter. + select { + case err := <-processACK: + if err != nil { + t.Fatalf("handoff failed: %v", err) + } + case <-time.After(time.Second * 15): + t.Fatalf("breach arbiter didn't send ack back") + } + + state := alice.State() + err = state.CloseChannel(&channeldb.ChannelCloseSummary{ + ChanPoint: state.FundingOutpoint, + ChainHash: state.ChainHash, + RemotePub: state.IdentityPub, + CloseType: channeldb.BreachClose, + Capacity: state.Capacity, + IsPending: true, + ShortChanID: state.ShortChanID(), + RemoteCurrentRevocation: state.RemoteCurrentRevocation, + RemoteNextRevocation: state.RemoteNextRevocation, + LocalChanConfig: state.LocalChanCfg, + }) + if err != nil { + t.Fatalf("unable to close channel: %v", err) + } + + // After exiting, the breach arbiter should have persisted the + // retribution information and the channel should be shown as pending + // force closed. + assertArbiterBreach(t, brar, chanPoint) + + // Assert that the database sees the channel as pending close, otherwise + // the breach arbiter won't be able to fully close it. + assertPendingClosed(t, alice) + + // Notify that the breaching transaction is confirmed, to trigger the + // retribution logic. + notifier := brar.cfg.Notifier.(*mock.SpendNotifier) + + select { + case notifier.ConfChan <- &chainntnfs.TxConfirmation{}: + case <-time.After(15 * time.Second): + t.Fatalf("conf not delivered") + } + + // The breach arbiter should attempt to sweep all outputs on the + // breached commitment. + var justiceTx *wire.MsgTx + select { + case justiceTx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + require.Len(t, justiceTx.TxIn, 3) + + // All outputs should initially spend from the force closed txn. + forceTxID := forceCloseTx.TxHash() + for _, txIn := range justiceTx.TxIn { + if txIn.PreviousOutPoint.Hash != forceTxID { + t.Fatalf("og justice tx not spending commitment") + } + } + + // Now we'll pretend some blocks pass without the justice tx + // confirming. + for i := int32(0); i <= 3; i++ { + notifier.EpochChan <- &chainntnfs.BlockEpoch{ + Height: blockHeight + i, + } + + // On every epoch, check that no new tx is published. + select { + case <-publTx: + t.Fatalf("tx was published") + case <-time.After(20 * time.Millisecond): + } + } + + // Now mine another block without the justice tx confirming. This + // should lead to the breacharbiter publishing the split justice tx + // variants. + notifier.EpochChan <- &chainntnfs.BlockEpoch{ + Height: blockHeight + 4, + } + + var ( + splits []*wire.MsgTx + spending = make(map[wire.OutPoint]struct{}) + maxIndex = uint32(len(forceCloseTx.TxOut)) - 1 + ) + for i := 0; i < 2; i++ { + + var tx *wire.MsgTx + select { + case tx = <-publTx: + splits = append(splits, tx) + + case <-time.After(5 * time.Second): + t.Fatalf("tx not published") + } + + // Check that every input is from the breached tx and that + // there are no duplicates. + for _, in := range tx.TxIn { + op := in.PreviousOutPoint + _, ok := spending[op] + if ok { + t.Fatal("already spent") + } + + if op.Hash != forceTxID || op.Index > maxIndex { + t.Fatalf("not spending breach") + } + + spending[op] = struct{}{} + } + } + + // All the inputs from the original justice transaction should have + // been spent by the 2 splits. + require.Len(t, spending, len(justiceTx.TxIn)) + require.Len(t, splits, 2) + + // Finally notify that they confirm, making the breach arbiter clean + // up. + for _, tx := range splits { + for _, in := range tx.TxIn { + op := &in.PreviousOutPoint + notifier.Spend(op, blockHeight+5, tx) + } + } + + // Assert that the channel is fully resolved. + assertBrarCleanup(t, brar, alice.ChanPoint, alice.State().Db) +} + // findInputIndex returns the index of the input that spends from the given // outpoint. This method fails if the outpoint is not found. func findInputIndex(t *testing.T, op wire.OutPoint, tx *wire.MsgTx) int {