2020-03-01 16:06:34 +00:00
package testcontainers
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
"os"
"os/exec"
"strings"
"time"
"github.com/cenkalti/backoff"
2020-09-06 12:31:23 +00:00
"github.com/docker/docker/errdefs"
2020-03-01 16:06:34 +00:00
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
2020-09-06 12:31:23 +00:00
"github.com/google/uuid"
2020-03-01 16:06:34 +00:00
"github.com/pkg/errors"
"github.com/testcontainers/testcontainers-go/wait"
)
// Implement interfaces
var _ Container = ( * DockerContainer ) ( nil )
// DockerContainer represents a container started using Docker
type DockerContainer struct {
// Container ID from Docker
ID string
WaitingFor wait . Strategy
Image string
// Cache to retrieve container information without re-fetching them from dockerd
raw * types . ContainerJSON
provider * DockerProvider
sessionID uuid . UUID
terminationSignal chan bool
skipReaper bool
consumers [ ] LogConsumer
stopProducer chan bool
}
func ( c * DockerContainer ) GetContainerID ( ) string {
return c . ID
}
// Endpoint gets proto://host:port string for the first exposed port
// Will returns just host:port if proto is ""
func ( c * DockerContainer ) Endpoint ( ctx context . Context , proto string ) ( string , error ) {
ports , err := c . Ports ( ctx )
if err != nil {
return "" , err
}
// get first port
var firstPort nat . Port
for p := range ports {
firstPort = p
break
}
return c . PortEndpoint ( ctx , firstPort , proto )
}
// PortEndpoint gets proto://host:port string for the given exposed port
// Will returns just host:port if proto is ""
func ( c * DockerContainer ) PortEndpoint ( ctx context . Context , port nat . Port , proto string ) ( string , error ) {
host , err := c . Host ( ctx )
if err != nil {
return "" , err
}
outerPort , err := c . MappedPort ( ctx , port )
if err != nil {
return "" , err
}
protoFull := ""
if proto != "" {
protoFull = fmt . Sprintf ( "%s://" , proto )
}
return fmt . Sprintf ( "%s%s:%s" , protoFull , host , outerPort . Port ( ) ) , nil
}
// Host gets host (ip or name) of the docker daemon where the container port is exposed
// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
// You can use the "TC_HOST" env variable to set this yourself
func ( c * DockerContainer ) Host ( ctx context . Context ) ( string , error ) {
host , err := c . provider . daemonHost ( )
if err != nil {
return "" , err
}
return host , nil
}
// MappedPort gets externally mapped port for a container port
func ( c * DockerContainer ) MappedPort ( ctx context . Context , port nat . Port ) ( nat . Port , error ) {
inspect , err := c . inspectContainer ( ctx )
if err != nil {
return "" , err
}
if inspect . ContainerJSONBase . HostConfig . NetworkMode == "host" {
return port , nil
}
ports , err := c . Ports ( ctx )
if err != nil {
return "" , err
}
for k , p := range ports {
if k . Port ( ) != port . Port ( ) {
continue
}
if port . Proto ( ) != "" && k . Proto ( ) != port . Proto ( ) {
continue
}
return nat . NewPort ( k . Proto ( ) , p [ 0 ] . HostPort )
}
return "" , errors . New ( "port not found" )
}
// Ports gets the exposed ports for the container.
func ( c * DockerContainer ) Ports ( ctx context . Context ) ( nat . PortMap , error ) {
inspect , err := c . inspectContainer ( ctx )
if err != nil {
return nil , err
}
return inspect . NetworkSettings . Ports , nil
}
// SessionID gets the current session id
func ( c * DockerContainer ) SessionID ( ) string {
return c . sessionID . String ( )
}
// Start will start an already created container
func ( c * DockerContainer ) Start ( ctx context . Context ) error {
shortID := c . ID [ : 12 ]
log . Printf ( "Starting container id: %s image: %s" , shortID , c . Image )
if err := c . provider . client . ContainerStart ( ctx , c . ID , types . ContainerStartOptions { } ) ; err != nil {
return err
}
// if a Wait Strategy has been specified, wait before returning
if c . WaitingFor != nil {
log . Printf ( "Waiting for container id %s image: %s" , shortID , c . Image )
if err := c . WaitingFor . WaitUntilReady ( ctx , c ) ; err != nil {
return err
}
}
log . Printf ( "Container is ready id: %s image: %s" , shortID , c . Image )
return nil
}
// Terminate is used to kill the container. It is usually triggered by as defer function.
func ( c * DockerContainer ) Terminate ( ctx context . Context ) error {
err := c . provider . client . ContainerRemove ( ctx , c . GetContainerID ( ) , types . ContainerRemoveOptions {
RemoveVolumes : true ,
Force : true ,
} )
if err == nil {
c . sessionID = uuid . UUID { }
c . raw = nil
}
return err
}
func ( c * DockerContainer ) inspectContainer ( ctx context . Context ) ( * types . ContainerJSON , error ) {
if c . raw != nil {
return c . raw , nil
}
inspect , err := c . provider . client . ContainerInspect ( ctx , c . ID )
if err != nil {
return nil , err
}
c . raw = & inspect
return c . raw , nil
}
// Logs will fetch both STDOUT and STDERR from the current container. Returns a
// ReadCloser and leaves it up to the caller to extract what it wants.
func ( c * DockerContainer ) Logs ( ctx context . Context ) ( io . ReadCloser , error ) {
options := types . ContainerLogsOptions {
ShowStdout : true ,
ShowStderr : true ,
}
return c . provider . client . ContainerLogs ( ctx , c . ID , options )
}
// FollowOutput adds a LogConsumer to be sent logs from the container's
// STDOUT and STDERR
func ( c * DockerContainer ) FollowOutput ( consumer LogConsumer ) {
if c . consumers == nil {
c . consumers = [ ] LogConsumer {
consumer ,
}
} else {
c . consumers = append ( c . consumers , consumer )
}
}
// Name gets the name of the container.
func ( c * DockerContainer ) Name ( ctx context . Context ) ( string , error ) {
inspect , err := c . inspectContainer ( ctx )
if err != nil {
return "" , err
}
return inspect . Name , nil
}
// Networks gets the names of the networks the container is attached to.
func ( c * DockerContainer ) Networks ( ctx context . Context ) ( [ ] string , error ) {
inspect , err := c . inspectContainer ( ctx )
if err != nil {
return [ ] string { } , err
}
networks := inspect . NetworkSettings . Networks
n := [ ] string { }
for k := range networks {
n = append ( n , k )
}
return n , nil
}
2020-09-06 12:31:23 +00:00
// ContainerIP gets the IP address of the primary network within the container.
func ( c * DockerContainer ) ContainerIP ( ctx context . Context ) ( string , error ) {
inspect , err := c . inspectContainer ( ctx )
if err != nil {
return "" , err
}
return inspect . NetworkSettings . IPAddress , nil
}
2020-03-01 16:06:34 +00:00
// NetworkAliases gets the aliases of the container for the networks it is attached to.
func ( c * DockerContainer ) NetworkAliases ( ctx context . Context ) ( map [ string ] [ ] string , error ) {
inspect , err := c . inspectContainer ( ctx )
if err != nil {
return map [ string ] [ ] string { } , err
}
networks := inspect . NetworkSettings . Networks
a := map [ string ] [ ] string { }
for k := range networks {
a [ k ] = networks [ k ] . Aliases
}
return a , nil
}
func ( c * DockerContainer ) Exec ( ctx context . Context , cmd [ ] string ) ( int , error ) {
cli := c . provider . client
response , err := cli . ContainerExecCreate ( ctx , c . ID , types . ExecConfig {
Cmd : cmd ,
Detach : false ,
} )
if err != nil {
return 0 , err
}
err = cli . ContainerExecStart ( ctx , response . ID , types . ExecStartCheck {
Detach : false ,
} )
if err != nil {
return 0 , err
}
var exitCode int
for {
execResp , err := cli . ContainerExecInspect ( ctx , response . ID )
if err != nil {
return 0 , err
}
if ! execResp . Running {
exitCode = execResp . ExitCode
break
}
time . Sleep ( 100 * time . Millisecond )
}
return exitCode , nil
}
// StartLogProducer will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer
func ( c * DockerContainer ) StartLogProducer ( ctx context . Context ) error {
go func ( ) {
options := types . ContainerLogsOptions {
ShowStdout : true ,
ShowStderr : true ,
Follow : true ,
}
ctx , cancel := context . WithTimeout ( ctx , time . Second * 5 )
defer cancel ( )
r , err := c . provider . client . ContainerLogs ( ctx , c . GetContainerID ( ) , options )
if err != nil {
// if we can't get the logs, panic, we can't return an error to anything
// from within this goroutine
panic ( err )
}
for {
select {
case <- c . stopProducer :
err := r . Close ( )
if err != nil {
// we can't close the read closer, this should never happen
panic ( err )
}
return
default :
h := make ( [ ] byte , 8 )
_ , err := r . Read ( h )
if err != nil {
// this explicitly ignores errors
// because we want to keep procesing even if one of our reads fails
continue
}
count := binary . BigEndian . Uint32 ( h [ 4 : ] )
if count == 0 {
continue
}
logType := h [ 0 ]
if logType > 2 {
panic ( fmt . Sprintf ( "received inavlid log type: %d" , logType ) )
}
// a map of the log type --> int representation in the header, notice the first is blank, this is stdin, but the go docker client doesn't allow following that in logs
logTypes := [ ] string { "" , StdoutLog , StderrLog }
b := make ( [ ] byte , count )
_ , err = r . Read ( b )
if err != nil {
// TODO: add-logger: use logger to log out this error
fmt . Fprintf ( os . Stderr , "error occurred reading log with known length %s" , err . Error ( ) )
continue
}
for _ , c := range c . consumers {
c . Accept ( Log {
LogType : logTypes [ logType ] ,
Content : b ,
} )
}
}
}
} ( )
return nil
}
// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func ( c * DockerContainer ) StopLogProducer ( ) error {
c . stopProducer <- true
return nil
}
// DockerNetwork represents a network started using Docker
type DockerNetwork struct {
ID string // Network ID from Docker
Driver string
Name string
provider * DockerProvider
terminationSignal chan bool
}
// Remove is used to remove the network. It is usually triggered by as defer function.
2020-09-06 12:31:23 +00:00
func ( n * DockerNetwork ) Remove ( ctx context . Context ) error {
return n . provider . client . NetworkRemove ( ctx , n . ID )
2020-03-01 16:06:34 +00:00
}
// DockerProvider implements the ContainerProvider interface
type DockerProvider struct {
client * client . Client
hostCache string
}
var _ ContainerProvider = ( * DockerProvider ) ( nil )
// NewDockerProvider creates a Docker provider with the EnvClient
func NewDockerProvider ( ) ( * DockerProvider , error ) {
client , err := client . NewEnvClient ( )
if err != nil {
return nil , err
}
client . NegotiateAPIVersion ( context . Background ( ) )
p := & DockerProvider {
client : client ,
}
return p , nil
}
// BuildImage will build and image from context and Dockerfile, then return the tag
func ( p * DockerProvider ) BuildImage ( ctx context . Context , img ImageBuildInfo ) ( string , error ) {
2020-09-06 12:31:23 +00:00
repo := uuid . New ( )
tag := uuid . New ( )
2020-03-01 16:06:34 +00:00
repoTag := fmt . Sprintf ( "%s:%s" , repo , tag )
buildContext , err := img . GetContext ( )
if err != nil {
return "" , err
}
buildOptions := types . ImageBuildOptions {
Dockerfile : img . GetDockerfile ( ) ,
Context : buildContext ,
Tags : [ ] string { repoTag } ,
}
resp , err := p . client . ImageBuild ( ctx , buildContext , buildOptions )
if err != nil {
return "" , err
}
// need to read the response from Docker, I think otherwise the image
// might not finish building before continuing to execute here
buf := new ( bytes . Buffer )
_ , err = buf . ReadFrom ( resp . Body )
if err != nil {
return "" , err
}
resp . Body . Close ( )
return repoTag , nil
}
// CreateContainer fulfills a request for a container without starting it
func ( p * DockerProvider ) CreateContainer ( ctx context . Context , req ContainerRequest ) ( Container , error ) {
exposedPortSet , exposedPortMap , err := nat . ParsePortSpecs ( req . ExposedPorts )
if err != nil {
return nil , err
}
env := [ ] string { }
for envKey , envVar := range req . Env {
env = append ( env , envKey + "=" + envVar )
}
if req . Labels == nil {
req . Labels = make ( map [ string ] string )
}
2020-09-06 12:31:23 +00:00
sessionID := uuid . New ( )
2020-03-01 16:06:34 +00:00
var termSignal chan bool
if ! req . SkipReaper {
r , err := NewReaper ( ctx , sessionID . String ( ) , p , req . ReaperImage )
if err != nil {
return nil , errors . Wrap ( err , "creating reaper failed" )
}
termSignal , err = r . Connect ( )
if err != nil {
return nil , errors . Wrap ( err , "connecting to reaper failed" )
}
for k , v := range r . Labels ( ) {
if _ , ok := req . Labels [ k ] ; ! ok {
req . Labels [ k ] = v
}
}
}
if err = req . Validate ( ) ; err != nil {
return nil , err
}
var tag string
if req . ShouldBuildImage ( ) {
tag , err = p . BuildImage ( ctx , & req )
if err != nil {
return nil , err
}
} else {
tag = req . Image
2020-09-06 12:31:23 +00:00
var shouldPullImage bool
2020-03-01 16:06:34 +00:00
2020-09-06 12:31:23 +00:00
if req . AlwaysPullImage {
shouldPullImage = true // If requested always attempt to pull image
} else {
_ , _ , err = p . client . ImageInspectWithRaw ( ctx , tag )
if err != nil {
if client . IsErrNotFound ( err ) {
shouldPullImage = true
} else {
2020-03-01 16:06:34 +00:00
return nil , err
}
2020-09-06 12:31:23 +00:00
}
}
if shouldPullImage {
pullOpt := types . ImagePullOptions { }
if req . RegistryCred != "" {
pullOpt . RegistryAuth = req . RegistryCred
}
if err := p . attemptToPullImage ( ctx , tag , pullOpt ) ; err != nil {
2020-03-01 16:06:34 +00:00
return nil , err
}
}
}
dockerInput := & container . Config {
2020-09-06 12:31:23 +00:00
Entrypoint : req . Entrypoint ,
2020-03-01 16:06:34 +00:00
Image : tag ,
Env : env ,
ExposedPorts : exposedPortSet ,
Labels : req . Labels ,
Cmd : req . Cmd ,
}
// prepare mounts
mounts := [ ] mount . Mount { }
for hostPath , innerPath := range req . BindMounts {
mounts = append ( mounts , mount . Mount {
Type : mount . TypeBind ,
Source : hostPath ,
Target : innerPath ,
} )
}
for volumeName , innerPath := range req . VolumeMounts {
mounts = append ( mounts , mount . Mount {
Type : mount . TypeVolume ,
Source : volumeName ,
Target : innerPath ,
} )
}
hostConfig := & container . HostConfig {
PortBindings : exposedPortMap ,
Mounts : mounts ,
Tmpfs : req . Tmpfs ,
AutoRemove : req . AutoRemove ,
Privileged : req . Privileged ,
NetworkMode : req . NetworkMode ,
}
endpointConfigs := map [ string ] * network . EndpointSettings { }
for _ , n := range req . Networks {
nw , err := p . GetNetwork ( ctx , NetworkRequest {
Name : n ,
} )
if err == nil {
endpointSetting := network . EndpointSettings {
Aliases : req . NetworkAliases [ n ] ,
NetworkID : nw . ID ,
}
endpointConfigs [ n ] = & endpointSetting
}
}
networkingConfig := network . NetworkingConfig {
EndpointsConfig : endpointConfigs ,
}
resp , err := p . client . ContainerCreate ( ctx , dockerInput , hostConfig , & networkingConfig , req . Name )
if err != nil {
return nil , err
}
c := & DockerContainer {
ID : resp . ID ,
WaitingFor : req . WaitingFor ,
Image : tag ,
sessionID : sessionID ,
provider : p ,
terminationSignal : termSignal ,
skipReaper : req . SkipReaper ,
stopProducer : make ( chan bool ) ,
}
return c , nil
}
2020-09-06 12:31:23 +00:00
// attemptToPullImage tries to pull the image while respecting the ctx cancellations.
// Besides, if the image cannot be pulled due to ErrorNotFound then no need to retry but terminate immediately.
func ( p * DockerProvider ) attemptToPullImage ( ctx context . Context , tag string , pullOpt types . ImagePullOptions ) error {
var (
err error
pull io . ReadCloser
)
err = backoff . Retry ( func ( ) error {
pull , err = p . client . ImagePull ( ctx , tag , pullOpt )
if _ , ok := err . ( errdefs . ErrNotFound ) ; ok {
return backoff . Permanent ( err )
}
return err
} , backoff . WithContext ( backoff . NewExponentialBackOff ( ) , ctx ) )
if err != nil {
return err
}
defer pull . Close ( )
// download of docker image finishes at EOF of the pull request
_ , err = ioutil . ReadAll ( pull )
return err
}
// Helth measure the healthiness of the provider. Right now we leverage the
// docker-client ping endpoint to see if the daemon is reachable.
func ( p * DockerProvider ) Health ( ctx context . Context ) ( err error ) {
_ , err = p . client . Ping ( ctx )
return
}
2020-03-01 16:06:34 +00:00
// RunContainer takes a RequestContainer as input and it runs a container via the docker sdk
func ( p * DockerProvider ) RunContainer ( ctx context . Context , req ContainerRequest ) ( Container , error ) {
c , err := p . CreateContainer ( ctx , req )
if err != nil {
return nil , err
}
if err := c . Start ( ctx ) ; err != nil {
return c , errors . Wrap ( err , "could not start container" )
}
return c , nil
}
// daemonHost gets the host or ip of the Docker daemon where ports are exposed on
// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
// You can use the "TC_HOST" env variable to set this yourself
func ( p * DockerProvider ) daemonHost ( ) ( string , error ) {
if p . hostCache != "" {
return p . hostCache , nil
}
host , exists := os . LookupEnv ( "TC_HOST" )
if exists {
p . hostCache = host
return p . hostCache , nil
}
// infer from Docker host
url , err := url . Parse ( p . client . DaemonHost ( ) )
if err != nil {
return "" , err
}
switch url . Scheme {
case "http" , "https" , "tcp" :
p . hostCache = url . Hostname ( )
case "unix" , "npipe" :
if inAContainer ( ) {
ip , err := getGatewayIp ( )
if err != nil {
return "" , err
}
p . hostCache = ip
} else {
p . hostCache = "localhost"
}
default :
return "" , errors . New ( "Could not determine host through env or docker host" )
}
return p . hostCache , nil
}
// CreateNetwork returns the object representing a new network identified by its name
func ( p * DockerProvider ) CreateNetwork ( ctx context . Context , req NetworkRequest ) ( Network , error ) {
if req . Labels == nil {
req . Labels = make ( map [ string ] string )
}
nc := types . NetworkCreate {
Driver : req . Driver ,
CheckDuplicate : req . CheckDuplicate ,
Internal : req . Internal ,
EnableIPv6 : req . EnableIPv6 ,
Attachable : req . Attachable ,
Labels : req . Labels ,
}
2020-09-06 12:31:23 +00:00
sessionID := uuid . New ( )
2020-03-01 16:06:34 +00:00
var termSignal chan bool
if ! req . SkipReaper {
r , err := NewReaper ( ctx , sessionID . String ( ) , p , req . ReaperImage )
if err != nil {
return nil , errors . Wrap ( err , "creating network reaper failed" )
}
termSignal , err = r . Connect ( )
if err != nil {
return nil , errors . Wrap ( err , "connecting to network reaper failed" )
}
for k , v := range r . Labels ( ) {
if _ , ok := req . Labels [ k ] ; ! ok {
req . Labels [ k ] = v
}
}
}
response , err := p . client . NetworkCreate ( ctx , req . Name , nc )
if err != nil {
return & DockerNetwork { } , err
}
n := & DockerNetwork {
ID : response . ID ,
Driver : req . Driver ,
Name : req . Name ,
terminationSignal : termSignal ,
2020-09-06 12:31:23 +00:00
provider : p ,
2020-03-01 16:06:34 +00:00
}
return n , nil
}
// GetNetwork returns the object representing the network identified by its name
func ( p * DockerProvider ) GetNetwork ( ctx context . Context , req NetworkRequest ) ( types . NetworkResource , error ) {
networkResource , err := p . client . NetworkInspect ( ctx , req . Name , types . NetworkInspectOptions {
Verbose : true ,
} )
if err != nil {
return types . NetworkResource { } , err
}
return networkResource , err
}
func inAContainer ( ) bool {
// see https://github.com/testcontainers/testcontainers-java/blob/3ad8d80e2484864e554744a4800a81f6b7982168/core/src/main/java/org/testcontainers/dockerclient/DockerClientConfigUtils.java#L15
if _ , err := os . Stat ( "/.dockerenv" ) ; err == nil {
return true
}
return false
}
func getGatewayIp ( ) ( string , error ) {
// see https://github.com/testcontainers/testcontainers-java/blob/3ad8d80e2484864e554744a4800a81f6b7982168/core/src/main/java/org/testcontainers/dockerclient/DockerClientConfigUtils.java#L27
cmd := exec . Command ( "sh" , "-c" , "ip route|awk '/default/ { print $3 }'" )
stdout , err := cmd . Output ( )
if err != nil {
return "" , errors . New ( "Failed to detect docker host" )
}
ip := strings . TrimSpace ( string ( stdout ) )
if len ( ip ) == 0 {
return "" , errors . New ( "Failed to parse default gateway IP" )
}
return string ( ip ) , nil
}