feat(disparity): publish disparity messages
This commit is contained in:
parent
552f69e46e
commit
1ee37f65af
@ -11,6 +11,7 @@ import depthai as dai
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
from camera import oak_pipeline as cam
|
||||
from oak_pipeline import DisparityProcessor
|
||||
|
||||
CAMERA_EXPOSITION_DEFAULT = "default"
|
||||
CAMERA_EXPOSITION_8300US = "8300us"
|
||||
@ -49,6 +50,9 @@ def _parse_args_cli() -> argparse.Namespace:
|
||||
help="threshold to filter detected objects",
|
||||
type=float,
|
||||
default=_get_env_float_value("OBJECTS_THRESHOLD", 0.2))
|
||||
parser.add_argument("-o", "---mqtt-topic-robocar-disparity",
|
||||
help="MQTT topic where to publish disparity results",
|
||||
default=_get_env_value("MQTT_TOPIC_DISPARITY", "/disparity"))
|
||||
parser.add_argument("-f", "--camera-fps",
|
||||
help="set rate at which camera should produce frames",
|
||||
type=int,
|
||||
@ -104,6 +108,7 @@ def execute_from_command_line() -> None:
|
||||
object_processor = cam.ObjectProcessor(mqtt_client=client,
|
||||
objects_topic=args.mqtt_topic_robocar_objects,
|
||||
objects_threshold=args.objects_threshold)
|
||||
disparity_processor = cam.DisparityProcessor(mqtt_client=client, disparity_topic=args.mqtt_topic_robocar_disparity)
|
||||
|
||||
pipeline = dai.Pipeline()
|
||||
if args.camera_tuning_exposition == CAMERA_EXPOSITION_500US:
|
||||
@ -120,7 +125,9 @@ def execute_from_command_line() -> None:
|
||||
img_width=args.image_width,
|
||||
img_height=args.image_height,
|
||||
fps=args.camera_fps,
|
||||
))
|
||||
),
|
||||
depth_source=cam.DepthSource(pipeline=pipeline),
|
||||
disparity_processor=disparity_processor)
|
||||
|
||||
def sigterm_handler(signum: int, frame: typing.Optional[
|
||||
types.FrameType]) -> None: # pylint: disable=unused-argument # need to implement handler signature
|
||||
|
@ -26,6 +26,8 @@ _NN_HEIGHT = 192
|
||||
_PREVIEW_WIDTH = 640
|
||||
_PREVIEW_HEIGHT = 480
|
||||
|
||||
_CAMERA_BASELINE_IN_MM = 75
|
||||
|
||||
|
||||
class ObjectProcessor:
|
||||
"""
|
||||
@ -124,6 +126,37 @@ class FrameProcessor:
|
||||
return frame_msg.id
|
||||
|
||||
|
||||
class DisparityProcessor:
|
||||
"""
|
||||
Processor for camera frames
|
||||
"""
|
||||
|
||||
def __init__(self, mqtt_client: mqtt.Client, disparity_topic: str):
|
||||
self._mqtt_client = mqtt_client
|
||||
self._disparity_topic = disparity_topic
|
||||
|
||||
def process(self, img: dai.ImgFrame, frame_ref: evt.FrameRef, focal_length_in_pixels: float,
|
||||
baseline_mm: float = _CAMERA_BASELINE_IN_MM) -> None:
|
||||
im_frame = img.getCvFrame()
|
||||
is_success, im_buf_arr = cv2.imencode(".jpg", im_frame)
|
||||
if not is_success:
|
||||
raise FrameProcessError("unable to process to encode frame to jpg")
|
||||
byte_im = im_buf_arr.tobytes()
|
||||
|
||||
disparity_msg = evt.DisparityMessage()
|
||||
disparity_msg.disparity = byte_im
|
||||
disparity_msg.frame_ref.name = frame_ref.name
|
||||
disparity_msg.frame_ref.id = frame_ref.id
|
||||
disparity_msg.frame_ref.created_at.FromDatetime(frame_ref.created_at.ToDatetime())
|
||||
disparity_msg.focal_length_in_pixels = focal_length_in_pixels
|
||||
disparity_msg.baseline_in_mm = baseline_mm
|
||||
|
||||
self._mqtt_client.publish(topic=self._disparity_topic,
|
||||
payload=disparity_msg.SerializeToString(),
|
||||
qos=0,
|
||||
retain=False)
|
||||
|
||||
|
||||
class Source(abc.ABC):
|
||||
"""Base class for image source"""
|
||||
|
||||
@ -233,6 +266,49 @@ class CameraSource(Source):
|
||||
return manip
|
||||
|
||||
|
||||
class DepthSource(Source):
|
||||
def __init__(self, pipeline: dai.Pipeline,
|
||||
extended_disparity: bool = False,
|
||||
subpixel: bool = False,
|
||||
lr_check: bool = True
|
||||
) -> None:
|
||||
"""
|
||||
# Closer-in minimum depth, disparity range is doubled (from 95 to 190):
|
||||
extended_disparity = False
|
||||
# Better accuracy for longer distance, fractional disparity 32-levels:
|
||||
subpixel = False
|
||||
# Better handling for occlusions:
|
||||
lr_check = True
|
||||
"""
|
||||
self._monoLeft = pipeline.create(dai.node.MonoCamera)
|
||||
self._monoRight = pipeline.create(dai.node.MonoCamera)
|
||||
self._depth = pipeline.create(dai.node.StereoDepth)
|
||||
self._xout_disparity = pipeline.create(dai.node.XLinkOut)
|
||||
|
||||
self._xout_disparity.setStreamName("disparity")
|
||||
|
||||
# Properties
|
||||
self._monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
|
||||
self._monoLeft.setCamera("left")
|
||||
self._monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
|
||||
self._monoRight.setCamera("right")
|
||||
|
||||
# Create a node that will produce the depth map
|
||||
# (using disparity output as it's easier to visualize depth this way)
|
||||
self._depth.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
|
||||
# Options: MEDIAN_OFF, KERNEL_3x3, KERNEL_5x5, KERNEL_7x7 (default)
|
||||
self._depth.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7)
|
||||
self._depth.setLeftRightCheck(lr_check)
|
||||
self._depth.setExtendedDisparity(extended_disparity)
|
||||
self._depth.setSubpixel(subpixel)
|
||||
|
||||
def get_stream_name(self) -> str:
|
||||
return self._xout_disparity.getStreamName()
|
||||
|
||||
def link(self, input_node: dai.Node.Input) -> None:
|
||||
self._depth.disparity.link(input_node)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MqttConfig:
|
||||
"""MQTT configuration"""
|
||||
@ -305,15 +381,19 @@ class PipelineController:
|
||||
"""
|
||||
|
||||
def __init__(self, frame_processor: FrameProcessor,
|
||||
object_processor: ObjectProcessor, camera: Source, object_node: ObjectDetectionNN,
|
||||
object_processor: ObjectProcessor, disparity_processor: DisparityProcessor,
|
||||
camera: Source, depth_source: Source, object_node: ObjectDetectionNN,
|
||||
pipeline: dai.Pipeline):
|
||||
self._frame_processor = frame_processor
|
||||
self._object_processor = object_processor
|
||||
self._disparity_processor = disparity_processor
|
||||
self._camera = camera
|
||||
self._depth_source = depth_source
|
||||
self._object_node = object_node
|
||||
self._stop = False
|
||||
self._pipeline = pipeline
|
||||
self._configure_pipeline()
|
||||
self._focal_length_in_pixels: float | None = None
|
||||
|
||||
def _configure_pipeline(self) -> None:
|
||||
logger.info("configure pipeline")
|
||||
@ -337,6 +417,11 @@ class PipelineController:
|
||||
logger.info('Connected cameras: %s', str(dev.getConnectedCameras()))
|
||||
logger.info("output queues found: %s", str(''.join(dev.getOutputQueueNames()))) # type: ignore
|
||||
|
||||
calib_data = dev.readCalibration()
|
||||
intrinsics = calib_data.getCameraIntrinsics(dai.CameraBoardSocket.CAM_C)
|
||||
self._focal_length_in_pixels = intrinsics[0][0]
|
||||
logger.info('Right mono camera focal length in pixels: %s', self._focal_length_in_pixels)
|
||||
|
||||
dev.startPipeline()
|
||||
# Queues
|
||||
queue_size = 4
|
||||
@ -344,6 +429,8 @@ class PipelineController:
|
||||
blocking=False)
|
||||
q_nn = dev.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, # type: ignore
|
||||
blocking=False)
|
||||
q_disparity = dev.getOutputQueue(name=self._depth_source.get_stream_name(), maxSize=queue_size, # type: ignore
|
||||
blocking=False)
|
||||
|
||||
start_time = time.time()
|
||||
counter = 0
|
||||
@ -355,7 +442,7 @@ class PipelineController:
|
||||
logger.info("stop loop event")
|
||||
return
|
||||
try:
|
||||
self._loop_on_camera_events(q_nn, q_rgb)
|
||||
self._loop_on_camera_events(q_nn, q_rgb, q_disparity)
|
||||
# pylint: disable=broad-except # bad frame or event must not stop loop
|
||||
except Exception as ex:
|
||||
logger.exception("unexpected error: %s", str(ex))
|
||||
@ -369,8 +456,7 @@ class PipelineController:
|
||||
display_time = time.time()
|
||||
logger.info("fps: %s", fps)
|
||||
|
||||
|
||||
def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue) -> None:
|
||||
def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue, q_disparity: dai.DataOutputQueue) -> None:
|
||||
logger.debug("wait for new frame")
|
||||
|
||||
# Wait for frame
|
||||
@ -390,6 +476,11 @@ class PipelineController:
|
||||
self._object_processor.process(in_nn, frame_ref)
|
||||
logger.debug("objects processed")
|
||||
|
||||
logger.debug("process disparity")
|
||||
in_disparity: dai.ImgFrame = q_disparity.get() # type: ignore
|
||||
self._disparity_processor.process(in_disparity, frame_ref=frame_ref,
|
||||
focal_length_in_pixels=self._focal_length_in_pixels)
|
||||
logger.debug("disparity processed")
|
||||
|
||||
def stop(self) -> None:
|
||||
"""
|
||||
|
@ -10,7 +10,7 @@ import paho.mqtt.client as mqtt
|
||||
import pytest
|
||||
import pytest_mock
|
||||
|
||||
import camera.depthai
|
||||
from camera.oak_pipeline import DisparityProcessor, ObjectProcessor, FrameProcessor, FrameProcessError
|
||||
|
||||
Object = dict[str, float]
|
||||
|
||||
@ -76,16 +76,16 @@ class TestObjectProcessor:
|
||||
return m
|
||||
|
||||
@pytest.fixture
|
||||
def object_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.ObjectProcessor:
|
||||
return camera.depthai.ObjectProcessor(mqtt_client, "topic/object", 0.2)
|
||||
def object_processor(self, mqtt_client: mqtt.Client) -> ObjectProcessor:
|
||||
return ObjectProcessor(mqtt_client, "topic/object", 0.2)
|
||||
|
||||
def test_process_without_object(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client,
|
||||
def test_process_without_object(self, object_processor: ObjectProcessor, mqtt_client: mqtt.Client,
|
||||
raw_objects_empty: dai.NNData, frame_ref: events.events_pb2.FrameRef) -> None:
|
||||
object_processor.process(raw_objects_empty, frame_ref)
|
||||
publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
|
||||
publish_mock.assert_not_called()
|
||||
|
||||
def test_process_with_object_with_low_score(self, object_processor: camera.depthai.ObjectProcessor,
|
||||
def test_process_with_object_with_low_score(self, object_processor: ObjectProcessor,
|
||||
mqtt_client: mqtt.Client, raw_objects_one: dai.NNData,
|
||||
frame_ref: events.events_pb2.FrameRef) -> None:
|
||||
object_processor._objects_threshold = 0.9
|
||||
@ -94,7 +94,7 @@ class TestObjectProcessor:
|
||||
publish_mock.assert_not_called()
|
||||
|
||||
def test_process_with_one_object(self,
|
||||
object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client,
|
||||
object_processor: ObjectProcessor, mqtt_client: mqtt.Client,
|
||||
raw_objects_one: dai.NNData, frame_ref: events.events_pb2.FrameRef,
|
||||
object1: Object) -> None:
|
||||
object_processor.process(raw_objects_one, frame_ref)
|
||||
@ -120,10 +120,10 @@ class TestObjectProcessor:
|
||||
|
||||
class TestFrameProcessor:
|
||||
@pytest.fixture
|
||||
def frame_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.FrameProcessor:
|
||||
return camera.depthai.FrameProcessor(mqtt_client, "topic/frame")
|
||||
def frame_processor(self, mqtt_client: mqtt.Client) -> FrameProcessor:
|
||||
return FrameProcessor(mqtt_client, "topic/frame")
|
||||
|
||||
def test_process(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture,
|
||||
def test_process(self, frame_processor: FrameProcessor, mocker: pytest_mock.MockerFixture,
|
||||
mqtt_client: mqtt.Client) -> None:
|
||||
img: dai.ImgFrame = mocker.MagicMock()
|
||||
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
|
||||
@ -145,12 +145,57 @@ class TestFrameProcessor:
|
||||
assert now - datetime.timedelta(
|
||||
milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10)
|
||||
|
||||
def test_process_error(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture,
|
||||
def test_process_error(self, frame_processor: FrameProcessor, mocker: pytest_mock.MockerFixture,
|
||||
mqtt_client: mqtt.Client) -> None:
|
||||
img: dai.ImgFrame = mocker.MagicMock()
|
||||
mocker.patch(target="cv2.imencode").return_value = (False, None)
|
||||
|
||||
with pytest.raises(camera.depthai.FrameProcessError) as ex:
|
||||
with pytest.raises(FrameProcessError) as ex:
|
||||
_ = frame_processor.process(img)
|
||||
exception_raised = ex.value
|
||||
assert exception_raised.message == "unable to process to encode frame to jpg"
|
||||
|
||||
|
||||
class TestDisparityProcessor:
|
||||
@pytest.fixture
|
||||
def frame_ref(self) -> events.events_pb2.FrameRef:
|
||||
now = datetime.datetime.now()
|
||||
frame_msg = events.events_pb2.FrameMessage()
|
||||
frame_msg.id.name = "robocar-oak-camera-oak"
|
||||
frame_msg.id.id = str(int(now.timestamp() * 1000))
|
||||
frame_msg.id.created_at.FromDatetime(now)
|
||||
return frame_msg.id
|
||||
|
||||
@pytest.fixture
|
||||
def disparity_processor(self, mqtt_client: mqtt.Client) -> DisparityProcessor:
|
||||
return DisparityProcessor(mqtt_client, "topic/disparity")
|
||||
|
||||
def test_process(self, disparity_processor: DisparityProcessor, mocker: pytest_mock.MockerFixture,
|
||||
frame_ref: events.events_pb2.FrameRef,
|
||||
mqtt_client: mqtt.Client) -> None:
|
||||
img: dai.ImgFrame = mocker.MagicMock()
|
||||
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
|
||||
|
||||
disparity_processor.process(img, frame_ref, 42)
|
||||
|
||||
pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
|
||||
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/disparity")
|
||||
payload = pub_mock.call_args.kwargs['payload']
|
||||
disparity_msg = events.events_pb2.DisparityMessage()
|
||||
disparity_msg.ParseFromString(payload)
|
||||
|
||||
assert disparity_msg.frame_ref == frame_ref
|
||||
assert disparity_msg.disparity == b"img content"
|
||||
assert disparity_msg.focal_length_in_pixels == 42
|
||||
assert disparity_msg.baseline_in_mm == 75
|
||||
|
||||
def test_process_error(self, disparity_processor: DisparityProcessor, mocker: pytest_mock.MockerFixture,
|
||||
frame_ref: events.events_pb2.FrameRef,
|
||||
mqtt_client: mqtt.Client) -> None:
|
||||
img: dai.ImgFrame = mocker.MagicMock()
|
||||
mocker.patch(target="cv2.imencode").return_value = (False, None)
|
||||
|
||||
with pytest.raises(FrameProcessError) as ex:
|
||||
disparity_processor.process(img, frame_ref, 42)
|
||||
exception_raised = ex.value
|
||||
assert exception_raised.message == "unable to process to encode frame to jpg"
|
||||
|
8
poetry.lock
generated
8
poetry.lock
generated
@ -1325,13 +1325,13 @@ requests = ">=2.0.1,<3.0.0"
|
||||
|
||||
[[package]]
|
||||
name = "robocar-protobuf"
|
||||
version = "1.3.0"
|
||||
version = "1.5.2"
|
||||
description = "Protobuf message definitions for robocar"
|
||||
optional = false
|
||||
python-versions = ">=3.10,<4.0"
|
||||
files = [
|
||||
{file = "robocar_protobuf-1.3.0-py3-none-any.whl", hash = "sha256:61216d6957f650cca9adaf958a707e740b11f8030888202bfaf66f0752a3026f"},
|
||||
{file = "robocar_protobuf-1.3.0.tar.gz", hash = "sha256:935d2e3542ed3db9f15a8d94d728f69276f4850d66b9c1987d0899abb95ab2ac"},
|
||||
{file = "robocar_protobuf-1.5.2-py3-none-any.whl", hash = "sha256:79b977c996cb1a65b2724bb8c524b8175bd2138fb5687a9d00d564c9037fbd37"},
|
||||
{file = "robocar_protobuf-1.5.2.tar.gz", hash = "sha256:ac1ab1322fd5f23a73d5d88905438b06e0a83eb918c6f882643d37dd29efde70"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
@ -1652,4 +1652,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "8fd14cd097e9dee4d06b2264cb5a11979f95a00863920002b8d0106a20e7c65a"
|
||||
content-hash = "a092e2de71959e10e690264dcf83191a20f87d9b63bc4f45e9e7f20999e35c6e"
|
||||
|
@ -16,7 +16,7 @@ protobuf3 = "^0.2.1"
|
||||
google = "^3.0.0"
|
||||
protobuf = "^4.21.8"
|
||||
opencv-python-headless = "^4.6.0.66"
|
||||
robocar-protobuf = {version = "^1.3.0", source = "robocar"}
|
||||
robocar-protobuf = {version = "^1.5", source = "robocar"}
|
||||
|
||||
|
||||
[tool.poetry.group.test.dependencies]
|
||||
|
Loading…
Reference in New Issue
Block a user