build: fix all pylint/mypy errors

This commit is contained in:
Cyrille Nofficial 2022-10-27 10:34:04 +02:00
parent 4daf4d3c23
commit 55d8ce06c6
5 changed files with 68 additions and 49 deletions

View File

@ -5,10 +5,12 @@ import argparse
import logging import logging
import os import os
import signal import signal
import typing, types
import depthai as dai
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from . import depthai as cam from . import depthai as cam # pylint: disable=reimported
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@ -54,7 +56,7 @@ def _parse_args_cli() -> argparse.Namespace:
return args return args
def _init_mqtt_client(broker_host: str, broker_port, user: str, password: str, client_id: str) -> mqtt.Client: def _init_mqtt_client(broker_host: str, broker_port: int, user: str, password: str, client_id: str) -> mqtt.Client:
logger.info("Start part.py-robocar-oak-camera") logger.info("Start part.py-robocar-oak-camera")
client = mqtt.Client(client_id=client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311) client = mqtt.Client(client_id=client_id, clean_session=True, userdata=None, protocol=mqtt.MQTTv311)
@ -85,10 +87,17 @@ def execute_from_command_line() -> None:
objects_topic=args.mqtt_topic_robocar_objects, objects_topic=args.mqtt_topic_robocar_objects,
objects_threshold=args.objects_threshold) objects_threshold=args.objects_threshold)
pipeline = dai.Pipeline()
pipeline_controller = cam.PipelineController(frame_processor=frame_processor, pipeline_controller = cam.PipelineController(frame_processor=frame_processor,
object_processor=object_processor) object_processor=object_processor,
object_node=cam.ObjectDetectionNN(pipeline=pipeline),
camera=cam.CameraSource(pipeline=pipeline,
img_width=args.image_width,
img_height=args.image_width,
))
def sigterm_handler(): def sigterm_handler(signum: int, frame: typing.Optional[
types.FrameType]) -> None: # pylint: disable=unused-argument # need to implement handler signature
logger.info("exit on SIGTERM") logger.info("exit on SIGTERM")
pipeline_controller.stop() pipeline_controller.stop()

View File

