robocar-oak-camera/camera/tests/test_depthai.py

202 lines
8.7 KiB
Python
Raw Permalink Normal View History

2022-10-21 09:01:38 +00:00
import datetime
2022-10-27 08:34:04 +00:00
import typing
2022-10-21 09:01:38 +00:00
import unittest.mock
import depthai as dai
2022-10-27 08:34:04 +00:00
import events.events_pb2
2022-10-21 09:01:38 +00:00
import numpy as np
2022-10-27 08:34:04 +00:00
import numpy.typing as npt
2022-10-21 09:01:38 +00:00
import paho.mqtt.client as mqtt
import pytest
import pytest_mock
from camera.oak_pipeline import DisparityProcessor, ObjectProcessor, FrameProcessor, FrameProcessError
2022-10-21 09:01:38 +00:00
Object = dict[str, float]
@pytest.fixture
def mqtt_client(mocker: pytest_mock.MockerFixture) -> mqtt.Client:
2022-10-27 08:34:04 +00:00
return mocker.MagicMock() # type: ignore
2022-10-21 09:01:38 +00:00
class TestObjectProcessor:
@pytest.fixture
2022-10-27 08:34:04 +00:00
def frame_ref(self) -> events.events_pb2.FrameRef:
2022-10-21 09:01:38 +00:00
now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now)
return frame_msg.id
@pytest.fixture
def object1(self) -> Object:
return {
"left": 0.3,
"right": 0.7,
"top": 0.1,
"bottom": 0.6,
"score": 0.8,
}
@pytest.fixture
def raw_objects_empty(self, mocker: pytest_mock.MockerFixture) -> dai.NNData:
raw_objects = mocker.MagicMock()
2022-10-27 08:34:04 +00:00
def mock_return(name: str) -> typing.List[typing.Union[int, typing.List[int]]]:
2022-10-21 09:01:38 +00:00
if name == "ExpandDims":
return [[0] * 4] * 100
elif name == "ExpandDims_2":
return [0] * 100
else:
raise ValueError(f"{name} is not a valid arg")
m = mocker.patch(target='depthai.NNData.getLayerFp16', autospec=True)
m.getLayerFp16 = mock_return
return m
@pytest.fixture
def raw_objects_one(self, mocker: pytest_mock.MockerFixture, object1: Object) -> dai.NNData:
2022-10-27 08:34:04 +00:00
def mock_return(name: str) -> typing.Union[npt.NDArray[np.int64], typing.List[float]]:
2022-10-21 09:01:38 +00:00
if name == "ExpandDims": # Detection boxes
2022-10-27 07:05:00 +00:00
boxes: list[list[float]] = [[0.] * 4] * 100
2022-10-21 09:01:38 +00:00
boxes[0] = [object1["top"], object1["left"], object1["bottom"], object1["right"]]
return np.array(boxes)
elif name == "ExpandDims_2": # Detection scores
2022-10-27 07:05:00 +00:00
scores: list[float] = [0.] * 100
2022-10-21 09:01:38 +00:00
scores[0] = object1["score"]
return scores
else:
raise ValueError(f"{name} is not a valid arg")
m = mocker.patch(target='depthai.NNData.getLayerFp16', autospec=True)
m.getLayerFp16 = mock_return
return m
@pytest.fixture
def object_processor(self, mqtt_client: mqtt.Client) -> ObjectProcessor:
return ObjectProcessor(mqtt_client, "topic/object", 0.2)
2022-10-21 09:01:38 +00:00
def test_process_without_object(self, object_processor: ObjectProcessor, mqtt_client: mqtt.Client,
2022-10-27 08:34:04 +00:00
raw_objects_empty: dai.NNData, frame_ref: events.events_pb2.FrameRef) -> None:
2022-10-21 09:01:38 +00:00
object_processor.process(raw_objects_empty, frame_ref)
2022-10-27 08:34:04 +00:00
publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
publish_mock.assert_not_called()
2022-10-21 09:01:38 +00:00
def test_process_with_object_with_low_score(self, object_processor: ObjectProcessor,
2022-10-27 08:34:04 +00:00
mqtt_client: mqtt.Client, raw_objects_one: dai.NNData,
frame_ref: events.events_pb2.FrameRef) -> None:
2022-10-21 09:01:38 +00:00
object_processor._objects_threshold = 0.9
object_processor.process(raw_objects_one, frame_ref)
2022-10-27 08:34:04 +00:00
publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
publish_mock.assert_not_called()
2022-10-21 09:01:38 +00:00
def test_process_with_one_object(self,
object_processor: ObjectProcessor, mqtt_client: mqtt.Client,
2022-10-27 08:34:04 +00:00
raw_objects_one: dai.NNData, frame_ref: events.events_pb2.FrameRef,
object1: Object) -> None:
2022-10-21 09:01:38 +00:00
object_processor.process(raw_objects_one, frame_ref)
left = object1["left"]
right = object1["right"]
top = object1["top"]
bottom = object1["bottom"]
score = object1["score"]
2022-10-27 08:34:04 +00:00
pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
2022-10-21 09:01:38 +00:00
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/object")
payload = pub_mock.call_args.kwargs['payload']
objects_msg = events.events_pb2.ObjectsMessage()
objects_msg.ParseFromString(payload)
assert len(objects_msg.objects) == 1
assert left - 0.0001 < objects_msg.objects[0].left < left + 0.0001
assert right - 0.0001 < objects_msg.objects[0].right < right + 0.0001
assert top - 0.0001 < objects_msg.objects[0].top < top + 0.0001
assert bottom - 0.0001 < objects_msg.objects[0].bottom < bottom + 0.0001
assert score - 0.0001 < objects_msg.objects[0].confidence < score + 0.0001
assert objects_msg.frame_ref == frame_ref
class TestFrameProcessor:
@pytest.fixture
def frame_processor(self, mqtt_client: mqtt.Client) -> FrameProcessor:
return FrameProcessor(mqtt_client, "topic/frame")
2022-10-21 09:01:38 +00:00
def test_process(self, frame_processor: FrameProcessor, mocker: pytest_mock.MockerFixture,
2022-10-27 08:34:04 +00:00
mqtt_client: mqtt.Client) -> None:
2022-10-21 09:01:38 +00:00
img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
frame_ref = frame_processor.process(img)
2022-10-27 08:34:04 +00:00
pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
2022-10-21 09:01:38 +00:00
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/frame")
payload = pub_mock.call_args.kwargs['payload']
frame_msg = events.events_pb2.FrameMessage()
frame_msg.ParseFromString(payload)
assert frame_msg.id == frame_ref
assert frame_msg.frame == b"img content"
assert frame_msg.id.name == "robocar-oak-camera-oak"
assert len(frame_msg.id.id) is 13
now = datetime.datetime.now()
assert now - datetime.timedelta(
milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10)
def test_process_error(self, frame_processor: FrameProcessor, mocker: pytest_mock.MockerFixture,
2022-10-27 08:34:04 +00:00
mqtt_client: mqtt.Client) -> None:
2022-10-21 09:01:38 +00:00
img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (False, None)
with pytest.raises(FrameProcessError) as ex:
2022-10-21 09:01:38 +00:00
_ = frame_processor.process(img)
exception_raised = ex.value
assert exception_raised.message == "unable to process to encode frame to jpg"
class TestDisparityProcessor:
@pytest.fixture
def frame_ref(self) -> events.events_pb2.FrameRef:
now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now)
return frame_msg.id
@pytest.fixture
def disparity_processor(self, mqtt_client: mqtt.Client) -> DisparityProcessor:
return DisparityProcessor(mqtt_client, "topic/disparity")
def test_process(self, disparity_processor: DisparityProcessor, mocker: pytest_mock.MockerFixture,
frame_ref: events.events_pb2.FrameRef,
mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
disparity_processor.process(img, frame_ref, 42)
pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/disparity")
payload = pub_mock.call_args.kwargs['payload']
disparity_msg = events.events_pb2.DisparityMessage()
disparity_msg.ParseFromString(payload)
assert disparity_msg.frame_ref == frame_ref
assert disparity_msg.disparity == b"img content"
assert disparity_msg.focal_length_in_pixels == 42
assert disparity_msg.baseline_in_mm == 75
def test_process_error(self, disparity_processor: DisparityProcessor, mocker: pytest_mock.MockerFixture,
frame_ref: events.events_pb2.FrameRef,
mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (False, None)
with pytest.raises(FrameProcessError) as ex:
disparity_processor.process(img, frame_ref, 42)
exception_raised = ex.value
assert exception_raised.message == "unable to process to encode frame to jpg"