diff --git a/breacharbiter.go b/breacharbiter.go index 25a190d3..5af66415 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -34,6 +34,10 @@ var ( // breached contracts. Entries are added to the justice txn bucket just // before broadcasting the sweep txn. justiceTxnBucket = []byte("justice-txn") + + // errBrarShuttingDown is an error returned if the breacharbiter has + // been signalled to exit. + errBrarShuttingDown = errors.New("breacharbiter shutting down") ) // ContractBreachEvent is an event the breachArbiter will receive in case a @@ -302,6 +306,147 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo, bo.outpoint) } +// waitForSpendEvent waits for any of the breached outputs to get spent, and +// mutates the breachInfo to be able to sweep it. This method should be used +// when we fail to publish the justice tx because of a double spend, indicating +// that the counter party has taken one of the breached outputs to the second +// level. The spendNtfns map is a cache used to store registered spend +// subscriptions, in case we must call this method multiple times. +func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, + spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error { + + // spend is used to wrap the index of the output that gets spent + // together with the spend details. + type spend struct { + index int + detail *chainntnfs.SpendDetail + } + + // We create a channel the first goroutine that gets a spend event can + // signal. We make it buffered in case multiple spend events come in at + // the same time. + anySpend := make(chan struct{}, len(breachInfo.breachedOutputs)) + + // The allSpends channel will be used to pass spend events from all the + // goroutines that detects a spend before they are signalled to exit. + allSpends := make(chan spend, len(breachInfo.breachedOutputs)) + + // exit will be used to signal the goroutines that they can exit. + exit := make(chan struct{}) + var wg sync.WaitGroup + + // We'll now launch a goroutine for each of the HTLC outputs, that will + // signal the moment they detect a spend event. + for i := 0; i < len(breachInfo.breachedOutputs); i++ { + breachedOutput := &breachInfo.breachedOutputs[i] + + // If this isn't an HTLC output, then we can skip it. + if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke && + breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke { + continue + } + + brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+ + "for ChannelPoint(%v)", breachedOutput.outpoint, + breachInfo.chanPoint) + + // If we have already registered for a notification for this + // output, we'll reuse it. + spendNtfn, ok := spendNtfns[breachedOutput.outpoint] + if !ok { + var err error + spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn( + &breachedOutput.outpoint, + breachInfo.breachHeight, true, + ) + if err != nil { + brarLog.Errorf("unable to check for spentness "+ + "of out_point=%v: %v", + breachedOutput.outpoint, err) + + // Registration may have failed if we've been + // instructed to shutdown. If so, return here + // to avoid entering an infinite loop. + select { + case <-b.quit: + return errBrarShuttingDown + default: + continue + } + } + spendNtfns[breachedOutput.outpoint] = spendNtfn + } + + // Launch a goroutine waiting for a spend event. + b.wg.Add(1) + wg.Add(1) + go func(index int, spendEv *chainntnfs.SpendEvent) { + defer b.wg.Done() + defer wg.Done() + + select { + // The output has been taken to the second level! + case sp, ok := <-spendEv.Spend: + if !ok { + return + } + brarLog.Debugf("Detected spend of HTLC(%v) "+ + "for ChannelPoint(%v)", + breachedOutput.outpoint, + breachInfo.chanPoint) + + // First we send the spend event on the + // allSpends channel, such that it can be + // handled after all go routines have exited. + allSpends <- spend{index, sp} + + // Finally we'll signal the anySpend channel + // that a spend was detected, such that the + // other goroutines can be shut down. + anySpend <- struct{}{} + case <-exit: + return + case <-b.quit: + return + } + }(i, spendNtfn) + } + + // We'll wait for any of the outputs to be spent, or that we are + // signalled to exit. + select { + // A goroutine have signalled that a spend occured. + case <-anySpend: + // Signal for the remaining goroutines to exit. + close(exit) + wg.Wait() + + // At this point all goroutines that can send on the allSpends + // channel have exited. We can therefore safely close the + // channel before ranging over its content. + close(allSpends) + for s := range allSpends { + breachedOutput := &breachInfo.breachedOutputs[s.index] + brarLog.Debugf("Detected second-level spend on "+ + "HTLC(%v) for ChannelPoint(%v)", + breachedOutput.outpoint, breachInfo.chanPoint) + + delete(spendNtfns, breachedOutput.outpoint) + + // In this case we'll morph our initial revoke spend to + // instead point to the second level output, and update + // the sign descriptor in the process. + convertToSecondLevelRevoke( + breachedOutput, breachInfo, s.detail, + ) + } + case <-b.quit: + return errBrarShuttingDown + } + + return nil +} + // exactRetribution is a goroutine which is executed once a contract breach has // been detected by a breachObserver. This function is responsible for // punishing a counterparty for violating the channel contract by sweeping ALL @@ -334,6 +479,11 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ "revoked funds", breachInfo.commitHash) + // We may have to wait for some of the HTLC outputs to be spent to the + // second level before broadcasting the justice tx. We'll store the + // SpendEvents between each attempt to not re-register uneccessarily. + spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent) + finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to get finalized txn for"+ @@ -345,77 +495,8 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, // construct a sweep transaction and write it to disk. This will allow // the breach arbiter to re-register for notifications for the justice // txid. - spendNtfns := make(map[wire.OutPoint]*chainntnfs.SpendEvent) - -secondLevelCheck: +justiceTxBroadcast: if finalTx == nil { - // Before we create the justice tx, we need to check to see if - // any of the active HTLC's on the commitment transactions has - // been spent. In this case, we'll need to go to the second - // level to sweep them before the remote party can. - for i := 0; i < len(breachInfo.breachedOutputs); i++ { - breachedOutput := &breachInfo.breachedOutputs[i] - - // If this isn't an HTLC output, then we can skip it. - if breachedOutput.witnessType != lnwallet.HtlcAcceptedRevoke && - breachedOutput.witnessType != lnwallet.HtlcOfferedRevoke { - continue - } - - brarLog.Debugf("Checking for second-level attempt on "+ - "HTLC(%v) for ChannelPoint(%v)", - breachedOutput.outpoint, breachInfo.chanPoint) - - // Now that we have an HTLC output, we'll quickly check - // to see if it has been spent or not. If we have - // already registered for a notification for this - // output, we'll reuse it. - spendNtfn, ok := spendNtfns[breachedOutput.outpoint] - if !ok { - spendNtfn, err = b.cfg.Notifier.RegisterSpendNtfn( - &breachedOutput.outpoint, - breachInfo.breachHeight, true, - ) - if err != nil { - brarLog.Errorf("unable to check for "+ - "spentness of out_point=%v: %v", - breachedOutput.outpoint, err) - - // Registration may have failed if - // we've been instructed to shutdown. - // If so, return here to avoid entering - // an infinite loop. - select { - case <-b.quit: - return - default: - continue - } - } - spendNtfns[breachedOutput.outpoint] = spendNtfn - } - - select { - // The output has been taken to the second level! - case spendDetails, ok := <-spendNtfn.Spend: - if !ok { - return - } - delete(spendNtfns, breachedOutput.outpoint) - - // In this case we'll morph our initial revoke - // spend to instead point to the second level - // output, and update the sign descriptor in - // the process. - convertToSecondLevelRevoke( - breachedOutput, breachInfo, spendDetails, - ) - - // It hasn't been spent so we'll continue. - default: - } - } - // With the breach transaction confirmed, we now create the // justice tx which will claim ALL the funds within the // channel. @@ -443,21 +524,30 @@ secondLevelCheck: // channel's retribution against the cheating counter party. err = b.cfg.PublishTransaction(finalTx) if err != nil { - brarLog.Errorf("unable to broadcast "+ - "justice tx: %v", err) + brarLog.Errorf("unable to broadcast justice tx: %v", err) + if err == lnwallet.ErrDoubleSpend { - brarLog.Infof("Attempting to transfer HTLC revocations " + - "to the second level") + // Broadcasting the transaction failed because of a + // conflict either in the mempool or in chain. We'll + // now create spend subscriptions for all HTLC outputs + // on the commitment transaction that could possibly + // have been spent, and wait for any of them to + // trigger. + brarLog.Infof("Waiting for a spend event before " + + "attempting to craft new justice tx.") finalTx = nil - // Txn publication may fail if we're shutting down. - // If so, return to avoid entering an infinite loop. - select { - case <-b.quit: + err := b.waitForSpendEvent(breachInfo, spendNtfns) + if err != nil { + if err != errBrarShuttingDown { + brarLog.Errorf("error waiting for "+ + "spend event: %v", err) + } return - default: - goto secondLevelCheck } + + brarLog.Infof("Attempting another justice tx broadcast") + goto justiceTxBroadcast } } @@ -469,8 +559,8 @@ secondLevelCheck: confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( &justiceTXID, 1, breachConfHeight) if err != nil { - brarLog.Errorf("unable to register for conf for txid: %v", - justiceTXID) + brarLog.Errorf("unable to register for conf for txid(%v): %v", + justiceTXID, err) return } diff --git a/breacharbiter_test.go b/breacharbiter_test.go index b9c00a09..ae617dac 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btclog" "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/keychain" @@ -933,11 +934,10 @@ restartCheck: } } -// TestBreachHandoffSuccess tests that a channel's close observer properly -// delivers retribution information to the breach arbiter in response to a -// breach close. This test verifies correctness in the event that the handoff -// experiences no interruptions. -func TestBreachHandoffSuccess(t *testing.T) { +func initBreachedState(t *testing.T) (*breachArbiter, + *lnwallet.LightningChannel, *lnwallet.LightningChannel, + *lnwallet.LocalForceCloseSummary, chan *ContractBreachEvent, + func(), func()) { // Create a pair of channels using a notifier that allows us to signal // a spend of the funding transaction. Alice's channel will be the on // observing a breach. @@ -945,7 +945,6 @@ func TestBreachHandoffSuccess(t *testing.T) { if err != nil { t.Fatalf("unable to create test channels: %v", err) } - defer cleanUpChans() // Instantiate a breach arbiter to handle the breach of alice's channel. contractBreaches := make(chan *ContractBreachEvent) @@ -956,7 +955,6 @@ func TestBreachHandoffSuccess(t *testing.T) { if err != nil { t.Fatalf("unable to initialize test breach arbiter: %v", err) } - defer cleanUpArb() // Send one HTLC to Bob and perform a state transition to lock it in. htlcAmount := lnwire.NewMSatFromSatoshis(20000) @@ -991,6 +989,20 @@ func TestBreachHandoffSuccess(t *testing.T) { t.Fatalf("Can't update the channel state: %v", err) } + return brar, alice, bob, bobClose, contractBreaches, cleanUpChans, + cleanUpArb +} + +// TestBreachHandoffSuccess tests that a channel's close observer properly +// delivers retribution information to the breach arbiter in response to a +// breach close. This test verifies correctness in the event that the handoff +// experiences no interruptions. +func TestBreachHandoffSuccess(t *testing.T) { + brar, alice, _, bobClose, contractBreaches, + cleanUpChans, cleanUpArb := initBreachedState(t) + defer cleanUpChans() + defer cleanUpArb() + chanPoint := alice.ChanPoint // Signal a spend of the funding transaction and wait for the close @@ -1052,59 +1064,11 @@ func TestBreachHandoffSuccess(t *testing.T) { // arbiter fails to write the information to disk, and that a subsequent attempt // at the handoff succeeds. func TestBreachHandoffFail(t *testing.T) { - // Create a pair of channels using a notifier that allows us to signal - // a spend of the funding transaction. Alice's channel will be the on - // observing a breach. - alice, bob, cleanUpChans, err := createInitChannels(1) - if err != nil { - t.Fatalf("unable to create test channels: %v", err) - } + brar, alice, _, bobClose, contractBreaches, + cleanUpChans, cleanUpArb := initBreachedState(t) defer cleanUpChans() - - // Instantiate a breach arbiter to handle the breach of alice's channel. - contractBreaches := make(chan *ContractBreachEvent) - - brar, cleanUpArb, err := createTestArbiter( - t, contractBreaches, alice.State().Db, - ) - if err != nil { - t.Fatalf("unable to initialize test breach arbiter: %v", err) - } defer cleanUpArb() - // Send one HTLC to Bob and perform a state transition to lock it in. - htlcAmount := lnwire.NewMSatFromSatoshis(20000) - htlc, _ := createHTLC(0, htlcAmount) - if _, err := alice.AddHTLC(htlc, nil); err != nil { - t.Fatalf("alice unable to add htlc: %v", err) - } - if _, err := bob.ReceiveHTLC(htlc); err != nil { - t.Fatalf("bob unable to recv add htlc: %v", err) - } - if err := forceStateTransition(alice, bob); err != nil { - t.Fatalf("Can't update the channel state: %v", err) - } - - // Generate the force close summary at this point in time, this will - // serve as the old state bob will broadcast. - bobClose, err := bob.ForceClose() - if err != nil { - t.Fatalf("unable to force close bob's channel: %v", err) - } - - // Now send another HTLC and perform a state transition, this ensures - // Alice is ahead of the state Bob will broadcast. - htlc2, _ := createHTLC(1, htlcAmount) - if _, err := alice.AddHTLC(htlc2, nil); err != nil { - t.Fatalf("alice unable to add htlc: %v", err) - } - if _, err := bob.ReceiveHTLC(htlc2); err != nil { - t.Fatalf("bob unable to recv add htlc: %v", err) - } - if err := forceStateTransition(alice, bob); err != nil { - t.Fatalf("Can't update the channel state: %v", err) - } - // Before alerting Alice of the breach, instruct our failing retribution // store to fail the next database operation, which we expect to write // the information handed off by the channel's close observer. @@ -1139,7 +1103,7 @@ func TestBreachHandoffFail(t *testing.T) { assertNoArbiterBreach(t, brar, chanPoint) assertNotPendingClosed(t, alice) - brar, cleanUpArb, err = createTestArbiter( + brar, cleanUpArb, err := createTestArbiter( t, contractBreaches, alice.State().Db, ) if err != nil { @@ -1186,6 +1150,117 @@ func TestBreachHandoffFail(t *testing.T) { assertArbiterBreach(t, brar, chanPoint) } +// TestBreachSecondLevelTransfer tests that sweep of a HTLC output on a +// breached commitment is transferred to a second level spend if the output is +// already spent. +func TestBreachSecondLevelTransfer(t *testing.T) { + brar, alice, _, bobClose, contractBreaches, + cleanUpChans, cleanUpArb := initBreachedState(t) + defer cleanUpChans() + defer cleanUpArb() + + var ( + height = bobClose.ChanSnapshot.CommitHeight + forceCloseTx = bobClose.CloseTx + chanPoint = alice.ChanPoint + publTx = make(chan *wire.MsgTx) + publErr error + ) + + // Make PublishTransaction always return ErrDoubleSpend to begin with. + publErr = lnwallet.ErrDoubleSpend + brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error { + publTx <- tx + return publErr + } + + // Notify the breach arbiter about the breach. + retribution, err := lnwallet.NewBreachRetribution( + alice.State(), height, forceCloseTx, 1) + if err != nil { + t.Fatalf("unable to create breach retribution: %v", err) + } + + breach := &ContractBreachEvent{ + ChanPoint: *chanPoint, + ProcessACK: make(chan error, 1), + BreachRetribution: retribution, + } + contractBreaches <- breach + + // We'll also wait to consume the ACK back from the breach arbiter. + select { + case err := <-breach.ProcessACK: + if err != nil { + t.Fatalf("handoff failed: %v", err) + } + case <-time.After(time.Second * 15): + t.Fatalf("breach arbiter didn't send ack back") + } + + // After exiting, the breach arbiter should have persisted the + // retribution information and the channel should be shown as pending + // force closed. + assertArbiterBreach(t, brar, chanPoint) + + // Notify that the breaching transaction is confirmed, to trigger the + // retribution logic. + notifier := brar.cfg.Notifier.(*mockSpendNotifier) + notifier.confChannel <- &chainntnfs.TxConfirmation{} + + // The breach arbiter should attempt to sweep all outputs on the + // breached commitment. We'll pretend that the HTLC output has been + // spent by the channel counter party's second level tx already. + var tx *wire.MsgTx + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + if tx.TxIn[0].PreviousOutPoint.Hash != forceCloseTx.TxHash() { + t.Fatalf("tx not attempting to spend commitment") + } + + // Find the index of the TxIn spending the HTLC output. + htlcOutpoint := &retribution.HtlcRetributions[0].OutPoint + htlcIn := -1 + for i, txIn := range tx.TxIn { + if txIn.PreviousOutPoint == *htlcOutpoint { + htlcIn = i + } + } + if htlcIn == -1 { + t.Fatalf("htlc in not found") + } + + // Since publishing the transaction failed above, the breach arbiter + // will attempt another second level check. Now notify that the htlc + // output is spent by a second level tx. + secondLvlTx := &wire.MsgTx{ + TxOut: []*wire.TxOut{ + &wire.TxOut{Value: 1}, + }, + } + notifier.Spend(htlcOutpoint, 2, secondLvlTx) + + // Now a transaction attempting to spend from the second level tx + // should be published instead. Let this publish succeed by setting the + // publishing error to nil. + publErr = nil + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + // The TxIn previously attempting to spend the HTLC outpoint should now + // be spending from the second level tx. + if tx.TxIn[htlcIn].PreviousOutPoint.Hash != secondLvlTx.TxHash() { + t.Fatalf("tx not attempting to spend second level tx, %v", tx.TxIn[0]) + } +} + // assertArbiterBreach checks that the breach arbiter has persisted the breach // information for a particular channel. func assertArbiterBreach(t *testing.T, brar *breachArbiter, diff --git a/mock.go b/mock.go index 35154ab9..5caf5136 100644 --- a/mock.go +++ b/mock.go @@ -118,6 +118,7 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, type mockSpendNotifier struct { *mockNotfier spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail + mtx sync.Mutex } func makeMockSpendNotifier() *mockSpendNotifier { @@ -131,6 +132,8 @@ func makeMockSpendNotifier() *mockSpendNotifier { func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32, _ bool) (*chainntnfs.SpendEvent, error) { + m.mtx.Lock() + defer m.mtx.Unlock() spendChan := make(chan *chainntnfs.SpendDetail) m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) @@ -145,6 +148,8 @@ func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, // will include the transaction and height provided by the caller. func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, txn *wire.MsgTx) { + m.mtx.Lock() + defer m.mtx.Unlock() if spendChans, ok := m.spendMap[*outpoint]; ok { delete(m.spendMap, *outpoint)