From 3641beb00212a9eb527b61fe78e004aed601c3f5 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 19 Mar 2019 19:21:03 -0700 Subject: [PATCH 1/4] breacharbiter: capitalize "unable to" error msgs --- breacharbiter.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 42207c89..25d3fda1 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -161,7 +161,7 @@ func (b *breachArbiter) Start() error { // retribution store. closedChans, err := b.cfg.DB.FetchClosedChannels(false) if err != nil { - brarLog.Errorf("unable to fetch closing channels: %v", err) + brarLog.Errorf("Unable to fetch closing channels: %v", err) return err } @@ -181,7 +181,7 @@ func (b *breachArbiter) Start() error { chanPoint := &chanSummary.ChanPoint if _, ok := breachRetInfos[*chanPoint]; ok { if err := b.cfg.Store.Remove(chanPoint); err != nil { - brarLog.Errorf("unable to remove closed "+ + brarLog.Errorf("Unable to remove closed "+ "chanid=%v from breach arbiter: %v", chanPoint, err) return err @@ -203,7 +203,7 @@ func (b *breachArbiter) Start() error { &breachTXID, breachScript, 1, retInfo.breachHeight, ) if err != nil { - brarLog.Errorf("unable to register for conf updates "+ + brarLog.Errorf("Unable to register for conf updates "+ "for txid: %v, err: %v", breachTXID, err) return err } @@ -366,8 +366,8 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, breachInfo.breachHeight, ) if err != nil { - brarLog.Errorf("unable to check for spentness "+ - "of out_point=%v: %v", + brarLog.Errorf("Unable to check for spentness "+ + "of outpoint=%v: %v", breachedOutput.outpoint, err) // Registration may have failed if we've been @@ -492,7 +492,7 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to get finalized txn for"+ + brarLog.Errorf("Unable to get finalized txn for"+ "chanid=%v: %v", &breachInfo.chanPoint, err) return } @@ -508,7 +508,7 @@ justiceTxBroadcast: // channel. finalTx, err = b.createJusticeTx(breachInfo) if err != nil { - brarLog.Errorf("unable to create justice tx: %v", err) + brarLog.Errorf("Unable to create justice tx: %v", err) return } @@ -516,7 +516,7 @@ justiceTxBroadcast: // attempt to broadcast. err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx) if err != nil { - brarLog.Errorf("unable to finalize justice tx for "+ + brarLog.Errorf("Unable to finalize justice tx for "+ "chanid=%v: %v", &breachInfo.chanPoint, err) return } @@ -530,7 +530,7 @@ justiceTxBroadcast: // channel's retribution against the cheating counter party. err = b.cfg.PublishTransaction(finalTx) if err != nil { - brarLog.Errorf("unable to broadcast justice tx: %v", err) + brarLog.Errorf("Unable to broadcast justice tx: %v", err) if err == lnwallet.ErrDoubleSpend { // Broadcasting the transaction failed because of a @@ -567,7 +567,7 @@ justiceTxBroadcast: &justiceTXID, justiceScript, 1, breachConfHeight, ) if err != nil { - brarLog.Errorf("unable to register for conf for txid(%v): %v", + brarLog.Errorf("Unable to register for conf for txid(%v): %v", justiceTXID, err) return } @@ -605,7 +605,7 @@ justiceTxBroadcast: // With the channel closed, mark it in the database as such. err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to mark chan as closed: %v", err) + brarLog.Errorf("Unable to mark chan as closed: %v", err) return } @@ -613,7 +613,7 @@ justiceTxBroadcast: // retribution info from the database. err = b.cfg.Store.Remove(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to remove retribution "+ + brarLog.Errorf("Unable to remove retribution "+ "from the db: %v", err) } @@ -672,7 +672,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { breached, err := b.cfg.Store.IsBreached(&chanPoint) if err != nil { b.Unlock() - brarLog.Errorf("unable to check breach info in DB: %v", err) + brarLog.Errorf("Unable to check breach info in DB: %v", err) select { case breachEvent.ProcessACK <- err: @@ -703,7 +703,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { err = b.cfg.Store.Add(retInfo) b.Unlock() if err != nil { - brarLog.Errorf("unable to persist retribution "+ + brarLog.Errorf("Unable to persist retribution "+ "info to db: %v", err) } @@ -733,7 +733,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { breachTXID, breachScript, 1, retInfo.breachHeight, ) if err != nil { - brarLog.Errorf("unable to register for conf updates for "+ + brarLog.Errorf("Unable to register for conf updates for "+ "txid: %v, err: %v", breachTXID, err) return } @@ -1286,7 +1286,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { // to remove a finalized retribution state that is not already // stored in the db. if retBucket == nil { - return errors.New("unable to remove retribution " + + return errors.New("Unable to remove retribution " + "because the retribution bucket doesn't exist.") } From 997aa3ecf042a89b86e382b09cb548c1eb09cbda Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 19 Mar 2019 19:22:35 -0700 Subject: [PATCH 2/4] breacharbiter: detect all spends, add terminal states This commit modifies the breach arbiter to monitor all breached inputs for spends and remove them from the set of inputs to be swept if they are spent to a terminal state. Prior, we would only watch for spends on htlcs that may need to transition and sweep the corresponding second-level htlc. With these changes, we will no monitor commitment outputs for spends, as well as spends from the second level htlcs themselves. If either of these is detected, we remove them from the set of inputs to sweep via the justice transaction because there is nothing the breach arbiter can do. This functionality will be needed when adding watchtower support, as the breach arbiter must detect the case when the tower sweeps on behalf of the user and stop pursuing the sweep itself. In addition, this now properly handles the potential case where somehow the remote party is able to sweep the their commitment or second-level htlc to their wallet, and prevent the breach arbiter from trying to sweep the outputs as it would now. Note that in the latter event, the internal accounting may still be incorrect, as it is assumed that all breached funds return to the victim. However, these issues will deferred and fixed at a later date, as the more crucial aspect is that the breach arbiter doesn't blow up as a result of towers sweeping channels. --- breacharbiter.go | 144 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 105 insertions(+), 39 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 25d3fda1..be5e1b36 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "sync" "sync/atomic" @@ -320,6 +321,8 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo, func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error { + inputs := breachInfo.breachedOutputs + // spend is used to wrap the index of the output that gets spent // together with the spend details. type spend struct { @@ -330,11 +333,11 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, // We create a channel the first goroutine that gets a spend event can // signal. We make it buffered in case multiple spend events come in at // the same time. - anySpend := make(chan struct{}, len(breachInfo.breachedOutputs)) + anySpend := make(chan struct{}, len(inputs)) // The allSpends channel will be used to pass spend events from all the // goroutines that detects a spend before they are signalled to exit. - allSpends := make(chan spend, len(breachInfo.breachedOutputs)) + allSpends := make(chan spend, len(inputs)) // exit will be used to signal the goroutines that they can exit. exit := make(chan struct{}) @@ -342,17 +345,11 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, // We'll now launch a goroutine for each of the HTLC outputs, that will // signal the moment they detect a spend event. - for i := 0; i < len(breachInfo.breachedOutputs); i++ { - breachedOutput := &breachInfo.breachedOutputs[i] + for i := range inputs { + breachedOutput := &inputs[i] - // If this isn't an HTLC output, then we can skip it. - if breachedOutput.witnessType != input.HtlcAcceptedRevoke && - breachedOutput.witnessType != input.HtlcOfferedRevoke { - continue - } - - brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+ - "for ChannelPoint(%v)", breachedOutput.outpoint, + brarLog.Infof("Checking spend from %v(%v) for ChannelPoint(%v)", + breachedOutput.witnessType, breachedOutput.outpoint, breachInfo.chanPoint) // If we have already registered for a notification for this @@ -396,9 +393,12 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, if !ok { return } - brarLog.Debugf("Detected spend of HTLC(%v) "+ - "for ChannelPoint(%v)", - breachedOutput.outpoint, + + brarLog.Infof("Detected spend on %s(%v) by "+ + "txid(%v) for ChannelPoint(%v)", + inputs[index].witnessType, + inputs[index].outpoint, + sp.SpenderTxHash, breachInfo.chanPoint) // First we send the spend event on the @@ -431,21 +431,58 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, // channel have exited. We can therefore safely close the // channel before ranging over its content. close(allSpends) - for s := range allSpends { - breachedOutput := &breachInfo.breachedOutputs[s.index] - brarLog.Debugf("Detected second-level spend on "+ - "HTLC(%v) for ChannelPoint(%v)", - breachedOutput.outpoint, breachInfo.chanPoint) + doneOutputs := make(map[int]struct{}) + for s := range allSpends { + breachedOutput := &inputs[s.index] delete(spendNtfns, breachedOutput.outpoint) - // In this case we'll morph our initial revoke spend to - // instead point to the second level output, and update - // the sign descriptor in the process. - convertToSecondLevelRevoke( - breachedOutput, breachInfo, s.detail, - ) + switch breachedOutput.witnessType { + case input.HtlcAcceptedRevoke: + fallthrough + case input.HtlcOfferedRevoke: + brarLog.Infof("Spend on second-level"+ + "%s(%v) for ChannelPoint(%v) "+ + "transitions to second-level output", + breachedOutput.witnessType, + breachedOutput.outpoint, + breachInfo.chanPoint) + + // In this case we'll morph our initial revoke + // spend to instead point to the second level + // output, and update the sign descriptor in the + // process. + convertToSecondLevelRevoke( + breachedOutput, breachInfo, s.detail, + ) + + continue + } + + brarLog.Infof("Spend on %s(%v) for ChannelPoint(%v) "+ + "transitions output to terminal state, "+ + "removing input from justice transaction", + breachedOutput.witnessType, + breachedOutput.outpoint, breachInfo.chanPoint) + + doneOutputs[s.index] = struct{}{} } + + // Filter the inputs for which we can no longer proceed. + var nextIndex int + for i := range inputs { + if _, ok := doneOutputs[i]; ok { + continue + } + + inputs[nextIndex] = inputs[i] + nextIndex++ + } + + // Update our remaining set of outputs before continuing with + // another attempt at publication. + breachInfo.breachedOutputs = inputs[:nextIndex] + case <-b.quit: return errBrarShuttingDown } @@ -552,7 +589,24 @@ justiceTxBroadcast: return } - brarLog.Infof("Attempting another justice tx broadcast") + if len(breachInfo.breachedOutputs) == 0 { + brarLog.Debugf("No more outputs to sweep for "+ + "breach, marking ChannelPoint(%v) "+ + "fully resolved", breachInfo.chanPoint) + + err = b.cleanupBreach(&breachInfo.chanPoint) + if err != nil { + brarLog.Errorf("Failed to cleanup "+ + "breached ChannelPoint(%v): %v", + breachInfo.chanPoint, err) + } + return + } + + brarLog.Infof("Attempting another justice tx "+ + "with %d inputs", + len(breachInfo.breachedOutputs)) + goto justiceTxBroadcast } } @@ -602,19 +656,11 @@ justiceTxBroadcast: "have been claimed", breachInfo.chanPoint, revokedFunds, totalFunds) - // With the channel closed, mark it in the database as such. - err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint) + err = b.cleanupBreach(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("Unable to mark chan as closed: %v", err) - return - } - - // Justice has been carried out; we can safely delete the - // retribution info from the database. - err = b.cfg.Store.Remove(&breachInfo.chanPoint) - if err != nil { - brarLog.Errorf("Unable to remove retribution "+ - "from the db: %v", err) + brarLog.Errorf("Failed to cleanup breached "+ + "ChannelPoint(%v): %v", breachInfo.chanPoint, + err) } // TODO(roasbeef): add peer to blacklist? @@ -628,6 +674,26 @@ justiceTxBroadcast: } } +// cleanupBreach marks the given channel point as fully resolved and removes the +// retribution for that the channel from the retribution store. +func (b *breachArbiter) cleanupBreach(chanPoint *wire.OutPoint) error { + // With the channel closed, mark it in the database as such. + err := b.cfg.DB.MarkChanFullyClosed(chanPoint) + if err != nil { + return fmt.Errorf("unable to mark chan as closed: %v", err) + } + + // Justice has been carried out; we can safely delete the retribution + // info from the database. + err = b.cfg.Store.Remove(chanPoint) + if err != nil { + return fmt.Errorf("unable to remove retribution from db: %v", + err) + } + + return nil +} + // handleBreachHandoff handles a new breach event, by writing it to disk, then // notifies the breachArbiter contract observer goroutine that a channel's // contract has been breached by the prior counterparty. Once notified the From 6f06c304219fc6c8351c553238ca350ce7f062fc Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 19 Mar 2019 19:22:47 -0700 Subject: [PATCH 3/4] mock: support late registration of spend ntfns --- mock.go | 46 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/mock.go b/mock.go index 1eccbea6..6c1f3fb6 100644 --- a/mock.go +++ b/mock.go @@ -123,6 +123,7 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, type mockSpendNotifier struct { *mockNotfier spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail + spends map[wire.OutPoint]*chainntnfs.SpendDetail mtx sync.Mutex } @@ -132,6 +133,7 @@ func makeMockSpendNotifier() *mockSpendNotifier { confChannel: make(chan *chainntnfs.TxConfirmation), }, spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail), + spends: make(map[wire.OutPoint]*chainntnfs.SpendDetail), } } @@ -140,8 +142,22 @@ func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, m.mtx.Lock() defer m.mtx.Unlock() - spendChan := make(chan *chainntnfs.SpendDetail) - m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) + spendChan := make(chan *chainntnfs.SpendDetail, 1) + if detail, ok := m.spends[*outpoint]; ok { + // Deliver spend immediately if details are already known. + spendChan <- &chainntnfs.SpendDetail{ + SpentOutPoint: detail.SpentOutPoint, + SpendingHeight: detail.SpendingHeight, + SpendingTx: detail.SpendingTx, + SpenderTxHash: detail.SpenderTxHash, + SpenderInputIndex: detail.SpenderInputIndex, + } + } else { + // Otherwise, queue the notification for delivery if the spend + // is ever received. + m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) + } + return &chainntnfs.SpendEvent{ Spend: spendChan, Cancel: func() { @@ -156,16 +172,30 @@ func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, m.mtx.Lock() defer m.mtx.Unlock() + txnHash := txn.TxHash() + details := &chainntnfs.SpendDetail{ + SpentOutPoint: outpoint, + SpendingHeight: height, + SpendingTx: txn, + SpenderTxHash: &txnHash, + SpenderInputIndex: outpoint.Index, + } + + // Cache details in case of late registration. + if _, ok := m.spends[*outpoint]; !ok { + m.spends[*outpoint] = details + } + + // Deliver any backlogged spend notifications. if spendChans, ok := m.spendMap[*outpoint]; ok { delete(m.spendMap, *outpoint) for _, spendChan := range spendChans { - txnHash := txn.TxHash() spendChan <- &chainntnfs.SpendDetail{ - SpentOutPoint: outpoint, - SpendingHeight: height, - SpendingTx: txn, - SpenderTxHash: &txnHash, - SpenderInputIndex: outpoint.Index, + SpentOutPoint: details.SpentOutPoint, + SpendingHeight: details.SpendingHeight, + SpendingTx: details.SpendingTx, + SpenderTxHash: details.SpenderTxHash, + SpenderInputIndex: details.SpenderInputIndex, } } } From 9523b420bc4aac00974f285a1e5ca93a9524b8db Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 19 Mar 2019 19:22:59 -0700 Subject: [PATCH 4/4] breacharbiter_test: add table-driven breach spend tests --- breacharbiter_test.go | 370 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 333 insertions(+), 37 deletions(-) diff --git a/breacharbiter_test.go b/breacharbiter_test.go index dfd215b4..b3ae1693 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -29,6 +29,7 @@ import ( "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" @@ -1156,10 +1157,162 @@ func TestBreachHandoffFail(t *testing.T) { assertArbiterBreach(t, brar, chanPoint) } -// TestBreachSecondLevelTransfer tests that sweep of a HTLC output on a -// breached commitment is transferred to a second level spend if the output is -// already spent. -func TestBreachSecondLevelTransfer(t *testing.T) { +type publAssertion func(*testing.T, map[wire.OutPoint]*wire.MsgTx, + chan *wire.MsgTx) + +type breachTest struct { + name string + + // spend2ndLevel requests that second level htlcs be spent *again*, as + // if by a remote party or watchtower. The outpoint of the second level + // htlc is in effect "readded" to the set of inputs. + spend2ndLevel bool + + // sendFinalConf informs the test to send a confirmation for the justice + // transaction before asserting the arbiter is cleaned up. + sendFinalConf bool + + // whenNonZeroInputs is called after spending an input but there are + // further inputs to spend in the test. + whenNonZeroInputs publAssertion + + // whenZeroInputs is called after spending an input but there are no + // further inputs to spend in the test. + whenZeroInputs publAssertion +} + +var ( + // commitSpendTx is used to spend commitment outputs. + commitSpendTx = &wire.MsgTx{ + TxOut: []*wire.TxOut{ + {Value: 500000000}, + }, + } + // htlc2ndLevlTx is used to transition an htlc output on the commitment + // transaction to a second level htlc. + htlc2ndLevlTx = &wire.MsgTx{ + TxOut: []*wire.TxOut{ + {Value: 20000}, + }, + } + // htlcSpendTx is used to spend from a second level htlc. + htlcSpendTx = &wire.MsgTx{ + TxOut: []*wire.TxOut{ + {Value: 10000}, + }, + } +) + +var breachTests = []breachTest{ + { + name: "all spends", + spend2ndLevel: true, + whenNonZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + var tx *wire.MsgTx + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + // The justice transaction should have thee same number + // of inputs as we are tracking in the test. + if len(tx.TxIn) != len(inputs) { + t.Fatalf("expected justice txn to have %d "+ + "inputs, found %d", len(inputs), + len(tx.TxIn)) + } + + // Ensure that each input exists on the justice + // transaction. + for in := range inputs { + findInputIndex(t, in, tx) + } + + }, + whenZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + // Sanity check to ensure the brar doesn't try to + // broadcast another sweep, since all outputs have been + // spent externally. + select { + case <-publTx: + t.Fatalf("tx published unexpectedly") + case <-time.After(50 * time.Millisecond): + } + }, + }, + { + name: "commit spends, second level sweep", + spend2ndLevel: false, + sendFinalConf: true, + whenNonZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + select { + case <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + }, + whenZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + // Now a transaction attempting to spend from the second + // level tx should be published instead. Let this + // publish succeed by setting the publishing error to + // nil. + var tx *wire.MsgTx + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + // The commitment outputs should be gone, and there + // should only be a single htlc spend. + if len(tx.TxIn) != 1 { + t.Fatalf("expect 1 htlc output, found %d "+ + "outputs", len(tx.TxIn)) + } + + // The remaining TxIn previously attempting to spend + // the HTLC outpoint should now be spending from the + // second level tx. + // + // NOTE: Commitment outputs and htlc sweeps are spent + // with a different transactions (and thus txids), + // ensuring we aren't mistaking this for a different + // output type. + onlyInput := tx.TxIn[0].PreviousOutPoint.Hash + if onlyInput != htlc2ndLevlTx.TxHash() { + t.Fatalf("tx not attempting to spend second "+ + "level tx, %v", tx.TxIn[0]) + } + }, + }, +} + +// TestBreachSpends checks the behavior of the breach arbiter in response to +// spend events on a channels outputs by asserting that it properly removes or +// modifies the inputs from the justice txn. +func TestBreachSpends(t *testing.T) { + for _, test := range breachTests { + tc := test + t.Run(tc.name, func(t *testing.T) { + testBreachSpends(t, tc) + }) + } +} + +func testBreachSpends(t *testing.T, test breachTest) { brar, alice, _, bobClose, contractBreaches, cleanUpChans, cleanUpArb := initBreachedState(t) defer cleanUpChans() @@ -1171,12 +1324,16 @@ func TestBreachSecondLevelTransfer(t *testing.T) { chanPoint = alice.ChanPoint publTx = make(chan *wire.MsgTx) publErr error + publMtx sync.Mutex ) // Make PublishTransaction always return ErrDoubleSpend to begin with. publErr = lnwallet.ErrDoubleSpend brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error { publTx <- tx + + publMtx.Lock() + defer publMtx.Unlock() return publErr } @@ -1204,11 +1361,32 @@ func TestBreachSecondLevelTransfer(t *testing.T) { t.Fatalf("breach arbiter didn't send ack back") } + state := alice.State() + err = state.CloseChannel(&channeldb.ChannelCloseSummary{ + ChanPoint: state.FundingOutpoint, + ChainHash: state.ChainHash, + RemotePub: state.IdentityPub, + CloseType: channeldb.BreachClose, + Capacity: state.Capacity, + IsPending: true, + ShortChanID: state.ShortChanID(), + RemoteCurrentRevocation: state.RemoteCurrentRevocation, + RemoteNextRevocation: state.RemoteNextRevocation, + LocalChanConfig: state.LocalChanCfg, + }) + if err != nil { + t.Fatalf("unable to close channel: %v", err) + } + // After exiting, the breach arbiter should have persisted the // retribution information and the channel should be shown as pending // force closed. assertArbiterBreach(t, brar, chanPoint) + // Assert that the database sees the channel as pending close, otherwise + // the breach arbiter won't be able to fully close it. + assertPendingClosed(t, alice) + // Notify that the breaching transaction is confirmed, to trigger the // retribution logic. notifier := brar.cfg.Notifier.(*mockSpendNotifier) @@ -1224,47 +1402,93 @@ func TestBreachSecondLevelTransfer(t *testing.T) { t.Fatalf("tx was not published") } - if tx.TxIn[0].PreviousOutPoint.Hash != forceCloseTx.TxHash() { - t.Fatalf("tx not attempting to spend commitment") - } - - // Find the index of the TxIn spending the HTLC output. - htlcOutpoint := &retribution.HtlcRetributions[0].OutPoint - htlcIn := -1 - for i, txIn := range tx.TxIn { - if txIn.PreviousOutPoint == *htlcOutpoint { - htlcIn = i + // All outputs should initially spend from the force closed txn. + forceTxID := forceCloseTx.TxHash() + for _, txIn := range tx.TxIn { + if txIn.PreviousOutPoint.Hash != forceTxID { + t.Fatalf("og justice tx not spending commitment") } } - if htlcIn == -1 { - t.Fatalf("htlc in not found") + + localOutpoint := retribution.LocalOutpoint + remoteOutpoint := retribution.RemoteOutpoint + htlcOutpoint := retribution.HtlcRetributions[0].OutPoint + + // Construct a map from outpoint on the force close to the transaction + // we want it to be spent by. As the test progresses, this map will be + // updated to contain only the set of commitment or second level + // outpoints that remain to be spent. + inputs := map[wire.OutPoint]*wire.MsgTx{ + htlcOutpoint: htlc2ndLevlTx, + localOutpoint: commitSpendTx, + remoteOutpoint: commitSpendTx, } - // Since publishing the transaction failed above, the breach arbiter - // will attempt another second level check. Now notify that the htlc - // output is spent by a second level tx. - secondLvlTx := &wire.MsgTx{ - TxOut: []*wire.TxOut{ - {Value: 1}, - }, - } - notifier.Spend(htlcOutpoint, 2, secondLvlTx) + // Until no more inputs to spend remain, deliver the spend events and + // process the assertions prescribed by the test case. + for len(inputs) > 0 { + var ( + op wire.OutPoint + spendTx *wire.MsgTx + ) - // Now a transaction attempting to spend from the second level tx - // should be published instead. Let this publish succeed by setting the - // publishing error to nil. - publErr = nil - select { - case tx = <-publTx: - case <-time.After(5 * time.Second): - t.Fatalf("tx was not published") + // Pick an outpoint at random from the set of inputs. + for op, spendTx = range inputs { + delete(inputs, op) + break + } + + // Deliver the spend notification for the chosen transaction. + notifier.Spend(&op, 2, spendTx) + + // When the second layer transfer is detected, add back the + // outpoint of the second layer tx so that we can spend it + // again. Only do so if the test requests this behavior. + spendTxID := spendTx.TxHash() + if test.spend2ndLevel && spendTxID == htlc2ndLevlTx.TxHash() { + // Create the second level outpoint that will be spent, + // the index is always zero for these 1-in-1-out txns. + spendOp := wire.OutPoint{Hash: spendTxID} + inputs[spendOp] = htlcSpendTx + } + + if len(inputs) > 0 { + test.whenNonZeroInputs(t, inputs, publTx) + } else { + // Reset the publishing error so that any publication, + // made by the breach arbiter, if any, will succeed. + publMtx.Lock() + publErr = nil + publMtx.Unlock() + test.whenZeroInputs(t, inputs, publTx) + } } - // The TxIn previously attempting to spend the HTLC outpoint should now - // be spending from the second level tx. - if tx.TxIn[htlcIn].PreviousOutPoint.Hash != secondLvlTx.TxHash() { - t.Fatalf("tx not attempting to spend second level tx, %v", tx.TxIn[0]) + // Deliver confirmation of sweep if the test expects it. + if test.sendFinalConf { + notifier.confChannel <- &chainntnfs.TxConfirmation{} } + + // Assert that the channel is fully resolved. + assertBrarCleanup(t, brar, alice.ChanPoint, alice.State().Db) +} + +// findInputIndex returns the index of the input that spends from the given +// outpoint. This method fails if the outpoint is not found. +func findInputIndex(t *testing.T, op wire.OutPoint, tx *wire.MsgTx) int { + t.Helper() + + inputIdx := -1 + for i, txIn := range tx.TxIn { + if txIn.PreviousOutPoint == op { + inputIdx = i + } + } + if inputIdx == -1 { + t.Fatalf("input %v in not found", op) + } + + return inputIdx } // assertArbiterBreach checks that the breach arbiter has persisted the breach @@ -1272,6 +1496,8 @@ func TestBreachSecondLevelTransfer(t *testing.T) { func assertArbiterBreach(t *testing.T, brar *breachArbiter, chanPoint *wire.OutPoint) { + t.Helper() + isBreached, err := brar.IsBreached(chanPoint) if err != nil { t.Fatalf("unable to determine if channel is "+ @@ -1290,6 +1516,8 @@ func assertArbiterBreach(t *testing.T, brar *breachArbiter, func assertNoArbiterBreach(t *testing.T, brar *breachArbiter, chanPoint *wire.OutPoint) { + t.Helper() + isBreached, err := brar.IsBreached(chanPoint) if err != nil { t.Fatalf("unable to determine if channel is "+ @@ -1302,9 +1530,77 @@ func assertNoArbiterBreach(t *testing.T, brar *breachArbiter, } } +// assertBrarCleanup blocks until the given channel point has been removed the +// retribution store and the channel is fully closed in the database. +func assertBrarCleanup(t *testing.T, brar *breachArbiter, + chanPoint *wire.OutPoint, db *channeldb.DB) { + + t.Helper() + + err := lntest.WaitNoError(func() error { + isBreached, err := brar.IsBreached(chanPoint) + if err != nil { + return err + } + + if isBreached { + return fmt.Errorf("channel %v still breached", + chanPoint) + } + + closedChans, err := db.FetchClosedChannels(false) + if err != nil { + return err + } + + for _, channel := range closedChans { + switch { + // Wrong channel. + case channel.ChanPoint != *chanPoint: + continue + + // Right channel, fully closed! + case !channel.IsPending: + return nil + } + + // Still pending. + return fmt.Errorf("channel %v still pending "+ + "close", chanPoint) + } + + return fmt.Errorf("channel %v not closed", chanPoint) + + }, time.Second) + if err != nil { + t.Fatalf(err.Error()) + } +} + +// assertPendingClosed checks that the channel has been marked pending closed in +// the channel database. +func assertPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { + t.Helper() + + closedChans, err := c.State().Db.FetchClosedChannels(true) + if err != nil { + t.Fatalf("unable to load pending closed channels: %v", err) + } + + for _, chanSummary := range closedChans { + if chanSummary.ChanPoint == *c.ChanPoint { + return + } + } + + t.Fatalf("channel %v was not marked pending closed", c.ChanPoint) +} + // assertNotPendingClosed checks that the channel has not been marked pending // closed in the channel database. func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { + t.Helper() + closedChans, err := c.State().Db.FetchClosedChannels(true) if err != nil { t.Fatalf("unable to load pending closed channels: %v", err)