Merge pull request #2347 from joostjager/sweeper-mock

utxonursery: mock sweeper in tests
This commit is contained in:
Olaoluwa Osuntokun 2018-12-19 17:29:35 -08:00 committed by GitHub
commit b5c3a37c05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 84 additions and 86 deletions

@ -636,7 +636,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
Notifier: cc.chainNotifier,
PublishTransaction: cc.wallet.PublishTransaction,
Store: utxnStore,
Sweeper: s.sweeper,
SweepInput: s.sweeper.SweepInput,
})
// Construct a closure that wraps the htlcswitch's CloseLink method.

@ -2,8 +2,6 @@ package sweep
import (
"fmt"
"os"
"runtime/pprof"
"sync"
"testing"
"time"
@ -58,6 +56,8 @@ func NewMockNotifier(t *testing.T) *MockNotifier {
// NotifyEpoch simulates a new epoch arriving.
func (m *MockNotifier) NotifyEpoch(height int32) {
m.t.Helper()
for epochChan, chanHeight := range m.epochChan {
// Only send notifications if the height is greater than the
// height the caller passed into the register call.
@ -72,8 +72,6 @@ func (m *MockNotifier) NotifyEpoch(height int32) {
Height: height,
}:
case <-time.After(defaultTestTimeout):
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
m.t.Fatal("epoch event not consumed")
}
}

@ -195,9 +195,8 @@ type NurseryConfig struct {
// maintained about the utxo nursery's incubating outputs.
Store NurseryStore
// Sweeper provides functionality to generate sweep transactions.
// Nursery uses this to sweep final outputs back into the wallet.
Sweeper *sweep.UtxoSweeper
// Sweep sweeps an input back to the wallet.
SweepInput func(input sweep.Input) (chan sweep.Result, error)
}
// utxoNursery is a system dedicated to incubating time-locked outputs created
@ -803,7 +802,7 @@ func (u *utxoNursery) sweepMatureOutputs(classHeight uint32,
// passed in with disastruous consequences.
local := output
resultChan, err := u.cfg.Sweeper.SweepInput(&local)
resultChan, err := u.cfg.SweepInput(&local)
if err != nil {
return err
}

@ -12,6 +12,7 @@ import (
"os"
"reflect"
"runtime/pprof"
"sync"
"testing"
"time"
@ -402,7 +403,7 @@ type nurseryTestContext struct {
store *nurseryStoreInterceptor
restart func() bool
receiveTx func() wire.MsgTx
sweeper *sweep.UtxoSweeper
sweeper *mockSweeper
timeoutChan chan chan time.Time
t *testing.T
}
@ -448,33 +449,7 @@ func createNurseryTestContext(t *testing.T,
bestHeight: 0,
}
sweeperStore := sweep.NewMockSweeperStore()
sweeperCfg := &sweep.UtxoSweeperConfig{
GenSweepScript: func() ([]byte, error) {
return []byte{}, nil
},
Estimator: &lnwallet.StaticFeeEstimator{},
Signer: &nurseryMockSigner{},
Notifier: notifier,
PublishTransaction: func(tx *wire.MsgTx) error {
return publishFunc(tx, "sweeper")
},
NewBatchTimer: func() <-chan time.Time {
c := make(chan time.Time, 1)
timeoutChan <- c
return c
},
ChainIO: chainIO,
Store: sweeperStore,
MaxInputsPerTx: 10,
MaxSweepAttempts: 5,
NextAttemptDeltaFunc: func(int) int32 { return 1 },
}
sweeper := sweep.New(sweeperCfg)
sweeper.Start()
sweeper := newMockSweeper(t)
nurseryCfg := NurseryConfig{
Notifier: notifier,
@ -488,9 +463,9 @@ func createNurseryTestContext(t *testing.T,
CloseHeight: 0,
}, nil
},
Store: storeIntercepter,
ChainIO: chainIO,
Sweeper: sweeper,
Store: storeIntercepter,
ChainIO: chainIO,
SweepInput: sweeper.sweepInput,
PublishTransaction: func(tx *wire.MsgTx) error {
return publishFunc(tx, "nursery")
},
@ -530,33 +505,11 @@ func createNurseryTestContext(t *testing.T,
// Simulate lnd restart.
ctx.nursery.Stop()
// Also restart sweeper to test behaviour as one unit.
//
// TODO(joostjager): Mock sweeper to test nursery in
// isolation.
ctx.sweeper.Stop()
// Find out if there is a last tx stored. If so, we
// expect it to be republished on startup.
hasLastTx, err := sweeperCfg.Store.GetLastPublishedTx()
if err != nil {
t.Fatal(err)
}
// Restart sweeper.
ctx.sweeper = sweep.New(sweeperCfg)
ctx.sweeper.Start()
// Receive last tx if expected.
if hasLastTx != nil {
utxnLog.Debugf("Expecting republish")
ctx.receiveTx()
} else {
utxnLog.Debugf("Expecting no republish")
}
ctx.sweeper = newMockSweeper(t)
/// Restart nursery.
nurseryCfg.Sweeper = ctx.sweeper
nurseryCfg.SweepInput = ctx.sweeper.sweepInput
ctx.nursery = newUtxoNursery(&nurseryCfg)
ctx.nursery.Start()
@ -570,6 +523,8 @@ func createNurseryTestContext(t *testing.T,
}
func (ctx *nurseryTestContext) notifyEpoch(height int32) {
ctx.t.Helper()
ctx.chainIO.bestHeight = height
ctx.notifier.NotifyEpoch(height)
}
@ -629,8 +584,6 @@ func (ctx *nurseryTestContext) finish() {
if len(activeHeights) > 0 {
ctx.t.Fatalf("Expected height index to be empty")
}
ctx.sweeper.Stop()
}
func createOutgoingRes(onLocalCommitment bool) *lnwallet.OutgoingHtlcResolution {
@ -955,20 +908,17 @@ func testSweep(t *testing.T, ctx *nurseryTestContext,
afterPublishAssert func()) {
// Wait for nursery to publish the sweep tx.
ctx.tick()
sweepTx := ctx.receiveTx()
ctx.sweeper.expectSweep()
if ctx.restart() {
// Nursery reoffers its input. Sweeper needs a tick to create the sweep
// tx.
ctx.tick()
ctx.receiveTx()
// Nursery reoffers its input after a restart.
ctx.sweeper.expectSweep()
}
afterPublishAssert()
// Confirm the sweep tx.
ctx.notifier.SpendOutpoint(sweepTx.TxIn[0].PreviousOutPoint, sweepTx)
ctx.sweeper.sweepAll()
// Wait for output to be promoted in store to GRAD.
select {
@ -985,19 +935,6 @@ func testSweep(t *testing.T, ctx *nurseryTestContext,
assertNurseryReportUnavailable(t, ctx.nursery)
}
func (ctx *nurseryTestContext) tick() {
select {
case c := <-ctx.timeoutChan:
select {
case c <- time.Time{}:
case <-time.After(defaultTestTimeout):
ctx.t.Fatal("tick timeout - tick not consumed")
}
case <-time.After(defaultTestTimeout):
ctx.t.Fatal("tick timeout - no new timer created")
}
}
type nurseryStoreInterceptor struct {
ns NurseryStore
@ -1100,3 +1037,67 @@ func (m *nurseryMockSigner) ComputeInputScript(tx *wire.MsgTx,
return &lnwallet.InputScript{}, nil
}
type mockSweeper struct {
lock sync.Mutex
resultChans map[wire.OutPoint]chan sweep.Result
t *testing.T
sweepChan chan sweep.Input
}
func newMockSweeper(t *testing.T) *mockSweeper {
return &mockSweeper{
resultChans: make(map[wire.OutPoint]chan sweep.Result),
sweepChan: make(chan sweep.Input, 1),
t: t,
}
}
func (s *mockSweeper) sweepInput(input sweep.Input) (chan sweep.Result, error) {
utxnLog.Debugf("mockSweeper sweepInput called for %v", *input.OutPoint())
select {
case s.sweepChan <- input:
case <-time.After(defaultTestTimeout):
s.t.Fatal("signal result timeout")
}
s.lock.Lock()
defer s.lock.Unlock()
c := make(chan sweep.Result, 1)
s.resultChans[*input.OutPoint()] = c
return c, nil
}
func (s *mockSweeper) expectSweep() {
s.t.Helper()
select {
case <-s.sweepChan:
case <-time.After(defaultTestTimeout):
s.t.Fatal("signal result timeout")
}
}
func (s *mockSweeper) sweepAll() {
s.t.Helper()
s.lock.Lock()
currentChans := s.resultChans
s.resultChans = make(map[wire.OutPoint]chan sweep.Result)
s.lock.Unlock()
for o, c := range currentChans {
utxnLog.Debugf("mockSweeper signal swept for %v", o)
select {
case c <- sweep.Result{}:
case <-time.After(defaultTestTimeout):
s.t.Fatal("signal result timeout")
}
}
}