280 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			280 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
 | 
						|
import os
 | 
						|
 | 
						|
# import container_support as cs
 | 
						|
import argparse
 | 
						|
import json
 | 
						|
import pathlib
 | 
						|
import typing
 | 
						|
 | 
						|
import numpy as np
 | 
						|
import re
 | 
						|
import tensorflow as tf
 | 
						|
import zipfile
 | 
						|
 | 
						|
import tensorflow.keras.losses
 | 
						|
 | 
						|
# from tensorflow.keras import backend as K
 | 
						|
from tensorflow.keras import callbacks
 | 
						|
from tensorflow.keras.layers import Conv2D
 | 
						|
from tensorflow.keras.layers import Dropout, Flatten, Dense
 | 
						|
from tensorflow.keras.layers import Input
 | 
						|
from tensorflow.keras.layers import Layer
 | 
						|
from tensorflow.keras.models import Model
 | 
						|
from tensorflow.keras.preprocessing.image import load_img, img_to_array
 | 
						|
from numpy import typing as npt
 | 
						|
 | 
						|
MODEL_CATEGORICAL = "categorical"
 | 
						|
MODEL_LINEAR = "linear"
 | 
						|
 | 
						|
 | 
						|
def linear_bin(a: float, N: int = 15, offset: int = 1, R: float = 2.0) -> npt.NDArray[np.float64]:
 | 
						|
    """
 | 
						|
    create a bin of length N
 | 
						|
    map val A to range R
 | 
						|
    offset one hot bin by offset, commonly R/2
 | 
						|
    """
 | 
						|
    a = a + offset
 | 
						|
    b = round(a / (R / (N - offset)))
 | 
						|
    arr = np.zeros(N)
 | 
						|
    b = clamp(b, 0, N - 1)
 | 
						|
    arr[int(b)] = 1
 | 
						|
    return arr
 | 
						|
 | 
						|
 | 
						|
def clamp(n: int, min: int, max: int) -> int:
 | 
						|
    if n <= min:
 | 
						|
        return min
 | 
						|
    if n >= max:
 | 
						|
        return max
 | 
						|
    return n
 | 
						|
 | 
						|
 | 
						|
def get_data(root_dir: pathlib.Path, filename: str) -> typing.List[typing.Any]:
 | 
						|
    # print('load data from file ' + filename)
 | 
						|
    d = json.load(open(os.path.join(root_dir, filename)))
 | 
						|
    return [(d['user/angle']), root_dir, d['cam/image_array']]
 | 
						|
 | 
						|
 | 
						|
numbers = re.compile(r'(\d+)')
 | 
						|
 | 
						|
 | 
						|
def unzip_file(root: pathlib.Path, f: str) -> None:
 | 
						|
    zip_ref = zipfile.ZipFile(os.path.join(root, f), 'r')
 | 
						|
    zip_ref.extractall(root)
 | 
						|
    zip_ref.close()
 | 
						|
 | 
						|
 | 
						|
