19 Commits

Author SHA1 Message Date
c6f955a50c fix(camera): bad img height configuration 2022-12-25 11:22:06 +01:00
7ebd9093d9 fix(object_detection): add link from image_manip to nn node 2022-11-11 17:16:38 +01:00
642df5b927 add debug logs 2022-11-09 21:04:32 +01:00
c755d019e8 feat(cli): add flag to configure log level 2022-11-09 20:37:20 +01:00
befb4bacb3 fix: bad pipeline configuration 2022-11-05 16:09:30 +01:00
30f9876c1d build: fix docker build (pip index missing) 2022-11-02 16:31:46 +01:00
df8676ae5c fix(dependency): upgrade robocar-protobuf 2022-11-02 16:08:33 +01:00
9c07826898 build: upgrade dependencies 2022-11-02 15:51:16 +01:00
2149a01dd6 build: fix docker build 2022-11-02 15:35:47 +01:00
0db958e936 build: limit files to include into docker image 2022-11-02 13:56:41 +01:00
4faf3c2fee updaqte dockerignore 2022-11-02 13:55:52 +01:00
7c65f87d58 build: merge mypy.ini with pyproject.toml 2022-10-27 16:04:32 +02:00
55d8ce06c6 build: fix all pylint/mypy errors 2022-10-27 16:03:23 +02:00
4daf4d3c23 build: upgrade dependencies 2022-10-27 09:13:41 +02:00
aed8e9f8c2 feat: check typing with mypy 2022-10-27 09:09:04 +02:00
7670b8b01a refactor: fix pylint 2022-10-26 17:32:35 +02:00
6db1afce75 build: use opencv headless flavor 2022-10-26 10:48:13 +02:00
667c6903ef feat(simulator): add simulator source 2022-10-25 16:59:18 +02:00
24e4410c25 refactor: rewrite depthai node management 2022-10-25 16:44:16 +02:00
11 changed files with 4957 additions and 300 deletions

View File

