breacharbiter: replace justice tx conf check with spend check

Since we want to potentially broadcast multiple versions of the justice
TX, instead of waiting for confirmation of a specific TXID, we instead
wait for the breached outputs to be spent.
This commit is contained in:
Johan T. Halseth 2021-04-20 15:42:23 +02:00
parent c3b2791158
commit 3be9b74694
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
2 changed files with 101 additions and 83 deletions

@ -538,17 +538,14 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
defer b.wg.Done() defer b.wg.Done()
// TODO(roasbeef): state needs to be checkpointed here // TODO(roasbeef): state needs to be checkpointed here
var breachConfHeight uint32
select { select {
case breachConf, ok := <-confChan.Confirmed: case _, ok := <-confChan.Confirmed:
// If the second value is !ok, then the channel has been closed // If the second value is !ok, then the channel has been closed
// signifying a daemon shutdown, so we exit. // signifying a daemon shutdown, so we exit.
if !ok { if !ok {
return return
} }
breachConfHeight = breachConf.BlockHeight
// Otherwise, if this is a real confirmation notification, then // Otherwise, if this is a real confirmation notification, then
// we fall through to complete our duty. // we fall through to complete our duty.
case <-b.quit: case <-b.quit:
@ -570,6 +567,10 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
return return
} }
// Compute both the total value of funds being swept and the
// amount of funds that were revoked from the counter party.
var totalFunds, revokedFunds btcutil.Amount
// If this retribution has not been finalized before, we will first // If this retribution has not been finalized before, we will first
// construct a sweep transaction and write it to disk. This will allow // construct a sweep transaction and write it to disk. This will allow
// the breach arbiter to re-register for notifications for the justice // the breach arbiter to re-register for notifications for the justice
@ -605,30 +606,49 @@ justiceTxBroadcast:
err = b.cfg.PublishTransaction(finalTx, label) err = b.cfg.PublishTransaction(finalTx, label)
if err != nil { if err != nil {
brarLog.Errorf("Unable to broadcast justice tx: %v", err) brarLog.Errorf("Unable to broadcast justice tx: %v", err)
}
if err == lnwallet.ErrDoubleSpend { // Regardless of publication succeeded or not, we now wait for any of
// Broadcasting the transaction failed because of a // the inputs to be spent. If any input got spent by the remote, we
// conflict either in the mempool or in chain. We'll // must recreate our justice transaction.
// now create spend subscriptions for all HTLC outputs var (
// on the commitment transaction that could possibly spendChan = make(chan []spend, 1)
// have been spent, and wait for any of them to errChan = make(chan error, 1)
// trigger. wg sync.WaitGroup
brarLog.Infof("Waiting for a spend event before " + )
"attempting to craft new justice tx.")
finalTx = nil
spends, err := b.waitForSpendEvent( wg.Add(1)
breachInfo, spendNtfns, go func() {
) defer wg.Done()
if err != nil {
if err != errBrarShuttingDown { spends, err := b.waitForSpendEvent(breachInfo, spendNtfns)
brarLog.Errorf("error waiting for "+ if err != nil {
"spend event: %v", err) errChan <- err
} return
return }
spendChan <- spends
}()
Loop:
for {
select {
case spends := <-spendChan:
// Print the funds swept by the txs.
for _, s := range spends {
tx := s.detail.SpendingTx
t, r := countRevokedFunds(breachInfo, tx)
totalFunds += t
revokedFunds += r
} }
brarLog.Infof("Justice for ChannelPoint(%v) has "+
"been served, %v revoked funds (%v total) "+
"have been claimed", breachInfo.chanPoint,
revokedFunds, totalFunds)
// Update the breach info with the new spends.
updateBreachInfo(breachInfo, spends) updateBreachInfo(breachInfo, spends)
if len(breachInfo.breachedOutputs) == 0 { if len(breachInfo.breachedOutputs) == 0 {
brarLog.Debugf("No more outputs to sweep for "+ brarLog.Debugf("No more outputs to sweep for "+
"breach, marking ChannelPoint(%v) "+ "breach, marking ChannelPoint(%v) "+
@ -640,63 +660,36 @@ justiceTxBroadcast:
"breached ChannelPoint(%v): %v", "breached ChannelPoint(%v): %v",
breachInfo.chanPoint, err) breachInfo.chanPoint, err)
} }
return
// TODO(roasbeef): add peer to blacklist?
// TODO(roasbeef): close other active channels with offending
// peer
break Loop
} }
finalTx = nil
brarLog.Infof("Attempting another justice tx "+ brarLog.Infof("Attempting another justice tx "+
"with %d inputs", "with %d inputs",
len(breachInfo.breachedOutputs)) len(breachInfo.breachedOutputs))
wg.Wait()
goto justiceTxBroadcast goto justiceTxBroadcast
case err := <-errChan:
if err != errBrarShuttingDown {
brarLog.Errorf("error waiting for "+
"spend event: %v", err)
}
break Loop
case <-b.quit:
break Loop
} }
} }
// As a conclusionary step, we register for a notification to be // Wait for our go routine to exit.
// dispatched once the justice tx is confirmed. After confirmation we wg.Wait()
// notify the caller that initiated the retribution workflow that the
// deed has been done.
justiceTXID := finalTx.TxHash()
justiceScript := finalTx.TxOut[0].PkScript
confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn(
&justiceTXID, justiceScript, 1, breachConfHeight,
)
if err != nil {
brarLog.Errorf("Unable to register for conf for txid(%v): %v",
justiceTXID, err)
return
}
select {
case _, ok := <-confChan.Confirmed:
if !ok {
return
}
// Compute both the total value of funds being swept and the
// amount of funds that were revoked from the counter party.
totalFunds, revokedFunds := countRevokedFunds(breachInfo, finalTx)
brarLog.Infof("Justice for ChannelPoint(%v) has "+
"been served, %v revoked funds (%v total) "+
"have been claimed", breachInfo.chanPoint,
revokedFunds, totalFunds)
err = b.cleanupBreach(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("Failed to cleanup breached "+
"ChannelPoint(%v): %v", breachInfo.chanPoint,
err)
}
// TODO(roasbeef): add peer to blacklist?
// TODO(roasbeef): close other active channels with offending
// peer
return
case <-b.quit:
return
}
} }
// countRevokedFunds counts the total and revoked funds swept by our justice // countRevokedFunds counts the total and revoked funds swept by our justice

