diff --git a/breacharbiter.go b/breacharbiter.go index 42207c89..be5e1b36 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" "sync" "sync/atomic" @@ -161,7 +162,7 @@ func (b *breachArbiter) Start() error { // retribution store. closedChans, err := b.cfg.DB.FetchClosedChannels(false) if err != nil { - brarLog.Errorf("unable to fetch closing channels: %v", err) + brarLog.Errorf("Unable to fetch closing channels: %v", err) return err } @@ -181,7 +182,7 @@ func (b *breachArbiter) Start() error { chanPoint := &chanSummary.ChanPoint if _, ok := breachRetInfos[*chanPoint]; ok { if err := b.cfg.Store.Remove(chanPoint); err != nil { - brarLog.Errorf("unable to remove closed "+ + brarLog.Errorf("Unable to remove closed "+ "chanid=%v from breach arbiter: %v", chanPoint, err) return err @@ -203,7 +204,7 @@ func (b *breachArbiter) Start() error { &breachTXID, breachScript, 1, retInfo.breachHeight, ) if err != nil { - brarLog.Errorf("unable to register for conf updates "+ + brarLog.Errorf("Unable to register for conf updates "+ "for txid: %v, err: %v", breachTXID, err) return err } @@ -320,6 +321,8 @@ func convertToSecondLevelRevoke(bo *breachedOutput, breachInfo *retributionInfo, func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, spendNtfns map[wire.OutPoint]*chainntnfs.SpendEvent) error { + inputs := breachInfo.breachedOutputs + // spend is used to wrap the index of the output that gets spent // together with the spend details. type spend struct { @@ -330,11 +333,11 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, // We create a channel the first goroutine that gets a spend event can // signal. We make it buffered in case multiple spend events come in at // the same time. - anySpend := make(chan struct{}, len(breachInfo.breachedOutputs)) + anySpend := make(chan struct{}, len(inputs)) // The allSpends channel will be used to pass spend events from all the // goroutines that detects a spend before they are signalled to exit. - allSpends := make(chan spend, len(breachInfo.breachedOutputs)) + allSpends := make(chan spend, len(inputs)) // exit will be used to signal the goroutines that they can exit. exit := make(chan struct{}) @@ -342,17 +345,11 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, // We'll now launch a goroutine for each of the HTLC outputs, that will // signal the moment they detect a spend event. - for i := 0; i < len(breachInfo.breachedOutputs); i++ { - breachedOutput := &breachInfo.breachedOutputs[i] + for i := range inputs { + breachedOutput := &inputs[i] - // If this isn't an HTLC output, then we can skip it. - if breachedOutput.witnessType != input.HtlcAcceptedRevoke && - breachedOutput.witnessType != input.HtlcOfferedRevoke { - continue - } - - brarLog.Debugf("Checking for second-level attempt on HTLC(%v) "+ - "for ChannelPoint(%v)", breachedOutput.outpoint, + brarLog.Infof("Checking spend from %v(%v) for ChannelPoint(%v)", + breachedOutput.witnessType, breachedOutput.outpoint, breachInfo.chanPoint) // If we have already registered for a notification for this @@ -366,8 +363,8 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, breachInfo.breachHeight, ) if err != nil { - brarLog.Errorf("unable to check for spentness "+ - "of out_point=%v: %v", + brarLog.Errorf("Unable to check for spentness "+ + "of outpoint=%v: %v", breachedOutput.outpoint, err) // Registration may have failed if we've been @@ -396,9 +393,12 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, if !ok { return } - brarLog.Debugf("Detected spend of HTLC(%v) "+ - "for ChannelPoint(%v)", - breachedOutput.outpoint, + + brarLog.Infof("Detected spend on %s(%v) by "+ + "txid(%v) for ChannelPoint(%v)", + inputs[index].witnessType, + inputs[index].outpoint, + sp.SpenderTxHash, breachInfo.chanPoint) // First we send the spend event on the @@ -431,21 +431,58 @@ func (b *breachArbiter) waitForSpendEvent(breachInfo *retributionInfo, // channel have exited. We can therefore safely close the // channel before ranging over its content. close(allSpends) - for s := range allSpends { - breachedOutput := &breachInfo.breachedOutputs[s.index] - brarLog.Debugf("Detected second-level spend on "+ - "HTLC(%v) for ChannelPoint(%v)", - breachedOutput.outpoint, breachInfo.chanPoint) + doneOutputs := make(map[int]struct{}) + for s := range allSpends { + breachedOutput := &inputs[s.index] delete(spendNtfns, breachedOutput.outpoint) - // In this case we'll morph our initial revoke spend to - // instead point to the second level output, and update - // the sign descriptor in the process. - convertToSecondLevelRevoke( - breachedOutput, breachInfo, s.detail, - ) + switch breachedOutput.witnessType { + case input.HtlcAcceptedRevoke: + fallthrough + case input.HtlcOfferedRevoke: + brarLog.Infof("Spend on second-level"+ + "%s(%v) for ChannelPoint(%v) "+ + "transitions to second-level output", + breachedOutput.witnessType, + breachedOutput.outpoint, + breachInfo.chanPoint) + + // In this case we'll morph our initial revoke + // spend to instead point to the second level + // output, and update the sign descriptor in the + // process. + convertToSecondLevelRevoke( + breachedOutput, breachInfo, s.detail, + ) + + continue + } + + brarLog.Infof("Spend on %s(%v) for ChannelPoint(%v) "+ + "transitions output to terminal state, "+ + "removing input from justice transaction", + breachedOutput.witnessType, + breachedOutput.outpoint, breachInfo.chanPoint) + + doneOutputs[s.index] = struct{}{} } + + // Filter the inputs for which we can no longer proceed. + var nextIndex int + for i := range inputs { + if _, ok := doneOutputs[i]; ok { + continue + } + + inputs[nextIndex] = inputs[i] + nextIndex++ + } + + // Update our remaining set of outputs before continuing with + // another attempt at publication. + breachInfo.breachedOutputs = inputs[:nextIndex] + case <-b.quit: return errBrarShuttingDown } @@ -492,7 +529,7 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to get finalized txn for"+ + brarLog.Errorf("Unable to get finalized txn for"+ "chanid=%v: %v", &breachInfo.chanPoint, err) return } @@ -508,7 +545,7 @@ justiceTxBroadcast: // channel. finalTx, err = b.createJusticeTx(breachInfo) if err != nil { - brarLog.Errorf("unable to create justice tx: %v", err) + brarLog.Errorf("Unable to create justice tx: %v", err) return } @@ -516,7 +553,7 @@ justiceTxBroadcast: // attempt to broadcast. err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx) if err != nil { - brarLog.Errorf("unable to finalize justice tx for "+ + brarLog.Errorf("Unable to finalize justice tx for "+ "chanid=%v: %v", &breachInfo.chanPoint, err) return } @@ -530,7 +567,7 @@ justiceTxBroadcast: // channel's retribution against the cheating counter party. err = b.cfg.PublishTransaction(finalTx) if err != nil { - brarLog.Errorf("unable to broadcast justice tx: %v", err) + brarLog.Errorf("Unable to broadcast justice tx: %v", err) if err == lnwallet.ErrDoubleSpend { // Broadcasting the transaction failed because of a @@ -552,7 +589,24 @@ justiceTxBroadcast: return } - brarLog.Infof("Attempting another justice tx broadcast") + if len(breachInfo.breachedOutputs) == 0 { + brarLog.Debugf("No more outputs to sweep for "+ + "breach, marking ChannelPoint(%v) "+ + "fully resolved", breachInfo.chanPoint) + + err = b.cleanupBreach(&breachInfo.chanPoint) + if err != nil { + brarLog.Errorf("Failed to cleanup "+ + "breached ChannelPoint(%v): %v", + breachInfo.chanPoint, err) + } + return + } + + brarLog.Infof("Attempting another justice tx "+ + "with %d inputs", + len(breachInfo.breachedOutputs)) + goto justiceTxBroadcast } } @@ -567,7 +621,7 @@ justiceTxBroadcast: &justiceTXID, justiceScript, 1, breachConfHeight, ) if err != nil { - brarLog.Errorf("unable to register for conf for txid(%v): %v", + brarLog.Errorf("Unable to register for conf for txid(%v): %v", justiceTXID, err) return } @@ -602,19 +656,11 @@ justiceTxBroadcast: "have been claimed", breachInfo.chanPoint, revokedFunds, totalFunds) - // With the channel closed, mark it in the database as such. - err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint) + err = b.cleanupBreach(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to mark chan as closed: %v", err) - return - } - - // Justice has been carried out; we can safely delete the - // retribution info from the database. - err = b.cfg.Store.Remove(&breachInfo.chanPoint) - if err != nil { - brarLog.Errorf("unable to remove retribution "+ - "from the db: %v", err) + brarLog.Errorf("Failed to cleanup breached "+ + "ChannelPoint(%v): %v", breachInfo.chanPoint, + err) } // TODO(roasbeef): add peer to blacklist? @@ -628,6 +674,26 @@ justiceTxBroadcast: } } +// cleanupBreach marks the given channel point as fully resolved and removes the +// retribution for that the channel from the retribution store. +func (b *breachArbiter) cleanupBreach(chanPoint *wire.OutPoint) error { + // With the channel closed, mark it in the database as such. + err := b.cfg.DB.MarkChanFullyClosed(chanPoint) + if err != nil { + return fmt.Errorf("unable to mark chan as closed: %v", err) + } + + // Justice has been carried out; we can safely delete the retribution + // info from the database. + err = b.cfg.Store.Remove(chanPoint) + if err != nil { + return fmt.Errorf("unable to remove retribution from db: %v", + err) + } + + return nil +} + // handleBreachHandoff handles a new breach event, by writing it to disk, then // notifies the breachArbiter contract observer goroutine that a channel's // contract has been breached by the prior counterparty. Once notified the @@ -672,7 +738,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { breached, err := b.cfg.Store.IsBreached(&chanPoint) if err != nil { b.Unlock() - brarLog.Errorf("unable to check breach info in DB: %v", err) + brarLog.Errorf("Unable to check breach info in DB: %v", err) select { case breachEvent.ProcessACK <- err: @@ -703,7 +769,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { err = b.cfg.Store.Add(retInfo) b.Unlock() if err != nil { - brarLog.Errorf("unable to persist retribution "+ + brarLog.Errorf("Unable to persist retribution "+ "info to db: %v", err) } @@ -733,7 +799,7 @@ func (b *breachArbiter) handleBreachHandoff(breachEvent *ContractBreachEvent) { breachTXID, breachScript, 1, retInfo.breachHeight, ) if err != nil { - brarLog.Errorf("unable to register for conf updates for "+ + brarLog.Errorf("Unable to register for conf updates for "+ "txid: %v, err: %v", breachTXID, err) return } @@ -1286,7 +1352,7 @@ func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { // to remove a finalized retribution state that is not already // stored in the db. if retBucket == nil { - return errors.New("unable to remove retribution " + + return errors.New("Unable to remove retribution " + "because the retribution bucket doesn't exist.") } diff --git a/breacharbiter_test.go b/breacharbiter_test.go index dfd215b4..b3ae1693 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -29,6 +29,7 @@ import ( "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/shachain" @@ -1156,10 +1157,162 @@ func TestBreachHandoffFail(t *testing.T) { assertArbiterBreach(t, brar, chanPoint) } -// TestBreachSecondLevelTransfer tests that sweep of a HTLC output on a -// breached commitment is transferred to a second level spend if the output is -// already spent. -func TestBreachSecondLevelTransfer(t *testing.T) { +type publAssertion func(*testing.T, map[wire.OutPoint]*wire.MsgTx, + chan *wire.MsgTx) + +type breachTest struct { + name string + + // spend2ndLevel requests that second level htlcs be spent *again*, as + // if by a remote party or watchtower. The outpoint of the second level + // htlc is in effect "readded" to the set of inputs. + spend2ndLevel bool + + // sendFinalConf informs the test to send a confirmation for the justice + // transaction before asserting the arbiter is cleaned up. + sendFinalConf bool + + // whenNonZeroInputs is called after spending an input but there are + // further inputs to spend in the test. + whenNonZeroInputs publAssertion + + // whenZeroInputs is called after spending an input but there are no + // further inputs to spend in the test. + whenZeroInputs publAssertion +} + +var ( + // commitSpendTx is used to spend commitment outputs. + commitSpendTx = &wire.MsgTx{ + TxOut: []*wire.TxOut{ + {Value: 500000000}, + }, + } + // htlc2ndLevlTx is used to transition an htlc output on the commitment + // transaction to a second level htlc. + htlc2ndLevlTx = &wire.MsgTx{ + TxOut: []*wire.TxOut{ + {Value: 20000}, + }, + } + // htlcSpendTx is used to spend from a second level htlc. + htlcSpendTx = &wire.MsgTx{ + TxOut: []*wire.TxOut{ + {Value: 10000}, + }, + } +) + +var breachTests = []breachTest{ + { + name: "all spends", + spend2ndLevel: true, + whenNonZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + var tx *wire.MsgTx + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + // The justice transaction should have thee same number + // of inputs as we are tracking in the test. + if len(tx.TxIn) != len(inputs) { + t.Fatalf("expected justice txn to have %d "+ + "inputs, found %d", len(inputs), + len(tx.TxIn)) + } + + // Ensure that each input exists on the justice + // transaction. + for in := range inputs { + findInputIndex(t, in, tx) + } + + }, + whenZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + // Sanity check to ensure the brar doesn't try to + // broadcast another sweep, since all outputs have been + // spent externally. + select { + case <-publTx: + t.Fatalf("tx published unexpectedly") + case <-time.After(50 * time.Millisecond): + } + }, + }, + { + name: "commit spends, second level sweep", + spend2ndLevel: false, + sendFinalConf: true, + whenNonZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + select { + case <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + }, + whenZeroInputs: func(t *testing.T, + inputs map[wire.OutPoint]*wire.MsgTx, + publTx chan *wire.MsgTx) { + + // Now a transaction attempting to spend from the second + // level tx should be published instead. Let this + // publish succeed by setting the publishing error to + // nil. + var tx *wire.MsgTx + select { + case tx = <-publTx: + case <-time.After(5 * time.Second): + t.Fatalf("tx was not published") + } + + // The commitment outputs should be gone, and there + // should only be a single htlc spend. + if len(tx.TxIn) != 1 { + t.Fatalf("expect 1 htlc output, found %d "+ + "outputs", len(tx.TxIn)) + } + + // The remaining TxIn previously attempting to spend + // the HTLC outpoint should now be spending from the + // second level tx. + // + // NOTE: Commitment outputs and htlc sweeps are spent + // with a different transactions (and thus txids), + // ensuring we aren't mistaking this for a different + // output type. + onlyInput := tx.TxIn[0].PreviousOutPoint.Hash + if onlyInput != htlc2ndLevlTx.TxHash() { + t.Fatalf("tx not attempting to spend second "+ + "level tx, %v", tx.TxIn[0]) + } + }, + }, +} + +// TestBreachSpends checks the behavior of the breach arbiter in response to +// spend events on a channels outputs by asserting that it properly removes or +// modifies the inputs from the justice txn. +func TestBreachSpends(t *testing.T) { + for _, test := range breachTests { + tc := test + t.Run(tc.name, func(t *testing.T) { + testBreachSpends(t, tc) + }) + } +} + +func testBreachSpends(t *testing.T, test breachTest) { brar, alice, _, bobClose, contractBreaches, cleanUpChans, cleanUpArb := initBreachedState(t) defer cleanUpChans() @@ -1171,12 +1324,16 @@ func TestBreachSecondLevelTransfer(t *testing.T) { chanPoint = alice.ChanPoint publTx = make(chan *wire.MsgTx) publErr error + publMtx sync.Mutex ) // Make PublishTransaction always return ErrDoubleSpend to begin with. publErr = lnwallet.ErrDoubleSpend brar.cfg.PublishTransaction = func(tx *wire.MsgTx) error { publTx <- tx + + publMtx.Lock() + defer publMtx.Unlock() return publErr } @@ -1204,11 +1361,32 @@ func TestBreachSecondLevelTransfer(t *testing.T) { t.Fatalf("breach arbiter didn't send ack back") } + state := alice.State() + err = state.CloseChannel(&channeldb.ChannelCloseSummary{ + ChanPoint: state.FundingOutpoint, + ChainHash: state.ChainHash, + RemotePub: state.IdentityPub, + CloseType: channeldb.BreachClose, + Capacity: state.Capacity, + IsPending: true, + ShortChanID: state.ShortChanID(), + RemoteCurrentRevocation: state.RemoteCurrentRevocation, + RemoteNextRevocation: state.RemoteNextRevocation, + LocalChanConfig: state.LocalChanCfg, + }) + if err != nil { + t.Fatalf("unable to close channel: %v", err) + } + // After exiting, the breach arbiter should have persisted the // retribution information and the channel should be shown as pending // force closed. assertArbiterBreach(t, brar, chanPoint) + // Assert that the database sees the channel as pending close, otherwise + // the breach arbiter won't be able to fully close it. + assertPendingClosed(t, alice) + // Notify that the breaching transaction is confirmed, to trigger the // retribution logic. notifier := brar.cfg.Notifier.(*mockSpendNotifier) @@ -1224,47 +1402,93 @@ func TestBreachSecondLevelTransfer(t *testing.T) { t.Fatalf("tx was not published") } - if tx.TxIn[0].PreviousOutPoint.Hash != forceCloseTx.TxHash() { - t.Fatalf("tx not attempting to spend commitment") - } - - // Find the index of the TxIn spending the HTLC output. - htlcOutpoint := &retribution.HtlcRetributions[0].OutPoint - htlcIn := -1 - for i, txIn := range tx.TxIn { - if txIn.PreviousOutPoint == *htlcOutpoint { - htlcIn = i + // All outputs should initially spend from the force closed txn. + forceTxID := forceCloseTx.TxHash() + for _, txIn := range tx.TxIn { + if txIn.PreviousOutPoint.Hash != forceTxID { + t.Fatalf("og justice tx not spending commitment") } } - if htlcIn == -1 { - t.Fatalf("htlc in not found") + + localOutpoint := retribution.LocalOutpoint + remoteOutpoint := retribution.RemoteOutpoint + htlcOutpoint := retribution.HtlcRetributions[0].OutPoint + + // Construct a map from outpoint on the force close to the transaction + // we want it to be spent by. As the test progresses, this map will be + // updated to contain only the set of commitment or second level + // outpoints that remain to be spent. + inputs := map[wire.OutPoint]*wire.MsgTx{ + htlcOutpoint: htlc2ndLevlTx, + localOutpoint: commitSpendTx, + remoteOutpoint: commitSpendTx, } - // Since publishing the transaction failed above, the breach arbiter - // will attempt another second level check. Now notify that the htlc - // output is spent by a second level tx. - secondLvlTx := &wire.MsgTx{ - TxOut: []*wire.TxOut{ - {Value: 1}, - }, - } - notifier.Spend(htlcOutpoint, 2, secondLvlTx) + // Until no more inputs to spend remain, deliver the spend events and + // process the assertions prescribed by the test case. + for len(inputs) > 0 { + var ( + op wire.OutPoint + spendTx *wire.MsgTx + ) - // Now a transaction attempting to spend from the second level tx - // should be published instead. Let this publish succeed by setting the - // publishing error to nil. - publErr = nil - select { - case tx = <-publTx: - case <-time.After(5 * time.Second): - t.Fatalf("tx was not published") + // Pick an outpoint at random from the set of inputs. + for op, spendTx = range inputs { + delete(inputs, op) + break + } + + // Deliver the spend notification for the chosen transaction. + notifier.Spend(&op, 2, spendTx) + + // When the second layer transfer is detected, add back the + // outpoint of the second layer tx so that we can spend it + // again. Only do so if the test requests this behavior. + spendTxID := spendTx.TxHash() + if test.spend2ndLevel && spendTxID == htlc2ndLevlTx.TxHash() { + // Create the second level outpoint that will be spent, + // the index is always zero for these 1-in-1-out txns. + spendOp := wire.OutPoint{Hash: spendTxID} + inputs[spendOp] = htlcSpendTx + } + + if len(inputs) > 0 { + test.whenNonZeroInputs(t, inputs, publTx) + } else { + // Reset the publishing error so that any publication, + // made by the breach arbiter, if any, will succeed. + publMtx.Lock() + publErr = nil + publMtx.Unlock() + test.whenZeroInputs(t, inputs, publTx) + } } - // The TxIn previously attempting to spend the HTLC outpoint should now - // be spending from the second level tx. - if tx.TxIn[htlcIn].PreviousOutPoint.Hash != secondLvlTx.TxHash() { - t.Fatalf("tx not attempting to spend second level tx, %v", tx.TxIn[0]) + // Deliver confirmation of sweep if the test expects it. + if test.sendFinalConf { + notifier.confChannel <- &chainntnfs.TxConfirmation{} } + + // Assert that the channel is fully resolved. + assertBrarCleanup(t, brar, alice.ChanPoint, alice.State().Db) +} + +// findInputIndex returns the index of the input that spends from the given +// outpoint. This method fails if the outpoint is not found. +func findInputIndex(t *testing.T, op wire.OutPoint, tx *wire.MsgTx) int { + t.Helper() + + inputIdx := -1 + for i, txIn := range tx.TxIn { + if txIn.PreviousOutPoint == op { + inputIdx = i + } + } + if inputIdx == -1 { + t.Fatalf("input %v in not found", op) + } + + return inputIdx } // assertArbiterBreach checks that the breach arbiter has persisted the breach @@ -1272,6 +1496,8 @@ func TestBreachSecondLevelTransfer(t *testing.T) { func assertArbiterBreach(t *testing.T, brar *breachArbiter, chanPoint *wire.OutPoint) { + t.Helper() + isBreached, err := brar.IsBreached(chanPoint) if err != nil { t.Fatalf("unable to determine if channel is "+ @@ -1290,6 +1516,8 @@ func assertArbiterBreach(t *testing.T, brar *breachArbiter, func assertNoArbiterBreach(t *testing.T, brar *breachArbiter, chanPoint *wire.OutPoint) { + t.Helper() + isBreached, err := brar.IsBreached(chanPoint) if err != nil { t.Fatalf("unable to determine if channel is "+ @@ -1302,9 +1530,77 @@ func assertNoArbiterBreach(t *testing.T, brar *breachArbiter, } } +// assertBrarCleanup blocks until the given channel point has been removed the +// retribution store and the channel is fully closed in the database. +func assertBrarCleanup(t *testing.T, brar *breachArbiter, + chanPoint *wire.OutPoint, db *channeldb.DB) { + + t.Helper() + + err := lntest.WaitNoError(func() error { + isBreached, err := brar.IsBreached(chanPoint) + if err != nil { + return err + } + + if isBreached { + return fmt.Errorf("channel %v still breached", + chanPoint) + } + + closedChans, err := db.FetchClosedChannels(false) + if err != nil { + return err + } + + for _, channel := range closedChans { + switch { + // Wrong channel. + case channel.ChanPoint != *chanPoint: + continue + + // Right channel, fully closed! + case !channel.IsPending: + return nil + } + + // Still pending. + return fmt.Errorf("channel %v still pending "+ + "close", chanPoint) + } + + return fmt.Errorf("channel %v not closed", chanPoint) + + }, time.Second) + if err != nil { + t.Fatalf(err.Error()) + } +} + +// assertPendingClosed checks that the channel has been marked pending closed in +// the channel database. +func assertPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { + t.Helper() + + closedChans, err := c.State().Db.FetchClosedChannels(true) + if err != nil { + t.Fatalf("unable to load pending closed channels: %v", err) + } + + for _, chanSummary := range closedChans { + if chanSummary.ChanPoint == *c.ChanPoint { + return + } + } + + t.Fatalf("channel %v was not marked pending closed", c.ChanPoint) +} + // assertNotPendingClosed checks that the channel has not been marked pending // closed in the channel database. func assertNotPendingClosed(t *testing.T, c *lnwallet.LightningChannel) { + t.Helper() + closedChans, err := c.State().Db.FetchClosedChannels(true) if err != nil { t.Fatalf("unable to load pending closed channels: %v", err) diff --git a/mock.go b/mock.go index 1eccbea6..6c1f3fb6 100644 --- a/mock.go +++ b/mock.go @@ -123,6 +123,7 @@ func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte, type mockSpendNotifier struct { *mockNotfier spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail + spends map[wire.OutPoint]*chainntnfs.SpendDetail mtx sync.Mutex } @@ -132,6 +133,7 @@ func makeMockSpendNotifier() *mockSpendNotifier { confChannel: make(chan *chainntnfs.TxConfirmation), }, spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail), + spends: make(map[wire.OutPoint]*chainntnfs.SpendDetail), } } @@ -140,8 +142,22 @@ func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, m.mtx.Lock() defer m.mtx.Unlock() - spendChan := make(chan *chainntnfs.SpendDetail) - m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) + spendChan := make(chan *chainntnfs.SpendDetail, 1) + if detail, ok := m.spends[*outpoint]; ok { + // Deliver spend immediately if details are already known. + spendChan <- &chainntnfs.SpendDetail{ + SpentOutPoint: detail.SpentOutPoint, + SpendingHeight: detail.SpendingHeight, + SpendingTx: detail.SpendingTx, + SpenderTxHash: detail.SpenderTxHash, + SpenderInputIndex: detail.SpenderInputIndex, + } + } else { + // Otherwise, queue the notification for delivery if the spend + // is ever received. + m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan) + } + return &chainntnfs.SpendEvent{ Spend: spendChan, Cancel: func() { @@ -156,16 +172,30 @@ func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32, m.mtx.Lock() defer m.mtx.Unlock() + txnHash := txn.TxHash() + details := &chainntnfs.SpendDetail{ + SpentOutPoint: outpoint, + SpendingHeight: height, + SpendingTx: txn, + SpenderTxHash: &txnHash, + SpenderInputIndex: outpoint.Index, + } + + // Cache details in case of late registration. + if _, ok := m.spends[*outpoint]; !ok { + m.spends[*outpoint] = details + } + + // Deliver any backlogged spend notifications. if spendChans, ok := m.spendMap[*outpoint]; ok { delete(m.spendMap, *outpoint) for _, spendChan := range spendChans { - txnHash := txn.TxHash() spendChan <- &chainntnfs.SpendDetail{ - SpentOutPoint: outpoint, - SpendingHeight: height, - SpendingTx: txn, - SpenderTxHash: &txnHash, - SpenderInputIndex: outpoint.Index, + SpentOutPoint: details.SpentOutPoint, + SpendingHeight: details.SpendingHeight, + SpendingTx: details.SpendingTx, + SpenderTxHash: details.SpenderTxHash, + SpenderInputIndex: details.SpenderInputIndex, } } }