feat(train): add new command to interact with aws and train models

This commit is contained in:
2021-10-17 19:15:44 +02:00
parent 5436dfebc2
commit 538cea18f2
1064 changed files with 282251 additions and 89305 deletions

47
pkg/train/archives.go Normal file
View File

@@ -0,0 +1,47 @@
package train
import (
"bytes"
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"log"
)
func ListArchives(ctx context.Context, bucket string) error {
client := s3.NewFromConfig(mustLoadConfig())
prefix := aws.String("input/data/train/train.zip")
objects, err := client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: prefix,
})
if err != nil {
return fmt.Errorf("unable to list objects in bucket %v: %w", bucket, err)
}
fmt.Printf("objects: %v\n", objects)
return nil
}
func (t Training) UploadArchive(ctx context.Context, archive []byte) error {
client := s3.NewFromConfig(t.config)
key := aws.String("input/data/train/train.zip")
log.Printf("upload archive to bucket '%s/%s'\n", t.bucketName, *key)
_, err := client.PutObject(
ctx,
&s3.PutObjectInput{
ACL: types.ObjectCannedACLPrivate,
Body: bytes.NewReader(archive),
Bucket: aws.String(t.bucketName),
Key: key,
})
if err != nil {
return fmt.Errorf("unable to upload archive: %w", err)
}
log.Println("archive uploaded")
return nil
}

View File

@@ -0,0 +1,12 @@
package train
import (
"testing"
)
func TestListArchives(t *testing.T) {
err := ListArchives()
if err != nil {
t.Errorf("unable to list buckets: %v", err)
}
}

22
pkg/train/config.go Normal file
View File

@@ -0,0 +1,22 @@
package train
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"log"
)
const (
prefixInput = "input/data/train/"
)
func mustLoadConfig() aws.Config {
c, err := config.LoadDefaultConfig(context.Background())
if err != nil {
log.Panicf("unable to load aws default config: %v", err)
}
return c
}

210
pkg/train/train.go Normal file
View File

