This repository has been archived on 2022-06-07. You can view files and clone it, but cannot push or open issues or pull requests.
robocar-pca9685-python/donkeycar/utils.py

238 lines
4.3 KiB
Python

'''
utils.py
Functions that don't fit anywhere else.
'''
from io import BytesIO
import os
import glob
import socket
import zipfile
import sys
import itertools
import subprocess
import math
import random
import time
import signal
import logging
from typing import List, Any, Tuple, Union
logger = logging.getLogger(__name__)
ONE_BYTE_SCALE = 1.0 / 255.0
class EqMemorizedString:
""" String that remembers what it was compared against """
def __init__(self, string):
self.string = string
self.mem = set()
def __eq__(self, other):
self.mem.add(other)
return self.string == other
def mem_as_str(self):
return ', '.join(self.mem)
'''
FILES
'''
def most_recent_file(dir_path, ext=''):
'''
return the most recent file given a directory path and extension
'''
query = dir_path + '/*' + ext
newest = min(glob.iglob(query), key=os.path.getctime)
return newest
def make_dir(path):
real_path = os.path.expanduser(path)
if not os.path.exists(real_path):
os.makedirs(real_path)
return real_path
def zip_dir(dir_path, zip_path):
"""
Create and save a zipfile of a one level directory
"""
file_paths = glob.glob(dir_path + "/*") # create path to search for files.
zf = zipfile.ZipFile(zip_path, 'w')
dir_name = os.path.basename(dir_path)
for p in file_paths:
file_name = os.path.basename(p)
zf.write(p, arcname=os.path.join(dir_name, file_name))
zf.close()
return zip_path
'''
BINNING
functions to help converte between floating point numbers and categories.
'''
def map_range(x, X_min, X_max, Y_min, Y_max):
'''
Linear mapping between two ranges of values
'''
X_range = X_max - X_min
Y_range = Y_max - Y_min
XY_ratio = X_range / Y_range
y = ((x - X_min) / XY_ratio + Y_min) // 1
return int(y)
def map_range_float(x, X_min, X_max, Y_min, Y_max):
'''
Same as map_range but supports floats return, rounded to 2 decimal places
'''
X_range = X_max - X_min
Y_range = Y_max - Y_min
XY_ratio = X_range / Y_range
y = ((x - X_min) / XY_ratio + Y_min)
# print("y= {}".format(y))
return round(y, 2)
'''
ANGLES
'''
def norm_deg(theta):
while theta > 360:
theta -= 360
while theta < 0:
theta += 360
return theta
DEG_TO_RAD = math.pi / 180.0
def deg2rad(theta):
return theta * DEG_TO_RAD
'''
VECTORS
'''
def dist(x1, y1, x2, y2):
return math.sqrt(math.pow(x2 - x1, 2) + math.pow(y2 - y1, 2))
'''
NETWORKING
'''
def my_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('192.0.0.8', 1027))
return s.getsockname()[0]
'''
OTHER
'''
def map_frange(x, X_min, X_max, Y_min, Y_max):
'''
Linear mapping between two ranges of values
'''
X_range = X_max - X_min
Y_range = Y_max - Y_min
XY_ratio = X_range / Y_range
y = ((x - X_min) / XY_ratio + Y_min)
return y
def merge_two_dicts(x, y):
"""Given two dicts, merge them into a new dict as a shallow copy."""
z = x.copy()
z.update(y)
return z
def param_gen(params):
'''
Accepts a dictionary of parameter options and returns
a list of dictionary with the permutations of the parameters.
'''
for p in itertools.product(*params.values()):
yield dict(zip(params.keys(), p))
def run_shell_command(cmd, cwd=None, timeout=15):
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd)
out = []
err = []
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
kill(proc.pid)
for line in proc.stdout.readlines():
out.append(line.decode())
for line in proc.stderr.readlines():
err.append(line)
return out, err, proc.pid
def kill(proc_id):
os.kill(proc_id, signal.SIGINT)
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
"""
Timers
"""
class FPSTimer(object):
def __init__(self):
self.t = time.time()
self.iter = 0
def reset(self):
self.t = time.time()
self.iter = 0
def on_frame(self):
self.iter += 1
if self.iter == 100:
e = time.time()
print('fps', 100.0 / (e - self.t))
self.t = time.time()
self.iter = 0