robocar-base/vendor/github.com/testcontainers/testcontainers-go/parallel.go

123 lines
2.5 KiB
Go
Raw Normal View History

package testcontainers
import (
"context"
"fmt"
"sync"
)
const (
defaultWorkersCount = 8
)
type ParallelContainerRequest []GenericContainerRequest
// ParallelContainersOptions represents additional options for parallel running
type ParallelContainersOptions struct {
WorkersCount int // count of parallel workers. If field empty(zero), default value will be 'defaultWorkersCount'
}
// ParallelContainersRequestError represents error from parallel request
type ParallelContainersRequestError struct {
Request GenericContainerRequest
Error error
}
type ParallelContainersError struct {
Errors []ParallelContainersRequestError
}
func (gpe ParallelContainersError) Error() string {
return fmt.Sprintf("%v", gpe.Errors)
}
func parallelContainersRunner(
ctx context.Context,
requests <-chan GenericContainerRequest,
errors chan<- ParallelContainersRequestError,
containers chan<- Container,
wg *sync.WaitGroup) {
for req := range requests {
c, err := GenericContainer(ctx, req)
if err != nil {
errors <- ParallelContainersRequestError{
Request: req,
Error: err,
}
continue
}
containers <- c
}
wg.Done()
}
// ParallelContainers creates a generic containers with parameters and run it in parallel mode
func ParallelContainers(ctx context.Context, reqs ParallelContainerRequest, opt ParallelContainersOptions) ([]Container, error) {
if opt.WorkersCount == 0 {
opt.WorkersCount = defaultWorkersCount
}
tasksChanSize := opt.WorkersCount
if tasksChanSize > len(reqs) {
tasksChanSize = len(reqs)
}
tasksChan := make(chan GenericContainerRequest, tasksChanSize)
errsChan := make(chan ParallelContainersRequestError)
resChan := make(chan Container)
waitRes := make(chan struct{})
containers := make([]Container, 0)
errors := make([]ParallelContainersRequestError, 0)
wg := sync.WaitGroup{}
wg.Add(tasksChanSize)
// run workers
for i := 0; i < tasksChanSize; i++ {
go parallelContainersRunner(ctx, tasksChan, errsChan, resChan, &wg)
}
go func() {
for {
select {
case c, ok := <-resChan:
if !ok {
resChan = nil
} else {
containers = append(containers, c)
}
case e, ok := <-errsChan:
if !ok {
errsChan = nil
} else {
errors = append(errors, e)
}
}
if resChan == nil && errsChan == nil {
waitRes <- struct{}{}
break
}
}
}()
for _, req := range reqs {
tasksChan <- req
}
close(tasksChan)
wg.Wait()
close(resChan)
close(errsChan)
<-waitRes
if len(errors) != 0 {
return containers, ParallelContainersError{Errors: errors}
}
return containers, nil
}