Merge pull request #2765 from cfromknecht/brar-handle-tower-sweep

breacharbiter: watch for spends on all inputs until reaching terminal states
This commit is contained in:
Olaoluwa Osuntokun 2019-03-21 20:10:24 -07:00 committed by GitHub
commit 5ee40df54c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 490 additions and 98 deletions

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"io" "io"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -161,7 +162,7 @@ func (b *breachArbiter) Start() error {
// retribution store. // retribution store.
closedChans, err := b.cfg.DB.FetchClosedChannels(false) closedChans, err := b.cfg.DB.FetchClosedChannels(false)
if err != nil { if err != nil {
brarLog.Errorf("unable to fetch closing channels: %v", err) brarLog.Errorf("Unable to fetch closing channels: %v", err)
return err return err
} }
@ -181,7 +182,7 @@ func (b *breachArbiter) Start() error {
chanPoint := &chanSummary.ChanPoint chanPoint := &chanSummary.ChanPoint
if _, ok := breachRetInfos[*chanPoint]; ok { if _, ok := breachRetInfos[*chanPoint]; ok {
if err := b.cfg.Store.Remove(chanPoint); err != nil { if err := b.cfg.Store.Remove(chanPoint); err != nil {
brarLog.Errorf("unable to remove closed "+ brarLog.Errorf("Unable to remove closed "+
"chanid=%v from breach arbiter: %v", "chanid=%v from breach arbiter: %v",
chanPoint, err) chanPoint, err)
return err return err
@ -203,7 +204,7 @@ func (b *breachArbiter) Start() error {
&breachTXID, breachScript, 1, retInfo.breachHeight, &breachTXID, breachScript, 1, retInfo.breachHeight,
) )
if err != nil { if err != nil {
brarLog.Errorf("unable to register for conf updates "+ brarLog.Errorf("Unable to register for conf updates "+
"for txid: %v, err: %v", breachTXID, err) "for txid: %v, err: %v", breachTXID, err)
return err return err
} }
@ -320,6 +321,8 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo,
func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error { spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error {
inputs := breachInfo.breachedOutputs
// spend is used to wrap the index of the output that gets spent // spend is used to wrap the index of the output that gets spent
// together with the spend details. // together with the spend details.
type spend struct { type spend struct {
@ -330,11 +333,11 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
// We create a channel the first goroutine that gets a spend event can // We create a channel the first goroutine that gets a spend event can
// signal. We make it buffered in case multiple spend events come in at // signal. We make it buffered in case multiple spend events come in at
// the same time. // the same time.
anySpend := make(chan struct{}, len(breachInfo.breachedOutputs)) anySpend := make(chan struct{}, len(inputs))
// The allSpends channel will be used to pass spend events from all the // The allSpends channel will be used to pass spend events from all the
// goroutines that detects a spend before they are signalled to exit. // goroutines that detects a spend before they are signalled to exit.
allSpends := make(chan spend, len(breachInfo.breachedOutputs)) allSpends := make(chan spend, len(inputs))
// exit will be used to signal the goroutines that they can exit. // exit will be used to signal the goroutines that they can exit.
exit := make(chan struct{}) exit := make(chan struct{})
@ -342,17 +345,11 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
// We'll now launch a goroutine for each of the HTLC outputs, that will // We'll now launch a goroutine for each of the HTLC outputs, that will
// signal the moment they detect a spend event. // signal the moment they detect a spend event.
for i := 0; i < len(breachInfo.breachedOutputs); i++ { for i := range inputs {
breachedOutput := &breachInfo.breachedOutputs[i] breachedOutput := &inputs[i]
// If this isn't an HTLC output, then we can skip it. brarLog.Infof("Checking spend from %v(%v) for ChannelPoint(%v)",
if breachedOutput.witnessType != input.HtlcAcceptedRevoke && breachedOutput.witnessType, breachedOutput.outpoint,
breachedOutput.witnessType != input.HtlcOfferedRevoke {
continue
}
brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+
"for ChannelPoint(%v)", breachedOutput.outpoint,
breachInfo.chanPoint) breachInfo.chanPoint)
// If we have already registered for a notification for this // If we have already registered for a notification for this
@ -366,8 +363,8 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
breachInfo.breachHeight, breachInfo.breachHeight,
) )
if err != nil { if err != nil {
brarLog.Errorf("unable to check for spentness "+ brarLog.Errorf("Unable to check for spentness "+
"of out_point=%v: %v", "of outpoint=%v: %v",
breachedOutput.outpoint, err) breachedOutput.outpoint, err)
// Registration may have failed if we've been // Registration may have failed if we've been
@ -396,9 +393,12 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
if !ok { if !ok {
return return
} }
brarLog.Debugf("Detected spend of HTLC(%v) "+
"for ChannelPoint(%v)", brarLog.Infof("Detected spend on %s(%v) by "+
breachedOutput.outpoint, "txid(%v) for ChannelPoint(%v)",
inputs[index].witnessType,
inputs[index].outpoint,
sp.SpenderTxHash,
breachInfo.chanPoint) breachInfo.chanPoint)
// First we send the spend event on the // First we send the spend event on the
@ -431,21 +431,58 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo,
// channel have exited. We can therefore safely close the // channel have exited. We can therefore safely close the
// channel before ranging over its content. // channel before ranging over its content.
close(allSpends) close(allSpends)
for s := range allSpends {
breachedOutput := &breachInfo.breachedOutputs[s.index]
brarLog.Debugf("Detected second-level spend on "+
"HTLC(%v) for ChannelPoint(%v)",
breachedOutput.outpoint, breachInfo.chanPoint)
doneOutputs := make(map[int]struct{})
for s := range allSpends {
breachedOutput := &inputs[s.index]
delete(spendNtfns, breachedOutput.outpoint) delete(spendNtfns, breachedOutput.outpoint)
// In this case we'll morph our initial revoke spend to switch breachedOutput.witnessType {
// instead point to the second level output, and update case input.HtlcAcceptedRevoke:
// the sign descriptor in the process. fallthrough
convertToSecondLevelRevoke( case input.HtlcOfferedRevoke:
breachedOutput, breachInfo, s.detail, brarLog.Infof("Spend on second-level"+
) "%s(%v) for ChannelPoint(%v) "+
"transitions to second-level output",
breachedOutput.witnessType,
breachedOutput.outpoint,
breachInfo.chanPoint)
// In this case we'll morph our initial revoke
// spend to instead point to the second level
// output, and update the sign descriptor in the
// process.
convertToSecondLevelRevoke(
breachedOutput, breachInfo, s.detail,
)
continue
}
brarLog.Infof("Spend on %s(%v) for ChannelPoint(%v) "+
"transitions output to terminal state, "+
"removing input from justice transaction",
breachedOutput.witnessType,
breachedOutput.outpoint, breachInfo.chanPoint)
doneOutputs[s.index] = struct{}{}
} }
// Filter the inputs for which we can no longer proceed.
var nextIndex int
for i := range inputs {
if _, ok := doneOutputs[i]; ok {
continue
}
inputs[nextIndex] = inputs[i]
nextIndex++
}
// Update our remaining set of outputs before continuing with
// another attempt at publication.
breachInfo.breachedOutputs = inputs[:nextIndex]
case <-b.quit: case <-b.quit:
return errBrarShuttingDown return errBrarShuttingDown
} }
@ -492,7 +529,7 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent,
finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint)
if err != nil { if err != nil {
brarLog.Errorf("unable to get finalized txn for"+ brarLog.Errorf("Unable to get finalized txn for"+
"chanid=%v: %v", &breachInfo.chanPoint, err) "chanid=%v: %v", &breachInfo.chanPoint, err)
return return
} }
@ -508,7 +545,7 @@ justiceTxBroadcast:
// channel. // channel.
finalTx, err = b.createJusticeTx(breachInfo) finalTx, err = b.createJusticeTx(breachInfo)
if err != nil { if err != nil {
brarLog.Errorf("unable to create justice tx: %v", err) brarLog.Errorf("Unable to create justice tx: %v", err)
return return
} }
@ -516,7 +553,7 @@ justiceTxBroadcast:
// attempt to broadcast. // attempt to broadcast.
err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx) err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx)
if err != nil { if err != nil {
brarLog.Errorf("unable to finalize justice tx for "+ brarLog.Errorf("Unable to finalize justice tx for "+
"chanid=%v: %v", &breachInfo.chanPoint, err) "chanid=%v: %v", &breachInfo.chanPoint, err)
return return
} }
@ -530,7 +567,7 @@ justiceTxBroadcast:
// channel's retribution against the cheating counter party. // channel's retribution against the cheating counter party.
err = b.cfg.PublishTransaction(finalTx) err = b.cfg.PublishTransaction(finalTx)
if err != nil { if err != nil {
brarLog.Errorf("unable to broadcast justice tx: %v", err) brarLog.Errorf("Unable to broadcast justice tx: %v", err)
if err == lnwallet.ErrDoubleSpend { if err == lnwallet.ErrDoubleSpend {
// Broadcasting the transaction failed because of a // Broadcasting the transaction failed because of a
@ -552,7 +589,24 @@ justiceTxBroadcast:
return return
} }
brarLog.Infof("Attempting another justice tx broadcast") if len(breachInfo.breachedOutputs) == 0 {
brarLog.Debugf("No more outputs to sweep for "+
"breach, marking ChannelPoint(%v) "+
"fully resolved", breachInfo.chanPoint)
err = b.cleanupBreach(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("Failed to cleanup "+
"breached ChannelPoint(%v): %v",
breachInfo.chanPoint, err)
}
return
}
brarLog.Infof("Attempting another justice tx "+
"with %d inputs",
len(breachInfo.breachedOutputs))
goto justiceTxBroadcast goto justiceTxBroadcast
} }
} }
@ -567,7 +621,7 @@ justiceTxBroadcast:
&justiceTXID, justiceScript, 1, breachConfHeight, &justiceTXID, justiceScript, 1, breachConfHeight,
) )
if err != nil { if err != nil {
brarLog.Errorf("unable to register for conf for txid(%v): %v", brarLog.Errorf("Unable to register for conf for txid(%v): %v",
justiceTXID, err) justiceTXID, err)
return return
} }
@ -602,19 +656,11 @@ justiceTxBroadcast:
"have been claimed", breachInfo.chanPoint, "have been claimed", breachInfo.chanPoint,
revokedFunds, totalFunds) revokedFunds, totalFunds)
// With the channel closed, mark it in the database as such. err = b.cleanupBreach(&breachInfo.chanPoint)
err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint)
if err != nil { if err != nil {
brarLog.Errorf("unable to mark chan as closed: %v", err) brarLog.Errorf("Failed to cleanup breached "+
return "ChannelPoint(%v): %v", breachInfo.chanPoint,
} err)
// Justice has been carried out; we can safely delete the
// retribution info from the database.
err = b.cfg.Store.Remove(&breachInfo.chanPoint)
if err != nil {
brarLog.Errorf("unable to remove retribution "+
"from the db: %v", err)
} }
// TODO(roasbeef): add peer to blacklist? // TODO(roasbeef): add peer to blacklist?
@ -628,6 +674,26 @@ justiceTxBroadcast:
} }
} }
// cleanupBreach marks the given channel point as fully resolved and removes the
// retribution for that the channel from the retribution store.
func (b *breachArbiter) cleanupBreach(chanPoint *wire.OutPoint) error {
// With the channel closed, mark it in the database as such.
err := b.cfg.DB.MarkChanFullyClosed(chanPoint)
if err != nil {
return fmt.Errorf("unable to mark chan as closed: %v", err)
}
// Justice has been carried out; we can safely delete the retribution
// info from the database.
err = b.cfg.Store.Remove(chanPoint)
if err != nil {
return fmt.Errorf("unable to remove retribution from db: %v",
err)
}
return nil
}
// handleBreachHandoff handles a new breach event, by writing it to disk, then // handleBreachHandoff handles a new breach event, by writing it to disk, then
// notifies the breachArbiter contract observer goroutine that a channel's // notifies the breachArbiter contract observer goroutine that a channel's
// contract has been breached by the prior counterparty. Once notified the // contract has been breached by the prior counterparty. Once notified the
@ -672,7 +738,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
breached, err := b.cfg.Store.IsBreached(&chanPoint) breached, err := b.cfg.Store.IsBreached(&chanPoint)
if err != nil { if err != nil {
b.Unlock() b.Unlock()
brarLog.Errorf("unable to check breach info in DB: %v", err) brarLog.Errorf("Unable to check breach info in DB: %v", err)
select { select {
case breachEvent.ProcessACK <- err: case breachEvent.ProcessACK <- err:
@ -703,7 +769,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
err = b.cfg.Store.Add(retInfo) err = b.cfg.Store.Add(retInfo)
b.Unlock() b.Unlock()
if err != nil { if err != nil {
brarLog.Errorf("unable to persist retribution "+ brarLog.Errorf("Unable to persist retribution "+
"info to db: %v", err) "info to db: %v", err)
} }
@ -733,7 +799,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) {
breachTXID, breachScript, 1, retInfo.breachHeight, breachTXID, breachScript, 1, retInfo.breachHeight,
) )
if err != nil { if err != nil {
brarLog.Errorf("unable to register for conf updates for "+ brarLog.Errorf("Unable to register for conf updates for "+
"txid: %v, err: %v", breachTXID, err) "txid: %v, err: %v", breachTXID, err)
return return
} }
@ -1286,7 +1352,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error {
// to remove a finalized retribution state that is not already // to remove a finalized retribution state that is not already
// stored in the db. // stored in the db.
if retBucket == nil { if retBucket == nil {
return errors.New("unable to remove retribution " + return errors.New("Unable to remove retribution " +
"because the retribution bucket doesn't exist.") "because the retribution bucket doesn't exist.")
} }

@ -29,6 +29,7 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/shachain" "github.com/lightningnetwork/lnd/shachain"
@ -1156,10 +1157,162 @@ func TestBreachHandoffFail(t *testing.T) {
assertArbiterBreach(t, brar, chanPoint) assertArbiterBreach(t, brar, chanPoint)
} }
// TestBreachSecondLevelTransfer tests that sweep of a HTLC output on a type publAssertion func(*testing.T, map[wire.OutPoint]*wire.MsgTx,
// breached commitment is transferred to a second level spend if the output is chan *wire.MsgTx)
// already spent.
func TestBreachSecondLevelTransfer(t *testing.T) { type breachTest struct {
name string
// spend2ndLevel requests that second level htlcs be spent *again*, as
// if by a remote party or watchtower. The outpoint of the second level
// htlc is in effect "readded" to the set of inputs.
spend2ndLevel bool
// sendFinalConf informs the test to send a confirmation for the justice
// transaction before asserting the arbiter is cleaned up.
sendFinalConf bool
// whenNonZeroInputs is called after spending an input but there are
// further inputs to spend in the test.
whenNonZeroInputs publAssertion
// whenZeroInputs is called after spending an input but there are no
// further inputs to spend in the test.
whenZeroInputs publAssertion
}
var (
// commitSpendTx is used to spend commitment outputs.
commitSpendTx = &wire.MsgTx{
TxOut: []*wire.TxOut{
{Value: 500000000},
},
}
// htlc2ndLevlTx is used to transition an htlc output on the commitment
// transaction to a second level htlc.
htlc2ndLevlTx = &wire.MsgTx{
TxOut: []*wire.TxOut{
{Value: 20000},
},
}
// htlcSpendTx is used to spend from a second level htlc.
htlcSpendTx = &wire.MsgTx{
TxOut: []*wire.TxOut{
{Value: 10000},
},
}
)
var breachTests = []breachTest{
{
name: "all spends",
spend2ndLevel: true,
whenNonZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]*wire.MsgTx,
publTx chan *wire.MsgTx) {
var tx *wire.MsgTx
select {
case tx = <-publTx:
case <-time.After(5 * time.Second):
t.Fatalf("tx was not published")
}
// The justice transaction should have thee same number
// of inputs as we are tracking in the test.
if len(tx.TxIn) != len(inputs) {
t.Fatalf("expected justice txn to have %d "+
"inputs, found %d", len(inputs),
len(tx.TxIn))
}
// Ensure that each input exists on the justice
// transaction.
for in := range inputs {
findInputIndex(t, in, tx)
}
},
whenZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]*wire.MsgTx,
publTx chan *wire.MsgTx) {
// Sanity check to ensure the brar doesn't try to
// broadcast another sweep, since all outputs have been
// spent externally.
select {
case <-publTx:
t.Fatalf("tx published unexpectedly")
case <-time.After(50 * time.Millisecond):
}
},
},
{
name: "commit spends, second level sweep",
spend2ndLevel: false,
sendFinalConf: true,
whenNonZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]*wire.MsgTx,
publTx chan *wire.MsgTx) {
select {
case <-publTx:
case <-time.After(5 * time.Second):
t.Fatalf("tx was not published")
}
},
whenZeroInputs: func(t *testing.T,
inputs map[wire.OutPoint]*wire.MsgTx,
publTx chan *wire.MsgTx) {
// Now a transaction attempting to spend from the second
// level tx should be published instead. Let this
// publish succeed by setting the publishing error to
// nil.
var tx *wire.MsgTx
select {
case tx = <-publTx:
case <-time.After(5 * time.Second):
t.Fatalf("tx was not published")
}
// The commitment outputs should be gone, and there
// should only be a single htlc spend.
if len(tx.TxIn) != 1 {
t.Fatalf("expect 1 htlc output, found %d "+
"outputs", len(tx.TxIn))
}
// The remaining TxIn previously attempting to spend
// the HTLC outpoint should now be spending from the
// second level tx.
//
// NOTE: Commitment outputs and htlc sweeps are spent
// with a different transactions (and thus txids),
// ensuring we aren't mistaking this for a different
// output type.
onlyInput := tx.TxIn[0].PreviousOutPoint.Hash
if onlyInput != htlc2ndLevlTx.TxHash() {
t.Fatalf("tx not attempting to spend second "+
"level tx, %v", tx.TxIn[0])
}
},
},
}
// TestBreachSpends checks the behavior of the breach arbiter in response to
// spend events on a channels outputs by asserting that it properly removes or
// modifies the inputs from the justice txn.
func TestBreachSpends(t *testing.T) {
for _, test := range breachTests {
tc := test
t.Run(tc.name, func(t *testing.T) {
testBreachSpends(t, tc)
})
}
}
func testBreachSpends(t *testing.T, test breachTest) {
brar, alice, _, bobClose, contractBreaches, brar, alice, _, bobClose, contractBreaches,
cleanUpChans, cleanUpArb := initBreachedState(t) cleanUpChans, cleanUpArb := initBreachedState(t)
defer cleanUpChans() defer cleanUpChans()
@ -1171,12 +1324,16 @@ func TestBreachSecondLevelTransfer(t *testing.T) {
chanPoint = alice.ChanPoint chanPoint = alice.ChanPoint
publTx = make(chan *wire.MsgTx) publTx = make(chan *wire.MsgTx)
publErr error publErr error
publMtx sync.Mutex
) )
// Make PublishTransaction always return ErrDoubleSpend to begin with. // Make PublishTransaction always return ErrDoubleSpend to begin with.
publErr = lnwallet.ErrDoubleSpend publErr = lnwallet.ErrDoubleSpend
brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error { brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error {
publTx <- tx publTx <- tx
publMtx.Lock()
defer publMtx.Unlock()
return publErr return publErr
} }
@ -1204,11 +1361,32 @@ func TestBreachSecondLevelTransfer(t *testing.T) {
t.Fatalf("breach arbiter didn't send ack back") t.Fatalf("breach arbiter didn't send ack back")
} }
state := alice.State()
err = state.CloseChannel(&channeldb.ChannelCloseSummary{
ChanPoint: state.FundingOutpoint,
ChainHash: state.ChainHash,
RemotePub: state.IdentityPub,
CloseType: channeldb.BreachClose,
Capacity: state.Capacity,
IsPending: true,
ShortChanID: state.ShortChanID(),
RemoteCurrentRevocation: state.RemoteCurrentRevocation,
RemoteNextRevocation: state.RemoteNextRevocation,
LocalChanConfig: state.LocalChanCfg,
})
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
// After exiting, the breach arbiter should have persisted the // After exiting, the breach arbiter should have persisted the
// retribution information and the channel should be shown as pending // retribution information and the channel should be shown as pending
// force closed. // force closed.
assertArbiterBreach(t, brar, chanPoint) assertArbiterBreach(t, brar, chanPoint)
// Assert that the database sees the channel as pending close, otherwise
// the breach arbiter won't be able to fully close it.
assertPendingClosed(t, alice)
// Notify that the breaching transaction is confirmed, to trigger the // Notify that the breaching transaction is confirmed, to trigger the
// retribution logic. // retribution logic.
notifier := brar.cfg.Notifier.(*mockSpendNotifier) notifier := brar.cfg.Notifier.(*mockSpendNotifier)
@ -1224,47 +1402,93 @@ func TestBreachSecondLevelTransfer(t *testing.T) {
t.Fatalf("tx was not published") t.Fatalf("tx was not published")
} }
if tx.TxIn[0].PreviousOutPoint.Hash != forceCloseTx.TxHash() { // All outputs should initially spend from the force closed txn.
t.Fatalf("tx not attempting to spend commitment") forceTxID := forceCloseTx.TxHash()
} for _, txIn := range tx.TxIn {
if txIn.PreviousOutPoint.Hash != forceTxID {
// Find the index of the TxIn spending the HTLC output. t.Fatalf("og justice tx not spending commitment")
htlcOutpoint := &retribution.HtlcRetributions[0].OutPoint
htlcIn := -1
for i, txIn := range tx.TxIn {
if txIn.PreviousOutPoint == *htlcOutpoint {
htlcIn = i
} }
} }
if htlcIn == -1 {
t.Fatalf("htlc in not found") localOutpoint := retribution.LocalOutpoint
remoteOutpoint := retribution.RemoteOutpoint
htlcOutpoint := retribution.HtlcRetributions[0].OutPoint
// Construct a map from outpoint on the force close to the transaction
// we want it to be spent by. As the test progresses, this map will be
// updated to contain only the set of commitment or second level
// outpoints that remain to be spent.
inputs := map[wire.OutPoint]*wire.MsgTx{
htlcOutpoint: htlc2ndLevlTx,
localOutpoint: commitSpendTx,
remoteOutpoint: commitSpendTx,
} }
// Since publishing the transaction failed above, the breach arbiter // Until no more inputs to spend remain, deliver the spend events and
// will attempt another second level check. Now notify that the htlc // process the assertions prescribed by the test case.
// output is spent by a second level tx. for len(inputs) > 0 {
secondLvlTx := &wire.MsgTx{ var (
TxOut: []*wire.TxOut{ op wire.OutPoint
{Value: 1}, spendTx *wire.MsgTx
}, )
}
notifier.Spend(htlcOutpoint, 2, secondLvlTx)
// Now a transaction attempting to spend from the second level tx // Pick an outpoint at random from the set of inputs.
// should be published instead. Let this publish succeed by setting the for op, spendTx = range inputs {
// publishing error to nil. delete(inputs, op)
publErr = nil break
select { }
case tx = <-publTx:
case <-time.After(5 * time.Second): // Deliver the spend notification for the chosen transaction.
t.Fatalf("tx was not published") notifier.Spend(&op, 2, spendTx)
// When the second layer transfer is detected, add back the
// outpoint of the second layer tx so that we can spend it
// again. Only do so if the test requests this behavior.
spendTxID := spendTx.TxHash()
if test.spend2ndLevel && spendTxID == htlc2ndLevlTx.TxHash() {
// Create the second level outpoint that will be spent,
// the index is always zero for these 1-in-1-out txns.
spendOp := wire.OutPoint{Hash: spendTxID}
inputs[spendOp] = htlcSpendTx
}
if len(inputs) > 0 {
test.whenNonZeroInputs(t, inputs, publTx)
} else {
// Reset the publishing error so that any publication,
// made by the breach arbiter, if any, will succeed.
publMtx.Lock()
publErr = nil
publMtx.Unlock()
test.whenZeroInputs(t, inputs, publTx)
}
} }
// The TxIn previously attempting to spend the HTLC outpoint should now // Deliver confirmation of sweep if the test expects it.
// be spending from the second level tx. if test.sendFinalConf {
if tx.TxIn[htlcIn].PreviousOutPoint.Hash != secondLvlTx.TxHash() { notifier.confChannel <- &chainntnfs.TxConfirmation{}
t.Fatalf("tx not attempting to spend second level tx, %v", tx.TxIn[0])
} }
// Assert that the channel is fully resolved.
assertBrarCleanup(t, brar, alice.ChanPoint, alice.State().Db)
}
// findInputIndex returns the index of the input that spends from the given
// outpoint. This method fails if the outpoint is not found.
func findInputIndex(t *testing.T, op wire.OutPoint, tx *wire.MsgTx) int {
t.Helper()
inputIdx := -1
for i, txIn := range tx.TxIn {
if txIn.PreviousOutPoint == op {
inputIdx = i
}
}
if inputIdx == -1 {
t.Fatalf("input %v in not found", op)
}
return inputIdx
} }
// assertArbiterBreach checks that the breach arbiter has persisted the breach // assertArbiterBreach checks that the breach arbiter has persisted the breach
@ -1272,6 +1496,8 @@ func TestBreachSecondLevelTransfer(t *testing.T) {
func assertArbiterBreach(t *testing.T, brar *breachArbiter, func assertArbiterBreach(t *testing.T, brar *breachArbiter,
chanPoint *wire.OutPoint) { chanPoint *wire.OutPoint) {
t.Helper()
isBreached, err := brar.IsBreached(chanPoint) isBreached, err := brar.IsBreached(chanPoint)
if err != nil { if err != nil {
t.Fatalf("unable to determine if channel is "+ t.Fatalf("unable to determine if channel is "+
@ -1290,6 +1516,8 @@ func assertArbiterBreach(t *testing.T, brar *breachArbiter,
func assertNoArbiterBreach(t *testing.T, brar *breachArbiter, func assertNoArbiterBreach(t *testing.T, brar *breachArbiter,
chanPoint *wire.OutPoint) { chanPoint *wire.OutPoint) {
t.Helper()
isBreached, err := brar.IsBreached(chanPoint) isBreached, err := brar.IsBreached(chanPoint)
if err != nil { if err != nil {
t.Fatalf("unable to determine if channel is "+ t.Fatalf("unable to determine if channel is "+
@ -1302,9 +1530,77 @@ func assertNoArbiterBreach(t *testing.T, brar *breachArbiter,
} }
} }
// assertBrarCleanup blocks until the given channel point has been removed the
// retribution store and the channel is fully closed in the database.
func assertBrarCleanup(t *testing.T, brar *breachArbiter,
chanPoint *wire.OutPoint, db *channeldb.DB) {
t.Helper()
err := lntest.WaitNoError(func() error {
isBreached, err := brar.IsBreached(chanPoint)
if err != nil {
return err
}
if isBreached {
return fmt.Errorf("channel %v still breached",
chanPoint)
}
closedChans, err := db.FetchClosedChannels(false)
if err != nil {
return err
}
for _, channel := range closedChans {
switch {
// Wrong channel.
case channel.ChanPoint != *chanPoint:
continue
// Right channel, fully closed!
case !channel.IsPending:
return nil
}
// Still pending.
return fmt.Errorf("channel %v still pending "+
"close", chanPoint)
}
return fmt.Errorf("channel %v not closed", chanPoint)
}, time.Second)
if err != nil {
t.Fatalf(err.Error())
}
}
// assertPendingClosed checks that the channel has been marked pending closed in
// the channel database.
func assertPendingClosed(t *testing.T, c *lnwallet.LightningChannel) {
t.Helper()
closedChans, err := c.State().Db.FetchClosedChannels(true)
if err != nil {
t.Fatalf("unable to load pending closed channels: %v", err)
}
for _, chanSummary := range closedChans {
if chanSummary.ChanPoint == *c.ChanPoint {
return
}
}
t.Fatalf("channel %v was not marked pending closed", c.ChanPoint)
}
// assertNotPendingClosed checks that the channel has not been marked pending // assertNotPendingClosed checks that the channel has not been marked pending
// closed in the channel database. // closed in the channel database.
func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) {
t.Helper()
closedChans, err := c.State().Db.FetchClosedChannels(true) closedChans, err := c.State().Db.FetchClosedChannels(true)
if err != nil { if err != nil {
t.Fatalf("unable to load pending closed channels: %v", err) t.Fatalf("unable to load pending closed channels: %v", err)

46
mock.go

@ -123,6 +123,7 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
type mockSpendNotifier struct { type mockSpendNotifier struct {
*mockNotfier *mockNotfier
spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail
spends map[wire.OutPoint]*chainntnfs.SpendDetail
mtx sync.Mutex mtx sync.Mutex
} }
@ -132,6 +133,7 @@ func makeMockSpendNotifier() *mockSpendNotifier {
confChannel: make(chan *chainntnfs.TxConfirmation), confChannel: make(chan *chainntnfs.TxConfirmation),
}, },
spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail), spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail),
spends: make(map[wire.OutPoint]*chainntnfs.SpendDetail),
} }
} }
@ -140,8 +142,22 @@ func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
spendChan := make(chan *chainntnfs.SpendDetail) spendChan := make(chan *chainntnfs.SpendDetail, 1)
m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) if detail, ok := m.spends[*outpoint]; ok {
// Deliver spend immediately if details are already known.
spendChan <- &chainntnfs.SpendDetail{
SpentOutPoint: detail.SpentOutPoint,
SpendingHeight: detail.SpendingHeight,
SpendingTx: detail.SpendingTx,
SpenderTxHash: detail.SpenderTxHash,
SpenderInputIndex: detail.SpenderInputIndex,
}
} else {
// Otherwise, queue the notification for delivery if the spend
// is ever received.
m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan)
}
return &chainntnfs.SpendEvent{ return &chainntnfs.SpendEvent{
Spend: spendChan, Spend: spendChan,
Cancel: func() { Cancel: func() {
@ -156,16 +172,30 @@ func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32,
m.mtx.Lock() m.mtx.Lock()
defer m.mtx.Unlock() defer m.mtx.Unlock()
txnHash := txn.TxHash()
details := &chainntnfs.SpendDetail{
SpentOutPoint: outpoint,
SpendingHeight: height,
SpendingTx: txn,
SpenderTxHash: &txnHash,
SpenderInputIndex: outpoint.Index,
}
// Cache details in case of late registration.
if _, ok := m.spends[*outpoint]; !ok {
m.spends[*outpoint] = details
}
// Deliver any backlogged spend notifications.
if spendChans, ok := m.spendMap[*outpoint]; ok { if spendChans, ok := m.spendMap[*outpoint]; ok {
delete(m.spendMap, *outpoint) delete(m.spendMap, *outpoint)
for _, spendChan := range spendChans { for _, spendChan := range spendChans {
txnHash := txn.TxHash()
spendChan <- &chainntnfs.SpendDetail{ spendChan <- &chainntnfs.SpendDetail{
SpentOutPoint: outpoint, SpentOutPoint: details.SpentOutPoint,
SpendingHeight: height, SpendingHeight: details.SpendingHeight,
SpendingTx: txn, SpendingTx: details.SpendingTx,
SpenderTxHash: &txnHash, SpenderTxHash: details.SpenderTxHash,
SpenderInputIndex: outpoint.Index, SpenderInputIndex: details.SpenderInputIndex,
} }
} }
} }