pool/worker_test: add tests for concrete Worker pools
This commit is contained in:
parent
32339a92d3
commit
ce1bd4be2c
353
pool/worker_test.go
Normal file
353
pool/worker_test.go
Normal file
@ -0,0 +1,353 @@
|
||||
package pool_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
crand "crypto/rand"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/buffer"
|
||||
"github.com/lightningnetwork/lnd/pool"
|
||||
)
|
||||
|
||||
type workerPoolTest struct {
|
||||
name string
|
||||
newPool func() interface{}
|
||||
numWorkers int
|
||||
}
|
||||
|
||||
// TestConcreteWorkerPools asserts the behavior of any concrete implementations
|
||||
// of worker pools provided by the pool package. Currently this tests the
|
||||
// pool.Read and pool.Write instances.
|
||||
func TestConcreteWorkerPools(t *testing.T) {
|
||||
const (
|
||||
gcInterval = time.Second
|
||||
expiryInterval = 250 * time.Millisecond
|
||||
numWorkers = 5
|
||||
workerTimeout = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
tests := []workerPoolTest{
|
||||
{
|
||||
name: "write pool",
|
||||
newPool: func() interface{} {
|
||||
bp := pool.NewWriteBuffer(
|
||||
gcInterval, expiryInterval,
|
||||
)
|
||||
|
||||
return pool.NewWrite(
|
||||
bp, numWorkers, workerTimeout,
|
||||
)
|
||||
},
|
||||
numWorkers: numWorkers,
|
||||
},
|
||||
{
|
||||
name: "read pool",
|
||||
newPool: func() interface{} {
|
||||
bp := pool.NewReadBuffer(
|
||||
gcInterval, expiryInterval,
|
||||
)
|
||||
|
||||
return pool.NewRead(
|
||||
bp, numWorkers, workerTimeout,
|
||||
)
|
||||
},
|
||||
numWorkers: numWorkers,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
testWorkerPool(t, test)
|
||||
}
|
||||
}
|
||||
|
||||
func testWorkerPool(t *testing.T, test workerPoolTest) {
|
||||
t.Run(test.name+" non blocking", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
p := test.newPool()
|
||||
startGeneric(t, p)
|
||||
defer stopGeneric(t, p)
|
||||
|
||||
submitNonblockingGeneric(t, p, test.numWorkers)
|
||||
})
|
||||
|
||||
t.Run(test.name+" blocking", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
p := test.newPool()
|
||||
startGeneric(t, p)
|
||||
defer stopGeneric(t, p)
|
||||
|
||||
submitBlockingGeneric(t, p, test.numWorkers)
|
||||
})
|
||||
|
||||
t.Run(test.name+" partial blocking", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
p := test.newPool()
|
||||
startGeneric(t, p)
|
||||
defer stopGeneric(t, p)
|
||||
|
||||
submitPartialBlockingGeneric(t, p, test.numWorkers)
|
||||
})
|
||||
}
|
||||
|
||||
// submitNonblockingGeneric asserts that queueing tasks to the worker pool and
|
||||
// allowing them all to unblock simultaneously results in all of the tasks being
|
||||
// completed in a timely manner.
|
||||
func submitNonblockingGeneric(t *testing.T, p interface{}, nWorkers int) {
|
||||
// We'll submit 2*nWorkers tasks that will all be unblocked
|
||||
// simultaneously.
|
||||
nUnblocked := 2 * nWorkers
|
||||
|
||||
// First we'll queue all of the tasks for the pool.
|
||||
errChan := make(chan error)
|
||||
semChan := make(chan struct{})
|
||||
for i := 0; i < nUnblocked; i++ {
|
||||
go func() { errChan <- submitGeneric(p, semChan) }()
|
||||
}
|
||||
|
||||
// Since we haven't signaled the semaphore, none of the them should
|
||||
// complete.
|
||||
pullNothing(t, errChan)
|
||||
|
||||
// Now, unblock them all simultaneously. All of the tasks should then be
|
||||
// processed in parallel. Afterward, no more errors should come through.
|
||||
close(semChan)
|
||||
pullParllel(t, nUnblocked, errChan)
|
||||
pullNothing(t, errChan)
|
||||
}
|
||||
|
||||
// submitBlockingGeneric asserts that submitting blocking tasks to the pool and
|
||||
// unblocking each sequentially results in a single task being processed at a
|
||||
// time.
|
||||
func submitBlockingGeneric(t *testing.T, p interface{}, nWorkers int) {
|
||||
// We'll submit 2*nWorkers tasks that will be unblocked sequentially.
|
||||
nBlocked := 2 * nWorkers
|
||||
|
||||
// First, queue all of the blocking tasks for the pool.
|
||||
errChan := make(chan error)
|
||||
semChan := make(chan struct{})
|
||||
for i := 0; i < nBlocked; i++ {
|
||||
go func() { errChan <- submitGeneric(p, semChan) }()
|
||||
}
|
||||
|
||||
// Since we haven't signaled the semaphore, none of them should
|
||||
// complete.
|
||||
pullNothing(t, errChan)
|
||||
|
||||
// Now, pull each blocking task sequentially from the pool. Afterwards,
|
||||
// no more errors should come through.
|
||||
pullSequntial(t, nBlocked, errChan, semChan)
|
||||
pullNothing(t, errChan)
|
||||
|
||||
}
|
||||
|
||||
// submitPartialBlockingGeneric tests that so long as one worker is not blocked,
|
||||
// any other non-blocking submitted tasks can still be processed.
|
||||
func submitPartialBlockingGeneric(t *testing.T, p interface{}, nWorkers int) {
|
||||
// We'll submit nWorkers-1 tasks that will be initially blocked, the
|
||||
// remainder will all be unblocked simultaneously. After the unblocked
|
||||
// tasks have finished, we will sequentially unblock the nWorkers-1
|
||||
// tasks that were first submitted.
|
||||
nBlocked := nWorkers - 1
|
||||
nUnblocked := 2*nWorkers - nBlocked
|
||||
|
||||
// First, submit all of the blocking tasks to the pool.
|
||||
errChan := make(chan error)
|
||||
semChan := make(chan struct{})
|
||||
for i := 0; i < nBlocked; i++ {
|
||||
go func() { errChan <- submitGeneric(p, semChan) }()
|
||||
}
|
||||
|
||||
// Since these are all blocked, no errors should be returned yet.
|
||||
pullNothing(t, errChan)
|
||||
|
||||
// Now, add all of the non-blocking task to the pool.
|
||||
semChanNB := make(chan struct{})
|
||||
for i := 0; i < nUnblocked; i++ {
|
||||
go func() { errChan <- submitGeneric(p, semChanNB) }()
|
||||
}
|
||||
|
||||
// Since we haven't unblocked the second batch, we again expect no tasks
|
||||
// to finish.
|
||||
pullNothing(t, errChan)
|
||||
|
||||
// Now, unblock the unblocked task and pull all of them. After they have
|
||||
// been pulled, we should see no more tasks.
|
||||
close(semChanNB)
|
||||
pullParllel(t, nUnblocked, errChan)
|
||||
pullNothing(t, errChan)
|
||||
|
||||
// Finally, unblock each the blocked tasks we added initially, and
|
||||
// assert that no further errors come through.
|
||||
pullSequntial(t, nBlocked, errChan, semChan)
|
||||
pullNothing(t, errChan)
|
||||
}
|
||||
|
||||
func pullNothing(t *testing.T, errChan chan error) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
t.Fatalf("received unexpected error before semaphore "+
|
||||
"release: %v", err)
|
||||
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
func pullParllel(t *testing.T, n int, errChan chan error) {
|
||||
t.Helper()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("task %d was not processed in time", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pullSequntial(t *testing.T, n int, errChan chan error, semChan chan struct{}) {
|
||||
t.Helper()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
// Signal for another task to unblock.
|
||||
select {
|
||||
case semChan <- struct{}{}:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("task %d was not unblocked", i)
|
||||
}
|
||||
|
||||
// Wait for the error to arrive, we expect it to be non-nil.
|
||||
select {
|
||||
case err := <-errChan:
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("task %d was not processed in time", i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startGeneric(t *testing.T, p interface{}) {
|
||||
t.Helper()
|
||||
|
||||
var err error
|
||||
switch pp := p.(type) {
|
||||
case *pool.Write:
|
||||
err = pp.Start()
|
||||
|
||||
case *pool.Read:
|
||||
err = pp.Start()
|
||||
|
||||
default:
|
||||
t.Fatalf("unknown worker pool type: %T", p)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unable to start worker pool: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func stopGeneric(t *testing.T, p interface{}) {
|
||||
t.Helper()
|
||||
|
||||
var err error
|
||||
switch pp := p.(type) {
|
||||
case *pool.Write:
|
||||
err = pp.Stop()
|
||||
|
||||
case *pool.Read:
|
||||
err = pp.Stop()
|
||||
|
||||
default:
|
||||
t.Fatalf("unknown worker pool type: %T", p)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unable to stop worker pool: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func submitGeneric(p interface{}, sem <-chan struct{}) error {
|
||||
var err error
|
||||
switch pp := p.(type) {
|
||||
case *pool.Write:
|
||||
err = pp.Submit(func(buf *bytes.Buffer) error {
|
||||
// Verify that the provided buffer has been reset to be
|
||||
// zero length.
|
||||
if buf.Len() != 0 {
|
||||
return fmt.Errorf("buf should be length zero, "+
|
||||
"instead has length %d", buf.Len())
|
||||
}
|
||||
|
||||
// Verify that the capacity of the buffer has the
|
||||
// correct underlying size of a buffer.WriteSize.
|
||||
if buf.Cap() != buffer.WriteSize {
|
||||
return fmt.Errorf("buf should have capacity "+
|
||||
"%d, instead has capacity %d",
|
||||
buffer.WriteSize, buf.Cap())
|
||||
}
|
||||
|
||||
// Sample some random bytes that we'll use to dirty the
|
||||
// buffer.
|
||||
b := make([]byte, rand.Intn(buf.Cap()))
|
||||
_, err := io.ReadFull(crand.Reader, b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write the random bytes the buffer.
|
||||
_, err = buf.Write(b)
|
||||
|
||||
// Wait until this task is signaled to exit.
|
||||
<-sem
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
case *pool.Read:
|
||||
err = pp.Submit(func(buf *buffer.Read) error {
|
||||
// Assert that all of the bytes in the provided array
|
||||
// are zero, indicating that the buffer was reset
|
||||
// between uses.
|
||||
for i := range buf[:] {
|
||||
if buf[i] != 0x00 {
|
||||
return fmt.Errorf("byte %d of "+
|
||||
"buffer.Read should be "+
|
||||
"0, instead is %d", i, buf[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Sample some random bytes to read into the buffer.
|
||||
_, err := io.ReadFull(crand.Reader, buf[:])
|
||||
|
||||
// Wait until this task is signaled to exit.
|
||||
<-sem
|
||||
|
||||
return err
|
||||
|
||||
})
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown worker pool type: %T", p)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to submit task: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user