feat: check typing with mypy
This commit is contained in:
		@@ -85,9 +85,7 @@ def execute_from_command_line() -> None:
 | 
			
		||||
                                           objects_topic=args.mqtt_topic_robocar_objects,
 | 
			
		||||
                                           objects_threshold=args.objects_threshold)
 | 
			
		||||
 | 
			
		||||
    pipeline_controller = cam.PipelineController(img_width=args.image_width,
 | 
			
		||||
                                                 img_height=args.image_height,
 | 
			
		||||
                                                 frame_processor=frame_processor,
 | 
			
		||||
    pipeline_controller = cam.PipelineController(frame_processor=frame_processor,
 | 
			
		||||
                                                 object_processor=object_processor)
 | 
			
		||||
 | 
			
		||||
    def sigterm_handler():
 | 
			
		||||
 
 | 
			
		||||
@@ -11,6 +11,7 @@ import cv2
 | 
			
		||||
import depthai as dai
 | 
			
		||||
import events.events_pb2
 | 
			
		||||
import numpy as np
 | 
			
		||||
import numpy.typing as npt
 | 
			
		||||
import paho.mqtt.client as mqtt
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
@@ -47,8 +48,7 @@ class ObjectProcessor:
 | 
			
		||||
        if boxes.shape[0] > 0:
 | 
			
		||||
            self._publish_objects(boxes, frame_ref, scores)
 | 
			
		||||
 | 
			
		||||
    def _publish_objects(self, boxes: np.array, frame_ref, scores: np.array) -> None:
 | 
			
		||||
 | 
			
		||||
    def _publish_objects(self, boxes: npt.NDArray[np.float64], frame_ref, scores: npt.NDArray[np.float64]) -> None:
 | 
			
		||||
        objects_msg = events.events_pb2.ObjectsMessage()
 | 
			
		||||
        objs = []
 | 
			
		||||
        for i in range(boxes.shape[0]):
 | 
			
		||||
@@ -276,11 +276,11 @@ class MqttSource(Source):
 | 
			
		||||
    def get_stream_name(self) -> str:
 | 
			
		||||
        return self._img_out.getStreamName()
 | 
			
		||||
 | 
			
		||||
    def link_preview(self, input_node: dai.Node.Input):
 | 
			
		||||
    def link(self, input_node: dai.Node.Input):
 | 
			
		||||
        self._img_in.out.link(input_node)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _to_planar(arr: np.ndarray, shape: tuple) -> list:
 | 
			
		||||
def _to_planar(arr: npt.NDArray[int], shape: tuple[int, int]) -> list[int]:
 | 
			
		||||
    return [val for channel in cv2.resize(arr, shape).transpose(2, 0, 1) for y_col in channel for val in y_col]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -289,7 +289,7 @@ class PipelineController:
 | 
			
		||||
    Pipeline controller that drive camera device
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, img_width: int, img_height: int, frame_processor: FrameProcessor,
 | 
			
		||||
    def __init__(self, frame_processor: FrameProcessor,
 | 
			
		||||
                 object_processor: ObjectProcessor, camera: Source, object_node: ObjectDetectionNN):
 | 
			
		||||
        self._pipeline = self._configure_pipeline()
 | 
			
		||||
        self._frame_processor = frame_processor
 | 
			
		||||
@@ -319,8 +319,8 @@ class PipelineController:
 | 
			
		||||
        with dai.Device(pipeline=self._pipeline) as device:
 | 
			
		||||
            logger.info('MxId: %s', device.getDeviceInfo().getMxId())
 | 
			
		||||
            logger.info('USB speed: %s', device.getUsbSpeed())
 | 
			
		||||
            logger.info('Connected cameras: %s', device.getConnectedCameras())
 | 
			
		||||
            logger.info("output queues found: %s", device.getOutputQueueNames())
 | 
			
		||||
            logger.info('Connected cameras: %s', str(device.getConnectedCameras()))
 | 
			
		||||
            logger.info("output queues found: %s", str(device.getOutputQueueNames()))
 | 
			
		||||
 | 
			
		||||
            device.startPipeline()
 | 
			
		||||
            # Queues
 | 
			
		||||
@@ -361,7 +361,7 @@ class PipelineController:
 | 
			
		||||
        self._stop = True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _bbox_to_object(bbox: np.array, score: float) -> events.events_pb2.Object:
 | 
			
		||||
def _bbox_to_object(bbox: npt.NDArray[np.float64], score: float) -> events.events_pb2.Object:
 | 
			
		||||
    obj = events.events_pb2.Object()
 | 
			
		||||
    obj.type = events.events_pb2.TypeObject.ANY
 | 
			
		||||
    obj.top = bbox[0].astype(float)
 | 
			
		||||
 
 | 
			
		||||
@@ -58,12 +58,12 @@ class TestObjectProcessor:
 | 
			
		||||
    def raw_objects_one(self, mocker: pytest_mock.MockerFixture, object1: Object) -> dai.NNData:
 | 
			
		||||
        def mock_return(name):
 | 
			
		||||
            if name == "ExpandDims":  # Detection boxes
 | 
			
		||||
                boxes = [[0] * 4] * 100
 | 
			
		||||
                boxes: list[list[float]] = [[0.] * 4] * 100
 | 
			
		||||
                boxes[0] = [object1["top"], object1["left"], object1["bottom"], object1["right"]]
 | 
			
		||||
                return np.array(boxes)
 | 
			
		||||
 | 
			
		||||
            elif name == "ExpandDims_2":  # Detection scores
 | 
			
		||||
                scores = [0] * 100
 | 
			
		||||
                scores: list[float] = [0.] * 100
 | 
			
		||||
                scores[0] = object1["score"]
 | 
			
		||||
                return scores
 | 
			
		||||
            else:
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user