@ -4,15 +4,17 @@ Camera event loop
import abc import abc
import datetime import datetime
import logging import logging
import pathlib
import typing import typing
from dataclasses import dataclass from dataclasses import dataclass
import cv2 import cv2
import depthai as dai import depthai as dai
import events.events_pb2 import events.events_pb2 as evt
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from depthai import Device
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -31,7 +33,7 @@ class ObjectProcessor:
self._objects_topic = objects_topic self._objects_topic = objects_topic
self._objects_threshold = objects_threshold self._objects_threshold = objects_threshold
def process(self, in_nn: dai.NNData, frame_ref) -> None: def process(self, in_nn: dai.NNData, frame_ref: evt.FrameRef) -> None:
""" """
Parse and publish result of NeuralNetwork result Parse and publish result of NeuralNetwork result
:param in_nn: NeuralNetwork result read from device :param in_nn: NeuralNetwork result read from device
@ -48,8 +50,8 @@ class ObjectProcessor:
if boxes.shape[0] > 0: if boxes.shape[0] > 0:
self._publish_objects(boxes, frame_ref, scores) self._publish_objects(boxes, frame_ref, scores)
def _publish_objects(self, boxes: npt.NDArray[np.float64], frame_ref, scores: npt.NDArray[np.float64]) -> None: def _publish_objects(self, boxes: npt.NDArray[np.float64], frame_ref: evt.FrameRef, scores: npt.NDArray[np.float64]) -> None:
objects_msg = events.events_pb2.ObjectsMessage() objects_msg = evt.ObjectsMessage()
objs = [] objs = []
for i in range(boxes.shape[0]): for i in range(boxes.shape[0]):
logger.debug("new object detected: %s", str(boxes[i])) logger.debug("new object detected: %s", str(boxes[i]))
@ -105,7 +107,7 @@ class FrameProcessor:
byte_im = im_buf_arr.tobytes() byte_im = im_buf_arr.tobytes()
now = datetime.datetime.now() now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage() frame_msg = evt.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak" frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000)) frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now) frame_msg.id.created_at.FromDatetime(now)
@ -149,7 +151,7 @@ class ObjectDetectionNN:
def __init__(self, pipeline: dai.Pipeline): def __init__(self, pipeline: dai.Pipeline):
# Define a neural network that will make predictions based on the source frames # Define a neural network that will make predictions based on the source frames
detection_nn = pipeline.createNeuralNetwork() detection_nn = pipeline.createNeuralNetwork()
detection_nn.setBlobPath(_NN_PATH) detection_nn.setBlobPath(pathlib.Path(_NN_PATH))
detection_nn.setNumPoolFrames(4) detection_nn.setNumPoolFrames(4)
detection_nn.input.setBlocking(False) detection_nn.input.setBlocking(False)
detection_nn.setNumInferenceThreads(2) detection_nn.setNumInferenceThreads(2)
@ -230,7 +232,7 @@ class MqttConfig:
class MqttSource(Source): class MqttSource(Source):
"""Image source based onto mqtt stream""" """Image source based onto mqtt stream"""
def __init__(self, device: dai.Device, pipeline: dai.Pipeline, mqtt_config: MqttConfig): def __init__(self, device: Device, pipeline: dai.Pipeline, mqtt_config: MqttConfig):
self._mqtt_config = mqtt_config self._mqtt_config = mqtt_config
self._client = mqtt.Client() self._client = mqtt.Client()
self._client.user_data_set(mqtt_config) self._client.user_data_set(mqtt_config)
@ -264,7 +266,7 @@ class MqttSource(Source):
# pylint: disable=unused-argument # pylint: disable=unused-argument
def _on_message(self, _: mqtt.Client, user_data: MqttConfig, msg: mqtt.MQTTMessage) -> None: def _on_message(self, _: mqtt.Client, user_data: MqttConfig, msg: mqtt.MQTTMessage) -> None:
frame_msg = events.events_pb2.FrameMessage() frame_msg = evt.FrameMessage()
frame_msg.ParseFromString(msg.payload) frame_msg.ParseFromString(msg.payload)
frame = np.asarray(frame_msg.frame, dtype="uint8") frame = np.asarray(frame_msg.frame, dtype="uint8")
@ -276,11 +278,11 @@ class MqttSource(Source):
def get_stream_name(self) -> str: def get_stream_name(self) -> str:
return self._img_out.getStreamName() return self._img_out.getStreamName()
def link(self, input_node: dai.Node.Input): def link(self, input_node: dai.Node.Input) -> None:
self._img_in.out.link(input_node) self._img_in.out.link(input_node)
def _to_planar(arr: npt.NDArray[int], shape: tuple[int, int]) -> list[int]: def _to_planar(arr: npt.NDArray[np.uint8], shape: tuple[int, int]) -> list[int]:
return [val for channel in cv2.resize(arr, shape).transpose(2, 0, 1) for y_col in channel for val in y_col] return [val for channel in cv2.resize(arr, shape).transpose(2, 0, 1) for y_col in channel for val in y_col]
@ -316,17 +318,19 @@ class PipelineController:
:return: :return:
""" """
# Connect to device and start pipeline # Connect to device and start pipeline
with dai.Device(pipeline=self._pipeline) as device: with Device(pipeline=self._pipeline) as dev:
logger.info('MxId: %s', device.getDeviceInfo().getMxId()) logger.info('MxId: %s', dev.getDeviceInfo().getMxId())
logger.info('USB speed: %s', device.getUsbSpeed()) logger.info('USB speed: %s', dev.getUsbSpeed())
logger.info('Connected cameras: %s', str(device.getConnectedCameras())) logger.info('Connected cameras: %s', str(dev.getConnectedCameras()))
logger.info("output queues found: %s", str(device.getOutputQueueNames())) logger.info("output queues found: %s", str(''.join(dev.getOutputQueueNames()))) # type: ignore
device.startPipeline() dev.startPipeline()
# Queues # Queues
queue_size = 4 queue_size = 4
q_rgb = device.getOutputQueue(name=self._camera.get_stream_name(), maxSize=queue_size, blocking=False) q_rgb = dev.getOutputQueue(name=self._camera.get_stream_name(), maxSize=queue_size, # type: ignore
q_nn = device.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, blocking=False) blocking=False)
q_nn = dev.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, # type: ignore
blocking=False)
self._stop = False self._stop = False
while True: while True:
@ -343,14 +347,14 @@ class PipelineController:
logger.debug("wait for new frame") logger.debug("wait for new frame")
# Wait for frame # Wait for frame
in_rgb: dai.ImgFrame = q_rgb.get() # blocking call, will wait until a new data has arrived in_rgb: dai.ImgFrame = q_rgb.get() # type: ignore # blocking call, will wait until a new data has arrived
try: try:
frame_ref = self._frame_processor.process(in_rgb) frame_ref = self._frame_processor.process(in_rgb)
except FrameProcessError as ex: except FrameProcessError as ex:
logger.error("unable to process frame: %s", str(ex)) logger.error("unable to process frame: %s", str(ex))
return return
# Read NN result # Read NN result
in_nn: dai.NNData = q_nn.get() in_nn: dai.NNData = q_nn.get() # type: ignore
self._object_processor.process(in_nn, frame_ref) self._object_processor.process(in_nn, frame_ref)
def stop(self) -> None: def stop(self) -> None:
@ -361,9 +365,9 @@ class PipelineController:
self._stop = True self._stop = True
def _bbox_to_object(bbox: npt.NDArray[np.float64], score: float) -> events.events_pb2.Object: def _bbox_to_object(bbox: npt.NDArray[np.float64], score: float) -> evt.Object:
obj = events.events_pb2.Object() obj = evt.Object()
obj.type = events.events_pb2.TypeObject.ANY obj.type = evt.TypeObject.ANY
obj.top = bbox[0].astype(float) obj.top = bbox[0].astype(float)
obj.right = bbox[3].astype(float) obj.right = bbox[3].astype(float)
obj.bottom = bbox[2].astype(float) obj.bottom = bbox[2].astype(float)