@ -1222,7 +1222,7 @@ func TestBreachHandoffFail(t *testing.T) {
} }
type publAssertion func(*testing.T, map[wire.OutPoint]struct{}, type publAssertion func(*testing.T, map[wire.OutPoint]struct{},
chan *wire.MsgTx, chainhash.Hash) chan *wire.MsgTx, chainhash.Hash) *wire.MsgTx
type breachTest struct { type breachTest struct {
name string name string
@ -1361,7 +1361,7 @@ var breachTests = []breachTest{
spend2ndLevel: true, spend2ndLevel: true,
whenNonZeroInputs: func(t *testing.T, whenNonZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]struct{}, inputs map[wire.OutPoint]struct{},
publTx chan *wire.MsgTx, _ chainhash.Hash) { publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx {
var tx *wire.MsgTx var tx *wire.MsgTx
select { select {
@ -1384,10 +1384,11 @@ var breachTests = []breachTest{
findInputIndex(t, in, tx) findInputIndex(t, in, tx)
} }
return tx
}, },
whenZeroInputs: func(t *testing.T, whenZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]struct{}, inputs map[wire.OutPoint]struct{},
publTx chan *wire.MsgTx, _ chainhash.Hash) { publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx {
// Sanity check to ensure the brar doesn't try to // Sanity check to ensure the brar doesn't try to
// broadcast another sweep, since all outputs have been // broadcast another sweep, since all outputs have been
@ -1397,6 +1398,8 @@ var breachTests = []breachTest{
t.Fatalf("tx published unexpectedly") t.Fatalf("tx published unexpectedly")
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
} }
return nil
}, },
}, },
{ {
@ -1405,7 +1408,7 @@ var breachTests = []breachTest{
sendFinalConf: true, sendFinalConf: true,
whenNonZeroInputs: func(t *testing.T, whenNonZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]struct{}, inputs map[wire.OutPoint]struct{},
publTx chan *wire.MsgTx, _ chainhash.Hash) { publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx {
var tx *wire.MsgTx var tx *wire.MsgTx
select { select {
@ -1428,11 +1431,12 @@ var breachTests = []breachTest{
findInputIndex(t, in, tx) findInputIndex(t, in, tx)
} }
return tx
}, },
whenZeroInputs: func(t *testing.T, whenZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]struct{}, inputs map[wire.OutPoint]struct{},
publTx chan *wire.MsgTx, publTx chan *wire.MsgTx,
htlc2ndLevlTxHash chainhash.Hash) { htlc2ndLevlTxHash chainhash.Hash) *wire.MsgTx {
// Now a transaction attempting to spend from the second // Now a transaction attempting to spend from the second
// level tx should be published instead. Let this // level tx should be published instead. Let this
@ -1465,6 +1469,8 @@ var breachTests = []breachTest{
t.Fatalf("tx not attempting to spend second "+ t.Fatalf("tx not attempting to spend second "+
"level tx, %v", tx.TxIn[0]) "level tx, %v", tx.TxIn[0])
} }
return tx
}, },
}, },
{ // nolint: dupl { // nolint: dupl
@ -1474,7 +1480,7 @@ var breachTests = []breachTest{
sweepHtlc: true, sweepHtlc: true,
whenNonZeroInputs: func(t *testing.T, whenNonZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]struct{}, inputs map[wire.OutPoint]struct{},
publTx chan *wire.MsgTx, _ chainhash.Hash) { publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx {
var tx *wire.MsgTx var tx *wire.MsgTx
select { select {
@ -1496,10 +1502,12 @@ var breachTests = []breachTest{
for in := range inputs { for in := range inputs {
findInputIndex(t, in, tx) findInputIndex(t, in, tx)
} }
return tx
}, },
whenZeroInputs: func(t *testing.T, whenZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]struct{}, inputs map[wire.OutPoint]struct{},
publTx chan *wire.MsgTx, _ chainhash.Hash) { publTx chan *wire.MsgTx, _ chainhash.Hash) *wire.MsgTx {
// Sanity check to ensure the brar doesn't try to // Sanity check to ensure the brar doesn't try to
// broadcast another sweep, since all outputs have been // broadcast another sweep, since all outputs have been
@ -1509,6 +1517,8 @@ var breachTests = []breachTest{
t.Fatalf("tx published unexpectedly") t.Fatalf("tx published unexpectedly")
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
} }
return nil
}, },
}, },
} }
@ -1567,7 +1577,11 @@ func testBreachSpends(t *testing.T, test breachTest) {
}, },
BreachRetribution: retribution, BreachRetribution: retribution,
} }
contractBreaches <- breach select {
case contractBreaches <- breach:
case <-time.After(15 * time.Second):
t.Fatalf("breach not delivered")
}
// We'll also wait to consume the ACK back from the breach arbiter. // We'll also wait to consume the ACK back from the breach arbiter.
select { select {
@ -1608,7 +1622,12 @@ func testBreachSpends(t *testing.T, test breachTest) {
// Notify that the breaching transaction is confirmed, to trigger the // Notify that the breaching transaction is confirmed, to trigger the
// retribution logic. // retribution logic.
notifier := brar.cfg.Notifier.(*mock.SpendNotifier) notifier := brar.cfg.Notifier.(*mock.SpendNotifier)
notifier.ConfChan <- &chainntnfs.TxConfirmation{}
select {
case notifier.ConfChan <- &chainntnfs.TxConfirmation{}:
case <-time.After(15 * time.Second):
t.Fatalf("conf not delivered")
}
// The breach arbiter should attempt to sweep all outputs on the // The breach arbiter should attempt to sweep all outputs on the
// breached commitment. We'll pretend that the HTLC output has been // breached commitment. We'll pretend that the HTLC output has been
@ -1666,6 +1685,7 @@ func testBreachSpends(t *testing.T, test breachTest) {
// Until no more inputs to spend remain, deliver the spend events and // Until no more inputs to spend remain, deliver the spend events and
// process the assertions prescribed by the test case. // process the assertions prescribed by the test case.
var justiceTx *wire.MsgTx
for len(spentBy) > 0 { for len(spentBy) > 0 {
var ( var (
op wire.OutPoint op wire.OutPoint
@ -1705,20 +1725,25 @@ func testBreachSpends(t *testing.T, test breachTest) {
} }
if len(spentBy) > 0 { if len(spentBy) > 0 {
test.whenNonZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash()) justiceTx = test.whenNonZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash())
} else { } else {
// Reset the publishing error so that any publication, // Reset the publishing error so that any publication,
// made by the breach arbiter, if any, will succeed. // made by the breach arbiter, if any, will succeed.
publMtx.Lock() publMtx.Lock()
publErr = nil publErr = nil
publMtx.Unlock() publMtx.Unlock()
test.whenZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash()) justiceTx = test.whenZeroInputs(t, inputsToSweep, publTx, htlc2ndLevlTx.TxHash())
} }
} }
// Deliver confirmation of sweep if the test expects it. // Deliver confirmation of sweep if the test expects it. Since we are
// looking for the final justice tx to confirme, we deliver a spend of
// all its inputs.
if test.sendFinalConf { if test.sendFinalConf {
notifier.ConfChan <- &chainntnfs.TxConfirmation{} for _, txin := range justiceTx.TxIn {
op := txin.PreviousOutPoint
notifier.Spend(&op, 3, justiceTx)
}
} }
// Assert that the channel is fully resolved. // Assert that the channel is fully resolved.