utxonursery: test restart behaviour

This commit is contained in:
Joost Jager 2018-09-12 22:23:06 +02:00
parent 83a5ea0c31
commit 9f6fb1e128
No known key found for this signature in database
GPG Key ID: AE6B0D042C8E38D9
3 changed files with 168 additions and 28 deletions

@ -582,10 +582,11 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
} }
s.utxoNursery = newUtxoNursery(&NurseryConfig{ s.utxoNursery = newUtxoNursery(&NurseryConfig{
ChainIO: cc.chainIO, ChainIO: cc.chainIO,
ConfDepth: 1, ConfDepth: 1,
DB: chanDB, FetchClosedChannels: chanDB.FetchClosedChannels,
Estimator: cc.feeEstimator, FetchClosedChannel: chanDB.FetchClosedChannel,
Estimator: cc.feeEstimator,
GenSweepScript: func() ([]byte, error) { GenSweepScript: func() ([]byte, error) {
return newSweepPkScript(cc.wallet) return newSweepPkScript(cc.wallet)
}, },

@ -181,9 +181,15 @@ type NurseryConfig struct {
// determining outputs in the chain as confirmed. // determining outputs in the chain as confirmed.
ConfDepth uint32 ConfDepth uint32
// DB provides access to a user's channels, such that they can be marked // FetchClosedChannels provides access to a user's channels, such that
// fully closed after incubation has concluded. // they can be marked fully closed after incubation has concluded.
DB *channeldb.DB FetchClosedChannels func(pendingOnly bool) (
[]*channeldb.ChannelCloseSummary, error)
// FetchClosedChannel provides access to the close summary to extract a
// height hint from.
FetchClosedChannel func(chanID *wire.OutPoint) (
*channeldb.ChannelCloseSummary, error)
// Estimator is used when crafting sweep transactions to estimate the // Estimator is used when crafting sweep transactions to estimate the
// necessary fee relative to the expected size of the sweep transaction. // necessary fee relative to the expected size of the sweep transaction.
@ -266,7 +272,7 @@ func (u *utxoNursery) Start() error {
// Load any pending close channels, which represents the super set of // Load any pending close channels, which represents the super set of
// all channels that may still be incubating. // all channels that may still be incubating.
pendingCloseChans, err := u.cfg.DB.FetchClosedChannels(true) pendingCloseChans, err := u.cfg.FetchClosedChannels(true)
if err != nil { if err != nil {
newBlockChan.Cancel() newBlockChan.Cancel()
return err return err
@ -346,6 +352,13 @@ func (u *utxoNursery) IncubateOutputs(chanPoint wire.OutPoint,
incomingHtlcs []lnwallet.IncomingHtlcResolution, incomingHtlcs []lnwallet.IncomingHtlcResolution,
broadcastHeight uint32) error { broadcastHeight uint32) error {
// Add to wait group because nursery might shut down during execution of
// this function. Otherwise it could happen that nursery thinks it is
// shut down, but in this function new goroutines were started and stay
// around.
u.wg.Add(1)
defer u.wg.Done()
numHtlcs := len(incomingHtlcs) + len(outgoingHtlcs) numHtlcs := len(incomingHtlcs) + len(outgoingHtlcs)
var ( var (
hasCommit bool hasCommit bool
@ -624,7 +637,7 @@ func (u *utxoNursery) reloadPreschool() error {
chanPoint := kid.OriginChanPoint() chanPoint := kid.OriginChanPoint()
// Load the close summary for this output's channel point. // Load the close summary for this output's channel point.
closeSummary, err := u.cfg.DB.FetchClosedChannel(chanPoint) closeSummary, err := u.cfg.FetchClosedChannel(chanPoint)
if err == channeldb.ErrClosedChannelNotFound { if err == channeldb.ErrClosedChannelNotFound {
// This should never happen since the close summary // This should never happen since the close summary
// should only be removed after the channel has been // should only be removed after the channel has been

@ -395,11 +395,16 @@ func TestBabyOutputSerialization(t *testing.T) {
type nurseryTestContext struct { type nurseryTestContext struct {
nursery *utxoNursery nursery *utxoNursery
notifier *nurseryMockNotifier notifier *nurseryMockNotifier
publishChan chan chainhash.Hash publishChan chan wire.MsgTx
store *nurseryStoreInterceptor store *nurseryStoreInterceptor
restart func() bool
receiveTx func() wire.MsgTx
t *testing.T
} }
func createNurseryTestContext(t *testing.T) *nurseryTestContext { func createNurseryTestContext(t *testing.T,
checkStartStop func(func()) bool) *nurseryTestContext {
// Create a temporary database and connect nurseryStore to it. The // Create a temporary database and connect nurseryStore to it. The
// alternative, mocking nurseryStore, is not chosen because there is // alternative, mocking nurseryStore, is not chosen because there is
// still considerable logic in the store. // still considerable logic in the store.
@ -427,9 +432,18 @@ func createNurseryTestContext(t *testing.T) *nurseryTestContext {
cfg := NurseryConfig{ cfg := NurseryConfig{
Notifier: notifier, Notifier: notifier,
DB: cdb, FetchClosedChannels: func(pendingOnly bool) (
Store: storeIntercepter, []*channeldb.ChannelCloseSummary, error) {
ChainIO: &mockChainIO{}, return []*channeldb.ChannelCloseSummary{}, nil
},
FetchClosedChannel: func(chanID *wire.OutPoint) (
*channeldb.ChannelCloseSummary, error) {
return &channeldb.ChannelCloseSummary{
CloseHeight: 0,
}, nil
},
Store: storeIntercepter,
ChainIO: &mockChainIO{},
GenSweepScript: func() ([]byte, error) { GenSweepScript: func() ([]byte, error) {
return []byte{}, nil return []byte{}, nil
}, },
@ -437,21 +451,62 @@ func createNurseryTestContext(t *testing.T) *nurseryTestContext {
Signer: &nurseryMockSigner{}, Signer: &nurseryMockSigner{},
} }
publishChan := make(chan chainhash.Hash, 1) publishChan := make(chan wire.MsgTx, 1)
cfg.PublishTransaction = func(tx *wire.MsgTx) error { cfg.PublishTransaction = func(tx *wire.MsgTx) error {
t.Logf("Publishing tx %v", tx.TxHash()) t.Logf("Publishing tx %v", tx.TxHash())
publishChan <- tx.TxHash() publishChan <- *tx
return nil return nil
} }
nursery := newUtxoNursery(&cfg) nursery := newUtxoNursery(&cfg)
nursery.Start() nursery.Start()
return &nurseryTestContext{ ctx := &nurseryTestContext{
nursery: nursery, nursery: nursery,
notifier: notifier, notifier: notifier,
store: storeIntercepter, store: storeIntercepter,
publishChan: publishChan, publishChan: publishChan,
t: t,
}
ctx.restart = func() bool {
return checkStartStop(func() {
ctx.nursery.Stop()
// Simulate lnd restart.
ctx.nursery = newUtxoNursery(ctx.nursery.cfg)
ctx.nursery.Start()
})
}
ctx.receiveTx = func() wire.MsgTx {
var tx wire.MsgTx
select {
case tx = <-ctx.publishChan:
return tx
case <-time.After(5 * time.Second):
t.Fatalf("tx not published")
}
return tx
}
// Start with testing an immediate restart.
ctx.restart()
return ctx
}
func (ctx *nurseryTestContext) finish() {
// Add a final restart point in this state
ctx.restart()
ctx.nursery.Stop()
// We should have consumed and asserted all published transactions in
// our unit tests.
select {
case <-ctx.publishChan:
ctx.t.Fatalf("unexpected transactions published")
default:
} }
} }
@ -552,19 +607,73 @@ func assertNurseryReportUnavailable(t *testing.T, nursery *utxoNursery) {
} }
} }
// testRestartLoop runs the specified test multiple times and in every run it
// will attempt to execute a restart action in a different location. This is to
// assert that the unit under test is recovering correctly from restarts.
func testRestartLoop(t *testing.T, test func(*testing.T,
func(func()) bool)) {
// Start with running the test without any restarts (index zero)
restartIdx := 0
for {
currentStartStopIdx := 0
// checkStartStop is called at every point in the test where a
// restart should be exercised. When this function is called as
// many times as the current value of currentStartStopIdx, it
// will execute startStopFunc.
checkStartStop := func(startStopFunc func()) bool {
currentStartStopIdx++
if restartIdx == currentStartStopIdx {
startStopFunc()
return true
}
return false
}
var subTestName string
if restartIdx == 0 {
subTestName = "no_restart"
} else {
subTestName = fmt.Sprintf("restart_%v", restartIdx)
}
t.Run(subTestName,
func(t *testing.T) {
test(t, checkStartStop)
})
// Exit the loop when all restart points have been tested.
if currentStartStopIdx == restartIdx {
return
}
restartIdx++
}
}
func TestNurseryOutgoingHtlcSuccessOnLocal(t *testing.T) { func TestNurseryOutgoingHtlcSuccessOnLocal(t *testing.T) {
ctx := createNurseryTestContext(t) testRestartLoop(t, testNurseryOutgoingHtlcSuccessOnLocal)
}
func testNurseryOutgoingHtlcSuccessOnLocal(t *testing.T,
checkStartStop func(func()) bool) {
ctx := createNurseryTestContext(t, checkStartStop)
outgoingRes := incubateTestOutput(t, ctx.nursery, true) outgoingRes := incubateTestOutput(t, ctx.nursery, true)
ctx.restart()
// Notify arrival of block where HTLC CLTV expires. // Notify arrival of block where HTLC CLTV expires.
ctx.notifier.notifyEpoch(125) ctx.notifier.notifyEpoch(125)
// This should trigger nursery to publish the timeout tx. // This should trigger nursery to publish the timeout tx.
select { ctx.receiveTx()
case <-ctx.publishChan:
case <-time.After(defaultTestTimeout): if ctx.restart() {
t.Fatalf("tx not published") // Restart should retrigger broadcast of timeout tx.
ctx.receiveTx()
} }
// Confirm the timeout tx. This should promote the HTLC to KNDR state. // Confirm the timeout tx. This should promote the HTLC to KNDR state.
@ -580,6 +689,8 @@ func TestNurseryOutgoingHtlcSuccessOnLocal(t *testing.T) {
t.Fatalf("output not promoted to KNDR") t.Fatalf("output not promoted to KNDR")
} }
ctx.restart()
// Notify arrival of block where second level HTLC unlocks. // Notify arrival of block where second level HTLC unlocks.
ctx.notifier.notifyEpoch(128) ctx.notifier.notifyEpoch(128)
@ -588,10 +699,18 @@ func TestNurseryOutgoingHtlcSuccessOnLocal(t *testing.T) {
} }
func TestNurseryOutgoingHtlcSuccessOnRemote(t *testing.T) { func TestNurseryOutgoingHtlcSuccessOnRemote(t *testing.T) {
ctx := createNurseryTestContext(t) testRestartLoop(t, testNurseryOutgoingHtlcSuccessOnRemote)
}
func testNurseryOutgoingHtlcSuccessOnRemote(t *testing.T,
checkStartStop func(func()) bool) {
ctx := createNurseryTestContext(t, checkStartStop)
outgoingRes := incubateTestOutput(t, ctx.nursery, false) outgoingRes := incubateTestOutput(t, ctx.nursery, false)
ctx.restart()
// Notify confirmation of the commitment tx. Is only listened to when // Notify confirmation of the commitment tx. Is only listened to when
// resolving remote commitment tx. // resolving remote commitment tx.
// //
@ -608,6 +727,8 @@ func TestNurseryOutgoingHtlcSuccessOnRemote(t *testing.T) {
t.Fatalf("output not promoted to KNDR") t.Fatalf("output not promoted to KNDR")
} }
ctx.restart()
// Notify arrival of block where HTLC CLTV expires. // Notify arrival of block where HTLC CLTV expires.
ctx.notifier.notifyEpoch(125) ctx.notifier.notifyEpoch(125)
@ -617,11 +738,11 @@ func TestNurseryOutgoingHtlcSuccessOnRemote(t *testing.T) {
func testSweep(t *testing.T, ctx *nurseryTestContext) { func testSweep(t *testing.T, ctx *nurseryTestContext) {
// Wait for nursery to publish the sweep tx. // Wait for nursery to publish the sweep tx.
var sweepTxHash chainhash.Hash sweepTx := ctx.receiveTx()
select {
case sweepTxHash = <-ctx.publishChan: if ctx.restart() {
case <-time.After(defaultTestTimeout): // Restart will trigger rebroadcast of sweep tx.
t.Fatalf("sweep tx not published") sweepTx = ctx.receiveTx()
} }
// Verify stage in nursery report. HTLCs should now both still be in // Verify stage in nursery report. HTLCs should now both still be in
@ -629,6 +750,7 @@ func testSweep(t *testing.T, ctx *nurseryTestContext) {
assertNurseryReport(t, ctx.nursery, 1, 2) assertNurseryReport(t, ctx.nursery, 1, 2)
// Confirm the sweep tx. // Confirm the sweep tx.
sweepTxHash := sweepTx.TxHash()
err := ctx.notifier.confirmTx(&sweepTxHash, 129) err := ctx.notifier.confirmTx(&sweepTxHash, 129)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -641,9 +763,13 @@ func testSweep(t *testing.T, ctx *nurseryTestContext) {
t.Fatalf("output not graduated") t.Fatalf("output not graduated")
} }
ctx.restart()
// As there only was one output to graduate, we expect the channel to be // As there only was one output to graduate, we expect the channel to be
// closed and no report available anymore. // closed and no report available anymore.
assertNurseryReportUnavailable(t, ctx.nursery) assertNurseryReportUnavailable(t, ctx.nursery)
ctx.finish()
} }
type nurseryStoreInterceptor struct { type nurseryStoreInterceptor struct {