robocar-base/vendor/github.com/testcontainers/testcontainers-go/docker.go

904 lines
22 KiB
Go

package testcontainers
import (
"archive/tar"
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/cenkalti/backoff"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/testcontainers/testcontainers-go/wait"
)
// Implement interfaces
var _ Container = (*DockerContainer)(nil)
const (
Bridge = "bridge" // Bridge network name (as well as driver)
ReaperDefault = "reaper_default" // Default network name when bridge is not available
)
// DockerContainer represents a container started using Docker
type DockerContainer struct {
// Container ID from Docker
ID string
WaitingFor wait.Strategy
Image string
// Cache to retrieve container information without re-fetching them from dockerd
raw *types.ContainerJSON
provider *DockerProvider
sessionID uuid.UUID
terminationSignal chan bool
skipReaper bool
consumers []LogConsumer
stopProducer chan bool
}
func (c *DockerContainer) GetContainerID() string {
return c.ID
}
// Endpoint gets proto://host:port string for the first exposed port
// Will returns just host:port if proto is ""
func (c *DockerContainer) Endpoint(ctx context.Context, proto string) (string, error) {
ports, err := c.Ports(ctx)
if err != nil {
return "", err
}
// get first port
var firstPort nat.Port
for p := range ports {
firstPort = p
break
}
return c.PortEndpoint(ctx, firstPort, proto)
}
// PortEndpoint gets proto://host:port string for the given exposed port
// Will returns just host:port if proto is ""
func (c *DockerContainer) PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) {
host, err := c.Host(ctx)
if err != nil {
return "", err
}
outerPort, err := c.MappedPort(ctx, port)
if err != nil {
return "", err
}
protoFull := ""
if proto != "" {
protoFull = fmt.Sprintf("%s://", proto)
}
return fmt.Sprintf("%s%s:%s", protoFull, host, outerPort.Port()), nil
}
// Host gets host (ip or name) of the docker daemon where the container port is exposed
// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
// You can use the "TC_HOST" env variable to set this yourself
func (c *DockerContainer) Host(ctx context.Context) (string, error) {
host, err := c.provider.daemonHost(ctx)
if err != nil {
return "", err
}
return host, nil
}
// MappedPort gets externally mapped port for a container port
func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) {
inspect, err := c.inspectContainer(ctx)
if err != nil {
return "", err
}
if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" {
return port, nil
}
ports, err := c.Ports(ctx)
if err != nil {
return "", err
}
for k, p := range ports {
if k.Port() != port.Port() {
continue
}
if port.Proto() != "" && k.Proto() != port.Proto() {
continue
}
return nat.NewPort(k.Proto(), p[0].HostPort)
}
return "", errors.New("port not found")
}
// Ports gets the exposed ports for the container.
func (c *DockerContainer) Ports(ctx context.Context) (nat.PortMap, error) {
inspect, err := c.inspectContainer(ctx)
if err != nil {
return nil, err
}
return inspect.NetworkSettings.Ports, nil
}
// SessionID gets the current session id
func (c *DockerContainer) SessionID() string {
return c.sessionID.String()
}
// Start will start an already created container
func (c *DockerContainer) Start(ctx context.Context) error {
shortID := c.ID[:12]
log.Printf("Starting container id: %s image: %s", shortID, c.Image)
if err := c.provider.client.ContainerStart(ctx, c.ID, types.ContainerStartOptions{}); err != nil {
return err
}
// if a Wait Strategy has been specified, wait before returning
if c.WaitingFor != nil {
log.Printf("Waiting for container id %s image: %s", shortID, c.Image)
if err := c.WaitingFor.WaitUntilReady(ctx, c); err != nil {
return err
}
}
log.Printf("Container is ready id: %s image: %s", shortID, c.Image)
return nil
}
// Terminate is used to kill the container. It is usually triggered by as defer function.
func (c *DockerContainer) Terminate(ctx context.Context) error {
err := c.provider.client.ContainerRemove(ctx, c.GetContainerID(), types.ContainerRemoveOptions{
RemoveVolumes: true,
Force: true,
})
if err == nil {
c.sessionID = uuid.UUID{}
c.raw = nil
}
return err
}
func (c *DockerContainer) inspectContainer(ctx context.Context) (*types.ContainerJSON, error) {
if c.raw != nil {
return c.raw, nil
}
inspect, err := c.provider.client.ContainerInspect(ctx, c.ID)
if err != nil {
return nil, err
}
c.raw = &inspect
return c.raw, nil
}
// Logs will fetch both STDOUT and STDERR from the current container. Returns a
// ReadCloser and leaves it up to the caller to extract what it wants.
func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) {
options := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
}
return c.provider.client.ContainerLogs(ctx, c.ID, options)
}
// FollowOutput adds a LogConsumer to be sent logs from the container's
// STDOUT and STDERR
func (c *DockerContainer) FollowOutput(consumer LogConsumer) {
if c.consumers == nil {
c.consumers = []LogConsumer{
consumer,
}
} else {
c.consumers = append(c.consumers, consumer)
}
}
// Name gets the name of the container.
func (c *DockerContainer) Name(ctx context.Context) (string, error) {
inspect, err := c.inspectContainer(ctx)
if err != nil {
return "", err
}
return inspect.Name, nil
}
// Networks gets the names of the networks the container is attached to.
func (c *DockerContainer) Networks(ctx context.Context) ([]string, error) {
inspect, err := c.inspectContainer(ctx)
if err != nil {
return []string{}, err
}
networks := inspect.NetworkSettings.Networks
n := []string{}
for k := range networks {
n = append(n, k)
}
return n, nil
}
// ContainerIP gets the IP address of the primary network within the container.
func (c *DockerContainer) ContainerIP(ctx context.Context) (string, error) {
inspect, err := c.inspectContainer(ctx)
if err != nil {
return "", err
}
return inspect.NetworkSettings.IPAddress, nil
}
// NetworkAliases gets the aliases of the container for the networks it is attached to.
func (c *DockerContainer) NetworkAliases(ctx context.Context) (map[string][]string, error) {
inspect, err := c.inspectContainer(ctx)
if err != nil {
return map[string][]string{}, err
}
networks := inspect.NetworkSettings.Networks
a := map[string][]string{}
for k := range networks {
a[k] = networks[k].Aliases
}
return a, nil
}
func (c *DockerContainer) Exec(ctx context.Context, cmd []string) (int, error) {
cli := c.provider.client
response, err := cli.ContainerExecCreate(ctx, c.ID, types.ExecConfig{
Cmd: cmd,
Detach: false,
})
if err != nil {
return 0, err
}
err = cli.ContainerExecStart(ctx, response.ID, types.ExecStartCheck{
Detach: false,
})
if err != nil {
return 0, err
}
var exitCode int
for {
execResp, err := cli.ContainerExecInspect(ctx, response.ID)
if err != nil {
return 0, err
}
if !execResp.Running {
exitCode = execResp.ExitCode
break
}
time.Sleep(100 * time.Millisecond)
}
return exitCode, nil
}
func (c *DockerContainer) CopyFileToContainer(ctx context.Context, hostFilePath string, containerFilePath string, fileMode int64) error {
fileContent, err := ioutil.ReadFile(hostFilePath)
if err != nil {
return err
}
buffer := &bytes.Buffer{}
tw := tar.NewWriter(buffer)
defer tw.Close()
hdr := &tar.Header{
Name: filepath.Base(containerFilePath),
Mode: fileMode,
Size: int64(len(fileContent)),
}
if err := tw.WriteHeader(hdr); err != nil {
return err
}
if _, err := tw.Write(fileContent); err != nil {
return err
}
return c.provider.client.CopyToContainer(ctx, c.ID, filepath.Dir(containerFilePath), buffer, types.CopyToContainerOptions{})
}
// StartLogProducer will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer
func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
go func() {
options := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
}
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
// if we can't get the logs, panic, we can't return an error to anything
// from within this goroutine
panic(err)
}
for {
select {
case <-c.stopProducer:
err := r.Close()
if err != nil {
// we can't close the read closer, this should never happen
panic(err)
}
return
default:
h := make([]byte, 8)
_, err := r.Read(h)
if err != nil {
// this explicitly ignores errors
// because we want to keep procesing even if one of our reads fails
continue
}
count := binary.BigEndian.Uint32(h[4:])
if count == 0 {
continue
}
logType := h[0]
if logType > 2 {
panic(fmt.Sprintf("received inavlid log type: %d", logType))
}
// a map of the log type --> int representation in the header, notice the first is blank, this is stdin, but the go docker client doesn't allow following that in logs
logTypes := []string{"", StdoutLog, StderrLog}
b := make([]byte, count)
_, err = r.Read(b)
if err != nil {
// TODO: add-logger: use logger to log out this error
fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error())
continue
}
for _, c := range c.consumers {
c.Accept(Log{
LogType: logTypes[logType],
Content: b,
})
}
}
}
}()
return nil
}
// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) StopLogProducer() error {
c.stopProducer <- true
return nil
}
// DockerNetwork represents a network started using Docker
type DockerNetwork struct {
ID string // Network ID from Docker
Driver string
Name string
provider *DockerProvider
terminationSignal chan bool
}
// Remove is used to remove the network. It is usually triggered by as defer function.
func (n *DockerNetwork) Remove(ctx context.Context) error {
return n.provider.client.NetworkRemove(ctx, n.ID)
}
// DockerProvider implements the ContainerProvider interface
type DockerProvider struct {
client *client.Client
hostCache string
defaultNetwork string // default container network
}
var _ ContainerProvider = (*DockerProvider)(nil)
// NewDockerProvider creates a Docker provider with the EnvClient
func NewDockerProvider() (*DockerProvider, error) {
client, err := client.NewEnvClient()
if err != nil {
return nil, err
}
client.NegotiateAPIVersion(context.Background())
p := &DockerProvider{
client: client,
}
return p, nil
}
// BuildImage will build and image from context and Dockerfile, then return the tag
func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (string, error) {
repo := uuid.New()
tag := uuid.New()
repoTag := fmt.Sprintf("%s:%s", repo, tag)
buildContext, err := img.GetContext()
if err != nil {
return "", err
}
buildOptions := types.ImageBuildOptions{
Dockerfile: img.GetDockerfile(),
Context: buildContext,
Tags: []string{repoTag},
}
resp, err := p.client.ImageBuild(ctx, buildContext, buildOptions)
if err != nil {
return "", err
}
// need to read the response from Docker, I think otherwise the image
// might not finish building before continuing to execute here
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return "", err
}
resp.Body.Close()
return repoTag, nil
}
// CreateContainer fulfills a request for a container without starting it
func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (Container, error) {
var err error
// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
p.defaultNetwork, err = getDefaultNetwork(ctx, p.client)
if err != nil {
return nil, err
}
// If default network is not bridge make sure it is attached to the request
// as container won't be attached to it automatically
if p.defaultNetwork != Bridge {
isAttached := false
for _, net := range req.Networks {
if net == p.defaultNetwork {
isAttached = true
break
}
}
if !isAttached {
req.Networks = append(req.Networks, p.defaultNetwork)
}
}
exposedPortSet, exposedPortMap, err := nat.ParsePortSpecs(req.ExposedPorts)
if err != nil {
return nil, err
}
env := []string{}
for envKey, envVar := range req.Env {
env = append(env, envKey+"="+envVar)
}
if req.Labels == nil {
req.Labels = make(map[string]string)
}
sessionID := uuid.New()
var termSignal chan bool
if !req.SkipReaper {
r, err := NewReaper(ctx, sessionID.String(), p, req.ReaperImage)
if err != nil {
return nil, errors.Wrap(err, "creating reaper failed")
}
termSignal, err = r.Connect()
if err != nil {
return nil, errors.Wrap(err, "connecting to reaper failed")
}
for k, v := range r.Labels() {
if _, ok := req.Labels[k]; !ok {
req.Labels[k] = v
}
}
}
if err = req.Validate(); err != nil {
return nil, err
}
var tag string
if req.ShouldBuildImage() {
tag, err = p.BuildImage(ctx, &req)
if err != nil {
return nil, err
}
} else {
tag = req.Image
var shouldPullImage bool
if req.AlwaysPullImage {
shouldPullImage = true // If requested always attempt to pull image
} else {
_, _, err = p.client.ImageInspectWithRaw(ctx, tag)
if err != nil {
if client.IsErrNotFound(err) {
shouldPullImage = true
} else {
return nil, err
}
}
}
if shouldPullImage {
pullOpt := types.ImagePullOptions{}
if req.RegistryCred != "" {
pullOpt.RegistryAuth = req.RegistryCred
}
if err := p.attemptToPullImage(ctx, tag, pullOpt); err != nil {
return nil, err
}
}
}
dockerInput := &container.Config{
Entrypoint: req.Entrypoint,
Image: tag,
Env: env,
ExposedPorts: exposedPortSet,
Labels: req.Labels,
Cmd: req.Cmd,
Hostname: req.Hostname,
}
// prepare mounts
mounts := []mount.Mount{}
for hostPath, innerPath := range req.BindMounts {
mounts = append(mounts, mount.Mount{
Type: mount.TypeBind,
Source: hostPath,
Target: innerPath,
})
}
for volumeName, innerPath := range req.VolumeMounts {
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Source: volumeName,
Target: innerPath,
})
}
hostConfig := &container.HostConfig{
PortBindings: exposedPortMap,
Mounts: mounts,
Tmpfs: req.Tmpfs,
AutoRemove: req.AutoRemove,
Privileged: req.Privileged,
NetworkMode: req.NetworkMode,
}
endpointConfigs := map[string]*network.EndpointSettings{}
for _, n := range req.Networks {
nw, err := p.GetNetwork(ctx, NetworkRequest{
Name: n,
})
if err == nil {
endpointSetting := network.EndpointSettings{
Aliases: req.NetworkAliases[n],
NetworkID: nw.ID,
}
endpointConfigs[n] = &endpointSetting
}
}
networkingConfig := network.NetworkingConfig{
EndpointsConfig: endpointConfigs,
}
resp, err := p.client.ContainerCreate(ctx, dockerInput, hostConfig, &networkingConfig, req.Name)
if err != nil {
return nil, err
}
c := &DockerContainer{
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: tag,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
skipReaper: req.SkipReaper,
stopProducer: make(chan bool),
}
return c, nil
}
// attemptToPullImage tries to pull the image while respecting the ctx cancellations.
// Besides, if the image cannot be pulled due to ErrorNotFound then no need to retry but terminate immediately.
func (p *DockerProvider) attemptToPullImage(ctx context.Context, tag string, pullOpt types.ImagePullOptions) error {
var (
err error
pull io.ReadCloser
)
err = backoff.Retry(func() error {
pull, err = p.client.ImagePull(ctx, tag, pullOpt)
if _, ok := err.(errdefs.ErrNotFound); ok {
return backoff.Permanent(err)
}
return err
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
if err != nil {
return err
}
defer pull.Close()
// download of docker image finishes at EOF of the pull request
_, err = ioutil.ReadAll(pull)
return err
}
// Helth measure the healthiness of the provider. Right now we leverage the
// docker-client ping endpoint to see if the daemon is reachable.
func (p *DockerProvider) Health(ctx context.Context) (err error) {
_, err = p.client.Ping(ctx)
return
}
// RunContainer takes a RequestContainer as input and it runs a container via the docker sdk
func (p *DockerProvider) RunContainer(ctx context.Context, req ContainerRequest) (Container, error) {
c, err := p.CreateContainer(ctx, req)
if err != nil {
return nil, err
}
if err := c.Start(ctx); err != nil {
return c, errors.Wrap(err, "could not start container")
}
return c, nil
}
// daemonHost gets the host or ip of the Docker daemon where ports are exposed on
// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
// You can use the "TC_HOST" env variable to set this yourself
func (p *DockerProvider) daemonHost(ctx context.Context) (string, error) {
if p.hostCache != "" {
return p.hostCache, nil
}
host, exists := os.LookupEnv("TC_HOST")
if exists {
p.hostCache = host
return p.hostCache, nil
}
// infer from Docker host
url, err := url.Parse(p.client.DaemonHost())
if err != nil {
return "", err
}
switch url.Scheme {
case "http", "https", "tcp":
p.hostCache = url.Hostname()
case "unix", "npipe":
if inAContainer() {
ip, err := p.GetGatewayIP(ctx)
if err != nil {
// fallback to getDefaultGatewayIP
ip, err = getDefaultGatewayIP()
if err != nil {
ip = "localhost"
}
}
p.hostCache = ip
} else {
p.hostCache = "localhost"
}
default:
return "", errors.New("Could not determine host through env or docker host")
}
return p.hostCache, nil
}
// CreateNetwork returns the object representing a new network identified by its name
func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (Network, error) {
var err error
// Make sure that bridge network exists
// In case it is disabled we will create reaper_default network
p.defaultNetwork, err = getDefaultNetwork(ctx, p.client)
if req.Labels == nil {
req.Labels = make(map[string]string)
}
nc := types.NetworkCreate{
Driver: req.Driver,
CheckDuplicate: req.CheckDuplicate,
Internal: req.Internal,
EnableIPv6: req.EnableIPv6,
Attachable: req.Attachable,
Labels: req.Labels,
}
sessionID := uuid.New()
var termSignal chan bool
if !req.SkipReaper {
r, err := NewReaper(ctx, sessionID.String(), p, req.ReaperImage)
if err != nil {
return nil, errors.Wrap(err, "creating network reaper failed")
}
termSignal, err = r.Connect()
if err != nil {
return nil, errors.Wrap(err, "connecting to network reaper failed")
}
for k, v := range r.Labels() {
if _, ok := req.Labels[k]; !ok {
req.Labels[k] = v
}
}
}
response, err := p.client.NetworkCreate(ctx, req.Name, nc)
if err != nil {
return &DockerNetwork{}, err
}
n := &DockerNetwork{
ID: response.ID,
Driver: req.Driver,
Name: req.Name,
terminationSignal: termSignal,
provider: p,
}
return n, nil
}
// GetNetwork returns the object representing the network identified by its name
func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (types.NetworkResource, error) {
networkResource, err := p.client.NetworkInspect(ctx, req.Name, types.NetworkInspectOptions{
Verbose: true,
})
if err != nil {
return types.NetworkResource{}, err
}
return networkResource, err
}
func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
// Use a default network as defined in the DockerProvider
nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.defaultNetwork})
if err != nil {
return "", err
}
var ip string
for _, config := range nw.IPAM.Config {
if config.Gateway != "" {
ip = config.Gateway
break
}
}
if ip == "" {
return "", errors.New("Failed to get gateway IP from network settings")
}
return ip, nil
}
func inAContainer() bool {
// see https://github.com/testcontainers/testcontainers-java/blob/3ad8d80e2484864e554744a4800a81f6b7982168/core/src/main/java/org/testcontainers/dockerclient/DockerClientConfigUtils.java#L15
if _, err := os.Stat("/.dockerenv"); err == nil {
return true
}
return false
}
// deprecated
// see https://github.com/testcontainers/testcontainers-java/blob/master/core/src/main/java/org/testcontainers/dockerclient/DockerClientConfigUtils.java#L46
func getDefaultGatewayIP() (string, error) {
// see https://github.com/testcontainers/testcontainers-java/blob/3ad8d80e2484864e554744a4800a81f6b7982168/core/src/main/java/org/testcontainers/dockerclient/DockerClientConfigUtils.java#L27
cmd := exec.Command("sh", "-c", "ip route|awk '/default/ { print $3 }'")
stdout, err := cmd.Output()
if err != nil {
return "", errors.New("Failed to detect docker host")
}
ip := strings.TrimSpace(string(stdout))
if len(ip) == 0 {
return "", errors.New("Failed to parse default gateway IP")
}
return string(ip), nil
}
func getDefaultNetwork(ctx context.Context, cli *client.Client) (string, error) {
// Get list of available networks
networkResources, err := cli.NetworkList(ctx, types.NetworkListOptions{})
if err != nil {
return "", err
}
reaperNetwork := ReaperDefault
reaperNetworkExists := false
for _, net := range networkResources {
if net.Name == Bridge {
return Bridge, nil
}
if net.Name == reaperNetwork {
reaperNetworkExists = true
}
}
// Create a bridge network for the container communications
if !reaperNetworkExists {
_, err = cli.NetworkCreate(ctx, reaperNetwork, types.NetworkCreate{
Driver: Bridge,
Attachable: true,
Labels: map[string]string{
TestcontainerLabel: "true",
},
})
if err != nil {
return "", err
}
}
return reaperNetwork, nil
}