diff --git a/breacharbiter.go b/breacharbiter.go index fdd5cd16..78a4b802 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -538,17 +538,14 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, defer b.wg.Done() // TODO(roasbeef): state needs to be checkpointed here - var breachConfHeight uint32 select { - case breachConf, ok := <-confChan.Confirmed: + case _, ok := <-confChan.Confirmed: // If the second value is !ok, then the channel has been closed // signifying a daemon shutdown, so we exit. if !ok { return } - breachConfHeight = breachConf.BlockHeight - // Otherwise, if this is a real confirmation notification, then // we fall through to complete our duty. case <-b.quit: @@ -570,6 +567,10 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, return } + // Compute both the total value of funds being swept and the + // amount of funds that were revoked from the counter party. + var totalFunds, revokedFunds btcutil.Amount + // If this retribution has not been finalized before, we will first // construct a sweep transaction and write it to disk. This will allow // the breach arbiter to re-register for notifications for the justice @@ -605,30 +606,49 @@ justiceTxBroadcast: err = b.cfg.PublishTransaction(finalTx, label) if err != nil { brarLog.Errorf("Unable to broadcast justice tx: %v", err) + } - if err == lnwallet.ErrDoubleSpend { - // Broadcasting the transaction failed because of a - // conflict either in the mempool or in chain. We'll - // now create spend subscriptions for all HTLC outputs - // on the commitment transaction that could possibly - // have been spent, and wait for any of them to - // trigger. - brarLog.Infof("Waiting for a spend event before " + - "attempting to craft new justice tx.") - finalTx = nil + // Regardless of publication succeeded or not, we now wait for any of + // the inputs to be spent. If any input got spent by the remote, we + // must recreate our justice transaction. + var ( + spendChan = make(chan []spend, 1) + errChan = make(chan error, 1) + wg sync.WaitGroup + ) - spends, err := b.waitForSpendEvent( - breachInfo, spendNtfns, - ) - if err != nil { - if err != errBrarShuttingDown { - brarLog.Errorf("error waiting for "+ - "spend event: %v", err) - } - return + wg.Add(1) + go func() { + defer wg.Done() + + spends, err := b.waitForSpendEvent(breachInfo, spendNtfns) + if err != nil { + errChan <- err + return + } + spendChan <- spends + }() + +Loop: + for { + select { + case spends := <-spendChan: + // Print the funds swept by the txs. + for _, s := range spends { + tx := s.detail.SpendingTx + t, r := countRevokedFunds(breachInfo, tx) + totalFunds += t + revokedFunds += r } + brarLog.Infof("Justice for ChannelPoint(%v) has "+ + "been served, %v revoked funds (%v total) "+ + "have been claimed", breachInfo.chanPoint, + revokedFunds, totalFunds) + + // Update the breach info with the new spends. updateBreachInfo(breachInfo, spends) + if len(breachInfo.breachedOutputs) == 0 { brarLog.Debugf("No more outputs to sweep for "+ "breach, marking ChannelPoint(%v) "+ @@ -640,63 +660,36 @@ justiceTxBroadcast: "breached ChannelPoint(%v): %v", breachInfo.chanPoint, err) } - return + + // TODO(roasbeef): add peer to blacklist? + + // TODO(roasbeef): close other active channels with offending + // peer + break Loop } + finalTx = nil brarLog.Infof("Attempting another justice tx "+ "with %d inputs", len(breachInfo.breachedOutputs)) + wg.Wait() goto justiceTxBroadcast + + case err := <-errChan: + if err != errBrarShuttingDown { + brarLog.Errorf("error waiting for "+ + "spend event: %v", err) + } + break Loop + + case <-b.quit: + break Loop } } - // As a conclusionary step, we register for a notification to be - // dispatched once the justice tx is confirmed. After confirmation we - // notify the caller that initiated the retribution workflow that the - // deed has been done. - justiceTXID := finalTx.TxHash() - justiceScript := finalTx.TxOut[0].PkScript - confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( - &justiceTXID, justiceScript, 1, breachConfHeight, - ) - if err != nil { - brarLog.Errorf("Unable to register for conf for txid(%v): %v", - justiceTXID, err) - return - } - - select { - case _, ok := <-confChan.Confirmed: - if !ok { - return - } - - // Compute both the total value of funds being swept and the - // amount of funds that were revoked from the counter party. - totalFunds, revokedFunds := countRevokedFunds(breachInfo, finalTx) - - brarLog.Infof("Justice for ChannelPoint(%v) has "+ - "been served, %v revoked funds (%v total) "+ - "have been claimed", breachInfo.chanPoint, - revokedFunds, totalFunds) - - err = b.cleanupBreach(&breachInfo.chanPoint) - if err != nil { - brarLog.Errorf("Failed to cleanup breached "+ - "ChannelPoint(%v): %v", breachInfo.chanPoint, - err) - } - - // TODO(roasbeef): add peer to blacklist? - - // TODO(roasbeef): close other active channels with offending - // peer - return - - case <-b.quit: - return - } + // Wait for our go routine to exit. + wg.Wait() } // countRevokedFunds counts the total and revoked funds swept by our justice diff --git a/breacharbiter_test.go b/breacharbiter_test.go index d7818fe6..b84775cd 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -1222,7 +1222,7 @@ func TestBreachHandoffFail(t *testing.T) { } type publAssertion func(*testing.T, map[wire.OutPoint]struct{}, - chan *wire.MsgTx, chainhash.Hash) + chan *wire.MsgTx, chainhash.Hash) *wire.MsgTx type breachTest struct { name string @@ -1361,7 +1361,7 @@ var breachTests = []breachTest{ spend2ndLevel: true, whenNonZeroInputs: func(t *testing.T, inputs map[wire.OutPoint]struct{}, - publTx chan *wire.MsgTx, _ chainhash.Hash) { + publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx { var tx *wire.MsgTx select { @@ -1384,10 +1384,11 @@ var breachTests = []breachTest{ findInputIndex(t, in, tx) } + return tx }, whenZeroInputs: func(t *testing.T, inputs map[wire.OutPoint]struct{}, - publTx chan *wire.MsgTx, _ chainhash.Hash) { + publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx { // Sanity check to ensure the brar doesn't try to // broadcast another sweep, since all outputs have been @@ -1397,6 +1398,8 @@ var breachTests = []breachTest{ t.Fatalf("tx published unexpectedly") case <-time.After(50 * time.Millisecond): } + + return nil }, }, { @@ -1405,7 +1408,7 @@ var breachTests = []breachTest{ sendFinalConf: true, whenNonZeroInputs: func(t *testing.T, inputs map[wire.OutPoint]struct{}, - publTx chan *wire.MsgTx, _ chainhash.Hash) { + publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx { var tx *wire.MsgTx select { @@ -1428,11 +1431,12 @@ var breachTests = []breachTest{ findInputIndex(t, in, tx) } + return tx }, whenZeroInputs: func(t *testing.T, inputs map[wire.OutPoint]struct{}, publTx chan *wire.MsgTx, - htlc2ndLevlTxHash chainhash.Hash) { + htlc2ndLevlTxHash chainhash.Hash) *wire.MsgTx { // Now a transaction attempting to spend from the second // level tx should be published instead. Let this @@ -1465,6 +1469,8 @@ var breachTests = []breachTest{ t.Fatalf("tx not attempting to spend second "+ "level tx, %v", tx.TxIn[0]) } + + return tx }, }, { // nolint: dupl @@ -1474,7 +1480,7 @@ var breachTests = []breachTest{ sweepHtlc: true, whenNonZeroInputs: func(t *testing.T, inputs map[wire.OutPoint]struct{}, - publTx chan *wire.MsgTx, _ chainhash.Hash) { + publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx { var tx *wire.MsgTx select { @@ -1496,10 +1502,12 @@ var breachTests = []breachTest{ for in := range inputs { findInputIndex(t, in, tx) } + + return tx }, whenZeroInputs: func(t *testing.T, inputs map[wire.OutPoint]struct{}, - publTx chan *wire.MsgTx, _ chainhash.Hash) { + publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx { // Sanity check to ensure the brar doesn't try to // broadcast another sweep, since all outputs have been @@ -1509,6 +1517,8 @@ var breachTests = []breachTest{ t.Fatalf("tx published unexpectedly") case <-time.After(50 * time.Millisecond): } + + return nil }, }, } @@ -1567,7 +1577,11 @@ func testBreachSpends(t *testing.T, test breachTest) { }, BreachRetribution: retribution, } - contractBreaches <- breach + select { + case contractBreaches <- breach: + case <-time.After(15 * time.Second): + t.Fatalf("breach not delivered") + } // We'll also wait to consume the ACK back from the breach arbiter. select { @@ -1608,7 +1622,12 @@ func testBreachSpends(t *testing.T, test breachTest) { // Notify that the breaching transaction is confirmed, to trigger the // retribution logic. notifier := brar.cfg.Notifier.(*mock.SpendNotifier) - notifier.ConfChan <- &chainntnfs.TxConfirmation{} + + select { + case notifier.ConfChan <- &chainntnfs.TxConfirmation{}: + case <-time.After(15 * time.Second): + t.Fatalf("conf not delivered") + } // The breach arbiter should attempt to sweep all outputs on the // breached commitment. We'll pretend that the HTLC output has been @@ -1666,6 +1685,7 @@ func testBreachSpends(t *testing.T, test breachTest) { // Until no more inputs to spend remain, deliver the spend events and // process the assertions prescribed by the test case. + var justiceTx *wire.MsgTx for len(spentBy) > 0 { var ( op wire.OutPoint @@ -1705,20 +1725,25 @@ func testBreachSpends(t *testing.T, test breachTest) { } if len(spentBy) > 0 { - test.whenNonZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash()) + justiceTx = test.whenNonZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash()) } else { // Reset the publishing error so that any publication, // made by the breach arbiter, if any, will succeed. publMtx.Lock() publErr = nil publMtx.Unlock() - test.whenZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash()) + justiceTx = test.whenZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash()) } } - // Deliver confirmation of sweep if the test expects it. + // Deliver confirmation of sweep if the test expects it. Since we are + // looking for the final justice tx to confirme, we deliver a spend of + // all its inputs. if test.sendFinalConf { - notifier.ConfChan <- &chainntnfs.TxConfirmation{} + for _, txin := range justiceTx.TxIn { + op := txin.PreviousOutPoint + notifier.Spend(&op, 3, justiceTx) + } } // Assert that the channel is fully resolved.