def train(model_type: str, batch_size: int, slide_size: int, img_height: int, img_width: int, img_depth: int,
 | 
						|
          horizon: int, drop: float) -> None:
 | 
						|
    # env = cs.TrainingEnvironment()
 | 
						|
 | 
						|
    os.system('mkdir -p logs')
 | 
						|
 | 
						|
    # ### Loading the files ###
 | 
						|
    # ** You need to copy all your files to the directory where you are runing this notebook **
 | 
						|
    # ** into a folder named "data"                                                          **
 | 
						|
 | 
						|
    data = []
 | 
						|
 | 
						|
    for root, dirs, files in os.walk('/opt/ml/input/data/train'):
 | 
						|
        for f in files:
 | 
						|
            if f.endswith('.zip'):
 | 
						|
                unzip_file(pathlib.Path(root), f)
 | 
						|
 | 
						|
    for root, dirs, files in os.walk('/opt/ml/input/data/train'):
 | 
						|
        data.extend(
 | 
						|
            [get_data(pathlib.Path(root), f) for f in sorted(files, key=str.lower) if
 | 
						|
             f.startswith('record') and f.endswith('.json')])
 | 
						|
 | 
						|
    # ### Loading throttle and angle ###
 | 
						|
 | 
						|
    angle = [d[0] for d in data]
 | 
						|
    angle_array = np.array(angle)
 | 
						|
 | 
						|
    # ### Loading images ###
 | 
						|
    if horizon > 0:
 | 
						|
        images = np.array(
 | 
						|
            [img_to_array(load_img(os.path.join(d[1], d[2])).crop((0, horizon, img_width, img_height))) for d in data],
 | 
						|
            'f')
 | 
						|
    else:
 | 
						|
        images = np.array([img_to_array(load_img(os.path.join(d[1], d[2]))) for d in data], 'f')
 | 
						|
 | 
						|
    # slide images vs orders
 | 
						|
    if slide_size > 0:
 | 
						|
        images = images[:len(images) - slide_size]
 | 
						|
        angle_array = angle_array[slide_size:]
 | 
						|
 | 
						|
    # ### Start training ###
 | 
						|
    from datetime import datetime
 | 
						|
    logdir = '/opt/ml/model/logs/' + datetime.now().strftime("%Y%m%d-%H%M%S")
 | 
						|
    logs = callbacks.TensorBoard(log_dir=logdir, histogram_freq=0, write_graph=True, write_images=True)
 | 
						|
 | 
						|
    # Creates a file writer for the log directory.
 | 
						|
    # file_writer = tf.summary.create_file_writer(logdir)
 | 
						|
 | 
						|
    # Using the file writer, log the reshaped image.
 | 
						|
    # with file_writer.as_default():
 | 
						|
    #    # Don't forget to reshape.
 | 
						|
    #    imgs = np.reshape(images[0:25], (-1, img_height, img_width, img_depth))
 | 
						|
    #    tf.summary.image("25 training data examples", imgs, max_outputs=25, step=0)
 | 
						|
 | 
						|
    model_filepath = '/opt/ml/model/model_other'
 | 
						|
    if model_type == MODEL_CATEGORICAL:
 | 
						|
        model_filepath = '/opt/ml/model/model_cat'
 | 
						|
        angle_cat_array = np.array([linear_bin(float(a)) for a in angle_array])
 | 
						|
        model = default_categorical(input_shape=(img_height - horizon, img_width, img_depth), drop=drop)
 | 
						|
        loss = {'angle_out': 'categorical_crossentropy', }
 | 
						|
        optimizer = 'adam'
 | 
						|
    elif model_type == MODEL_LINEAR:
 | 
						|
        model_filepath = '/opt/ml/model/model_lin'
 | 
						|
        angle_cat_array = np.array([a for a in angle_array])
 | 
						|
        model = default_linear(input_shape=(img_height - horizon, img_width, img_depth), drop=drop)
 | 
						|
        loss = tensorflow.keras.losses.Loss('mse')
 | 
						|
        optimizer = 'rmsprop'
 | 
						|
    else:
 | 
						|
        raise Exception("invalid model type")
 | 
						|
 | 
						|
    save_best = callbacks.ModelCheckpoint(model_filepath, monitor='val_loss', verbose=1,
 | 
						|
                                          save_best_only=True, mode='min')
 | 
						|
    early_stop = callbacks.EarlyStopping(monitor='val_loss',
 | 
						|
                                         min_delta=.0005,
 | 
						|
                                         patience=5,
 | 
						|
                                         verbose=1,
 | 
						|
                                         mode='auto')
 | 
						|
 | 
						|
    # categorical output of the angle
 | 
						|
    callbacks_list = [save_best, early_stop, logs]
 | 
						|
 | 
						|
    model.compile(optimizer=optimizer,
 | 
						|
                  loss=loss, )
 | 
						|
    model.fit({'img_in': images}, {'angle_out': angle_cat_array, }, batch_size=batch_size,
 | 
						|
              epochs=100, verbose=1, validation_split=0.2, shuffle=True, callbacks=callbacks_list)
 | 
						|
 | 
						|
    # Save model for tensorflow using
 | 
						|
    model.save("/opt/ml/model/tfModel", save_format="tf")
 | 
						|
 | 
						|
    def representative_dataset() -> typing.Generator[typing.List[tf.float32], typing.Any, None]:
 | 
						|
        for d in tf.data.Dataset.from_tensor_slices(images).batch(1).take(100):
 | 
						|
            yield [tf.dtypes.cast(d, tf.float32)]
 | 
						|
 | 
						|
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
 | 
						|
 | 
						|
    # full quantization for edgeTpu
 | 
						|
    # https://www.tensorflow.org/lite/performance/post_training_quantization#full_integer_quantization
 | 
						|
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
 | 
						|
    converter.representative_dataset = representative_dataset
 | 
						|
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
 | 
						|
    converter.inference_input_type = tf.uint8  # or tf.int8
 | 
						|
    converter.inference_output_type = tf.uint8  # or tf.int8
 | 
						|
 | 
						|
    tflite_model = converter.convert()
 | 
						|
 | 
						|
    # Save the model.
 | 
						|
    with open(file=f'/opt/ml/model/model_{model_type}_{img_width}x{img_height}h{horizon}.tflite',
 | 
						|
              mode='wb') as f_output:
 | 
						|
        f_output.write(tflite_model)
 | 
						|
 | 
						|
 | 
						|