@ -1,5 +1,5 @@
venv venv
dist/ dist/*
build-docker.sh build-docker.sh
Dockerfile Dockerfile

View File

@ -1,7 +1,8 @@
FROM docker.io/library/python:3.10-slim as base FROM docker.io/library/python:3.10-slim as base
# Configure piwheels repo to use pre-compiled numpy wheels for arm # Configure piwheels repo to use pre-compiled numpy wheels for arm
RUN echo -n "[global]\nextra-index-url=https://www.piwheels.org/simple\n" >> /etc/pip.conf RUN echo -n "[global]\n" > /etc/pip.conf &&\
echo -n "extra-index-url = https://www.piwheels.org/simple https://git.cyrilix.bzh/api/packages/robocars/pypi/simple \n" >> /etc/pip.conf
RUN apt-get update && apt-get install -y libgl1 libglib2.0-0 RUN apt-get update && apt-get install -y libgl1 libglib2.0-0
@ -20,7 +21,14 @@ FROM base as builder
RUN apt-get install -y git && \ RUN apt-get install -y git && \
pip3 install poetry==1.2.0 && \ pip3 install poetry==1.2.0 && \
poetry self add "poetry-dynamic-versioning[plugin]" poetry self add "poetry-dynamic-versioning[plugin]"
ADD . .
ADD poetry.lock .
ADD pyproject.toml .
ADD camera camera
ADD README.md .
# Poetry expect to found a git project
ADD .git .git
RUN poetry build RUN poetry build
@ -31,7 +39,7 @@ RUN mkdir /models
COPY --from=model-builder /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob COPY --from=model-builder /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob
COPY --from=builder dist/*.whl /tmp/ COPY --from=builder dist/*.whl /tmp/
RUN pip3 install /tmp/*whl RUN pip3 install /tmp/*.whl
WORKDIR /tmp WORKDIR /tmp
USER 1234 USER 1234

View File

@ -7,6 +7,6 @@ PLATFORM="linux/amd64,linux/arm64"
#PLATFORM="linux/amd64,linux/arm64,linux/arm/v7" #PLATFORM="linux/amd64,linux/arm64,linux/arm/v7"
podman build . --platform "${PLATFORM}" --manifest "${IMAGE_NAME}:${TAG}" podman build . --platform "${PLATFORM}" --manifest "${IMAGE_NAME}:${TAG}"
podman manifest push --all --format v2s2 "localhost/${IMAGE_NAME}:${TAG}" "docker://${FULL_IMAGE_NAME}" podman manifest push --all "localhost/${IMAGE_NAME}:${TAG}" "docker://${FULL_IMAGE_NAME}"
printf "\nImage %s published" "docker://${FULL_IMAGE_NAME}" printf "\nImage %s published" "docker://${FULL_IMAGE_NAME}"

View File

@ -5,13 +5,14 @@ import argparse
import logging import logging
import os import os
import signal import signal
import typing, types
import depthai as dai
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from . import depthai as cam from . import depthai as cam # pylint: disable=reimported
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
_DEFAULT_CLIENT_ID = "robocar-depthai" _DEFAULT_CLIENT_ID = "robocar-depthai"
@ -50,11 +51,15 @@ def _parse_args_cli() -> argparse.Namespace:
parser.add_argument("-W", "--image-width", help="image width", parser.add_argument("-W", "--image-width", help="image width",
type=int, type=int,
default=_get_env_int_value("IMAGE_WIDTH", 126)) default=_get_env_int_value("IMAGE_WIDTH", 126))
parser.add_argument("--log", help="Log level",
type=str,
default="info",
choices=["info", "debug"])
args = parser.parse_args() args = parser.parse_args()
return args return args
def _init_mqtt_client(broker_host: str, broker_port, user: str, password: str, client_id: str) -> mqtt.Client: def _init_mqtt_client(broker_host: str, broker_port: int, user: str, password: str, client_id: str) -> mqtt.Client:
logger.info("Start part.py-robocar-oak-camera") logger.info("Start part.py-robocar-oak-camera")
client = mqtt.Client(client_id=client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311) client = mqtt.Client(client_id=client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311)
@ -70,9 +75,12 @@ def execute_from_command_line() -> None:
Cli entrypoint Cli entrypoint
:return: :return:
""" """
logging.basicConfig(level=logging.INFO)
args = _parse_args_cli() args = _parse_args_cli()
if args.log == "info":
logging.basicConfig(level=logging.INFO)
elif args.log == "debug":
logging.basicConfig(level=logging.DEBUG)
client = _init_mqtt_client(broker_host=args.mqtt_broker_host, client = _init_mqtt_client(broker_host=args.mqtt_broker_host,
broker_port=args.mqtt_broker_port, broker_port=args.mqtt_broker_port,
@ -85,12 +93,18 @@ def execute_from_command_line() -> None:
objects_topic=args.mqtt_topic_robocar_objects, objects_topic=args.mqtt_topic_robocar_objects,
objects_threshold=args.objects_threshold) objects_threshold=args.objects_threshold)
pipeline_controller = cam.PipelineController(img_width=args.image_width, pipeline = dai.Pipeline()
img_height=args.image_height, pipeline_controller = cam.PipelineController(pipeline=pipeline,
frame_processor=frame_processor, frame_processor=frame_processor,
object_processor=object_processor) object_processor=object_processor,
object_node=cam.ObjectDetectionNN(pipeline=pipeline),
camera=cam.CameraSource(pipeline=pipeline,
img_width=args.image_width,
img_height=args.image_height,
))
def sigterm_handler(): def sigterm_handler(signum: int, frame: typing.Optional[
types.FrameType]) -> None: # pylint: disable=unused-argument # need to implement handler signature
logger.info("exit on SIGTERM") logger.info("exit on SIGTERM")
pipeline_controller.stop() pipeline_controller.stop()

View File

@ -1,16 +1,20 @@
""" """
Camera event loop Camera event loop
""" """
import abc
import datetime import datetime
import logging import logging
import pathlib
import typing import typing
from dataclasses import dataclass
import cv2 import cv2
import depthai as dai import depthai as dai
import events.events_pb2 as evt
import numpy as np import numpy as np
import numpy.typing as npt
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from depthai import Device
import events.events_pb2
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,7 +33,7 @@ class ObjectProcessor:
self._objects_topic = objects_topic self._objects_topic = objects_topic
self._objects_threshold = objects_threshold self._objects_threshold = objects_threshold
def process(self, in_nn: dai.NNData, frame_ref) -> None: def process(self, in_nn: dai.NNData, frame_ref: evt.FrameRef) -> None:
""" """
Parse and publish result of NeuralNetwork result Parse and publish result of NeuralNetwork result
:param in_nn: NeuralNetwork result read from device :param in_nn: NeuralNetwork result read from device
@ -46,9 +50,8 @@ class ObjectProcessor:
if boxes.shape[0] > 0: if boxes.shape[0] > 0:
self._publish_objects(boxes, frame_ref, scores) self._publish_objects(boxes, frame_ref, scores)
def _publish_objects(self, boxes: np.array, frame_ref, scores: np.array) -> None: def _publish_objects(self, boxes: npt.NDArray[np.float64], frame_ref: evt.FrameRef, scores: npt.NDArray[np.float64]) -> None:
objects_msg = evt.ObjectsMessage()
objects_msg = events.events_pb2.ObjectsMessage()
objs = [] objs = []
for i in range(boxes.shape[0]): for i in range(boxes.shape[0]):
logger.debug("new object detected: %s", str(boxes[i])) logger.debug("new object detected: %s", str(boxes[i]))
@ -91,7 +94,7 @@ class FrameProcessor:
def process(self, img: dai.ImgFrame) -> typing.Any: def process(self, img: dai.ImgFrame) -> typing.Any:
""" """
Publish camera frames Publish camera frames
:param img: :param img: image read from camera
:return: :return:
id frame reference id frame reference
:raise: :raise:
@ -104,7 +107,7 @@ class FrameProcessor:
byte_im = im_buf_arr.tobytes() byte_im = im_buf_arr.tobytes()
now = datetime.datetime.now() now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage() frame_msg = evt.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak" frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000)) frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now) frame_msg.id.created_at.FromDatetime(now)
@ -117,73 +120,195 @@ class FrameProcessor:
return frame_msg.id return frame_msg.id
class Source(abc.ABC):
"""Base class for image source"""
@abc.abstractmethod
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: steam name
"""
@abc.abstractmethod
def link(self, input_node: dai.Node.Input) -> None:
"""
Link this source to the input node
:param: input_node: input node to link
"""
class ObjectDetectionNN:
"""
Node to detect objects into image
Read image as input and apply resize transformation before to run NN on it
Result is available with 'get_stream_name()' stream
"""
def __init__(self, pipeline: dai.Pipeline):
# Define a neural network that will make predictions based on the source frames
detection_nn = pipeline.createNeuralNetwork()
detection_nn.setBlobPath(pathlib.Path(_NN_PATH))
detection_nn.setNumPoolFrames(4)
detection_nn.input.setBlocking(False)
detection_nn.setNumInferenceThreads(2)
self._detection_nn = detection_nn
self._xout = self._configure_xout_nn(pipeline)
self._detection_nn.out.link(self._xout.input)
self._manip_image = self._configure_manip(pipeline)
self._manip_image.out.link(self._detection_nn.input)
@staticmethod
def _configure_manip(pipeline: dai.Pipeline) -> dai.node.ImageManip:
# Resize image
manip = pipeline.createImageManip()
manip.initialConfig.setResize(_NN_WIDTH, _NN_HEIGHT)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
manip.initialConfig.setKeepAspectRatio(False)
return manip
@staticmethod
def _configure_xout_nn(pipeline: dai.Pipeline) -> dai.node.XLinkOut:
xout_nn = pipeline.createXLinkOut()
xout_nn.setStreamName("nn")
xout_nn.input.setBlocking(False)
return xout_nn
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: stream name
"""
return self._xout.getStreamName()
def get_input(self) -> dai.Node.Input:
"""
Get input node to use to link with source node
:return: input to link with source output, see Source.link()
"""
return self._manip_image.inputImage
class CameraSource(Source):
"""Image source based on camera preview"""
def __init__(self, pipeline: dai.Pipeline, img_width: int, img_height: int):
self._cam_rgb = pipeline.createColorCamera()
self._xout_rgb = pipeline.createXLinkOut()
self._xout_rgb.setStreamName("rgb")
# Properties
self._cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
self._cam_rgb.setPreviewSize(width=img_width, height=img_height)
self._cam_rgb.setInterleaved(False)
self._cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
self._cam_rgb.setFps(30)
# link camera preview to output
self._cam_rgb.preview.link(self._xout_rgb.input)
def link(self, input_node: dai.Node.Input) -> None:
self._cam_rgb.preview.link(input_node)
def get_stream_name(self) -> str:
return self._xout_rgb.getStreamName()
@dataclass
class MqttConfig:
"""MQTT configuration"""
host: str
topic: str
port: int = 1883
qos: int = 0
class MqttSource(Source):
"""Image source based onto mqtt stream"""
def __init__(self, device: Device, pipeline: dai.Pipeline, mqtt_config: MqttConfig):
self._mqtt_config = mqtt_config
self._client = mqtt.Client()
self._client.user_data_set(mqtt_config)
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._img_in = pipeline.createXLinkIn()
self._img_in.setStreamName("img_input")
self._img_out = pipeline.createXLinkOut()
self._img_out.setStreamName("img_output")
self._img_in.out.link(self._img_out.input)
self._img_in_queue = device.getInputQueue(self._img_in.getStreamName())
def run(self) -> None:
""" Connect and start mqtt loop """
self._client.connect(host=self._mqtt_config.host, port=self._mqtt_config.port)
self._client.loop_start()
def stop(self) -> None:
"""Stop and disconnect mqtt loop"""
self._client.loop_stop()
self._client.disconnect()
@staticmethod
# pylint: disable=unused-argument
def _on_connect(client: mqtt.Client, userdata: MqttConfig, flags: typing.Any,
result_connection: typing.Any) -> None:
# if we lose the connection and reconnect then subscriptions will be renewed.
client.subscribe(topic=userdata.topic, qos=userdata.qos)
# pylint: disable=unused-argument
def _on_message(self, _: mqtt.Client, user_data: MqttConfig, msg: mqtt.MQTTMessage) -> None:
frame_msg = evt.FrameMessage()
frame_msg.ParseFromString(msg.payload)
frame = np.asarray(frame_msg.frame, dtype="uint8")
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
nn_data = dai.NNData()
nn_data.setLayer("data", _to_planar(frame, (300, 300)))
self._img_in_queue.send(nn_data)
def get_stream_name(self) -> str:
return self._img_out.getStreamName()
def link(self, input_node: dai.Node.Input) -> None:
self._img_in.out.link(input_node)
def _to_planar(arr: npt.NDArray[np.uint8], shape: tuple[int, int]) -> list[int]:
return [val for channel in cv2.resize(arr, shape).transpose(2, 0, 1) for y_col in channel for val in y_col]
class PipelineController: class PipelineController:
""" """
Pipeline controller that drive camera device Pipeline controller that drive camera device
""" """
def __init__(self, img_width: int, img_height: int, frame_processor: FrameProcessor, def __init__(self, frame_processor: FrameProcessor,
object_processor: ObjectProcessor): object_processor: ObjectProcessor, camera: Source, object_node: ObjectDetectionNN,
self._img_width = img_width pipeline: dai.Pipeline):
self._img_height = img_height
self._pipeline = self._configure_pipeline()
self._frame_processor = frame_processor self._frame_processor = frame_processor
self._object_processor = object_processor self._object_processor = object_processor
self._camera = camera
self._object_node = object_node
self._stop = False self._stop = False
self._pipeline = pipeline
self._configure_pipeline()
def _configure_pipeline(self) -> dai.Pipeline: def _configure_pipeline(self) -> None:
logger.info("configure pipeline") logger.info("configure pipeline")
pipeline = dai.Pipeline()
pipeline.setOpenVINOVersion(version=dai.OpenVINO.VERSION_2021_4) self._pipeline.setOpenVINOVersion(version=dai.OpenVINO.VERSION_2021_4)
detection_nn = self._configure_detection_nn(pipeline)
xout_nn = self._configure_xout_nn(pipeline)
# Resize image
manip = pipeline.create(dai.node.ImageManip)
manip.initialConfig.setResize(_NN_WIDTH, _NN_HEIGHT)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
manip.initialConfig.setKeepAspectRatio(False)
cam_rgb = pipeline.create(dai.node.ColorCamera)
xout_rgb = pipeline.create(dai.node.XLinkOut)
xout_rgb.setStreamName("rgb")
# Properties
cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
cam_rgb.setPreviewSize(width=self._img_width, height=self._img_height)
cam_rgb.setInterleaved(False)
cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
cam_rgb.setFps(30)
# Link preview to manip and manip to nn # Link preview to manip and manip to nn
cam_rgb.preview.link(manip.inputImage) self._camera.link(self._object_node.get_input())
manip.out.link(detection_nn.input)
# Linking to output
cam_rgb.preview.link(xout_rgb.input)
detection_nn.out.link(xout_nn.input)
logger.info("pipeline configured") logger.info("pipeline configured")
return pipeline
@staticmethod
def _configure_xout_nn(pipeline: dai.Pipeline) -> dai.node.XLinkOut:
xout_nn = pipeline.create(dai.node.XLinkOut)
xout_nn.setStreamName("nn")
xout_nn.input.setBlocking(False)
return xout_nn
@staticmethod
def _configure_detection_nn(pipeline: dai.Pipeline) -> dai.node.NeuralNetwork:
# Define a neural network that will make predictions based on the source frames
detection_nn = pipeline.create(dai.node.NeuralNetwork)
detection_nn.setBlobPath(_NN_PATH)
detection_nn.setNumPoolFrames(4)
detection_nn.input.setBlocking(False)
detection_nn.setNumInferenceThreads(2)
return detection_nn
def run(self) -> None: def run(self) -> None:
""" """
@ -191,17 +316,19 @@ class PipelineController:
:return: :return:
""" """
# Connect to device and start pipeline # Connect to device and start pipeline
with dai.Device(self._pipeline) as device: with Device(pipeline=self._pipeline) as dev:
logger.info('MxId: %s', device.getDeviceInfo().getMxId()) logger.info('MxId: %s', dev.getDeviceInfo().getMxId())
logger.info('USB speed: %s', device.getUsbSpeed()) logger.info('USB speed: %s', dev.getUsbSpeed())
logger.info('Connected cameras: %s', device.getConnectedCameras()) logger.info('Connected cameras: %s', str(dev.getConnectedCameras()))
logger.info("output queues found: %s", device.getOutputQueueNames()) logger.info("output queues found: %s", str(''.join(dev.getOutputQueueNames()))) # type: ignore
device.startPipeline() dev.startPipeline()
# Queues # Queues
queue_size = 4 queue_size = 4
q_rgb = device.getOutputQueue(name="rgb", maxSize=queue_size, blocking=False) q_rgb = dev.getOutputQueue(name=self._camera.get_stream_name(), maxSize=queue_size, # type: ignore
q_nn = device.getOutputQueue(name="nn", maxSize=queue_size, blocking=False) blocking=False)
q_nn = dev.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, # type: ignore
blocking=False)
self._stop = False self._stop = False
while True: while True:
@ -214,20 +341,27 @@ class PipelineController:
except Exception as ex: except Exception as ex:
logger.exception("unexpected error: %s", str(ex)) logger.exception("unexpected error: %s", str(ex))
def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue): def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue) -> None:
logger.debug("wait for new frame") logger.debug("wait for new frame")
# Wait for frame # Wait for frame
in_rgb: dai.ImgFrame = q_rgb.get() # blocking call, will wait until a new data has arrived in_rgb: dai.ImgFrame = q_rgb.get() # type: ignore # blocking call, will wait until a new data has arrived
try: try:
logger.debug("process frame")
frame_ref = self._frame_processor.process(in_rgb) frame_ref = self._frame_processor.process(in_rgb)
except FrameProcessError as ex: except FrameProcessError as ex:
logger.error("unable to process frame: %s", str(ex)) logger.error("unable to process frame: %s", str(ex))
# Read NN result return
in_nn: dai.NNData = q_nn.get() logger.debug("frame processed")
self._object_processor.process(in_nn, frame_ref)
def stop(self): logger.debug("wait for nn response")
# Read NN result
in_nn: dai.NNData = q_nn.get() # type: ignore
logger.debug("process objects")
self._object_processor.process(in_nn, frame_ref)
logger.debug("objects processed")
def stop(self) -> None:
""" """
Stop event loop, if loop is not running, do nothing Stop event loop, if loop is not running, do nothing
:return: :return:
@ -235,9 +369,9 @@ class PipelineController:
self._stop = True self._stop = True
def _bbox_to_object(bbox: np.array, score: float) -> events.events_pb2.Object: def _bbox_to_object(bbox: npt.NDArray[np.float64], score: float) -> evt.Object:
obj = events.events_pb2.Object() obj = evt.Object()
obj.type = events.events_pb2.TypeObject.ANY obj.type = evt.TypeObject.ANY
obj.top = bbox[0].astype(float) obj.top = bbox[0].astype(float)
obj.right = bbox[3].astype(float) obj.right = bbox[3].astype(float)
obj.bottom = bbox[2].astype(float) obj.bottom = bbox[2].astype(float)

View File

@ -1,26 +1,28 @@
import datetime import datetime
import typing
import unittest.mock import unittest.mock
import depthai as dai import depthai as dai
import events.events_pb2
import numpy as np import numpy as np
import numpy.typing as npt
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import pytest import pytest
import pytest_mock import pytest_mock
import camera.depthai import camera.depthai
import events.events_pb2
Object = dict[str, float] Object = dict[str, float]
@pytest.fixture @pytest.fixture
def mqtt_client(mocker: pytest_mock.MockerFixture) -> mqtt.Client: def mqtt_client(mocker: pytest_mock.MockerFixture) -> mqtt.Client:
return mocker.MagicMock() return mocker.MagicMock() # type: ignore
class TestObjectProcessor: class TestObjectProcessor:
@pytest.fixture @pytest.fixture
def frame_ref(self): def frame_ref(self) -> events.events_pb2.FrameRef:
now = datetime.datetime.now() now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage() frame_msg = events.events_pb2.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak" frame_msg.id.name = "robocar-oak-camera-oak"
@ -42,7 +44,7 @@ class TestObjectProcessor:
def raw_objects_empty(self, mocker: pytest_mock.MockerFixture) -> dai.NNData: def raw_objects_empty(self, mocker: pytest_mock.MockerFixture) -> dai.NNData:
raw_objects = mocker.MagicMock() raw_objects = mocker.MagicMock()
def mock_return(name): def mock_return(name: str) -> typing.List[typing.Union[int, typing.List[int]]]:
if name == "ExpandDims": if name == "ExpandDims":
return [[0] * 4] * 100 return [[0] * 4] * 100
elif name == "ExpandDims_2": elif name == "ExpandDims_2":
@ -56,14 +58,14 @@ class TestObjectProcessor:
@pytest.fixture @pytest.fixture
def raw_objects_one(self, mocker: pytest_mock.MockerFixture, object1: Object) -> dai.NNData: def raw_objects_one(self, mocker: pytest_mock.MockerFixture, object1: Object) -> dai.NNData:
def mock_return(name): def mock_return(name: str) -> typing.Union[npt.NDArray[np.int64], typing.List[float]]:
if name == "ExpandDims": # Detection boxes if name == "ExpandDims": # Detection boxes
boxes = [[0] * 4] * 100 boxes: list[list[float]] = [[0.] * 4] * 100
boxes[0] = [object1["top"], object1["left"], object1["bottom"], object1["right"]] boxes[0] = [object1["top"], object1["left"], object1["bottom"], object1["right"]]
return np.array(boxes) return np.array(boxes)
elif name == "ExpandDims_2": # Detection scores elif name == "ExpandDims_2": # Detection scores
scores = [0] * 100 scores: list[float] = [0.] * 100
scores[0] = object1["score"] scores[0] = object1["score"]
return scores return scores
else: else:
@ -77,20 +79,24 @@ class TestObjectProcessor:
def object_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.ObjectProcessor: def object_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.ObjectProcessor:
return camera.depthai.ObjectProcessor(mqtt_client, "topic/object", 0.2) return camera.depthai.ObjectProcessor(mqtt_client, "topic/object", 0.2)
def test_process_without_object(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client, def test_process_without_object(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client,
raw_objects_empty, frame_ref): raw_objects_empty: dai.NNData, frame_ref: events.events_pb2.FrameRef) -> None:
object_processor.process(raw_objects_empty, frame_ref) object_processor.process(raw_objects_empty, frame_ref)
mqtt_client.publish.assert_not_called() publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
publish_mock.assert_not_called()
def test_process_with_object_with_low_score(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client, def test_process_with_object_with_low_score(self, object_processor: camera.depthai.ObjectProcessor,
raw_objects_one, frame_ref): mqtt_client: mqtt.Client, raw_objects_one: dai.NNData,
frame_ref: events.events_pb2.FrameRef) -> None:
object_processor._objects_threshold = 0.9 object_processor._objects_threshold = 0.9
object_processor.process(raw_objects_one, frame_ref) object_processor.process(raw_objects_one, frame_ref)
mqtt_client.publish.assert_not_called() publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
publish_mock.assert_not_called()
def test_process_with_one_object(self, def test_process_with_one_object(self,
object_processor: camera.depthai.ObjectProcessor, mqtt_client, object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client,
raw_objects_one, frame_ref, object1: Object): raw_objects_one: dai.NNData, frame_ref: events.events_pb2.FrameRef,
object1: Object) -> None:
object_processor.process(raw_objects_one, frame_ref) object_processor.process(raw_objects_one, frame_ref)
left = object1["left"] left = object1["left"]
right = object1["right"] right = object1["right"]
@ -98,7 +104,7 @@ class TestObjectProcessor:
bottom = object1["bottom"] bottom = object1["bottom"]
score = object1["score"] score = object1["score"]
pub_mock: unittest.mock.MagicMock = mqtt_client.publish pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/object") pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/object")
payload = pub_mock.call_args.kwargs['payload'] payload = pub_mock.call_args.kwargs['payload']
objects_msg = events.events_pb2.ObjectsMessage() objects_msg = events.events_pb2.ObjectsMessage()
@ -118,13 +124,13 @@ class TestFrameProcessor:
return camera.depthai.FrameProcessor(mqtt_client, "topic/frame") return camera.depthai.FrameProcessor(mqtt_client, "topic/frame")
def test_process(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture, def test_process(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture,
mqtt_client: mqtt.Client): mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock() img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content")) mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
frame_ref = frame_processor.process(img) frame_ref = frame_processor.process(img)
pub_mock: unittest.mock.MagicMock = mqtt_client.publish pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/frame") pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/frame")
payload = pub_mock.call_args.kwargs['payload'] payload = pub_mock.call_args.kwargs['payload']
frame_msg = events.events_pb2.FrameMessage() frame_msg = events.events_pb2.FrameMessage()
@ -140,7 +146,7 @@ class TestFrameProcessor:
milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10) milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10)
def test_process_error(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture, def test_process_error(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture,
mqtt_client: mqtt.Client): mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock() img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (False, None) mocker.patch(target="cv2.imencode").return_value = (False, None)

3627
cv2-stubs/__init__.pyi Normal file

File diff suppressed because one or more lines are too long

View File

View File

@ -1,53 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: events/events.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x65vents/events.proto\x12\x0erobocar.events\x1a\x1fgoogle/protobuf/timestamp.proto\"T\n\x08\x46rameRef\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\n\n\x02id\x18\x02 \x01(\t\x12.\n\ncreated_at\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\"C\n\x0c\x46rameMessage\x12$\n\x02id\x18\x01 \x01(\x0b\x32\x18.robocar.events.FrameRef\x12\r\n\x05\x66rame\x18\x02 \x01(\x0c\"d\n\x0fSteeringMessage\x12\x10\n\x08steering\x18\x01 \x01(\x02\x12\x12\n\nconfidence\x18\x02 \x01(\x02\x12+\n\tframe_ref\x18\x03 \x01(\x0b\x32\x18.robocar.events.FrameRef\"d\n\x0fThrottleMessage\x12\x10\n\x08throttle\x18\x01 \x01(\x02\x12\x12\n\nconfidence\x18\x02 \x01(\x02\x12+\n\tframe_ref\x18\x03 \x01(\x0b\x32\x18.robocar.events.FrameRef\"A\n\x10\x44riveModeMessage\x12-\n\ndrive_mode\x18\x01 \x01(\x0e\x32\x19.robocar.events.DriveMode\"f\n\x0eObjectsMessage\x12\'\n\x07objects\x18\x01 \x03(\x0b\x32\x16.robocar.events.Object\x12+\n\tframe_ref\x18\x02 \x01(\x0b\x32\x18.robocar.events.FrameRef\"\x80\x01\n\x06Object\x12(\n\x04type\x18\x01 \x01(\x0e\x32\x1a.robocar.events.TypeObject\x12\x0c\n\x04left\x18\x02 \x01(\x02\x12\x0b\n\x03top\x18\x03 \x01(\x02\x12\r\n\x05right\x18\x04 \x01(\x02\x12\x0e\n\x06\x62ottom\x18\x05 \x01(\x02\x12\x12\n\nconfidence\x18\x06 \x01(\x02\"&\n\x13SwitchRecordMessage\x12\x0f\n\x07\x65nabled\x18\x01 \x01(\x08\"\x8c\x01\n\x0bRoadMessage\x12&\n\x07\x63ontour\x18\x01 \x03(\x0b\x32\x15.robocar.events.Point\x12(\n\x07\x65llipse\x18\x02 \x01(\x0b\x32\x17.robocar.events.Ellipse\x12+\n\tframe_ref\x18\x03 \x01(\x0b\x32\x18.robocar.events.FrameRef\"\x1d\n\x05Point\x12\t\n\x01x\x18\x01 \x01(\x05\x12\t\n\x01y\x18\x02 \x01(\x05\"r\n\x07\x45llipse\x12%\n\x06\x63\x65nter\x18\x01 \x01(\x0b\x32\x15.robocar.events.Point\x12\r\n\x05width\x18\x02 \x01(\x05\x12\x0e\n\x06height\x18\x03 \x01(\x05\x12\r\n\x05\x61ngle\x18\x04 \x01(\x02\x12\x12\n\nconfidence\x18\x05 \x01(\x02\"\x82\x01\n\rRecordMessage\x12+\n\x05\x66rame\x18\x01 \x01(\x0b\x32\x1c.robocar.events.FrameMessage\x12\x31\n\x08steering\x18\x02 \x01(\x0b\x32\x1f.robocar.events.SteeringMessage\x12\x11\n\trecordSet\x18\x03 \x01(\t*-\n\tDriveMode\x12\x0b\n\x07INVALID\x10\x00\x12\x08\n\x04USER\x10\x01\x12\t\n\x05PILOT\x10\x02*2\n\nTypeObject\x12\x07\n\x03\x41NY\x10\x00\x12\x07\n\x03\x43\x41R\x10\x01\x12\x08\n\x04\x42UMP\x10\x02\x12\x08\n\x04PLOT\x10\x03\x42\nZ\x08./eventsb\x06proto3')
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'events.events_pb2', globals())
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'Z\010./events'
_DRIVEMODE._serialized_start=1196
_DRIVEMODE._serialized_end=1241
_TYPEOBJECT._serialized_start=1243
_TYPEOBJECT._serialized_end=1293
_FRAMEREF._serialized_start=72
_FRAMEREF._serialized_end=156
_FRAMEMESSAGE._serialized_start=158
_FRAMEMESSAGE._serialized_end=225
_STEERINGMESSAGE._serialized_start=227
_STEERINGMESSAGE._serialized_end=327
_THROTTLEMESSAGE._serialized_start=329
_THROTTLEMESSAGE._serialized_end=429
_DRIVEMODEMESSAGE._serialized_start=431
_DRIVEMODEMESSAGE._serialized_end=496
_OBJECTSMESSAGE._serialized_start=498
_OBJECTSMESSAGE._serialized_end=600
_OBJECT._serialized_start=603
_OBJECT._serialized_end=731
_SWITCHRECORDMESSAGE._serialized_start=733
_SWITCHRECORDMESSAGE._serialized_end=771
_ROADMESSAGE._serialized_start=774
_ROADMESSAGE._serialized_end=914
_POINT._serialized_start=916
_POINT._serialized_end=945
_ELLIPSE._serialized_start=947
_ELLIPSE._serialized_end=1061
_RECORDMESSAGE._serialized_start=1064
_RECORDMESSAGE._serialized_end=1194
# @@protoc_insertion_point(module_scope)

1176
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,18 +6,18 @@ authors = ["Cyrille Nofficial <cynoffic@cyrilix.fr>"]
readme = "README.md" readme = "README.md"
packages = [ packages = [
{ include = "camera" }, { include = "camera" },
{ include = "events" },
] ]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.10" python = "^3.10"
paho-mqtt = "^1.6.1" paho-mqtt = "^1.6.1"
depthai = "^2.17.4.0" depthai = "^2.19.0"
protobuf3 = "^0.2.1" protobuf3 = "^0.2.1"
google = "^3.0.0" google = "^3.0.0"
opencv-python = "^4.6.0.66"
blobconverter = "^1.3.0" blobconverter = "^1.3.0"
protobuf = "^4.21.8" protobuf = "^4.21.8"
opencv-python-headless = "^4.6.0.66"
robocar-protobuf = {version = "^1.1.2", source = "robocar"}
[tool.poetry.group.test.dependencies] [tool.poetry.group.test.dependencies]
@ -27,6 +27,16 @@ pytest-mock = "^3.10.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pylint = "^2.15.4" pylint = "^2.15.4"
mypy = "^0.982"
types-paho-mqtt = "^1.6.0.1"
types-protobuf = "^3.20.4.2"
[[tool.poetry.source]]
name = "robocar"
url = "https://git.cyrilix.bzh/api/packages/robocars/pypi/simple"
default = false
secondary = false
[build-system] [build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"] requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"]
@ -41,3 +51,8 @@ style = 'semver'
vcs = 'git' vcs = 'git'
dirty = true dirty = true
bump = true bump = true
[tool.mypy]
strict = true
warn_unused_configs = true
plugins = 'numpy.typing.mypy_plugin'