@@ -0,0 +1,210 @@
package train
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/sagemaker"
"github.com/aws/aws-sdk-go-v2/service/sagemaker/types"
"github.com/cyrilix/robocar-tools/pkg/data"
"io/fs"
"io/ioutil"
"log"
"strconv"
"time"
)
func New(bucketName string, ociImage, roleArn string) *Training {
return &Training{
config: mustLoadConfig(),
bucketName: bucketName,
ociImage: ociImage,
roleArn: roleArn,
prefixInput: prefixInput,
outputBucket: fmt.Sprintf("s3://%s/output", bucketName),
}
}
type Training struct {
config aws.Config
bucketName string
ociImage string
roleArn string
prefixInput string
outputBucket string
}
func (t *Training) TrainDir(ctx context.Context, jobName, basedir string, sliceSize int, outputModelFile string) error {
log.Printf("run training with data from %s\n", basedir)
archive, err := data.BuildArchive(basedir, sliceSize)
if err != nil {
return fmt.Errorf("unable to build data archive: %w", err)
}
log.Println("")
err = t.UploadArchive(ctx, archive)
if err != nil {
return fmt.Errorf("unable to upload data arrchive: %w", err)
}
log.Println("")
err = t.runTraining(
ctx,
jobName,
sliceSize,
120,
160,
)
if err != nil {
return fmt.Errorf("unable to run training: %w", err)
}
err = t.GetTrainingOutput(ctx, jobName, outputModelFile)
if err != nil {
return fmt.Errorf("unable to get output model file '%s': %w", outputModelFile, err)
}
return nil
}
func List(bucketName string) error {
pfxInput := prefixInput
// Create an Amazon S3 service client
client := s3.NewFromConfig(mustLoadConfig())
// Get the first page of results for ListObjectsV2 for a bucket
output, err := client.ListObjectsV2(context.TODO(), &s3.ListObjectsV2Input{
Bucket: aws.String(bucketName),
Prefix: &pfxInput,
})
if err != nil {
log.Fatal(err)
}
log.Println("first page results:")
for _, object := range output.Contents {
if *object.Key == pfxInput {
continue
}
log.Printf("key=%s size=%d", aws.ToString(object.Key), object.Size)
}
return nil
}
func (t *Training) runTraining(ctx context.Context, jobName string, slideSize int, imgHeight, imgWidth int) error {
client := sagemaker.NewFromConfig(mustLoadConfig())
log.Printf("Start training job '%s'\n", jobName)
// TODO: check train data exist
jobOutput, err := client.CreateTrainingJob(
ctx,
&sagemaker.CreateTrainingJobInput{
EnableManagedSpotTraining: true,
AlgorithmSpecification: &types.AlgorithmSpecification{
TrainingInputMode: types.TrainingInputModeFile,
TrainingImage: aws.String(t.ociImage),
},
OutputDataConfig: &types.OutputDataConfig{
S3OutputPath: aws.String(t.outputBucket),
},
ResourceConfig: &types.ResourceConfig{
InstanceCount: 1,
InstanceType: types.TrainingInstanceTypeMlP2Xlarge,
VolumeSizeInGB: 1,
},
RoleArn: aws.String(t.roleArn),
StoppingCondition: &types.StoppingCondition{
MaxRuntimeInSeconds: 1800,
MaxWaitTimeInSeconds: aws.Int32(3600),
},
TrainingJobName: aws.String(jobName),
HyperParameters: map[string]string{
"sagemaker_region": "eu-west-1",
"slide_size": strconv.Itoa(slideSize),
"img_height": strconv.Itoa(imgHeight),
"img_width": strconv.Itoa(imgWidth),
},
InputDataConfig: []types.Channel{
{
ChannelName: aws.String("train"),
DataSource: &types.DataSource{
S3DataSource: &types.S3DataSource{
S3DataType: types.S3DataTypeS3Prefix,
S3Uri: aws.String(fmt.Sprintf("s3://%s/%s", t.bucketName, t.prefixInput)),
S3DataDistributionType: types.S3DataDistributionFullyReplicated,
},
},
},
},
},
)
if err != nil {
return fmt.Errorf("unable to run sagemeker job: %w", err)
}
for {
time.Sleep(30 * time.Second)
status, err := client.DescribeTrainingJob(
ctx,
&sagemaker.DescribeTrainingJobInput{
TrainingJobName: aws.String(jobName),
},
)
if err != nil {
log.Printf("unable to get status from ob %v: %v\n", jobOutput.TrainingJobArn, err)
continue
}
switch status.TrainingJobStatus {
case types.TrainingJobStatusInProgress:
log.Printf("job in progress: %v - %v - %v\n", status.TrainingJobStatus, status.SecondaryStatus, *status.SecondaryStatusTransitions[len(status.SecondaryStatusTransitions) - 1].StatusMessage)
continue
case types.TrainingJobStatusFailed:
return fmt.Errorf("job %s finished with status %v\n", jobName, status.TrainingJobStatus)
default:
log.Printf("job %s finished with status %v\n", jobName, status.TrainingJobStatus)
return nil
}
}
}
func (t *Training) GetTrainingOutput(ctx context.Context, jobName, outputFile string) error {
// Create an Amazon S3 service client
client := s3.NewFromConfig(t.config)
// Get the first page of results for ListObjectsV2 for a bucket
output, err := client.GetObject(
ctx,
&s3.GetObjectInput{
Bucket: aws.String(t.bucketName),
Key: aws.String(fmt.Sprintf("output/%s/model.tar.gz", jobName)),
},
)
if err != nil {
return fmt.Errorf("unable to get resource: %w", err)
}
content, err := ioutil.ReadAll(output.Body)
if err != nil {
return fmt.Errorf("unable read output content: %w", err)
}
err = ioutil.WriteFile(outputFile, content, fs.ModePerm)
if err != nil {
return fmt.Errorf("unable to write content to '%v': %w", outputFile, err)
}
return nil
}
func ListJob(ctx context.Context) error {
client := sagemaker.NewFromConfig(mustLoadConfig())
jobs, err := client.ListTrainingJobs(ctx, &sagemaker.ListTrainingJobsInput{})
if err != nil {
return fmt.Errorf("unable to list trainings jobs: %w", err)
}
for _, job := range jobs.TrainingJobSummaries {
fmt.Printf("%s\t\t%s\n", *job.TrainingJobName, job.TrainingJobStatus)
}
return nil
}

22
pkg/train/train_test.go Normal file
View File

@@ -0,0 +1,22 @@
package train
import "testing"
func TestList(t *testing.T) {
tests := []struct {
name string
wantErr bool
}{
{
name: "default",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := List(); (err != nil) != tt.wantErr {
t.Errorf("List() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}