#!/usr/bin/env python3 import os # import container_support as cs import argparse import json import pathlib import typing import numpy as np import re import tensorflow as tf import zipfile import tensorflow.keras.losses from tensorflow.keras import callbacks from tensorflow.keras.layers import Conv2D from tensorflow.keras.layers import Dropout, Flatten, Dense from tensorflow.keras.layers import Input from tensorflow.keras.layers import Layer from tensorflow.keras.models import Model from tensorflow.keras.preprocessing.image import load_img, img_to_array from numpy import typing as npt MODEL_CATEGORICAL = "categorical" MODEL_LINEAR = "linear" def linear_bin_speed_zone(a: int, N: int = 4) -> npt.NDArray[np.float64]: """ create a bin of length N """ arr = np.zeros(N) arr[a] = 1 return arr def linear_bin(a: float, N: int = 15, offset: int = 1, R: float = 2.0) -> npt.NDArray[np.float64]: """ create a bin of length N map val A to range R offset one hot bin by offset, commonly R/2 """ a = a + offset b = round(a / (R / (N - offset))) arr = np.zeros(N) b = clamp(b, 0, N - 1) arr[int(b)] = 1 return arr def clamp(n: int, min: int, max: int) -> int: if n <= min: return min if n >= max: return max return n def get_data(root_dir: pathlib.Path, filename: str) -> typing.List[typing.Any]: # print('load data from file ' + filename) d = json.load(open(os.path.join(root_dir, filename))) return [(d['user/angle']), root_dir, d['cam/image_array']] def get_data_speed_zone(root_dir, filename): print('load data from file ' + filename) d = json.load(open(os.path.join(root_dir, filename))) return [(d['speed_zone']), root_dir, d['cam/image_array']] numbers = re.compile(r'(\d+)') def unzip_file(root: pathlib.Path, f: str) -> None: zip_ref = zipfile.ZipFile(os.path.join(root, f), 'r') zip_ref.extractall(root) zip_ref.close() def train(model_type: str, record_field: str, batch_size: int, slide_size: int, img_height: int, img_width: int, img_depth: int, horizon: int, drop: float) -> None: # env = cs.TrainingEnvironment() os.system('mkdir -p logs') # ### Loading the files ### # ** You need to copy all your files to the directory where you are runing this notebook ** # ** into a folder named "data" ** data = [] for root, dirs, files in os.walk('/opt/ml/input/data/train'): for f in files: if f.endswith('.zip'): unzip_file(pathlib.Path(root), f) if record_field == 'angle': output_name = 'angle_out' for root, dirs, files in os.walk('/opt/ml/input/data/train'): data.extend( [get_data(root, f) for f in sorted(files, key=str.lower) if f.startswith('record') and f.endswith('.json')]) elif record_field == 'speed_zone': output_name = 'speed_zone_output' for root, dirs, files in os.walk('/opt/ml/input/data/train'): data.extend( [get_data_speed_zone(root, f) for f in sorted(files, key=str.lower) if f.startswith('record') and f.endswith('.json')]) else: print(f"invalid record filed: {record_field}") return # ### Loading values (angle or speed_zone) ### value = [d[0] for d in data] value_array = np.array(value) # ### Loading images ### if horizon > 0: images = np.array( [img_to_array(load_img(os.path.join(d[1], d[2])).crop((0, horizon, img_width, img_height))) for d in data], 'f') else: images = np.array([img_to_array(load_img(os.path.join(d[1], d[2]))) for d in data], 'f') # slide images vs orders if slide_size > 0: images = images[:len(images) - slide_size] value_array = value_array[slide_size:] # ### Start training ### from datetime import datetime logdir = '/opt/ml/model/logs/' + datetime.now().strftime("%Y%m%d-%H%M%S") logs = callbacks.TensorBoard(log_dir=logdir, histogram_freq=0, write_graph=True, write_images=True) # Creates a file writer for the log directory. # file_writer = tf.summary.create_file_writer(logdir) # Using the file writer, log the reshaped image. # with file_writer.as_default(): # # Don't forget to reshape. # imgs = np.reshape(images[0:25], (-1, img_height, img_width, img_depth)) # tf.summary.image("25 training data examples", imgs, max_outputs=25, step=0) model_filepath = '/opt/ml/model/model_other' if model_type == MODEL_CATEGORICAL: model_filepath = '/opt/ml/model/model_cat' if record_field == 'angle': input_value_array = np.array([linear_bin(float(a)) for a in value_array]) output_bin = 15 elif record_field == 'speed_zone': input_value_array = np.array([linear_bin_speed_zone(a) for a in value_array]) output_bin = 4 model = default_categorical(input_shape=(img_height - horizon, img_width, img_depth), drop=drop, output_name=output_name, output_bin=output_bin) loss = {output_name: 'categorical_crossentropy', } optimizer = 'adam' elif model_type == MODEL_LINEAR: model_filepath = '/opt/ml/model/model_lin' input_value_array = np.array([a for a in value_array]) model = default_linear(input_shape=(img_height - horizon, img_width, img_depth), drop=drop, output_name=output_name) loss = 'mse' optimizer = 'rmsprop' else: raise Exception("invalid model type") # Display the model's architecture model.summary() save_best = callbacks.ModelCheckpoint(model_filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min') early_stop = callbacks.EarlyStopping(monitor='val_loss', min_delta=.0005, patience=5, verbose=1, mode='auto') # categorical output of the angle callbacks_list = [save_best, early_stop, logs] model.compile(optimizer=optimizer, loss=loss,) model.fit({'img_in': images}, {output_name: input_value_array, }, batch_size=batch_size, epochs=100, verbose=1, validation_split=0.2, shuffle=True, callbacks=callbacks_list) # Save model for tensorflow using model.save(f'/opt/ml/model/model_{record_field.replace("_", "")}_{model_type}_{str(img_width)}x{str(img_height)}h{str(horizon)}') def representative_dataset() -> typing.Generator[typing.List[tf.float32], typing.Any, None]: for d in tf.data.Dataset.from_tensor_slices(images).batch(1).take(100): yield [tf.dtypes.cast(d, tf.float32)] converter = tf.lite.TFLiteConverter.from_keras_model(model) # full quantization for edgeTpu # https://www.tensorflow.org/lite/performance/post_training_quantization#full_integer_quantization converter.optimizations = [tf.lite.Optimize.DEFAULT] converter.representative_dataset = representative_dataset converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] converter.inference_input_type = tf.uint8 # or tf.int8 converter.inference_output_type = tf.uint8 # or tf.int8 tflite_model = converter.convert() # Save the model. with open(f'/opt/ml/model/model_{record_field.replace("_", "")}_{model_type}_{str(img_width)}x{str(img_height)}h{str(horizon)}.tflite', 'wb') as f: f.write(tflite_model) def conv2d(filters: float, kernel: typing.Union[int, typing.Tuple[int, int]], strides: typing.Union[int, typing.Tuple[int, int]], layer_num: int, activation: str = 'relu') -> Conv2D: """ Helper function to create a standard valid-padded convolutional layer with square kernel and strides and unified naming convention :param filters: channel dimension of the layer :param kernel: creates (kernel, kernel) kernel matrix dimension :param strides: creates (strides, strides) stride :param layer_num: used in labelling the layer :param activation: activation, defaults to relu :return: tf.keras Convolution2D layer """ return Conv2D(filters=filters, kernel_size=(kernel, kernel), strides=(strides, strides), activation=activation, name='conv2d_' + str(layer_num)) def core_cnn_layers(img_in: Input, img_height: int, img_width: int, drop: float, l4_stride: int = 1) -> Layer: """ Returns the core CNN layers that are shared among the different models, like linear, imu, behavioural :param img_width: image width :param img_height: image height :param img_in: input layer of network :param drop: dropout rate :param l4_stride: 4-th layer stride, default 1 :return: stack of CNN layers """ x = img_in x = conv2d(img_height / 5, 5, 2, 1)(x) x = Dropout(drop)(x) x = conv2d(img_width / 5, 5, 2, 2)(x) x = Dropout(drop)(x) x = conv2d(64, 5, 2, 3)(x) x = Dropout(drop)(x) x = conv2d(64, 3, l4_stride, 4)(x) x = Dropout(drop)(x) x = conv2d(64, 3, 1, 5)(x) x = Dropout(drop)(x) x = Flatten(name='flattened')(x) return x def default_linear(input_shape: typing.Tuple[int, int, int] = (120, 160, 3), drop: float = 0.2, output_name: str ='angle_out') -> Model: img_in = Input(shape=input_shape, name='img_in') x = core_cnn_layers(img_in, img_width=input_shape[1], img_height=input_shape[0], drop=drop) x = Dense(100, activation='relu', name='dense_1')(x) x = Dropout(drop)(x) x = Dense(50, activation='relu', name='dense_2')(x) x = Dropout(drop)(x) value_out = Dense(1, activation='linear', name=output_name)(x) model = Model(inputs=[img_in], outputs=[value_out], name='linear') return model def default_categorical(input_shape: typing.Tuple[int, int, int] = (120, 160, 3), drop: float = 0.2, output_name: str ='angle_out', output_bin: int = 15) -> Model: img_in = Input(shape=input_shape, name='img_in') x = core_cnn_layers(img_in, img_width=input_shape[1], img_height=input_shape[0], drop=drop, l4_stride=2) x = Dense(100, activation='relu', name="dense_1")(x) x = Dropout(drop)(x) x = Dense(50, activation='relu', name="dense_2")(x) x = Dropout(drop)(x) # Categorical output of the angle into 15 bins value_out = Dense(output_bin, activation='softmax', name=output_name)(x) model = Model(inputs=[img_in], outputs=[value_out], name='categorical') return model def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--slide_size", type=int, default=0) parser.add_argument("--img_height", type=int, default=120) parser.add_argument("--img_width", type=int, default=160) parser.add_argument("--img_depth", type=int, default=3) parser.add_argument("--horizon", type=int, default=0) parser.add_argument("--batch_size", type=int, default=32) parser.add_argument("--drop", type=float, default=0.2) parser.add_argument("--model_type", type=str, default=MODEL_CATEGORICAL) parser.add_argument("--record_field", type=str, choices=['angle', 'speed_zone'], default='angle') args = parser.parse_args() params = vars(args) train( model_type=params["model_type"], record_field=params["record_field"], batch_size=params["batch_size"], slide_size=params["slide_size"], img_height=params["img_height"], img_width=params["img_width"], img_depth=params["img_depth"], horizon=params["horizon"], drop=params["drop"], ) if __name__ == "__main__": main()