def conv2d(filters: float, kernel: typing.Union[int, typing.Tuple[int, int]], strides: typing.Union[int, typing.Tuple[int, int]], layer_num: int,
 | 
						|
           activation: str = 'relu') -> Conv2D:
 | 
						|
    """
 | 
						|
    Helper function to create a standard valid-padded convolutional layer
 | 
						|
    with square kernel and strides and unified naming convention
 | 
						|
    :param filters:     channel dimension of the layer
 | 
						|
    :param kernel:      creates (kernel, kernel) kernel matrix dimension
 | 
						|
    :param strides:     creates (strides, strides) stride
 | 
						|
    :param layer_num:   used in labelling the layer
 | 
						|
    :param activation:  activation, defaults to relu
 | 
						|
    :return:            tf.keras Convolution2D layer
 | 
						|
    """
 | 
						|
    return Conv2D(filters=filters,
 | 
						|
                  kernel_size=(kernel, kernel),
 | 
						|
                  strides=(strides, strides),
 | 
						|
                  activation=activation,
 | 
						|
                  name='conv2d_' + str(layer_num))
 | 
						|
 | 
						|
 | 
						|
def core_cnn_layers(img_in: Input, img_height: int, img_width: int, drop: float, l4_stride: int = 1) -> Layer:
 | 
						|
    """
 | 
						|
    Returns the core CNN layers that are shared among the different models,
 | 
						|
    like linear, imu, behavioural
 | 
						|
    :param img_width:       image width
 | 
						|
    :param img_height:      image height
 | 
						|
    :param img_in:          input layer of network
 | 
						|
    :param drop:            dropout rate
 | 
						|
    :param l4_stride:       4-th layer stride, default 1
 | 
						|
    :return:                stack of CNN layers
 | 
						|
    """
 | 
						|
    x = img_in
 | 
						|
    x = conv2d(img_height / 5, 5, 2, 1)(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    x = conv2d(img_width / 5, 5, 2, 2)(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    x = conv2d(64, 5, 2, 3)(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    x = conv2d(64, 3, l4_stride, 4)(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    x = conv2d(64, 3, 1, 5)(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    x = Flatten(name='flattened')(x)
 | 
						|
    return x
 | 
						|
 | 
						|
 | 
						|
def default_linear(input_shape: typing.Tuple[int, int, int] = (120, 160, 3), drop: float = 0.2) -> Model:
 | 
						|
    img_in = Input(shape=input_shape, name='img_in')
 | 
						|
    x = core_cnn_layers(img_in, img_width=input_shape[1], img_height=input_shape[0], drop=drop)
 | 
						|
    x = Dense(100, activation='relu', name='dense_1')(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    x = Dense(50, activation='relu', name='dense_2')(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    angle_out = Dense(1, activation='linear', name='angle_out')(x)
 | 
						|
 | 
						|
    model = Model(inputs=[img_in], outputs=[angle_out], name='linear')
 | 
						|
    return model
 | 
						|
 | 
						|
 | 
						|
def default_categorical(input_shape: typing.Tuple[int, int, int] = (120, 160, 3), drop: float = 0.2) -> Model:
 | 
						|
    img_in = Input(shape=input_shape, name='img_in')
 | 
						|
    x = core_cnn_layers(img_in, img_width=input_shape[1], img_height=input_shape[0], drop=drop, l4_stride=2)
 | 
						|
    x = Dense(100, activation='relu', name="dense_1")(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    x = Dense(50, activation='relu', name="dense_2")(x)
 | 
						|
    x = Dropout(drop)(x)
 | 
						|
    # Categorical output of the angle into 15 bins
 | 
						|
    angle_out = Dense(15, activation='softmax', name='angle_out')(x)
 | 
						|
 | 
						|
    model = Model(inputs=[img_in], outputs=[angle_out], name='categorical')
 | 
						|
    return model
 | 
						|
 | 
						|
 | 
						|
def main() -> None:
 | 
						|
    parser = argparse.ArgumentParser()
 | 
						|
 | 
						|
    parser.add_argument("--slide_size", type=int, default=0)
 | 
						|
    parser.add_argument("--img_height", type=int, default=120)
 | 
						|
    parser.add_argument("--img_width", type=int, default=160)
 | 
						|
    parser.add_argument("--img_depth", type=int, default=3)
 | 
						|
    parser.add_argument("--horizon", type=int, default=0)
 | 
						|
    parser.add_argument("--batch_size", type=int, default=32)
 | 
						|
    parser.add_argument("--drop", type=float, default=0.2)
 | 
						|
    parser.add_argument("--model_type", type=str, default=MODEL_CATEGORICAL)
 | 
						|
 | 
						|
    args = parser.parse_args()
 | 
						|
    params = vars(args)
 | 
						|
    train(
 | 
						|
        model_type=params["model_type"],
 | 
						|
        batch_size=params["batch_size"],
 | 
						|
        slide_size=params["slide_size"],
 | 
						|
        img_height=params["img_height"],
 | 
						|
        img_width=params["img_width"],
 | 
						|
        img_depth=params["img_depth"],
 | 
						|
        horizon=params["horizon"],
 | 
						|
        drop=params["drop"],
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |