From ce1bd4be2ce645a12c28f5ea94ca39e985f01cc9 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 21 Feb 2019 20:10:40 -0800 Subject: [PATCH] pool/worker_test: add tests for concrete Worker pools --- pool/worker_test.go | 353 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 353 insertions(+) create mode 100644 pool/worker_test.go diff --git a/pool/worker_test.go b/pool/worker_test.go new file mode 100644 index 00000000..ee23e7a5 --- /dev/null +++ b/pool/worker_test.go @@ -0,0 +1,353 @@ +package pool_test + +import ( + "bytes" + crand "crypto/rand" + "fmt" + "io" + "math/rand" + "testing" + "time" + + "github.com/lightningnetwork/lnd/buffer" + "github.com/lightningnetwork/lnd/pool" +) + +type workerPoolTest struct { + name string + newPool func() interface{} + numWorkers int +} + +// TestConcreteWorkerPools asserts the behavior of any concrete implementations +// of worker pools provided by the pool package. Currently this tests the +// pool.Read and pool.Write instances. +func TestConcreteWorkerPools(t *testing.T) { + const ( + gcInterval = time.Second + expiryInterval = 250 * time.Millisecond + numWorkers = 5 + workerTimeout = 500 * time.Millisecond + ) + + tests := []workerPoolTest{ + { + name: "write pool", + newPool: func() interface{} { + bp := pool.NewWriteBuffer( + gcInterval, expiryInterval, + ) + + return pool.NewWrite( + bp, numWorkers, workerTimeout, + ) + }, + numWorkers: numWorkers, + }, + { + name: "read pool", + newPool: func() interface{} { + bp := pool.NewReadBuffer( + gcInterval, expiryInterval, + ) + + return pool.NewRead( + bp, numWorkers, workerTimeout, + ) + }, + numWorkers: numWorkers, + }, + } + + for _, test := range tests { + testWorkerPool(t, test) + } +} + +func testWorkerPool(t *testing.T, test workerPoolTest) { + t.Run(test.name+" non blocking", func(t *testing.T) { + t.Parallel() + + p := test.newPool() + startGeneric(t, p) + defer stopGeneric(t, p) + + submitNonblockingGeneric(t, p, test.numWorkers) + }) + + t.Run(test.name+" blocking", func(t *testing.T) { + t.Parallel() + + p := test.newPool() + startGeneric(t, p) + defer stopGeneric(t, p) + + submitBlockingGeneric(t, p, test.numWorkers) + }) + + t.Run(test.name+" partial blocking", func(t *testing.T) { + t.Parallel() + + p := test.newPool() + startGeneric(t, p) + defer stopGeneric(t, p) + + submitPartialBlockingGeneric(t, p, test.numWorkers) + }) +} + +// submitNonblockingGeneric asserts that queueing tasks to the worker pool and +// allowing them all to unblock simultaneously results in all of the tasks being +// completed in a timely manner. +func submitNonblockingGeneric(t *testing.T, p interface{}, nWorkers int) { + // We'll submit 2*nWorkers tasks that will all be unblocked + // simultaneously. + nUnblocked := 2 * nWorkers + + // First we'll queue all of the tasks for the pool. + errChan := make(chan error) + semChan := make(chan struct{}) + for i := 0; i < nUnblocked; i++ { + go func() { errChan <- submitGeneric(p, semChan) }() + } + + // Since we haven't signaled the semaphore, none of the them should + // complete. + pullNothing(t, errChan) + + // Now, unblock them all simultaneously. All of the tasks should then be + // processed in parallel. Afterward, no more errors should come through. + close(semChan) + pullParllel(t, nUnblocked, errChan) + pullNothing(t, errChan) +} + +// submitBlockingGeneric asserts that submitting blocking tasks to the pool and +// unblocking each sequentially results in a single task being processed at a +// time. +func submitBlockingGeneric(t *testing.T, p interface{}, nWorkers int) { + // We'll submit 2*nWorkers tasks that will be unblocked sequentially. + nBlocked := 2 * nWorkers + + // First, queue all of the blocking tasks for the pool. + errChan := make(chan error) + semChan := make(chan struct{}) + for i := 0; i < nBlocked; i++ { + go func() { errChan <- submitGeneric(p, semChan) }() + } + + // Since we haven't signaled the semaphore, none of them should + // complete. + pullNothing(t, errChan) + + // Now, pull each blocking task sequentially from the pool. Afterwards, + // no more errors should come through. + pullSequntial(t, nBlocked, errChan, semChan) + pullNothing(t, errChan) + +} + +// submitPartialBlockingGeneric tests that so long as one worker is not blocked, +// any other non-blocking submitted tasks can still be processed. +func submitPartialBlockingGeneric(t *testing.T, p interface{}, nWorkers int) { + // We'll submit nWorkers-1 tasks that will be initially blocked, the + // remainder will all be unblocked simultaneously. After the unblocked + // tasks have finished, we will sequentially unblock the nWorkers-1 + // tasks that were first submitted. + nBlocked := nWorkers - 1 + nUnblocked := 2*nWorkers - nBlocked + + // First, submit all of the blocking tasks to the pool. + errChan := make(chan error) + semChan := make(chan struct{}) + for i := 0; i < nBlocked; i++ { + go func() { errChan <- submitGeneric(p, semChan) }() + } + + // Since these are all blocked, no errors should be returned yet. + pullNothing(t, errChan) + + // Now, add all of the non-blocking task to the pool. + semChanNB := make(chan struct{}) + for i := 0; i < nUnblocked; i++ { + go func() { errChan <- submitGeneric(p, semChanNB) }() + } + + // Since we haven't unblocked the second batch, we again expect no tasks + // to finish. + pullNothing(t, errChan) + + // Now, unblock the unblocked task and pull all of them. After they have + // been pulled, we should see no more tasks. + close(semChanNB) + pullParllel(t, nUnblocked, errChan) + pullNothing(t, errChan) + + // Finally, unblock each the blocked tasks we added initially, and + // assert that no further errors come through. + pullSequntial(t, nBlocked, errChan, semChan) + pullNothing(t, errChan) +} + +func pullNothing(t *testing.T, errChan chan error) { + t.Helper() + + select { + case err := <-errChan: + t.Fatalf("received unexpected error before semaphore "+ + "release: %v", err) + + case <-time.After(time.Second): + } +} + +func pullParllel(t *testing.T, n int, errChan chan error) { + t.Helper() + + for i := 0; i < n; i++ { + select { + case err := <-errChan: + if err != nil { + t.Fatal(err) + } + + case <-time.After(time.Second): + t.Fatalf("task %d was not processed in time", i) + } + } +} + +func pullSequntial(t *testing.T, n int, errChan chan error, semChan chan struct{}) { + t.Helper() + + for i := 0; i < n; i++ { + // Signal for another task to unblock. + select { + case semChan <- struct{}{}: + case <-time.After(time.Second): + t.Fatalf("task %d was not unblocked", i) + } + + // Wait for the error to arrive, we expect it to be non-nil. + select { + case err := <-errChan: + if err != nil { + t.Fatal(err) + } + + case <-time.After(time.Second): + t.Fatalf("task %d was not processed in time", i) + } + } +} + +func startGeneric(t *testing.T, p interface{}) { + t.Helper() + + var err error + switch pp := p.(type) { + case *pool.Write: + err = pp.Start() + + case *pool.Read: + err = pp.Start() + + default: + t.Fatalf("unknown worker pool type: %T", p) + } + + if err != nil { + t.Fatalf("unable to start worker pool: %v", err) + } +} + +func stopGeneric(t *testing.T, p interface{}) { + t.Helper() + + var err error + switch pp := p.(type) { + case *pool.Write: + err = pp.Stop() + + case *pool.Read: + err = pp.Stop() + + default: + t.Fatalf("unknown worker pool type: %T", p) + } + + if err != nil { + t.Fatalf("unable to stop worker pool: %v", err) + } +} + +func submitGeneric(p interface{}, sem <-chan struct{}) error { + var err error + switch pp := p.(type) { + case *pool.Write: + err = pp.Submit(func(buf *bytes.Buffer) error { + // Verify that the provided buffer has been reset to be + // zero length. + if buf.Len() != 0 { + return fmt.Errorf("buf should be length zero, "+ + "instead has length %d", buf.Len()) + } + + // Verify that the capacity of the buffer has the + // correct underlying size of a buffer.WriteSize. + if buf.Cap() != buffer.WriteSize { + return fmt.Errorf("buf should have capacity "+ + "%d, instead has capacity %d", + buffer.WriteSize, buf.Cap()) + } + + // Sample some random bytes that we'll use to dirty the + // buffer. + b := make([]byte, rand.Intn(buf.Cap())) + _, err := io.ReadFull(crand.Reader, b) + if err != nil { + return err + } + + // Write the random bytes the buffer. + _, err = buf.Write(b) + + // Wait until this task is signaled to exit. + <-sem + + return err + }) + + case *pool.Read: + err = pp.Submit(func(buf *buffer.Read) error { + // Assert that all of the bytes in the provided array + // are zero, indicating that the buffer was reset + // between uses. + for i := range buf[:] { + if buf[i] != 0x00 { + return fmt.Errorf("byte %d of "+ + "buffer.Read should be "+ + "0, instead is %d", i, buf[i]) + } + } + + // Sample some random bytes to read into the buffer. + _, err := io.ReadFull(crand.Reader, buf[:]) + + // Wait until this task is signaled to exit. + <-sem + + return err + + }) + + default: + return fmt.Errorf("unknown worker pool type: %T", p) + } + + if err != nil { + return fmt.Errorf("unable to submit task: %v", err) + } + + return nil +}