''' utils.py Functions that don't fit anywhere else. ''' from io import BytesIO import os import glob import socket import zipfile import sys import itertools import subprocess import math import random import time import signal import logging from typing import List, Any, Tuple, Union logger = logging.getLogger(__name__) ONE_BYTE_SCALE = 1.0 / 255.0 class EqMemorizedString: """ String that remembers what it was compared against """ def __init__(self, string): self.string = string self.mem = set() def __eq__(self, other): self.mem.add(other) return self.string == other def mem_as_str(self): return ', '.join(self.mem) ''' FILES ''' def most_recent_file(dir_path, ext=''): ''' return the most recent file given a directory path and extension ''' query = dir_path + '/*' + ext newest = min(glob.iglob(query), key=os.path.getctime) return newest def make_dir(path): real_path = os.path.expanduser(path) if not os.path.exists(real_path): os.makedirs(real_path) return real_path def zip_dir(dir_path, zip_path): """ Create and save a zipfile of a one level directory """ file_paths = glob.glob(dir_path + "/*") # create path to search for files. zf = zipfile.ZipFile(zip_path, 'w') dir_name = os.path.basename(dir_path) for p in file_paths: file_name = os.path.basename(p) zf.write(p, arcname=os.path.join(dir_name, file_name)) zf.close() return zip_path ''' BINNING functions to help converte between floating point numbers and categories. ''' def map_range(x, X_min, X_max, Y_min, Y_max): ''' Linear mapping between two ranges of values ''' X_range = X_max - X_min Y_range = Y_max - Y_min XY_ratio = X_range / Y_range y = ((x - X_min) / XY_ratio + Y_min) // 1 return int(y) def map_range_float(x, X_min, X_max, Y_min, Y_max): ''' Same as map_range but supports floats return, rounded to 2 decimal places ''' X_range = X_max - X_min Y_range = Y_max - Y_min XY_ratio = X_range / Y_range y = ((x - X_min) / XY_ratio + Y_min) # print("y= {}".format(y)) return round(y, 2) ''' ANGLES ''' def norm_deg(theta): while theta > 360: theta -= 360 while theta < 0: theta += 360 return theta DEG_TO_RAD = math.pi / 180.0 def deg2rad(theta): return theta * DEG_TO_RAD ''' VECTORS ''' def dist(x1, y1, x2, y2): return math.sqrt(math.pow(x2 - x1, 2) + math.pow(y2 - y1, 2)) ''' NETWORKING ''' def my_ip(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('192.0.0.8', 1027)) return s.getsockname()[0] ''' OTHER ''' def map_frange(x, X_min, X_max, Y_min, Y_max): ''' Linear mapping between two ranges of values ''' X_range = X_max - X_min Y_range = Y_max - Y_min XY_ratio = X_range / Y_range y = ((x - X_min) / XY_ratio + Y_min) return y def merge_two_dicts(x, y): """Given two dicts, merge them into a new dict as a shallow copy.""" z = x.copy() z.update(y) return z def param_gen(params): ''' Accepts a dictionary of parameter options and returns a list of dictionary with the permutations of the parameters. ''' for p in itertools.product(*params.values()): yield dict(zip(params.keys(), p)) def run_shell_command(cmd, cwd=None, timeout=15): proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd) out = [] err = [] try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: kill(proc.pid) for line in proc.stdout.readlines(): out.append(line.decode()) for line in proc.stderr.readlines(): err.append(line) return out, err, proc.pid def kill(proc_id): os.kill(proc_id, signal.SIGINT) def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) """ Timers """ class FPSTimer(object): def __init__(self): self.t = time.time() self.iter = 0 def reset(self): self.t = time.time() self.iter = 0 def on_frame(self): self.iter += 1 if self.iter == 100: e = time.time() print('fps', 100.0 / (e - self.t)) self.t = time.time() self.iter = 0