From c3bf8d205433790295256ef7d10b035c2c721868 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 24 Sep 2019 19:19:53 -0700 Subject: [PATCH 1/3] contractcourt: create new channel arb test context struct In this commit, we create a new channel arb test context struct as the current `createTestChannelArbitrator` has several return parameters, and upcoming changes will likely at first glance need to add one or more additional parameters. Rather than extend the existing set of return parameters, we opt to instead create this struct that wraps the existing state. Along the way we add several new utility methods to this context, and use them in the existing tests where applicable: * `AssertStateTransitions` * `AssertState` * `Restart` * `CleanUp` --- contractcourt/channel_arbitrator_test.go | 459 ++++++++++++++--------- 1 file changed, 281 insertions(+), 178 deletions(-) diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index b785b02a..2e31ca13 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -3,12 +3,16 @@ package contractcourt import ( "errors" "fmt" + "io/ioutil" + "os" + "path/filepath" "sync" "testing" "time" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/coreos/bbolt" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/input" @@ -127,6 +131,25 @@ func (b *mockArbitratorLog) WipeHistory() error { return nil } +// testArbLog is a wrapper around an existing (ideally fully concrete +// ArbitratorLog) that lets us intercept certain calls like transitioning to a +// new state. +type testArbLog struct { + ArbitratorLog + + newStates chan ArbitratorState +} + +func (t *testArbLog) CommitState(s ArbitratorState) error { + if err := t.ArbitratorLog.CommitState(s); err != nil { + return err + } + + t.newStates <- s + + return nil +} + type mockChainIO struct{} var _ lnwallet.BlockChainIO = (*mockChainIO)(nil) @@ -148,9 +171,101 @@ func (*mockChainIO) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) return nil, nil } -func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator, - chan struct{}, chan []ResolutionMsg, chan *chainntnfs.BlockEpoch, error) { +type chanArbTestCtx struct { + t *testing.T + chanArb *ChannelArbitrator + + cleanUp func() + + resolvedChan chan struct{} + + blockEpochs chan *chainntnfs.BlockEpoch + + incubationRequests chan struct{} + + resolutions chan []ResolutionMsg + + log ArbitratorLog +} + +func (c *chanArbTestCtx) CleanUp() { + if err := c.chanArb.Stop(); err != nil { + c.t.Fatalf("unable to stop chan arb: %v", err) + } + + if c.cleanUp != nil { + c.cleanUp() + } +} + +// AssertStateTransitions asserts that the state machine steps through the +// passed states in order. +func (c *chanArbTestCtx) AssertStateTransitions(expectedStates ...ArbitratorState) { + c.t.Helper() + + var newStatesChan chan ArbitratorState + switch log := c.log.(type) { + case *mockArbitratorLog: + newStatesChan = log.newStates + + case *testArbLog: + newStatesChan = log.newStates + + default: + c.t.Fatalf("unable to assert state transitions with %T", log) + } + + for _, exp := range expectedStates { + var state ArbitratorState + select { + case state = <-newStatesChan: + case <-time.After(5 * time.Second): + c.t.Fatalf("new state not received") + } + + if state != exp { + c.t.Fatalf("expected new state %v, got %v", exp, state) + } + } +} + +// AssertState checks that the ChannelArbitrator is in the state we expect it +// to be. +func (c *chanArbTestCtx) AssertState(expected ArbitratorState) { + if c.chanArb.state != expected { + c.t.Fatalf("expected state %v, was %v", expected, c.chanArb.state) + } +} + +// Restart simulates a clean restart of the channel arbitrator, forcing it to +// walk through it's recovery logic. If this function returns nil, then a +// restart was successful. Note that the restart process keeps the log in +// place, in order to simulate proper persistence of the log. The caller can +// optionally provide a restart closure which will be executed before the +// resolver is started again, but after it is created. +func (c *chanArbTestCtx) Restart(restartClosure func(*chanArbTestCtx)) (*chanArbTestCtx, error) { + if err := c.chanArb.Stop(); err != nil { + return nil, err + } + + newCtx, err := createTestChannelArbitrator(c.t, c.log) + if err != nil { + return nil, err + } + + if restartClosure != nil { + restartClosure(newCtx) + } + + if err := newCtx.chanArb.Start(); err != nil { + return nil, err + } + + return newCtx, nil +} + +func createTestChannelArbitrator(t *testing.T, log ArbitratorLog) (*chanArbTestCtx, error) { blockEpochs := make(chan *chainntnfs.BlockEpoch) blockEpoch := &chainntnfs.BlockEpochEvent{ Epochs: blockEpochs, @@ -167,6 +282,7 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator, } resolutionChan := make(chan []ResolutionMsg, 1) + incubateChan := make(chan struct{}) chainIO := &mockChainIO{} chainArbCfg := ChainArbitratorConfig{ @@ -188,6 +304,8 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator, IncubateOutputs: func(wire.OutPoint, *lnwallet.CommitOutputResolution, *lnwallet.OutgoingHtlcResolution, *lnwallet.IncomingHtlcResolution, uint32) error { + + incubateChan <- struct{}{} return nil }, } @@ -224,17 +342,49 @@ func createTestChannelArbitrator(log ArbitratorLog) (*ChannelArbitrator, ChainEvents: chanEvents, } - htlcSets := make(map[HtlcSetKey]htlcSet) - return NewChannelArbitrator(arbCfg, htlcSets, log), resolvedChan, - resolutionChan, blockEpochs, nil -} + var cleanUp func() + if log == nil { + dbDir, err := ioutil.TempDir("", "chanArb") + if err != nil { + return nil, err + } + dbPath := filepath.Join(dbDir, "testdb") + db, err := bbolt.Open(dbPath, 0600, nil) + if err != nil { + return nil, err + } -// assertState checks that the ChannelArbitrator is in the state we expect it -// to be. -func assertState(t *testing.T, c *ChannelArbitrator, expected ArbitratorState) { - if c.state != expected { - t.Fatalf("expected state %v, was %v", expected, c.state) + backingLog, err := newBoltArbitratorLog( + db, arbCfg, chainhash.Hash{}, chanPoint, + ) + if err != nil { + return nil, err + } + cleanUp = func() { + db.Close() + os.RemoveAll(dbDir) + } + + log = &testArbLog{ + ArbitratorLog: backingLog, + newStates: make(chan ArbitratorState), + } } + + htlcSets := make(map[HtlcSetKey]htlcSet) + + chanArb := NewChannelArbitrator(arbCfg, htlcSets, log) + + return &chanArbTestCtx{ + t: t, + chanArb: chanArb, + cleanUp: cleanUp, + resolvedChan: resolvedChan, + resolutions: resolutionChan, + blockEpochs: blockEpochs, + log: log, + incubationRequests: incubateChan, + }, nil } // TestChannelArbitratorCooperativeClose tests that the ChannelArbitertor @@ -246,22 +396,26 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) { newStates: make(chan ArbitratorState, 5), } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } - if err := chanArb.Start(); err != nil { + if err := chanArbCtx.chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) } - defer chanArb.Stop() + defer func() { + if err := chanArbCtx.chanArb.Stop(); err != nil { + t.Fatalf("unable to stop chan arb: %v", err) + } + }() // It should start out in the default state. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // We set up a channel to detect when MarkChannelClosed is called. closeInfos := make(chan *channeldb.ChannelCloseSummary) - chanArb.cfg.MarkChannelClosed = func( + chanArbCtx.chanArb.cfg.MarkChannelClosed = func( closeInfo *channeldb.ChannelCloseSummary) error { closeInfos <- closeInfo return nil @@ -272,7 +426,7 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) { closeInfo := &CooperativeCloseInfo{ &channeldb.ChannelCloseSummary{}, } - chanArb.cfg.ChainEvents.CooperativeClosure <- closeInfo + chanArbCtx.chanArb.cfg.ChainEvents.CooperativeClosure <- closeInfo select { case c := <-closeInfos: @@ -285,31 +439,13 @@ func TestChannelArbitratorCooperativeClose(t *testing.T) { // It should mark the channel as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") } } -func assertStateTransitions(t *testing.T, newStates <-chan ArbitratorState, - expectedStates ...ArbitratorState) { - t.Helper() - - for _, exp := range expectedStates { - var state ArbitratorState - select { - case state = <-newStates: - case <-time.After(5 * time.Second): - t.Fatalf("new state not received") - } - - if state != exp { - t.Fatalf("expected new state %v, got %v", exp, state) - } - } -} - // TestChannelArbitratorRemoteForceClose checks that the ChannelArbitrator goes // through the expected states if a remote force close is observed in the // chain. @@ -319,10 +455,11 @@ func TestChannelArbitratorRemoteForceClose(t *testing.T) { newStates: make(chan ArbitratorState, 5), } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) @@ -330,7 +467,7 @@ func TestChannelArbitratorRemoteForceClose(t *testing.T) { defer chanArb.Stop() // It should start out in the default state. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // Send a remote force close event. commitSpend := &chainntnfs.SpendDetail{ @@ -351,13 +488,13 @@ func TestChannelArbitratorRemoteForceClose(t *testing.T) { // It should transition StateDefault -> StateContractClosed -> // StateFullyResolved. - assertStateTransitions( - t, log.newStates, StateContractClosed, StateFullyResolved, + chanArbCtx.AssertStateTransitions( + StateContractClosed, StateFullyResolved, ) // It should also mark the channel as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") @@ -373,10 +510,11 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) { newStates: make(chan ArbitratorState, 5), } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) @@ -384,7 +522,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) { defer chanArb.Stop() // It should start out in the default state. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // We create a channel we can use to pause the ChannelArbitrator at the // point where it broadcasts the close tx, and check its state. @@ -411,7 +549,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) { } // It should transition to StateBroadcastCommit. - assertStateTransitions(t, log.newStates, StateBroadcastCommit) + chanArbCtx.AssertStateTransitions(StateBroadcastCommit) // When it is broadcasting the force close, its state should be // StateBroadcastCommit. @@ -426,7 +564,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) { // After broadcasting, transition should be to // StateCommitmentBroadcasted. - assertStateTransitions(t, log.newStates, StateCommitmentBroadcasted) + chanArbCtx.AssertStateTransitions(StateCommitmentBroadcasted) select { case <-respChan: @@ -445,7 +583,7 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) { // After broadcasting the close tx, it should be in state // StateCommitmentBroadcasted. - assertState(t, chanArb, StateCommitmentBroadcasted) + chanArbCtx.AssertState(StateCommitmentBroadcasted) // Now notify about the local force close getting confirmed. chanArb.cfg.ChainEvents.LocalUnilateralClosure <- &LocalUnilateralCloseInfo{ @@ -458,12 +596,11 @@ func TestChannelArbitratorLocalForceClose(t *testing.T) { } // It should transition StateContractClosed -> StateFullyResolved. - assertStateTransitions(t, log.newStates, StateContractClosed, - StateFullyResolved) + chanArbCtx.AssertStateTransitions(StateContractClosed, StateFullyResolved) // It should also mark the channel as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") @@ -479,10 +616,11 @@ func TestChannelArbitratorBreachClose(t *testing.T) { newStates: make(chan ArbitratorState, 5), } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) @@ -494,19 +632,19 @@ func TestChannelArbitratorBreachClose(t *testing.T) { }() // It should start out in the default state. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // Send a breach close event. chanArb.cfg.ChainEvents.ContractBreach <- &lnwallet.BreachRetribution{} // It should transition StateDefault -> StateFullyResolved. - assertStateTransitions( - t, log.newStates, StateFullyResolved, + chanArbCtx.AssertStateTransitions( + StateFullyResolved, ) // It should also mark the channel as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") @@ -523,14 +661,15 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { resolvers: make(map[ContractResolver]struct{}), } - chanArb, resolved, resolutions, _, err := createTestChannelArbitrator( - arbLog, + chanArbCtx, err := createTestChannelArbitrator( + t, arbLog, ) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } incubateChan := make(chan struct{}) + chanArb := chanArbCtx.chanArb chanArb.cfg.IncubateOutputs = func(_ wire.OutPoint, _ *lnwallet.CommitOutputResolution, _ *lnwallet.OutgoingHtlcResolution, @@ -599,8 +738,8 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { // The force close request should trigger broadcast of the commitment // transaction. - assertStateTransitions( - t, arbLog.newStates, StateBroadcastCommit, + chanArbCtx.AssertStateTransitions( + StateBroadcastCommit, StateCommitmentBroadcasted, ) select { @@ -675,15 +814,15 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { }, } - assertStateTransitions( - t, arbLog.newStates, StateContractClosed, + chanArbCtx.AssertStateTransitions( + StateContractClosed, StateWaitingFullResolution, ) // We expect an immediate resolution message for the outgoing dust htlc. // It is not resolvable on-chain. select { - case msgs := <-resolutions: + case msgs := <-chanArbCtx.resolutions: if len(msgs) != 1 { t.Fatalf("expected 1 message, instead got %v", len(msgs)) } @@ -723,7 +862,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { // Finally, we should also receive a resolution message instructing the // switch to cancel back the HTLC. select { - case msgs := <-resolutions: + case msgs := <-chanArbCtx.resolutions: if len(msgs) != 1 { t.Fatalf("expected 1 message, instead got %v", len(msgs)) } @@ -740,7 +879,7 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { // to the second level. Channel arbitrator should still not be marked // as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: t.Fatalf("channel resolved prematurely") default: } @@ -749,9 +888,9 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { notifier.spendChan <- &chainntnfs.SpendDetail{SpendingTx: closeTx} // At this point channel should be marked as resolved. - assertStateTransitions(t, arbLog.newStates, StateFullyResolved) + chanArbCtx.AssertStateTransitions(StateFullyResolved) select { - case <-resolved: + case <-chanArbCtx.resolvedChan: case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") } @@ -766,10 +905,11 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) { newStates: make(chan ArbitratorState, 5), } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) @@ -777,7 +917,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) { defer chanArb.Stop() // It should start out in the default state. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // Create a channel we can use to assert the state when it publishes // the close tx. @@ -804,7 +944,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) { } // It should transition to StateBroadcastCommit. - assertStateTransitions(t, log.newStates, StateBroadcastCommit) + chanArbCtx.AssertStateTransitions(StateBroadcastCommit) // We expect it to be in state StateBroadcastCommit when publishing // the force close. @@ -819,7 +959,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) { // After broadcasting, transition should be to // StateCommitmentBroadcasted. - assertStateTransitions(t, log.newStates, StateCommitmentBroadcasted) + chanArbCtx.AssertStateTransitions(StateCommitmentBroadcasted) // Wait for a response to the force close. select { @@ -838,7 +978,7 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) { } // The state should be StateCommitmentBroadcasted. - assertState(t, chanArb, StateCommitmentBroadcasted) + chanArbCtx.AssertState(StateCommitmentBroadcasted) // Now notify about the _REMOTE_ commitment getting confirmed. commitSpend := &chainntnfs.SpendDetail{ @@ -853,12 +993,11 @@ func TestChannelArbitratorLocalForceCloseRemoteConfirmed(t *testing.T) { } // It should transition StateContractClosed -> StateFullyResolved. - assertStateTransitions(t, log.newStates, StateContractClosed, - StateFullyResolved) + chanArbCtx.AssertStateTransitions(StateContractClosed, StateFullyResolved) // It should resolve. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(15 * time.Second): t.Fatalf("contract was not resolved") @@ -875,10 +1014,11 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) { newStates: make(chan ArbitratorState, 5), } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) @@ -886,7 +1026,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) { defer chanArb.Stop() // It should start out in the default state. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // Return ErrDoubleSpend when attempting to publish the tx. stateChan := make(chan ArbitratorState) @@ -912,7 +1052,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) { } // It should transition to StateBroadcastCommit. - assertStateTransitions(t, log.newStates, StateBroadcastCommit) + chanArbCtx.AssertStateTransitions(StateBroadcastCommit) // We expect it to be in state StateBroadcastCommit when publishing // the force close. @@ -927,7 +1067,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) { // After broadcasting, transition should be to // StateCommitmentBroadcasted. - assertStateTransitions(t, log.newStates, StateCommitmentBroadcasted) + chanArbCtx.AssertStateTransitions(StateCommitmentBroadcasted) // Wait for a response to the force close. select { @@ -946,7 +1086,7 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) { } // The state should be StateCommitmentBroadcasted. - assertState(t, chanArb, StateCommitmentBroadcasted) + chanArbCtx.AssertState(StateCommitmentBroadcasted) // Now notify about the _REMOTE_ commitment getting confirmed. commitSpend := &chainntnfs.SpendDetail{ @@ -961,12 +1101,11 @@ func TestChannelArbitratorLocalForceDoubleSpend(t *testing.T) { } // It should transition StateContractClosed -> StateFullyResolved. - assertStateTransitions(t, log.newStates, StateContractClosed, - StateFullyResolved) + chanArbCtx.AssertStateTransitions(StateContractClosed, StateFullyResolved) // It should resolve. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(15 * time.Second): t.Fatalf("contract was not resolved") @@ -983,17 +1122,18 @@ func TestChannelArbitratorPersistence(t *testing.T) { failLog: true, } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) } // It should start in StateDefault. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // Send a remote force close event. commitSpend := &chainntnfs.SpendDetail{ @@ -1014,20 +1154,17 @@ func TestChannelArbitratorPersistence(t *testing.T) { if log.state != StateDefault { t.Fatalf("expected to stay in StateDefault") } - chanArb.Stop() - // Create a new arbitrator with the same log. - chanArb, resolved, _, _, err = createTestChannelArbitrator(log) + // Restart the channel arb, this'll use the same long and prior + // context. + chanArbCtx, err = chanArbCtx.Restart(nil) if err != nil { - t.Fatalf("unable to create ChannelArbitrator: %v", err) - } - - if err := chanArb.Start(); err != nil { - t.Fatalf("unable to start ChannelArbitrator: %v", err) + t.Fatalf("unable to restart channel arb: %v", err) } + chanArb = chanArbCtx.chanArb // Again, it should start up in the default state. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // Now we make the log succeed writing the resolutions, but fail when // attempting to close the channel. @@ -1047,20 +1184,16 @@ func TestChannelArbitratorPersistence(t *testing.T) { if log.state != StateDefault { t.Fatalf("expected to stay in StateDefault") } - chanArb.Stop() - // Create yet another arbitrator with the same log. - chanArb, resolved, _, _, err = createTestChannelArbitrator(log) + // Restart once again to simulate yet another restart. + chanArbCtx, err = chanArbCtx.Restart(nil) if err != nil { - t.Fatalf("unable to create ChannelArbitrator: %v", err) - } - - if err := chanArb.Start(); err != nil { - t.Fatalf("unable to start ChannelArbitrator: %v", err) + t.Fatalf("unable to restart channel arb: %v", err) } + chanArb = chanArbCtx.chanArb // Starts out in StateDefault. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // Now make fetching the resolutions fail. log.failFetch = fmt.Errorf("intentional fetch failure") @@ -1070,9 +1203,7 @@ func TestChannelArbitratorPersistence(t *testing.T) { // Since logging the resolutions and closing the channel now succeeds, // it should advance to StateContractClosed. - assertStateTransitions( - t, log.newStates, StateContractClosed, - ) + chanArbCtx.AssertStateTransitions(StateContractClosed) // It should not advance further, however, as fetching resolutions // failed. @@ -1084,24 +1215,18 @@ func TestChannelArbitratorPersistence(t *testing.T) { // Create a new arbitrator, and now make fetching resolutions succeed. log.failFetch = nil - chanArb, resolved, _, _, err = createTestChannelArbitrator(log) + chanArbCtx, err = chanArbCtx.Restart(nil) if err != nil { - t.Fatalf("unable to create ChannelArbitrator: %v", err) + t.Fatalf("unable to restart channel arb: %v", err) } - - if err := chanArb.Start(); err != nil { - t.Fatalf("unable to start ChannelArbitrator: %v", err) - } - defer chanArb.Stop() + defer chanArbCtx.CleanUp() // Finally it should advance to StateFullyResolved. - assertStateTransitions( - t, log.newStates, StateFullyResolved, - ) + chanArbCtx.AssertStateTransitions(StateFullyResolved) // It should also mark the channel as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") @@ -1119,17 +1244,18 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) { newStates: make(chan ArbitratorState, 5), } - chanArb, _, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) } // It should start in StateDefault. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) // We start by attempting a local force close. We'll return an // unexpected publication error, causing the state machine to halt. @@ -1157,7 +1283,7 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) { } // It should transition to StateBroadcastCommit. - assertStateTransitions(t, log.newStates, StateBroadcastCommit) + chanArbCtx.AssertStateTransitions(StateBroadcastCommit) // We expect it to be in state StateBroadcastCommit when attempting // the force close. @@ -1181,43 +1307,25 @@ func TestChannelArbitratorForceCloseBreachedChannel(t *testing.T) { t.Fatalf("no response received") } - // Stop the channel abitrator. - if err := chanArb.Stop(); err != nil { - t.Fatal(err) - } - // We mimic that the channel is breached while the channel arbitrator // is down. This means that on restart it will be started with a // pending close channel, of type BreachClose. - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err = chanArbCtx.Restart(func(c *chanArbTestCtx) { + c.chanArb.cfg.IsPendingClose = true + c.chanArb.cfg.ClosingHeight = 100 + c.chanArb.cfg.CloseType = channeldb.BreachClose + }) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } - - chanArb.cfg.IsPendingClose = true - chanArb.cfg.ClosingHeight = 100 - chanArb.cfg.CloseType = channeldb.BreachClose - - // Start the channel abitrator again, and make sure it goes straight to - // state fully resolved, as in case of breach there is nothing to - // handle. - if err := chanArb.Start(); err != nil { - t.Fatalf("unable to start ChannelArbitrator: %v", err) - } - defer func() { - if err := chanArb.Stop(); err != nil { - t.Fatal(err) - } - }() + defer chanArbCtx.CleanUp() // Finally it should advance to StateFullyResolved. - assertStateTransitions( - t, log.newStates, StateFullyResolved, - ) + chanArbCtx.AssertStateTransitions(StateFullyResolved) // It should also mark the channel as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") @@ -1286,6 +1394,8 @@ func TestChannelArbitratorCommitFailure(t *testing.T) { } for _, test := range testCases { + test := test + log := &mockArbitratorLog{ state: StateDefault, newStates: make(chan ArbitratorState, 5), @@ -1296,17 +1406,18 @@ func TestChannelArbitratorCommitFailure(t *testing.T) { failCommitState: test.expectedStates[0], } - chanArb, resolved, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) } // It should start in StateDefault. - assertState(t, chanArb, StateDefault) + chanArbCtx.AssertState(StateDefault) closed := make(chan struct{}) chanArb.cfg.MarkChannelClosed = func( @@ -1336,30 +1447,23 @@ func TestChannelArbitratorCommitFailure(t *testing.T) { // Start the arbitrator again, with IsPendingClose reporting // the channel closed in the database. - chanArb, resolved, _, _, err = createTestChannelArbitrator(log) + log.failCommit = false + chanArbCtx, err = chanArbCtx.Restart(func(c *chanArbTestCtx) { + c.chanArb.cfg.IsPendingClose = true + c.chanArb.cfg.ClosingHeight = 100 + c.chanArb.cfg.CloseType = test.closeType + }) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } - log.failCommit = false - - chanArb.cfg.IsPendingClose = true - chanArb.cfg.ClosingHeight = 100 - chanArb.cfg.CloseType = test.closeType - - if err := chanArb.Start(); err != nil { - t.Fatalf("unable to start ChannelArbitrator: %v", err) - } - // Since the channel is marked closed in the database, it // should advance to the expected states. - assertStateTransitions( - t, log.newStates, test.expectedStates..., - ) + chanArbCtx.AssertStateTransitions(test.expectedStates...) // It should also mark the channel as resolved. select { - case <-resolved: + case <-chanArbCtx.resolvedChan: // Expected. case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") @@ -1382,11 +1486,12 @@ func TestChannelArbitratorEmptyResolutions(t *testing.T) { failFetch: errNoResolutions, } - chanArb, _, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb chanArb.cfg.IsPendingClose = true chanArb.cfg.ClosingHeight = 100 chanArb.cfg.CloseType = channeldb.RemoteForceClose @@ -1397,9 +1502,7 @@ func TestChannelArbitratorEmptyResolutions(t *testing.T) { // It should not advance its state beyond StateContractClosed, since // fetching resolutions fails. - assertStateTransitions( - t, log.newStates, StateContractClosed, - ) + chanArbCtx.AssertStateTransitions(StateContractClosed) // It should not advance further, however, as fetching resolutions // failed. @@ -1420,10 +1523,11 @@ func TestChannelArbitratorAlreadyForceClosed(t *testing.T) { log := &mockArbitratorLog{ state: StateCommitmentBroadcasted, } - chanArb, _, _, _, err := createTestChannelArbitrator(log) + chanArbCtx, err := createTestChannelArbitrator(t, log) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) } @@ -1515,12 +1619,13 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { resolvers: make(map[ContractResolver]struct{}), } - chanArb, _, resolutions, blockEpochs, err := createTestChannelArbitrator( - arbLog, + chanArbCtx, err := createTestChannelArbitrator( + t, arbLog, ) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } + chanArb := chanArbCtx.chanArb if err := chanArb.Start(); err != nil { t.Fatalf("unable to start ChannelArbitrator: %v", err) } @@ -1568,7 +1673,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { // now mine a block (height 5), which is 5 blocks away // (our grace delta) from the expiry of that HTLC. case testCase.htlcExpired: - blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} + chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5} // Otherwise, we'll just trigger a regular force close // request. @@ -1584,8 +1689,8 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { // determined that it needs to go to chain in order to // block off the redemption path so it can cancel the // incoming HTLC. - assertStateTransitions( - t, arbLog.newStates, StateBroadcastCommit, + chanArbCtx.AssertStateTransitions( + StateBroadcastCommit, StateCommitmentBroadcasted, ) @@ -1646,15 +1751,15 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { // The channel arb should now transition to waiting // until the HTLCs have been fully resolved. - assertStateTransitions( - t, arbLog.newStates, StateContractClosed, + chanArbCtx.AssertStateTransitions( + StateContractClosed, StateWaitingFullResolution, ) // Now that we've sent this signal, we should have that // HTLC be cancelled back immediately. select { - case msgs := <-resolutions: + case msgs := <-chanArbCtx.resolutions: if len(msgs) != 1 { t.Fatalf("expected 1 message, "+ "instead got %v", len(msgs)) @@ -1672,10 +1777,8 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) { // so instead, we'll mine another block which'll cause // it to re-examine its state and realize there're no // more HTLCs. - blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} - assertStateTransitions( - t, arbLog.newStates, StateFullyResolved, - ) + chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6} + chanArbCtx.AssertStateTransitions(StateFullyResolved) }) } } From d0df5a4dddf0d2b1f253369430bb349942a97d8d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 6 Sep 2019 18:40:27 -0700 Subject: [PATCH 2/3] contractcourt: supplement resolvers with confirmed commit set HTLCs In this commit, we fix an existing bug in the package, causing resolutions to be restarted without their required supplementary information. This can happen if a distinct HTLC set gets confirmed compared to the HTLCs that we may have had our commitment at time of close. Due to this bug, on restart certain HTLCS would be rejected as they would present their state to the invoice registry, but be rejected due to checks such as amount value. To fix this, we'll now pass in the set of confirmed HTLCs into the resolvers when we re-launch them, giving us access to all the information we need to supplement the HTLCS. We also add a new test that ensures that the proper fields of a resolver are set after a restart. --- contractcourt/channel_arbitrator.go | 43 +++++------ contractcourt/channel_arbitrator_test.go | 92 +++++++++++++++--------- 2 files changed, 83 insertions(+), 52 deletions(-) diff --git a/contractcourt/channel_arbitrator.go b/contractcourt/channel_arbitrator.go index fbad1465..b01262ab 100644 --- a/contractcourt/channel_arbitrator.go +++ b/contractcourt/channel_arbitrator.go @@ -415,7 +415,19 @@ func (c *ChannelArbitrator) Start() error { if startingState == StateWaitingFullResolution && nextState == StateWaitingFullResolution { - if err := c.relaunchResolvers(); err != nil { + // In order to relaunch the resolvers, we'll need to fetch the + // set of HTLCs that were present in the commitment transaction + // at the time it was confirmed. commitSet.ConfCommitKey can't + // be nil at this point since we're in + // StateWaitingFullResolution. We can only be in + // StateWaitingFullResolution after we've transitioned from + // StateContractClosed which can only be triggered by the local + // or remote close trigger. This trigger is only fired when we + // receive a chain event from the chain watcher than the + // commitment has been confirmed on chain, and before we + // advance our state step, we call InsertConfirmedCommitSet. + confCommitSet := commitSet.HtlcSets[*commitSet.ConfCommitKey] + if err := c.relaunchResolvers(confCommitSet); err != nil { c.cfg.BlockEpochs.Cancel() return err } @@ -431,7 +443,7 @@ func (c *ChannelArbitrator) Start() error { // starting the ChannelArbitrator. This information should ideally be stored in // the database, so this only serves as a intermediate work-around to prevent a // migration. -func (c *ChannelArbitrator) relaunchResolvers() error { +func (c *ChannelArbitrator) relaunchResolvers(confirmedHTLCs []channeldb.HTLC) error { // We'll now query our log to see if there are any active // unresolved contracts. If this is the case, then we'll // relaunch all contract resolvers. @@ -456,31 +468,22 @@ func (c *ChannelArbitrator) relaunchResolvers() error { // to prevent a db migration. We use all available htlc sets here in // order to ensure we have complete coverage. htlcMap := make(map[wire.OutPoint]*channeldb.HTLC) - for _, htlcs := range c.activeHTLCs { - for _, htlc := range htlcs.incomingHTLCs { - htlc := htlc - outpoint := wire.OutPoint{ - Hash: commitHash, - Index: uint32(htlc.OutputIndex), - } - htlcMap[outpoint] = &htlc - } - - for _, htlc := range htlcs.outgoingHTLCs { - htlc := htlc - outpoint := wire.OutPoint{ - Hash: commitHash, - Index: uint32(htlc.OutputIndex), - } - htlcMap[outpoint] = &htlc + for _, htlc := range confirmedHTLCs { + htlc := htlc + outpoint := wire.OutPoint{ + Hash: commitHash, + Index: uint32(htlc.OutputIndex), } + htlcMap[outpoint] = &htlc } log.Infof("ChannelArbitrator(%v): relaunching %v contract "+ "resolvers", c.cfg.ChanPoint, len(unresolvedContracts)) for _, resolver := range unresolvedContracts { - c.supplementResolver(resolver, htlcMap) + if err := c.supplementResolver(resolver, htlcMap); err != nil { + return err + } } c.launchResolvers(unresolvedContracts) diff --git a/contractcourt/channel_arbitrator_test.go b/contractcourt/channel_arbitrator_test.go index 2e31ca13..5f676d0d 100644 --- a/contractcourt/channel_arbitrator_test.go +++ b/contractcourt/channel_arbitrator_test.go @@ -655,30 +655,15 @@ func TestChannelArbitratorBreachClose(t *testing.T) { // ChannelArbitrator goes through the expected states in case we request it to // force close a channel that still has an HTLC pending. func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { - arbLog := &mockArbitratorLog{ - state: StateDefault, - newStates: make(chan ArbitratorState, 5), - resolvers: make(map[ContractResolver]struct{}), - } - - chanArbCtx, err := createTestChannelArbitrator( - t, arbLog, - ) + // We create a new test context for this channel arb, notice that we + // pass in a nil ArbitratorLog which means that a default one backed by + // a real DB will be created. We need this for our test as we want to + // test proper restart recovery and resolver population. + chanArbCtx, err := createTestChannelArbitrator(t, nil) if err != nil { t.Fatalf("unable to create ChannelArbitrator: %v", err) } - - incubateChan := make(chan struct{}) chanArb := chanArbCtx.chanArb - chanArb.cfg.IncubateOutputs = func(_ wire.OutPoint, - _ *lnwallet.CommitOutputResolution, - _ *lnwallet.OutgoingHtlcResolution, - _ *lnwallet.IncomingHtlcResolution, _ uint32) error { - - incubateChan <- struct{}{} - - return nil - } chanArb.cfg.PreimageDB = newMockWitnessBeacon() chanArb.cfg.Registry = &mockRegistry{} @@ -697,9 +682,10 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { chanArb.UpdateContractSignals(signals) // Add HTLC to channel arbitrator. + htlcAmt := 10000 htlc := channeldb.HTLC{ Incoming: false, - Amt: 10000, + Amt: lnwire.MilliSatoshi(htlcAmt), HtlcIndex: 99, } @@ -775,8 +761,8 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { Index: 0, } - // Set up the outgoing resolution. Populate SignedTimeoutTx because - // our commitment transaction got confirmed. + // Set up the outgoing resolution. Populate SignedTimeoutTx because our + // commitment transaction got confirmed. outgoingRes := lnwallet.OutgoingHtlcResolution{ Expiry: 10, SweepSignDesc: input.SignDescriptor{ @@ -835,29 +821,71 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { t.Fatalf("resolution msgs not sent") } + // We'll grab the old notifier here as our resolvers are still holding + // a reference to this instance, and a new one will be created when we + // restart the channel arb below. + oldNotifier := chanArb.cfg.Notifier.(*mockNotifier) + + // At this point, in order to simulate a restart, we'll re-create the + // channel arbitrator. We do this to ensure that all information + // required to properly resolve this HTLC are populated. + if err := chanArb.Stop(); err != nil { + t.Fatalf("unable to stop chan arb: %v", err) + } + + // We'll no re-create the resolver, notice that we use the existing + // arbLog so it carries over the same on-disk state. + chanArbCtxNew, err := chanArbCtx.Restart(nil) + if err != nil { + t.Fatalf("unable to create ChannelArbitrator: %v", err) + } + chanArb = chanArbCtxNew.chanArb + defer chanArbCtxNew.CleanUp() + + // Post restart, it should be the case that our resolver was properly + // supplemented, and we only have a single resolver in the final set. + if len(chanArb.activeResolvers) != 1 { + t.Fatalf("expected single resolver, instead got: %v", + len(chanArb.activeResolvers)) + } + + // We'll now examine the in-memory state of the active resolvers to + // ensure t hey were populated properly. + resolver := chanArb.activeResolvers[0] + outgoingResolver, ok := resolver.(*htlcOutgoingContestResolver) + if !ok { + t.Fatalf("expected outgoing contest resolver, got %vT", + resolver) + } + + // The resolver should have its htlcAmt field populated as it. + if int64(outgoingResolver.htlcAmt) != int64(htlcAmt) { + t.Fatalf("wrong htlc amount: expected %v, got %v,", + htlcAmt, int64(outgoingResolver.htlcAmt)) + } + // htlcOutgoingContestResolver is now active and waiting for the HTLC to // expire. It should not yet have passed it on for incubation. select { - case <-incubateChan: + case <-chanArbCtx.incubationRequests: t.Fatalf("contract should not be incubated yet") default: } // Send a notification that the expiry height has been reached. - notifier := chanArb.cfg.Notifier.(*mockNotifier) - notifier.epochChan <- &chainntnfs.BlockEpoch{Height: 10} + oldNotifier.epochChan <- &chainntnfs.BlockEpoch{Height: 10} // htlcOutgoingContestResolver is now transforming into a // htlcTimeoutResolver and should send the contract off for incubation. select { - case <-incubateChan: + case <-chanArbCtx.incubationRequests: case <-time.After(5 * time.Second): t.Fatalf("no response received") } // Notify resolver that the HTLC output of the commitment has been // spent. - notifier.spendChan <- &chainntnfs.SpendDetail{SpendingTx: closeTx} + oldNotifier.spendChan <- &chainntnfs.SpendDetail{SpendingTx: closeTx} // Finally, we should also receive a resolution message instructing the // switch to cancel back the HTLC. @@ -879,18 +907,18 @@ func TestChannelArbitratorLocalForceClosePendingHtlc(t *testing.T) { // to the second level. Channel arbitrator should still not be marked // as resolved. select { - case <-chanArbCtx.resolvedChan: + case <-chanArbCtxNew.resolvedChan: t.Fatalf("channel resolved prematurely") default: } // Notify resolver that the second level transaction is spent. - notifier.spendChan <- &chainntnfs.SpendDetail{SpendingTx: closeTx} + oldNotifier.spendChan <- &chainntnfs.SpendDetail{SpendingTx: closeTx} // At this point channel should be marked as resolved. - chanArbCtx.AssertStateTransitions(StateFullyResolved) + chanArbCtxNew.AssertStateTransitions(StateFullyResolved) select { - case <-chanArbCtx.resolvedChan: + case <-chanArbCtxNew.resolvedChan: case <-time.After(5 * time.Second): t.Fatalf("contract was not resolved") } From 16941d7aad0336ca4587ed9596f8493bd9561521 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 16 Sep 2019 20:46:01 -0700 Subject: [PATCH 3/3] build: nerf linter --- .golangci.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.golangci.yml b/.golangci.yml index e00edaa1..ca3e6881 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -45,6 +45,16 @@ linters: # trigger funlen problems that we may not want to solve at that time. - funlen + # Disable for now as we haven't yet tuned the sensitivity to our codebase + # yet. Enabling by default for example, would also force new contributors to + # potentially extensively refactor code, when they want to smaller change to + # land. + - gocyclo + + # Instances of table driven tests that don't pre-allocate shouldn't trigger + # the linter. + - prealloc + issues: # Only show newly introduced problems. new-from-rev: 01f696afce2f9c0d4ed854edefa3846891d01d8a