View File

@ -1,26 +1,28 @@
import datetime import datetime
import typing
import unittest.mock import unittest.mock
import depthai as dai import depthai as dai
import events.events_pb2
import numpy as np import numpy as np
import numpy.typing as npt
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import pytest import pytest
import pytest_mock import pytest_mock
import camera.depthai import camera.depthai
import events.events_pb2
Object = dict[str, float] Object = dict[str, float]
@pytest.fixture @pytest.fixture
def mqtt_client(mocker: pytest_mock.MockerFixture) -> mqtt.Client: def mqtt_client(mocker: pytest_mock.MockerFixture) -> mqtt.Client:
return mocker.MagicMock() return mocker.MagicMock() # type: ignore
class TestObjectProcessor: class TestObjectProcessor:
@pytest.fixture @pytest.fixture
def frame_ref(self): def frame_ref(self) -> events.events_pb2.FrameRef:
now = datetime.datetime.now() now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage() frame_msg = events.events_pb2.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak" frame_msg.id.name = "robocar-oak-camera-oak"
@ -42,7 +44,7 @@ class TestObjectProcessor:
def raw_objects_empty(self, mocker: pytest_mock.MockerFixture) -> dai.NNData: def raw_objects_empty(self, mocker: pytest_mock.MockerFixture) -> dai.NNData:
raw_objects = mocker.MagicMock() raw_objects = mocker.MagicMock()
def mock_return(name): def mock_return(name: str) -> typing.List[typing.Union[int, typing.List[int]]]:
if name == "ExpandDims": if name == "ExpandDims":
return [[0] * 4] * 100 return [[0] * 4] * 100
elif name == "ExpandDims_2": elif name == "ExpandDims_2":
@ -56,7 +58,7 @@ class TestObjectProcessor:
@pytest.fixture @pytest.fixture
def raw_objects_one(self, mocker: pytest_mock.MockerFixture, object1: Object) -> dai.NNData: def raw_objects_one(self, mocker: pytest_mock.MockerFixture, object1: Object) -> dai.NNData:
def mock_return(name): def mock_return(name: str) -> typing.Union[npt.NDArray[np.int64], typing.List[float]]:
if name == "ExpandDims": # Detection boxes if name == "ExpandDims": # Detection boxes
boxes: list[list[float]] = [[0.] * 4] * 100 boxes: list[list[float]] = [[0.] * 4] * 100
boxes[0] = [object1["top"], object1["left"], object1["bottom"], object1["right"]] boxes[0] = [object1["top"], object1["left"], object1["bottom"], object1["right"]]
@ -77,20 +79,24 @@ class TestObjectProcessor:
def object_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.ObjectProcessor: def object_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.ObjectProcessor:
return camera.depthai.ObjectProcessor(mqtt_client, "topic/object", 0.2) return camera.depthai.ObjectProcessor(mqtt_client, "topic/object", 0.2)
def test_process_without_object(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client, def test_process_without_object(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client,
raw_objects_empty, frame_ref): raw_objects_empty: dai.NNData, frame_ref: events.events_pb2.FrameRef) -> None:
object_processor.process(raw_objects_empty, frame_ref) object_processor.process(raw_objects_empty, frame_ref)
mqtt_client.publish.assert_not_called() publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
publish_mock.assert_not_called()
def test_process_with_object_with_low_score(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client, def test_process_with_object_with_low_score(self, object_processor: camera.depthai.ObjectProcessor,
raw_objects_one, frame_ref): mqtt_client: mqtt.Client, raw_objects_one: dai.NNData,
frame_ref: events.events_pb2.FrameRef) -> None:
object_processor._objects_threshold = 0.9 object_processor._objects_threshold = 0.9
object_processor.process(raw_objects_one, frame_ref) object_processor.process(raw_objects_one, frame_ref)
mqtt_client.publish.assert_not_called() publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
publish_mock.assert_not_called()
def test_process_with_one_object(self, def test_process_with_one_object(self,
object_processor: camera.depthai.ObjectProcessor, mqtt_client, object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client,
raw_objects_one, frame_ref, object1: Object): raw_objects_one: dai.NNData, frame_ref: events.events_pb2.FrameRef,
object1: Object) -> None:
object_processor.process(raw_objects_one, frame_ref) object_processor.process(raw_objects_one, frame_ref)
left = object1["left"] left = object1["left"]
right = object1["right"] right = object1["right"]
@ -98,7 +104,7 @@ class TestObjectProcessor:
bottom = object1["bottom"] bottom = object1["bottom"]
score = object1["score"] score = object1["score"]
pub_mock: unittest.mock.MagicMock = mqtt_client.publish pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/object") pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/object")
payload = pub_mock.call_args.kwargs['payload'] payload = pub_mock.call_args.kwargs['payload']
objects_msg = events.events_pb2.ObjectsMessage() objects_msg = events.events_pb2.ObjectsMessage()
@ -118,13 +124,13 @@ class TestFrameProcessor:
return camera.depthai.FrameProcessor(mqtt_client, "topic/frame") return camera.depthai.FrameProcessor(mqtt_client, "topic/frame")
def test_process(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture, def test_process(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture,
mqtt_client: mqtt.Client): mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock() img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content")) mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
frame_ref = frame_processor.process(img) frame_ref = frame_processor.process(img)
pub_mock: unittest.mock.MagicMock = mqtt_client.publish pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/frame") pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/frame")
payload = pub_mock.call_args.kwargs['payload'] payload = pub_mock.call_args.kwargs['payload']
frame_msg = events.events_pb2.FrameMessage() frame_msg = events.events_pb2.FrameMessage()
@ -140,7 +146,7 @@ class TestFrameProcessor:
milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10) milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10)
def test_process_error(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture, def test_process_error(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture,
mqtt_client: mqtt.Client): mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock() img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (False, None) mocker.patch(target="cv2.imencode").return_value = (False, None)

View File

@ -1,6 +1,6 @@
# Python: 3.8.0 (tags/v3.8.0:fa919fd, Oct 14 2019, 19:37:50) [MSC v.1916 64 bit (AMD64)] # Python: 3.8.0 (tags/v3.8.0:fa919fd, Oct 14 2019, 19:37:50) [MSC v.1916 64 bit (AMD64)]
# Library: cv2, version: 4.4.0 # Library: cv2-stubs, version: 4.4.0
# Module: cv2.cv2, version: 4.4.0 # Module: cv2-stubs.cv2-stubs, version: 4.4.0
import typing import typing
import __init__ as _mod_cv2 import __init__ as _mod_cv2

6
poetry.lock generated
View File

@ -771,7 +771,7 @@ requests = ">=2.0.1,<3.0.0"
[[package]] [[package]]
name = "robocar-protobuf" name = "robocar-protobuf"
version = "1.1.1" version = "1.1.2"
description = "Protobuf message definitions for robocar" description = "Protobuf message definitions for robocar"
category = "main" category = "main"
optional = false optional = false
@ -1526,8 +1526,8 @@ requests-toolbelt = [
{file = "requests_toolbelt-0.9.1-py2.py3-none-any.whl", hash = "sha256:380606e1d10dc85c3bd47bf5a6095f815ec007be7a8b69c878507068df059e6f"}, {file = "requests_toolbelt-0.9.1-py2.py3-none-any.whl", hash = "sha256:380606e1d10dc85c3bd47bf5a6095f815ec007be7a8b69c878507068df059e6f"},
] ]
robocar-protobuf = [ robocar-protobuf = [
{file = "robocar_protobuf-1.1.1-py3-none-any.whl", hash = "sha256:d04b8e4cdacb7286d3425d74fb4402210422469e1240951921029c3dcf8c3e83"}, {file = "robocar_protobuf-1.1.2-py3-none-any.whl", hash = "sha256:3f47608464576cf10377b1635aa1f2a494445c71dbca1bd7ae1e97c4d09301e6"},
{file = "robocar_protobuf-1.1.1.tar.gz", hash = "sha256:c41dfa9bcc143ea88ac38dee7c52f9672bb06f13ad1bccad3b244c32d3f12073"}, {file = "robocar_protobuf-1.1.2.tar.gz", hash = "sha256:7ae5fe6c2b53edd7314d685a5492d945ad315a5a498c1342e95a3b46bf684bbc"},
] ]
s3transfer = [ s3transfer = [
{file = "s3transfer-0.6.0-py3-none-any.whl", hash = "sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd"}, {file = "s3transfer-0.6.0-py3-none-any.whl", hash = "sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd"},