diff --git a/server.go b/server.go index 4c9ce4c0..4c710354 100644 --- a/server.go +++ b/server.go @@ -582,10 +582,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, } s.utxoNursery = newUtxoNursery(&NurseryConfig{ - ChainIO: cc.chainIO, - ConfDepth: 1, - DB: chanDB, - Estimator: cc.feeEstimator, + ChainIO: cc.chainIO, + ConfDepth: 1, + FetchClosedChannels: chanDB.FetchClosedChannels, + FetchClosedChannel: chanDB.FetchClosedChannel, + Estimator: cc.feeEstimator, GenSweepScript: func() ([]byte, error) { return newSweepPkScript(cc.wallet) }, diff --git a/utxonursery.go b/utxonursery.go index 5c801dce..d889a3f8 100644 --- a/utxonursery.go +++ b/utxonursery.go @@ -181,9 +181,15 @@ type NurseryConfig struct { // determining outputs in the chain as confirmed. ConfDepth uint32 - // DB provides access to a user's channels, such that they can be marked - // fully closed after incubation has concluded. - DB *channeldb.DB + // FetchClosedChannels provides access to a user's channels, such that + // they can be marked fully closed after incubation has concluded. + FetchClosedChannels func(pendingOnly bool) ( + []*channeldb.ChannelCloseSummary, error) + + // FetchClosedChannel provides access to the close summary to extract a + // height hint from. + FetchClosedChannel func(chanID *wire.OutPoint) ( + *channeldb.ChannelCloseSummary, error) // Estimator is used when crafting sweep transactions to estimate the // necessary fee relative to the expected size of the sweep transaction. @@ -266,7 +272,7 @@ func (u *utxoNursery) Start() error { // Load any pending close channels, which represents the super set of // all channels that may still be incubating. - pendingCloseChans, err := u.cfg.DB.FetchClosedChannels(true) + pendingCloseChans, err := u.cfg.FetchClosedChannels(true) if err != nil { newBlockChan.Cancel() return err @@ -346,6 +352,13 @@ func (u *utxoNursery) IncubateOutputs(chanPoint wire.OutPoint, incomingHtlcs []lnwallet.IncomingHtlcResolution, broadcastHeight uint32) error { + // Add to wait group because nursery might shut down during execution of + // this function. Otherwise it could happen that nursery thinks it is + // shut down, but in this function new goroutines were started and stay + // around. + u.wg.Add(1) + defer u.wg.Done() + numHtlcs := len(incomingHtlcs) + len(outgoingHtlcs) var ( hasCommit bool @@ -624,7 +637,7 @@ func (u *utxoNursery) reloadPreschool() error { chanPoint := kid.OriginChanPoint() // Load the close summary for this output's channel point. - closeSummary, err := u.cfg.DB.FetchClosedChannel(chanPoint) + closeSummary, err := u.cfg.FetchClosedChannel(chanPoint) if err == channeldb.ErrClosedChannelNotFound { // This should never happen since the close summary // should only be removed after the channel has been diff --git a/utxonursery_test.go b/utxonursery_test.go index 02b2d3a0..a98f3324 100644 --- a/utxonursery_test.go +++ b/utxonursery_test.go @@ -395,11 +395,16 @@ func TestBabyOutputSerialization(t *testing.T) { type nurseryTestContext struct { nursery *utxoNursery notifier *nurseryMockNotifier - publishChan chan chainhash.Hash + publishChan chan wire.MsgTx store *nurseryStoreInterceptor + restart func() bool + receiveTx func() wire.MsgTx + t *testing.T } -func createNurseryTestContext(t *testing.T) *nurseryTestContext { +func createNurseryTestContext(t *testing.T, + checkStartStop func(func()) bool) *nurseryTestContext { + // Create a temporary database and connect nurseryStore to it. The // alternative, mocking nurseryStore, is not chosen because there is // still considerable logic in the store. @@ -427,9 +432,18 @@ func createNurseryTestContext(t *testing.T) *nurseryTestContext { cfg := NurseryConfig{ Notifier: notifier, - DB: cdb, - Store: storeIntercepter, - ChainIO: &mockChainIO{}, + FetchClosedChannels: func(pendingOnly bool) ( + []*channeldb.ChannelCloseSummary, error) { + return []*channeldb.ChannelCloseSummary{}, nil + }, + FetchClosedChannel: func(chanID *wire.OutPoint) ( + *channeldb.ChannelCloseSummary, error) { + return &channeldb.ChannelCloseSummary{ + CloseHeight: 0, + }, nil + }, + Store: storeIntercepter, + ChainIO: &mockChainIO{}, GenSweepScript: func() ([]byte, error) { return []byte{}, nil }, @@ -437,21 +451,62 @@ func createNurseryTestContext(t *testing.T) *nurseryTestContext { Signer: &nurseryMockSigner{}, } - publishChan := make(chan chainhash.Hash, 1) + publishChan := make(chan wire.MsgTx, 1) cfg.PublishTransaction = func(tx *wire.MsgTx) error { t.Logf("Publishing tx %v", tx.TxHash()) - publishChan <- tx.TxHash() + publishChan <- *tx return nil } nursery := newUtxoNursery(&cfg) nursery.Start() - return &nurseryTestContext{ + ctx := &nurseryTestContext{ nursery: nursery, notifier: notifier, store: storeIntercepter, publishChan: publishChan, + t: t, + } + + ctx.restart = func() bool { + return checkStartStop(func() { + ctx.nursery.Stop() + // Simulate lnd restart. + ctx.nursery = newUtxoNursery(ctx.nursery.cfg) + ctx.nursery.Start() + }) + } + + ctx.receiveTx = func() wire.MsgTx { + var tx wire.MsgTx + select { + case tx = <-ctx.publishChan: + return tx + case <-time.After(5 * time.Second): + t.Fatalf("tx not published") + } + return tx + } + + // Start with testing an immediate restart. + ctx.restart() + + return ctx +} + +func (ctx *nurseryTestContext) finish() { + // Add a final restart point in this state + ctx.restart() + + ctx.nursery.Stop() + + // We should have consumed and asserted all published transactions in + // our unit tests. + select { + case <-ctx.publishChan: + ctx.t.Fatalf("unexpected transactions published") + default: } } @@ -552,19 +607,73 @@ func assertNurseryReportUnavailable(t *testing.T, nursery *utxoNursery) { } } +// testRestartLoop runs the specified test multiple times and in every run it +// will attempt to execute a restart action in a different location. This is to +// assert that the unit under test is recovering correctly from restarts. +func testRestartLoop(t *testing.T, test func(*testing.T, + func(func()) bool)) { + + // Start with running the test without any restarts (index zero) + restartIdx := 0 + + for { + currentStartStopIdx := 0 + + // checkStartStop is called at every point in the test where a + // restart should be exercised. When this function is called as + // many times as the current value of currentStartStopIdx, it + // will execute startStopFunc. + checkStartStop := func(startStopFunc func()) bool { + currentStartStopIdx++ + if restartIdx == currentStartStopIdx { + startStopFunc() + + return true + } + return false + } + + var subTestName string + if restartIdx == 0 { + subTestName = "no_restart" + } else { + subTestName = fmt.Sprintf("restart_%v", restartIdx) + } + t.Run(subTestName, + func(t *testing.T) { + test(t, checkStartStop) + }) + + // Exit the loop when all restart points have been tested. + if currentStartStopIdx == restartIdx { + return + } + restartIdx++ + } +} + func TestNurseryOutgoingHtlcSuccessOnLocal(t *testing.T) { - ctx := createNurseryTestContext(t) + testRestartLoop(t, testNurseryOutgoingHtlcSuccessOnLocal) +} + +func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T, + checkStartStop func(func()) bool) { + + ctx := createNurseryTestContext(t, checkStartStop) outgoingRes := incubateTestOutput(t, ctx.nursery, true) + ctx.restart() + // Notify arrival of block where HTLC CLTV expires. ctx.notifier.notifyEpoch(125) // This should trigger nursery to publish the timeout tx. - select { - case <-ctx.publishChan: - case <-time.After(defaultTestTimeout): - t.Fatalf("tx not published") + ctx.receiveTx() + + if ctx.restart() { + // Restart should retrigger broadcast of timeout tx. + ctx.receiveTx() } // Confirm the timeout tx. This should promote the HTLC to KNDR state. @@ -580,6 +689,8 @@ func TestNurseryOutgoingHtlcSuccessOnLocal(t *testing.T) { t.Fatalf("output not promoted to KNDR") } + ctx.restart() + // Notify arrival of block where second level HTLC unlocks. ctx.notifier.notifyEpoch(128) @@ -588,10 +699,18 @@ func TestNurseryOutgoingHtlcSuccessOnLocal(t *testing.T) { } func TestNurseryOutgoingHtlcSuccessOnRemote(t *testing.T) { - ctx := createNurseryTestContext(t) + testRestartLoop(t, testNurseryOutgoingHtlcSuccessOnRemote) +} + +func testNurseryOutgoingHtlcSuccessOnRemote(t *testing.T, + checkStartStop func(func()) bool) { + + ctx := createNurseryTestContext(t, checkStartStop) outgoingRes := incubateTestOutput(t, ctx.nursery, false) + ctx.restart() + // Notify confirmation of the commitment tx. Is only listened to when // resolving remote commitment tx. // @@ -608,6 +727,8 @@ func TestNurseryOutgoingHtlcSuccessOnRemote(t *testing.T) { t.Fatalf("output not promoted to KNDR") } + ctx.restart() + // Notify arrival of block where HTLC CLTV expires. ctx.notifier.notifyEpoch(125) @@ -617,11 +738,11 @@ func TestNurseryOutgoingHtlcSuccessOnRemote(t *testing.T) { func testSweep(t *testing.T, ctx *nurseryTestContext) { // Wait for nursery to publish the sweep tx. - var sweepTxHash chainhash.Hash - select { - case sweepTxHash = <-ctx.publishChan: - case <-time.After(defaultTestTimeout): - t.Fatalf("sweep tx not published") + sweepTx := ctx.receiveTx() + + if ctx.restart() { + // Restart will trigger rebroadcast of sweep tx. + sweepTx = ctx.receiveTx() } // Verify stage in nursery report. HTLCs should now both still be in @@ -629,6 +750,7 @@ func testSweep(t *testing.T, ctx *nurseryTestContext) { assertNurseryReport(t, ctx.nursery, 1, 2) // Confirm the sweep tx. + sweepTxHash := sweepTx.TxHash() err := ctx.notifier.confirmTx(&sweepTxHash, 129) if err != nil { t.Fatal(err) @@ -641,9 +763,13 @@ func testSweep(t *testing.T, ctx *nurseryTestContext) { t.Fatalf("output not graduated") } + ctx.restart() + // As there only was one output to graduate, we expect the channel to be // closed and no report available anymore. assertNurseryReportUnavailable(t, ctx.nursery) + + ctx.finish() } type nurseryStoreInterceptor struct {