Compare commits

...

25 Commits

Author SHA1 Message Date
b2f2570968 disable disparity frames 2024-03-18 19:27:32 +01:00
267c90750d chore: upgrade dependencies 2024-03-13 19:59:03 +01:00
01e2c8ce16 fix stereo filters 2024-01-28 17:51:14 +01:00
69b4e28323 fix disparity pipeline 2024-01-28 14:08:40 +01:00
31ceef93ae chore: bump to python 3.12 2024-01-27 11:49:59 +01:00
1703bb356f chore: upgrade dependecies 2024-01-27 11:49:59 +01:00
33791d0014 feat(disparity): fine tuning mode and post-processing filters 2024-01-27 11:49:59 +01:00
1ee37f65af feat(disparity): publish disparity messages 2024-01-23 23:39:10 +01:00
552f69e46e feat: tune camera exposition 2024-01-15 19:51:13 +01:00
4ec2aef409 feat: display fps 2024-01-14 10:36:40 +01:00
54977ee4e3 build: upgrade dependencies 2024-01-13 18:30:19 +01:00
87c1ee96e3 feat(object-detection): apply detection on big image 2024-01-13 18:28:21 +01:00
d4f8a12577 feat: refactor and option to configure camera fps 2023-10-01 17:24:05 +02:00
2593d5d953 chore: dependencies upgrade 2023-10-01 11:57:28 +02:00
c6f955a50c fix(camera): bad img height configuration 2022-12-25 11:22:06 +01:00
7ebd9093d9 fix(object_detection): add link from image_manip to nn node 2022-11-11 17:16:38 +01:00
642df5b927 add debug logs 2022-11-09 21:04:32 +01:00
c755d019e8 feat(cli): add flag to configure log level 2022-11-09 20:37:20 +01:00
befb4bacb3 fix: bad pipeline configuration 2022-11-05 16:09:30 +01:00
30f9876c1d build: fix docker build (pip index missing) 2022-11-02 16:31:46 +01:00
df8676ae5c fix(dependency): upgrade robocar-protobuf 2022-11-02 16:08:33 +01:00
9c07826898 build: upgrade dependencies 2022-11-02 15:51:16 +01:00
2149a01dd6 build: fix docker build 2022-11-02 15:35:47 +01:00
0db958e936 build: limit files to include into docker image 2022-11-02 13:56:41 +01:00
4faf3c2fee updaqte dockerignore 2022-11-02 13:55:52 +01:00
12 changed files with 1643 additions and 1990 deletions

View File

@ -1,5 +1,5 @@
venv venv
dist/ dist/*
build-docker.sh build-docker.sh
Dockerfile Dockerfile

View File

@ -1,9 +1,10 @@
FROM docker.io/library/python:3.10-slim as base FROM docker.io/library/python:3.12-slim as base
# Configure piwheels repo to use pre-compiled numpy wheels for arm # Configure piwheels repo to use pre-compiled numpy wheels for arm
RUN echo -n "[global]\nextra-index-url=https://www.piwheels.org/simple\n" >> /etc/pip.conf RUN echo -n "[global]\n" > /etc/pip.conf &&\
echo -n "extra-index-url = https://www.piwheels.org/simple https://git.cyrilix.bzh/api/packages/robocars/pypi/simple \n" >> /etc/pip.conf
RUN apt-get update && apt-get install -y libgl1 libglib2.0-0 RUN apt-get update && apt-get install -y libgl1 libglib2.0-0 procps cmake g++ gcc
################# #################
FROM base as model-builder FROM base as model-builder
@ -18,20 +19,29 @@ RUN blobconverter --zoo-name mobile_object_localizer_192x192 --zoo-type depthai
FROM base as builder FROM base as builder
RUN apt-get install -y git && \ RUN apt-get install -y git && \
pip3 install poetry==1.2.0 && \ pip3 install poetry && \
poetry self add "poetry-dynamic-versioning[plugin]" poetry self add "poetry-dynamic-versioning[plugin]"
ADD . .
ADD poetry.lock .
ADD pyproject.toml .
ADD camera camera
ADD README.md .
# Poetry expect to found a git project
ADD .git .git
RUN poetry build RUN poetry build
################# #################
FROM base FROM base
COPY camera_tunning /camera_tuning
RUN mkdir /models RUN mkdir /models
COPY --from=model-builder /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob COPY --from=model-builder /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob /models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob
COPY --from=builder dist/*.whl /tmp/ COPY --from=builder dist/*.whl /tmp/
RUN pip3 install /tmp/*whl RUN pip3 install /tmp/*.whl
WORKDIR /tmp WORKDIR /tmp
USER 1234 USER 1234

133
README.md
View File

@ -9,3 +9,136 @@ To build images, run script:
```bash ```bash
./build-docker.sh ./build-docker.sh
``` ```
## Usage
```shell
usage: cli.py [-h] [-u MQTT_USERNAME] [-p MQTT_PASSWORD] [-b MQTT_BROKER_HOST]
[-P MQTT_BROKER_PORT] [-C MQTT_CLIENT_ID]
[-c MQTT_TOPIC_ROBOCAR_OAK_CAMERA]
[-o MQTT_TOPIC_ROBOCAR_OBJECTS] [-t OBJECTS_THRESHOLD]
[-d MQTT_TOPIC_ROBOCAR_DISPARITY] [-f CAMERA_FPS]
[--camera-tuning-exposition {default,500us,8300us}]
[-H IMAGE_HEIGHT] [-W IMAGE_WIDTH] [--log {info,debug}]
[--stereo-mode-lr-check] [--stereo-mode-extended-disparity]
[--stereo-mode-subpixel]
[--stereo-post-processing-median-filter]
[--stereo-post-processing-median-value {MEDIAN_OFF,KERNEL_3x3,KERNEL_5x5,KERNEL_7x7}]
[--stereo-post-processing-speckle-filter]
[--stereo-post-processing-speckle-enable STEREO_POST_PROCESSING_SPECKLE_ENABLE]
[--stereo-post-processing-speckle-range STEREO_POST_PROCESSING_SPECKLE_RANGE]
[--stereo-post-processing-temporal-filter]
[--stereo-post-processing-temporal-persistency-mode {PERSISTENCY_OFF,VALID_8_OUT_OF_8,VALID_2_IN_LAST_3,VALID_2_IN_LAST_4,VALID_2_OUT_OF_8,VALID_1_IN_LAST_2,VALID_1_IN_LAST_5,VALID_1_IN_LAST_8,PERSISTENCY_INDEFINITELY}]
[--stereo-post-processing-temporal-alpha STEREO_POST_PROCESSING_TEMPORAL_ALPHA]
[--stereo-post-processing-temporal-delta STEREO_POST_PROCESSING_TEMPORAL_DELTA]
[--stereo-post-processing-spatial-filter]
[--stereo-post-processing-spatial-enable STEREO_POST_PROCESSING_SPATIAL_ENABLE]
[--stereo-post-processing-spatial-hole-filling-radius STEREO_POST_PROCESSING_SPATIAL_HOLE_FILLING_RADIUS]
[--stereo-post-processing-spatial-alpha STEREO_POST_PROCESSING_SPATIAL_ALPHA]
[--stereo-post-processing-spatial-delta STEREO_POST_PROCESSING_SPATIAL_DELTA]
[--stereo-post-processing-spatial-num-iterations STEREO_POST_PROCESSING_SPATIAL_NUM_ITERATIONS]
[--stereo-post-processing-threshold-filter]
[--stereo-post-processing-threshold-min-range STEREO_POST_PROCESSING_THRESHOLD_MIN_RANGE]
[--stereo-post-processing-threshold-max-range STEREO_POST_PROCESSING_THRESHOLD_MAX_RANGE]
[--stereo-post-processing-decimation-filter]
[--stereo-post-processing-decimation-decimal-factor {1,2,3,4}]
[--stereo-post-processing-decimation-mode {PIXEL_SKIPPING,NON_ZERO_MEDIAN,NON_ZERO_MEAN}]
options:
-h, --help show this help message and exit
-u MQTT_USERNAME, --mqtt-username MQTT_USERNAME
MQTT user
-p MQTT_PASSWORD, --mqtt-password MQTT_PASSWORD
MQTT password
-b MQTT_BROKER_HOST, --mqtt-broker-host MQTT_BROKER_HOST
MQTT broker host
-P MQTT_BROKER_PORT, --mqtt-broker-port MQTT_BROKER_PORT
MQTT broker port
-C MQTT_CLIENT_ID, --mqtt-client-id MQTT_CLIENT_ID
MQTT client id
-c MQTT_TOPIC_ROBOCAR_OAK_CAMERA, --mqtt-topic-robocar-oak-camera MQTT_TOPIC_ROBOCAR_OAK_CAMERA
MQTT topic where to publish robocar-oak-camera frames
-o MQTT_TOPIC_ROBOCAR_OBJECTS, ---mqtt-topic-robocar-objects MQTT_TOPIC_ROBOCAR_OBJECTS
MQTT topic where to publish objects detection results
-t OBJECTS_THRESHOLD, --objects-threshold OBJECTS_THRESHOLD
threshold to filter detected objects
-d MQTT_TOPIC_ROBOCAR_DISPARITY, ---mqtt-topic-robocar-disparity MQTT_TOPIC_ROBOCAR_DISPARITY
MQTT topic where to publish disparity results
-f CAMERA_FPS, --camera-fps CAMERA_FPS
set rate at which camera should produce frames
--camera-tuning-exposition {default,500us,8300us}
override camera exposition configuration
-H IMAGE_HEIGHT, --image-height IMAGE_HEIGHT
image height
-W IMAGE_WIDTH, --image-width IMAGE_WIDTH
image width
--log {info,debug} Log level
--stereo-mode-lr-check
remove incorrectly calculated disparity pixels due to
occlusions at object borders
--stereo-mode-extended-disparity
allows detecting closer distance objects for the given
baseline. This increases the maximum disparity search
from 96 to 191, meaning the range is now: [0..190]
--stereo-mode-subpixel
iimproves the precision and is especially useful for
long range measurements
--stereo-post-processing-median-filter
enable post-processing median filter
--stereo-post-processing-median-value {MEDIAN_OFF,KERNEL_3x3,KERNEL_5x5,KERNEL_7x7}
Median filter config
--stereo-post-processing-speckle-filter
enable post-processing speckle filter
--stereo-post-processing-speckle-enable STEREO_POST_PROCESSING_SPECKLE_ENABLE
enable post-processing speckle filter
--stereo-post-processing-speckle-range STEREO_POST_PROCESSING_SPECKLE_RANGE
Speckle search range
--stereo-post-processing-temporal-filter
enable post-processing temporal filter
--stereo-post-processing-temporal-persistency-mode {PERSISTENCY_OFF,VALID_8_OUT_OF_8,VALID_2_IN_LAST_3,VALID_2_IN_LAST_4,VALID_2_OUT_OF_8,VALID_1_IN_LAST_2,VALID_1_IN_LAST_5,VALID_1_IN_LAST_8,PERSISTENCY_INDEFINITELY}
Persistency mode.
--stereo-post-processing-temporal-alpha STEREO_POST_PROCESSING_TEMPORAL_ALPHA
The Alpha factor in an exponential moving average with
Alpha=1 - no filter. Alpha = 0 - infinite filter.
Determines the extent of the temporal history that
should be averaged.
--stereo-post-processing-temporal-delta STEREO_POST_PROCESSING_TEMPORAL_DELTA
Step-size boundary. Establishes the threshold used to
preserve surfaces (edges). If the disparity value
between neighboring pixels exceed the disparity
threshold set by this delta parameter, then filtering
will be temporarily disabled. Default value 0 means
auto: 3 disparity integer levels. In case of subpixel
mode its 3*number of subpixel levels.
--stereo-post-processing-spatial-filter
enable post-processing spatial filter
--stereo-post-processing-spatial-enable STEREO_POST_PROCESSING_SPATIAL_ENABLE
Whether to enable or disable the filter
--stereo-post-processing-spatial-hole-filling-radius STEREO_POST_PROCESSING_SPATIAL_HOLE_FILLING_RADIUS
An in-place heuristic symmetric hole-filling mode
applied horizontally during the filter passes
--stereo-post-processing-spatial-alpha STEREO_POST_PROCESSING_SPATIAL_ALPHA
The Alpha factor in an exponential moving average with
Alpha=1 - no filter. Alpha = 0 - infinite filter
--stereo-post-processing-spatial-delta STEREO_POST_PROCESSING_SPATIAL_DELTA
Step-size boundary. Establishes the threshold used to
preserve edges
--stereo-post-processing-spatial-num-iterations STEREO_POST_PROCESSING_SPATIAL_NUM_ITERATIONS
Number of iterations over the image in both horizontal
and vertical direction
--stereo-post-processing-threshold-filter
enable post-processing threshold filter
--stereo-post-processing-threshold-min-range STEREO_POST_PROCESSING_THRESHOLD_MIN_RANGE
Minimum range in depth units. Depth values under this
value are invalidated
--stereo-post-processing-threshold-max-range STEREO_POST_PROCESSING_THRESHOLD_MAX_RANGE
Maximum range in depth units. Depth values over this
value are invalidated.
--stereo-post-processing-decimation-filter
enable post-processing decimation filter
--stereo-post-processing-decimation-decimal-factor {1,2,3,4}
Decimation factor
--stereo-post-processing-decimation-mode {PIXEL_SKIPPING,NON_ZERO_MEDIAN,NON_ZERO_MEAN}
Decimation algorithm type
```

View File

@ -7,6 +7,6 @@ PLATFORM="linux/amd64,linux/arm64"
#PLATFORM="linux/amd64,linux/arm64,linux/arm/v7" #PLATFORM="linux/amd64,linux/arm64,linux/arm/v7"
podman build . --platform "${PLATFORM}" --manifest "${IMAGE_NAME}:${TAG}" podman build . --platform "${PLATFORM}" --manifest "${IMAGE_NAME}:${TAG}"
podman manifest push --all --format v2s2 "localhost/${IMAGE_NAME}:${TAG}" "docker://${FULL_IMAGE_NAME}" podman manifest push --all "localhost/${IMAGE_NAME}:${TAG}" "docker://${FULL_IMAGE_NAME}"
printf "\nImage %s published" "docker://${FULL_IMAGE_NAME}" printf "\nImage %s published" "docker://${FULL_IMAGE_NAME}"

View File

@ -5,15 +5,22 @@ import argparse
import logging import logging
import os import os
import signal import signal
import typing, types import types
import typing
from typing import List
import depthai as dai import depthai as dai
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from . import depthai as cam # pylint: disable=reimported from camera import oak_pipeline as cam
from camera.oak_pipeline import StereoDepthPostFilter, MedianFilter, SpeckleFilter, TemporalFilter, SpatialFilter, \
ThresholdFilter, DecimationFilter
CAMERA_EXPOSITION_DEFAULT = "default"
CAMERA_EXPOSITION_8300US = "8300us"
CAMERA_EXPOSITION_500US = "500us"
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
_DEFAULT_CLIENT_ID = "robocar-depthai" _DEFAULT_CLIENT_ID = "robocar-depthai"
@ -46,12 +53,147 @@ def _parse_args_cli() -> argparse.Namespace:
help="threshold to filter detected objects", help="threshold to filter detected objects",
type=float, type=float,
default=_get_env_float_value("OBJECTS_THRESHOLD", 0.2)) default=_get_env_float_value("OBJECTS_THRESHOLD", 0.2))
parser.add_argument("-d", "---mqtt-topic-robocar-disparity",
help="MQTT topic where to publish disparity results",
default=_get_env_value("MQTT_TOPIC_DISPARITY", "/disparity"))
parser.add_argument("-f", "--camera-fps",
help="set rate at which camera should produce frames",
type=int,
default=30)
parser.add_argument("--camera-tuning-exposition", type=str,
default=CAMERA_EXPOSITION_DEFAULT,
help="override camera exposition configuration",
choices=[CAMERA_EXPOSITION_DEFAULT, CAMERA_EXPOSITION_500US, CAMERA_EXPOSITION_8300US])
parser.add_argument("-H", "--image-height", help="image height", parser.add_argument("-H", "--image-height", help="image height",
type=int, type=int,
default=_get_env_int_value("IMAGE_HEIGHT", 120)) default=_get_env_int_value("IMAGE_HEIGHT", 120))
parser.add_argument("-W", "--image-width", help="image width", parser.add_argument("-W", "--image-width", help="image width",
type=int, type=int,
default=_get_env_int_value("IMAGE_WIDTH", 126)) default=_get_env_int_value("IMAGE_WIDTH", 126))
parser.add_argument("--log", help="Log level",
type=str,
default="info",
choices=["info", "debug"])
parser.add_argument("--disable-disparity", action="store_true",
help="enable disparity frame",
default=False
)
parser.add_argument("--stereo-mode-lr-check",
help="remove incorrectly calculated disparity pixels due to occlusions at object borders",
default=False, action="store_true"
)
parser.add_argument("--stereo-mode-extended-disparity",
help="allows detecting closer distance objects for the given baseline. This increases the maximum disparity search from 96 to 191, meaning the range is now: [0..190]",
default=False, action="store_true"
)
parser.add_argument("--stereo-mode-subpixel",
help="iimproves the precision and is especially useful for long range measurements",
default=False, action="store_true"
)
parser.add_argument("--stereo-post-processing-median-filter",
help="enable post-processing median filter",
default=False, action="store_true"
)
parser.add_argument("--stereo-post-processing-median-value",
help="Median filter config ",
type=str,
choices=["MEDIAN_OFF", "KERNEL_3x3", "KERNEL_5x5", "KERNEL_7x7"],
default="KERNEL_7x7",
)
parser.add_argument("--stereo-post-processing-speckle-filter",
help="enable post-processing speckle filter",
default=False, action="store_true"
)
parser.add_argument("--stereo-post-processing-speckle-enable",
help="enable post-processing speckle filter",
type=bool, default=False
)
parser.add_argument("--stereo-post-processing-speckle-range",
help="Speckle search range",
type=int, default=50
)
parser.add_argument("--stereo-post-processing-temporal-filter",
help="enable post-processing temporal filter",
default=False, action="store_true"
)
parser.add_argument("--stereo-post-processing-temporal-persistency-mode",
help="Persistency mode.",
type=str, default="VALID_2_IN_LAST_4",
choices=["PERSISTENCY_OFF", "VALID_8_OUT_OF_8", "VALID_2_IN_LAST_3", "VALID_2_IN_LAST_4",
"VALID_2_OUT_OF_8", "VALID_1_IN_LAST_2", "VALID_1_IN_LAST_5", "VALID_1_IN_LAST_8",
"PERSISTENCY_INDEFINITELY"]
)
parser.add_argument("--stereo-post-processing-temporal-alpha",
help="The Alpha factor in an exponential moving average with Alpha=1 - no filter. "
"Alpha = 0 - infinite filter. Determines the extent of the temporal history that should be "
"averaged. ",
type=float, default=0.4,
)
parser.add_argument("--stereo-post-processing-temporal-delta",
help="Step-size boundary. Establishes the threshold used to preserve surfaces (edges). "
"If the disparity value between neighboring pixels exceed the disparity threshold set by "
"this delta parameter, then filtering will be temporarily disabled. Default value 0 means "
"auto: 3 disparity integer levels. In case of subpixel mode its 3*number of subpixel "
"levels.",
type=int, default=0,
)
parser.add_argument("--stereo-post-processing-spatial-filter",
help="enable post-processing spatial filter",
default=False, action="store_true"
)
parser.add_argument("--stereo-post-processing-spatial-enable",
help="Whether to enable or disable the filter",
type=bool, default=False,
)
parser.add_argument("--stereo-post-processing-spatial-hole-filling-radius",
help="An in-place heuristic symmetric hole-filling mode applied horizontally during the filter passes",
type=int, default=2,
)
parser.add_argument("--stereo-post-processing-spatial-alpha",
help="The Alpha factor in an exponential moving average with Alpha=1 - no filter. Alpha = 0 - infinite filter",
type=float, default=0.5,
)
parser.add_argument("--stereo-post-processing-spatial-delta",
help="Step-size boundary. Establishes the threshold used to preserve edges",
type=int, default=0,
)
parser.add_argument("--stereo-post-processing-spatial-num-iterations",
help="Number of iterations over the image in both horizontal and vertical direction",
type=int, default=1,
)
parser.add_argument("--stereo-post-processing-threshold-filter",
help="enable post-processing threshold filter",
default=False, action="store_true"
)
parser.add_argument("--stereo-post-processing-threshold-min-range",
help="Minimum range in depth units. Depth values under this value are invalidated",
type=int, default=500,
)
parser.add_argument("--stereo-post-processing-threshold-max-range",
help="Maximum range in depth units. Depth values over this value are invalidated.",
type=int, default=15000,
)
parser.add_argument("--stereo-post-processing-decimation-filter",
help="enable post-processing decimation filter",
default=False, action="store_true"
)
parser.add_argument("--stereo-post-processing-decimation-decimal-factor",
help="Decimation factor",
type=int, default=1, choices=[1, 2, 3, 4]
)
parser.add_argument("--stereo-post-processing-decimation-mode",
help="Decimation algorithm type",
type=str, default="PIXEL_SKIPPING",
choices=["PIXEL_SKIPPING", "NON_ZERO_MEDIAN", "NON_ZERO_MEAN"]
)
args = parser.parse_args() args = parser.parse_args()
return args return args
@ -72,9 +214,12 @@ def execute_from_command_line() -> None:
Cli entrypoint Cli entrypoint
:return: :return:
""" """
logging.basicConfig(level=logging.INFO)
args = _parse_args_cli() args = _parse_args_cli()
if args.log == "info":
logging.basicConfig(level=logging.INFO)
elif args.log == "debug":
logging.basicConfig(level=logging.DEBUG)
client = _init_mqtt_client(broker_host=args.mqtt_broker_host, client = _init_mqtt_client(broker_host=args.mqtt_broker_host,
broker_port=args.mqtt_broker_port, broker_port=args.mqtt_broker_port,
@ -86,15 +231,36 @@ def execute_from_command_line() -> None:
object_processor = cam.ObjectProcessor(mqtt_client=client, object_processor = cam.ObjectProcessor(mqtt_client=client,
objects_topic=args.mqtt_topic_robocar_objects, objects_topic=args.mqtt_topic_robocar_objects,
objects_threshold=args.objects_threshold) objects_threshold=args.objects_threshold)
if args.disable_disparity == False:
depth_source = cam.DepthSource(pipeline=pipeline,
extended_disparity=args.stereo_mode_extended_disparity,
subpixel=args.stereo_mode_subpixel,
lr_check=args.stereo_mode_lr_check,
stereo_filters=stereo_filters),
disparity_processor = cam.DisparityProcessor(mqtt_client=client, disparity_topic=args.mqtt_topic_robocar_disparity)
else:
disparity_processor = None
depth_source = None
pipeline = dai.Pipeline() pipeline = dai.Pipeline()
pipeline_controller = cam.PipelineController(frame_processor=frame_processor, if args.camera_tuning_exposition == CAMERA_EXPOSITION_500US:
pipeline.setCameraTuningBlobPath('/camera_tuning/tuning_exp_limit_500us.bin')
elif args.camera_tuning_exposition == CAMERA_EXPOSITION_8300US:
pipeline.setCameraTuningBlobPath('/camera_tuning/tuning_exp_limit_8300us.bin')
stereo_filters = _get_stereo_filters(args)
pipeline_controller = cam.PipelineController(pipeline=pipeline,
frame_processor=frame_processor,
object_processor=object_processor, object_processor=object_processor,
object_node=cam.ObjectDetectionNN(pipeline=pipeline), object_node=cam.ObjectDetectionNN(pipeline=pipeline),
camera=cam.CameraSource(pipeline=pipeline, camera=cam.CameraSource(pipeline=pipeline,
img_width=args.image_width, img_width=args.image_width,
img_height=args.image_width, img_height=args.image_height,
)) fps=args.camera_fps,
),
depth_source=depth_source,
disparity_processor=disparity_processor)
def sigterm_handler(signum: int, frame: typing.Optional[ def sigterm_handler(signum: int, frame: typing.Optional[
types.FrameType]) -> None: # pylint: disable=unused-argument # need to implement handler signature types.FrameType]) -> None: # pylint: disable=unused-argument # need to implement handler signature
@ -119,3 +285,89 @@ def _get_env_int_value(env_var: str, default_value: int) -> int:
def _get_env_float_value(env_var: str, default_value: float) -> float: def _get_env_float_value(env_var: str, default_value: float) -> float:
value = _get_env_value(env_var, str(default_value)) value = _get_env_value(env_var, str(default_value))
return float(value) return float(value)
def _get_stereo_filters(args: argparse.Namespace) -> List[StereoDepthPostFilter]:
filters = []
if args.stereo_post_processing_median_filter:
if args.stereo_post_processing_median_value == "MEDIAN_OFF":
value = dai.MedianFilter.MEDIAN_OFF
elif args.stereo_post_processing_median_value == "KERNEL_3x3":
value = dai.MedianFilter.KERNEL_3x3
elif args.stereo_post_processing_median_value == "KERNEL_5x5":
value = dai.MedianFilter.KERNEL_5x5
elif args.stereo_post_processing_median_value == "KERNEL_7x7":
value = dai.MedianFilter.KERNEL_7x7
else:
value = dai.MedianFilter.KERNEL_7x7
filters.append(MedianFilter(value=value))
if args.stereo_post_processing_speckle_filter:
filters.append(SpeckleFilter(enable=args.stereo_post_processing_speckle_enable,
speckle_range=args.stereo_post_processing_speckle_range))
if args.stereo_post_processing_temporal_filter:
if args.stereo_post_processing_temporal_persistency-mode == "PERSISTENCY_OFF":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.PERSISTENCY_OFF
elif args.stereo_post_processing_temporal_persistency-mode == "VALID_8_OUT_OF_8":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_8_OUT_OF_8
elif args.stereo_post_processing_temporal_persistency-mode == "VALID_2_IN_LAST_3":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_2_IN_LAST_3
elif args.stereo_post_processing_temporal_persistency-mode == "VALID_2_IN_LAST_4":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_2_IN_LAST_4
elif args.stereo_post_processing_temporal_persistency-mode == "VALID_2_OUT_OF_8":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_2_OUT_OF_8
elif args.stereo_post_processing_temporal_persistency-mode == "VALID_1_IN_LAST_2":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_1_IN_LAST_2
elif args.stereo_post_processing_temporal_persistency-mode == "VALID_1_IN_LAST_5":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_1_IN_LAST_5
elif args.stereo_post_processing_temporal_persistency-mode == "VALID_1_IN_LAST_8":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_1_IN_LAST_8
elif args.stereo_post_processing_temporal_persistency-mode == "PERSISTENCY_INDEFINITELY":
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.PERSISTENCY_INDEFINITELY
else:
mode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_2_IN_LAST_4
filters.append(TemporalFilter(
enable=args.stereo_post_processing_temporal_enable,
persistencyMode=mode,
alpha=args.stereo_post_processing_temporal_alpha,
delta=args.stereo_post_processing_temporal_delta
))
if args.stereo_post_processing_spatial_filter:
filters.append(SpatialFilter(enable=args.stereo_post_processing_spatial_enable,
hole_filling_radius=args.stereo_post_processing_spatial_hole_filling_radius,
alpha=args.stereo_post_processing_spatial_alpha,
delta=args.stereo_post_processing_spatial_delta,
num_iterations=args.stereo_post_processing_spatial_num_iterations,
))
if args.stereo_post_processing_threshold_filter:
filters.append(ThresholdFilter(
min_range=args.stereo_post_processing_threshold_min_range,
max_range=args.stereo_post_processing_threshold_max_range,
))
if args.stereo_post_processing_decimation_filter:
if args.stereo_post_processing_decimation_mode == "PIXEL_SKIPPING":
mode=dai.RawStereoDepthConfig.PostProcessing.DecimationFilter.DecimationMode.PIXEL_SKIPPING
if args.stereo_post_processing_decimation_mode == "NON_ZERO_MEDIAN":
mode=dai.RawStereoDepthConfig.PostProcessing.DecimationFilter.DecimationMode.NON_ZERO_MEDIAN
if args.stereo_post_processing_decimation_mode == "NON_ZERO_MEAN":
mode=dai.RawStereoDepthConfig.PostProcessing.DecimationFilter.DecimationMode.NON_ZERO_MEAN
else:
mode=dai.RawStereoDepthConfig.PostProcessing.DecimationFilter.DecimationMode.PIXEL_SKIPPING
filters.append(DecimationFilter(
decimation_factor=args.stereo_post_processing_decimation_decimal_factor,
mode=mode
))
return filters
if __name__ == '__main__':
execute_from_command_line()

View File

@ -1,376 +0,0 @@
"""
Camera event loop
"""
import abc
import datetime
import logging
import pathlib
import typing
from dataclasses import dataclass
import cv2
import depthai as dai
import events.events_pb2 as evt
import numpy as np
import numpy.typing as npt
import paho.mqtt.client as mqtt
from depthai import Device
logger = logging.getLogger(__name__)
_NN_PATH = "/models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob"
_NN_WIDTH = 192
_NN_HEIGHT = 192
class ObjectProcessor:
"""
Processor for Object detection
"""
def __init__(self, mqtt_client: mqtt.Client, objects_topic: str, objects_threshold: float):
self._mqtt_client = mqtt_client
self._objects_topic = objects_topic
self._objects_threshold = objects_threshold
def process(self, in_nn: dai.NNData, frame_ref: evt.FrameRef) -> None:
"""
Parse and publish result of NeuralNetwork result
:param in_nn: NeuralNetwork result read from device
:param frame_ref: Id of the frame where objects are been detected
:return:
"""
detection_boxes = np.array(in_nn.getLayerFp16("ExpandDims")).reshape((100, 4))
detection_scores = np.array(in_nn.getLayerFp16("ExpandDims_2")).reshape((100,))
# keep boxes bigger than threshold
mask = detection_scores >= self._objects_threshold
boxes = detection_boxes[mask]
scores = detection_scores[mask]
if boxes.shape[0] > 0:
self._publish_objects(boxes, frame_ref, scores)
def _publish_objects(self, boxes: npt.NDArray[np.float64], frame_ref: evt.FrameRef, scores: npt.NDArray[np.float64]) -> None:
objects_msg = evt.ObjectsMessage()
objs = []
for i in range(boxes.shape[0]):
logger.debug("new object detected: %s", str(boxes[i]))
objs.append(_bbox_to_object(boxes[i], scores[i].astype(float)))
objects_msg.objects.extend(objs)
objects_msg.frame_ref.name = frame_ref.name
objects_msg.frame_ref.id = frame_ref.id
objects_msg.frame_ref.created_at.FromDatetime(frame_ref.created_at.ToDatetime())
logger.debug("publish object event to %s", self._objects_topic)
self._mqtt_client.publish(topic=self._objects_topic,
payload=objects_msg.SerializeToString(),
qos=0,
retain=False)
class FrameProcessError(Exception):
"""
Error base for invalid frame processing
Attributes:
message -- explanation of the error
"""
def __init__(self, message: str):
"""
:param message: explanation of the error
"""
self.message = message
class FrameProcessor:
"""
Processor for camera frames
"""
def __init__(self, mqtt_client: mqtt.Client, frame_topic: str):
self._mqtt_client = mqtt_client
self._frame_topic = frame_topic
def process(self, img: dai.ImgFrame) -> typing.Any:
"""
Publish camera frames
:param img: image read from camera
:return:
id frame reference
:raise:
FrameProcessError if frame can't be processed
"""
im_resize = img.getCvFrame()
is_success, im_buf_arr = cv2.imencode(".jpg", im_resize)
if not is_success:
raise FrameProcessError("unable to process to encode frame to jpg")
byte_im = im_buf_arr.tobytes()
now = datetime.datetime.now()
frame_msg = evt.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now)
frame_msg.frame = byte_im
logger.debug("publish frame event to %s", self._frame_topic)
self._mqtt_client.publish(topic=self._frame_topic,
payload=frame_msg.SerializeToString(),
qos=0,
retain=False)
return frame_msg.id
class Source(abc.ABC):
"""Base class for image source"""
@abc.abstractmethod
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: steam name
"""
@abc.abstractmethod
def link(self, input_node: dai.Node.Input) -> None:
"""
Link this source to the input node
:param: input_node: input node to link
"""
class ObjectDetectionNN:
"""
Node to detect objects into image
Read image as input and apply resize transformation before to run NN on it
Result is available with 'get_stream_name()' stream
"""
def __init__(self, pipeline: dai.Pipeline):
# Define a neural network that will make predictions based on the source frames
detection_nn = pipeline.createNeuralNetwork()
detection_nn.setBlobPath(pathlib.Path(_NN_PATH))
detection_nn.setNumPoolFrames(4)
detection_nn.input.setBlocking(False)
detection_nn.setNumInferenceThreads(2)
self._detection_nn = detection_nn
self._xout = self._configure_xout_nn(pipeline)
self._detection_nn.out.link(self._xout.input)
self._manip_image = self._configure_manip(pipeline)
@staticmethod
def _configure_manip(pipeline: dai.Pipeline) -> dai.node.ImageManip:
# Resize image
manip = pipeline.createImageManip()
manip.initialConfig.setResize(_NN_WIDTH, _NN_HEIGHT)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
manip.initialConfig.setKeepAspectRatio(False)
return manip
@staticmethod
def _configure_xout_nn(pipeline: dai.Pipeline) -> dai.node.XLinkOut:
xout_nn = pipeline.createXLinkOut()
xout_nn.setStreamName("nn")
xout_nn.input.setBlocking(False)
return xout_nn
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: stream name
"""
return self._xout.getStreamName()
def get_input(self) -> dai.Node.Input:
"""
Get input node to use to link with source node
:return: input to link with source output, see Source.link()
"""
return self._manip_image.inputImage
class CameraSource(Source):
"""Image source based on camera preview"""
def __init__(self, pipeline: dai.Pipeline, img_width: int, img_height: int):
cam_rgb = pipeline.createColorCamera()
xout_rgb = pipeline.createXLinkOut()
xout_rgb.setStreamName("rgb")
self._cam_rgb = cam_rgb
self._xout_rgb = xout_rgb
# Properties
cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
cam_rgb.setPreviewSize(width=img_width, height=img_height)
cam_rgb.setInterleaved(False)
cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
cam_rgb.setFps(30)
# link camera preview to output
cam_rgb.preview.link(xout_rgb.input)
def link(self, input_node: dai.Node.Input) -> None:
self._cam_rgb.preview.link(input_node)
def get_stream_name(self) -> str:
return self._xout_rgb.getStreamName()
@dataclass
class MqttConfig:
"""MQTT configuration"""
host: str
topic: str
port: int = 1883
qos: int = 0
class MqttSource(Source):
"""Image source based onto mqtt stream"""
def __init__(self, device: Device, pipeline: dai.Pipeline, mqtt_config: MqttConfig):
self._mqtt_config = mqtt_config
self._client = mqtt.Client()
self._client.user_data_set(mqtt_config)
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._img_in = pipeline.createXLinkIn()
self._img_in.setStreamName("img_input")
self._img_out = pipeline.createXLinkOut()
self._img_out.setStreamName("img_output")
self._img_in.out.link(self._img_out.input)
self._img_in_queue = device.getInputQueue(self._img_in.getStreamName())
def run(self) -> None:
""" Connect and start mqtt loop """
self._client.connect(host=self._mqtt_config.host, port=self._mqtt_config.port)
self._client.loop_start()
def stop(self) -> None:
"""Stop and disconnect mqtt loop"""
self._client.loop_stop()
self._client.disconnect()
@staticmethod
# pylint: disable=unused-argument
def _on_connect(client: mqtt.Client, userdata: MqttConfig, flags: typing.Any,
result_connection: typing.Any) -> None:
# if we lose the connection and reconnect then subscriptions will be renewed.
client.subscribe(topic=userdata.topic, qos=userdata.qos)
# pylint: disable=unused-argument
def _on_message(self, _: mqtt.Client, user_data: MqttConfig, msg: mqtt.MQTTMessage) -> None:
frame_msg = evt.FrameMessage()
frame_msg.ParseFromString(msg.payload)
frame = np.asarray(frame_msg.frame, dtype="uint8")
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
nn_data = dai.NNData()
nn_data.setLayer("data", _to_planar(frame, (300, 300)))
self._img_in_queue.send(nn_data)
def get_stream_name(self) -> str:
return self._img_out.getStreamName()
def link(self, input_node: dai.Node.Input) -> None:
self._img_in.out.link(input_node)
def _to_planar(arr: npt.NDArray[np.uint8], shape: tuple[int, int]) -> list[int]:
return [val for channel in cv2.resize(arr, shape).transpose(2, 0, 1) for y_col in channel for val in y_col]
class PipelineController:
"""
Pipeline controller that drive camera device
"""
def __init__(self, frame_processor: FrameProcessor,
object_processor: ObjectProcessor, camera: Source, object_node: ObjectDetectionNN):
self._pipeline = self._configure_pipeline()
self._frame_processor = frame_processor
self._object_processor = object_processor
self._camera = camera
self._object_node = object_node
self._stop = False
def _configure_pipeline(self) -> dai.Pipeline:
logger.info("configure pipeline")
pipeline = dai.Pipeline()
pipeline.setOpenVINOVersion(version=dai.OpenVINO.VERSION_2021_4)
# Link preview to manip and manip to nn
self._camera.link(self._object_node.get_input())
logger.info("pipeline configured")
return pipeline
def run(self) -> None:
"""
Start event loop
:return:
"""
# Connect to device and start pipeline
with Device(pipeline=self._pipeline) as dev:
logger.info('MxId: %s', dev.getDeviceInfo().getMxId())
logger.info('USB speed: %s', dev.getUsbSpeed())
logger.info('Connected cameras: %s', str(dev.getConnectedCameras()))
logger.info("output queues found: %s", str(''.join(dev.getOutputQueueNames()))) # type: ignore
dev.startPipeline()
# Queues
queue_size = 4
q_rgb = dev.getOutputQueue(name=self._camera.get_stream_name(), maxSize=queue_size, # type: ignore
blocking=False)
q_nn = dev.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, # type: ignore
blocking=False)
self._stop = False
while True:
if self._stop:
logger.info("stop loop event")
return
try:
self._loop_on_camera_events(q_nn, q_rgb)
# pylint: disable=broad-except # bad frame or event must not stop loop
except Exception as ex:
logger.exception("unexpected error: %s", str(ex))
def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue) -> None:
logger.debug("wait for new frame")
# Wait for frame
in_rgb: dai.ImgFrame = q_rgb.get() # type: ignore # blocking call, will wait until a new data has arrived
try:
frame_ref = self._frame_processor.process(in_rgb)
except FrameProcessError as ex:
logger.error("unable to process frame: %s", str(ex))
return
# Read NN result
in_nn: dai.NNData = q_nn.get() # type: ignore
self._object_processor.process(in_nn, frame_ref)
def stop(self) -> None:
"""
Stop event loop, if loop is not running, do nothing
:return:
"""
self._stop = True
def _bbox_to_object(bbox: npt.NDArray[np.float64], score: float) -> evt.Object:
obj = evt.Object()
obj.type = evt.TypeObject.ANY
obj.top = bbox[0].astype(float)
obj.right = bbox[3].astype(float)
obj.bottom = bbox[2].astype(float)
obj.left = bbox[1].astype(float)
obj.confidence = score
return obj

666
camera/oak_pipeline.py Normal file
View File

@ -0,0 +1,666 @@
"""
Camera event loop
"""
import abc
import datetime
import logging
import pathlib
import time
from dataclasses import dataclass
from typing import List, Any
import cv2
import depthai as dai
import events.events_pb2 as evt
import numpy as np
import numpy.typing as npt
import paho.mqtt.client as mqtt
from depthai import Device
logger = logging.getLogger(__name__)
_NN_PATH = "/models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob"
_NN_WIDTH = 192
_NN_HEIGHT = 192
_PREVIEW_WIDTH = 640
_PREVIEW_HEIGHT = 480
_CAMERA_BASELINE_IN_MM = 75
class ObjectProcessor:
"""
Processor for Object detection
"""
def __init__(self, mqtt_client: mqtt.Client, objects_topic: str, objects_threshold: float):
self._mqtt_client = mqtt_client
self._objects_topic = objects_topic
self._objects_threshold = objects_threshold
def process(self, in_nn: dai.NNData, frame_ref: evt.FrameRef) -> None:
"""
Parse and publish result of NeuralNetwork result
:param in_nn: NeuralNetwork result read from device
:param frame_ref: Id of the frame where objects are been detected
:return:
"""
detection_boxes = np.array(in_nn.getLayerFp16("ExpandDims")).reshape((100, 4))
detection_scores = np.array(in_nn.getLayerFp16("ExpandDims_2")).reshape((100,))
# keep boxes bigger than threshold
mask = detection_scores >= self._objects_threshold
boxes = detection_boxes[mask]
scores = detection_scores[mask]
if boxes.shape[0] > 0:
self._publish_objects(boxes, frame_ref, scores)
def _publish_objects(self, boxes: npt.NDArray[np.float64], frame_ref: evt.FrameRef, scores: npt.NDArray[np.float64]) -> None:
objects_msg = evt.ObjectsMessage()
objs = []
for i in range(boxes.shape[0]):
logger.debug("new object detected: %s", str(boxes[i]))
objs.append(_bbox_to_object(boxes[i], scores[i].astype(float)))
objects_msg.objects.extend(objs)
objects_msg.frame_ref.name = frame_ref.name
objects_msg.frame_ref.id = frame_ref.id
objects_msg.frame_ref.created_at.FromDatetime(frame_ref.created_at.ToDatetime())
logger.debug("publish object event to %s", self._objects_topic)
self._mqtt_client.publish(topic=self._objects_topic,
payload=objects_msg.SerializeToString(),
qos=0,
retain=False)
class FrameProcessError(Exception):
"""
Error base for invalid frame processing
Attributes:
message -- explanation of the error
"""
def __init__(self, message: str):
"""
:param message: explanation of the error
"""
self.message = message
class FrameProcessor:
"""
Processor for camera frames
"""
def __init__(self, mqtt_client: mqtt.Client, frame_topic: str):
self._mqtt_client = mqtt_client
self._frame_topic = frame_topic
def process(self, img: dai.ImgFrame) -> Any:
"""
Publish camera frames
:param img: image read from camera
:return:
id frame reference
:raise:
FrameProcessError if frame can't be processed
"""
im_resize = img.getCvFrame()
is_success, im_buf_arr = cv2.imencode(".jpg", im_resize)
if not is_success:
raise FrameProcessError("unable to process to encode frame to jpg")
byte_im = im_buf_arr.tobytes()
now = datetime.datetime.now()
frame_msg = evt.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now)
frame_msg.frame = byte_im
logger.debug("publish frame event to %s", self._frame_topic)
self._mqtt_client.publish(topic=self._frame_topic,
payload=frame_msg.SerializeToString(),
qos=0,
retain=False)
return frame_msg.id
class DisparityProcessor:
"""
Processor for camera frames
"""
def __init__(self, mqtt_client: mqtt.Client, disparity_topic: str):
self._mqtt_client = mqtt_client
self._disparity_topic = disparity_topic
def process(self, img: dai.ImgFrame, frame_ref: evt.FrameRef, focal_length_in_pixels: float,
baseline_mm: float = _CAMERA_BASELINE_IN_MM) -> None:
im_frame = img.getCvFrame()
is_success, im_buf_arr = cv2.imencode(".jpg", im_frame)
if not is_success:
raise FrameProcessError("unable to process to encode frame to jpg")
byte_im = im_buf_arr.tobytes()
disparity_msg = evt.DisparityMessage()
disparity_msg.disparity = byte_im
disparity_msg.frame_ref.name = frame_ref.name
disparity_msg.frame_ref.id = frame_ref.id
disparity_msg.frame_ref.created_at.FromDatetime(frame_ref.created_at.ToDatetime())
disparity_msg.focal_length_in_pixels = focal_length_in_pixels
disparity_msg.baseline_in_mm = baseline_mm
self._mqtt_client.publish(topic=self._disparity_topic,
payload=disparity_msg.SerializeToString(),
qos=0,
retain=False)
class Source(abc.ABC):
"""Base class for image source"""
@abc.abstractmethod
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: steam name
"""
@abc.abstractmethod
def link(self, input_node: dai.Node.Input) -> None:
"""
Link this source to the input node
:param: input_node: input node to link
"""
class ObjectDetectionNN:
"""
Node to detect objects into image
Read image as input and apply resize transformation before to run NN on it
Result is available with 'get_stream_name()' stream
"""
def __init__(self, pipeline: dai.Pipeline):
# Define a neural network that will make predictions based on the source frames
detection_nn = pipeline.createNeuralNetwork()
detection_nn.setBlobPath(pathlib.Path(_NN_PATH))
detection_nn.setNumPoolFrames(4)
detection_nn.input.setBlocking(False)
detection_nn.setNumInferenceThreads(2)
self._detection_nn = detection_nn
self._xout = self._configure_xout_nn(pipeline)
self._detection_nn.out.link(self._xout.input)
self._manip_image = self._configure_manip(pipeline)
self._manip_image.out.link(self._detection_nn.input)
@staticmethod
def _configure_manip(pipeline: dai.Pipeline) -> dai.node.ImageManip:
# Resize image
manip = pipeline.createImageManip()
manip.initialConfig.setResize(_NN_WIDTH, _NN_HEIGHT)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
manip.initialConfig.setKeepAspectRatio(False)
return manip
@staticmethod
def _configure_xout_nn(pipeline: dai.Pipeline) -> dai.node.XLinkOut:
xout_nn = pipeline.createXLinkOut()
xout_nn.setStreamName("nn")
xout_nn.input.setBlocking(False)
return xout_nn
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: stream name
"""
return self._xout.getStreamName()
def get_input(self) -> dai.Node.Input:
"""
Get input node to use to link with source node
:return: input to link with source output, see Source.link()
"""
return self._manip_image.inputImage
class CameraSource(Source):
"""Image source based on camera preview"""
def __init__(self, pipeline: dai.Pipeline, img_width: int, img_height: int, fps: int):
self._cam_rgb = pipeline.createColorCamera()
self._xout_rgb = pipeline.createXLinkOut()
self._xout_rgb.setStreamName("rgb")
# Properties
self._cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
self._cam_rgb.setPreviewSize(width=_PREVIEW_WIDTH, height=_PREVIEW_HEIGHT)
self._cam_rgb.setInterleaved(False)
self._cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
self._cam_rgb.setFps(fps)
self._resize_manip = self._configure_manip(pipeline=pipeline, img_width=img_width, img_height=img_height)
# link camera preview to output
self._cam_rgb.preview.link(self._resize_manip.inputImage)
self._resize_manip.out.link(self._xout_rgb.input)
def link(self, input_node: dai.Node.Input) -> None:
self._cam_rgb.preview.link(input_node)
def get_stream_name(self) -> str:
return self._xout_rgb.getStreamName()
@staticmethod
def _configure_manip(pipeline: dai.Pipeline, img_width: int, img_height: int) -> dai.node.ImageManip:
# Resize image
manip = pipeline.createImageManip()
manip.initialConfig.setResize(img_width, img_height)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
manip.initialConfig.setKeepAspectRatio(False)
return manip
class StereoDepthPostFilter(abc.ABC):
@abc.abstractmethod
def apply(self, config: dai.RawStereoDepthConfig) -> None:
pass
class MedianFilter(StereoDepthPostFilter):
"""
This is a non-edge preserving Median filter, which can be used to reduce noise and smoothen the depth map.
Median filter is implemented in hardware, so its the fastest filter.
"""
def __init__(self, value: dai.MedianFilter = dai.MedianFilter.KERNEL_7x7) -> None:
self._value = value
def apply(self, config: dai.RawStereoDepthConfig) -> None:
config.postProcessing.median.value = self._value
class SpeckleFilter(StereoDepthPostFilter):
"""
Speckle Filter is used to reduce the speckle noise. Speckle noise is a region with huge variance between
neighboring disparity/depth pixels, and speckle filter tries to filter this region.
"""
def __init__(self, enable: bool = True, speckle_range: int = 50) -> None:
"""
:param enable: Whether to enable or disable the filter.
:param speckle_range: Speckle search range.
"""
self._enable = enable
self._speckle_range = speckle_range
def apply(self, config: dai.RawStereoDepthConfig) -> None:
config.postProcessing.speckleFilter.enable = self._enable
config.postProcessing.speckleFilter.speckleRange = self._speckle_range
class TemporalFilter(StereoDepthPostFilter):
"""
Temporal Filter is intended to improve the depth data persistency by manipulating per-pixel values based on
previous frames. The filter performs a single pass on the data, adjusting the depth values while also updating the
tracking history. In cases where the pixel data is missing or invalid, the filter uses a user-defined persistency
mode to decide whether the missing value should be rectified with stored data. Note that due to its reliance on
historic data the filter may introduce visible blurring/smearing artifacts, and therefore is best-suited for
static scenes.
"""
def __init__(self,
enable: bool = True,
persistencyMode: dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode=dai.RawStereoDepthConfig.PostProcessing.TemporalFilter.PersistencyMode.VALID_2_IN_LAST_4,
alpha: float = 0.4,
delta: int = 0):
"""
:param enable: Whether to enable or disable the filter.
:param persistencyMode: Persistency mode. If the current disparity/depth value is invalid, it will be replaced
by an older value, based on persistency mode.
:param alpha: The Alpha factor in an exponential moving average with Alpha=1 - no filter.
Alpha = 0 - infinite filter. Determines the extent of the temporal history that should be averaged.
:param delta: Step-size boundary. Establishes the threshold used to preserve surfaces (edges).
If the disparity value between neighboring pixels exceed the disparity threshold set by this delta parameter,
then filtering will be temporarily disabled. Default value 0 means auto: 3 disparity integer levels.
In case of subpixel mode its 3*number of subpixel levels.
"""
self._enable = enable
self._persistencyMode = persistencyMode
self._alpha = alpha
self._delta = delta
def apply(self, config: dai.RawStereoDepthConfig) -> None:
config.postProcessing.temporalFilter.enable = self._enable
config.postProcessing.temporalFilter.persistencyMode = self._persistencyMode
config.postProcessing.temporalFilter.alpha = self._alpha
config.postProcessing.temporalFilter.delta = self._delta
class SpatialFilter(StereoDepthPostFilter):
"""
Spatial Edge-Preserving Filter will fill invalid depth pixels with valid neighboring depth pixels. It performs a
series of 1D horizontal and vertical passes or iterations, to enhance the smoothness of the reconstructed data.
"""
def __init__(self,
enable: bool = True,
hole_filling_radius: int = 2,
alpha: float = 0.5,
delta: int = 0,
num_iterations: int = 1):
"""
:param enable: Whether to enable or disable the filter.
:param hole_filling_radius: An in-place heuristic symmetric hole-filling mode applied horizontally during
the filter passes. Intended to rectify minor artefacts with minimal performance impact. Search radius for
hole filling.
:param alpha: The Alpha factor in an exponential moving average with Alpha=1 - no filter.
Alpha = 0 - infinite filter. Determines the amount of smoothing.
:param delta: Step-size boundary. Establishes the threshold used to preserve edges. If the disparity value
between neighboring pixels exceed the disparity threshold set by this delta parameter, then filtering will be
temporarily disabled. Default value 0 means auto: 3 disparity integer levels. In case of subpixel mode its
3*number of subpixel levels.
:param num_iterations: Number of iterations over the image in both horizontal and vertical direction.
"""
self._enable = enable
self._hole_filling_radius = hole_filling_radius
self._alpha = alpha
self._delta = delta
self._num_iterations = num_iterations
def apply(self, config: dai.RawStereoDepthConfig) -> None:
config.postProcessing.spatialFilter.enable = self._enable
config.postProcessing.spatialFilter.holeFillingRadius = self._hole_filling_radius
config.postProcessing.spatialFilter.alpha = self._alpha
config.postProcessing.spatialFilter.delta = self._delta
config.postProcessing.spatialFilter.numIterations = self._num_iterations
class ThresholdFilter(StereoDepthPostFilter):
"""
Threshold Filter filters out all disparity/depth pixels outside the configured min/max threshold values.
"""
def __init__(self, min_range: int = 400, max_range: int = 15000):
"""
:param min_range: Minimum range in depth units. Depth values under this value are invalidated.
:param max_range: Maximum range in depth units. Depth values over this value are invalidated.
"""
self._min_range = min_range
self._max_range = max_range
def apply(self, config: dai.RawStereoDepthConfig) -> None:
config.postProcessing.thresholdFilter.minRange = self._min_range
config.postProcessing.thresholdFilter.maxRange = self._max_range
class DecimationFilter(StereoDepthPostFilter):
"""
Decimation Filter will sub-samples the depth map, which means it reduces the depth scene complexity and allows
other filters to run faster. Setting decimationFactor to 2 will downscale 1280x800 depth map to 640x400.
"""
def __init__(self,
decimation_factor: int = 1,
decimation_mode: dai.RawStereoDepthConfig.PostProcessing.DecimationFilter.DecimationMode = dai.RawStereoDepthConfig.PostProcessing.DecimationFilter.DecimationMode.PIXEL_SKIPPING
):
"""
:param decimation_factor: Decimation factor. Valid values are 1,2,3,4. Disparity/depth map x/y resolution will
be decimated with this value.
:param decimation_mode: Decimation algorithm type.
"""
self._decimation_factor = decimation_factor
self._mode = decimation_mode
def apply(self, config: dai.RawStereoDepthConfig) -> None:
config.postProcessing.decimationFilter.decimationFactor = self._decimation_factor
config.postProcessing.decimationFilter.decimationMode = self._mode
class DepthSource(Source):
def __init__(self, pipeline: dai.Pipeline,
extended_disparity: bool = False,
subpixel: bool = False,
lr_check: bool = True,
stereo_filters: List[StereoDepthPostFilter] = []
) -> None:
"""
# Closer-in minimum depth, disparity range is doubled (from 95 to 190):
extended_disparity = False
# Better accuracy for longer distance, fractional disparity 32-levels:
subpixel = False
# Better handling for occlusions:
lr_check = True
"""
self._monoLeft = pipeline.create(dai.node.MonoCamera)
self._monoRight = pipeline.create(dai.node.MonoCamera)
self._depth = pipeline.create(dai.node.StereoDepth)
self._xout_disparity = pipeline.create(dai.node.XLinkOut)
self._xout_disparity.setStreamName("disparity")
# Properties
self._monoLeft.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
self._monoLeft.setCamera("left")
self._monoLeft.out.link(self._depth.left)
self._monoRight.setResolution(dai.MonoCameraProperties.SensorResolution.THE_400_P)
self._monoRight.setCamera("right")
self._monoRight.out.link(self._depth.right)
# Create a node that will produce the depth map
# (using disparity output as it's easier to visualize depth this way)
self._depth.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DENSITY)
# Options: MEDIAN_OFF, KERNEL_3x3, KERNEL_5x5, KERNEL_7x7 (default)
self._depth.initialConfig.setMedianFilter(dai.MedianFilter.KERNEL_7x7)
self._depth.setLeftRightCheck(lr_check)
self._depth.setExtendedDisparity(extended_disparity)
self._depth.setSubpixel(subpixel)
self._depth.disparity.link(self._xout_disparity.input)
if len(stereo_filters) > 0:
# Configure post-processing filters
config = self._depth.initialConfig.get()
for filter in stereo_filters:
filter.apply(config)
self._depth.initialConfig.set(config)
def get_stream_name(self) -> str:
return self._xout_disparity.getStreamName()
def link(self, input_node: dai.Node.Input) -> None:
self._depth.disparity.link(input_node)
@dataclass
class MqttConfig:
"""MQTT configuration"""
host: str
topic: str
port: int = 1883
qos: int = 0
class MqttSource(Source):
"""Image source based onto mqtt stream"""
def __init__(self, device: Device, pipeline: dai.Pipeline, mqtt_config: MqttConfig):
self._mqtt_config = mqtt_config
self._client = mqtt.Client()
self._client.user_data_set(mqtt_config)
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._img_in = pipeline.createXLinkIn()
self._img_in.setStreamName("img_input")
self._img_out = pipeline.createXLinkOut()
self._img_out.setStreamName("img_output")
self._img_in.out.link(self._img_out.input)
self._img_in_queue = device.getInputQueue(self._img_in.getStreamName())
def run(self) -> None:
""" Connect and start mqtt loop """
self._client.connect(host=self._mqtt_config.host, port=self._mqtt_config.port)
self._client.loop_start()
def stop(self) -> None:
"""Stop and disconnect mqtt loop"""
self._client.loop_stop()
self._client.disconnect()
@staticmethod
# pylint: disable=unused-argument
def _on_connect(client: mqtt.Client, userdata: MqttConfig, flags: Any,
result_connection: Any) -> None:
# if we lose the connection and reconnect then subscriptions will be renewed.
client.subscribe(topic=userdata.topic, qos=userdata.qos)
# pylint: disable=unused-argument
def _on_message(self, _: mqtt.Client, user_data: MqttConfig, msg: mqtt.MQTTMessage) -> None:
frame_msg = evt.FrameMessage()
frame_msg.ParseFromString(msg.payload)
frame = np.asarray(frame_msg.frame, dtype="uint8")
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
nn_data = dai.NNData()
nn_data.setLayer("data", _to_planar(frame, (300, 300)))
self._img_in_queue.send(nn_data)
def get_stream_name(self) -> str:
return self._img_out.getStreamName()
def link(self, input_node: dai.Node.Input) -> None:
self._img_in.out.link(input_node)
def _to_planar(arr: npt.NDArray[np.uint8], shape: tuple[int, int]) -> list[int]:
return [val for channel in cv2.resize(arr, shape).transpose(2, 0, 1) for y_col in channel for val in y_col]
class PipelineController:
"""
Pipeline controller that drive camera device
"""
def __init__(self, frame_processor: FrameProcessor,
object_processor: ObjectProcessor, disparity_processor: DisparityProcessor,
camera: Source, depth_source: Source, object_node: ObjectDetectionNN,
pipeline: dai.Pipeline):
self._frame_processor = frame_processor
self._object_processor = object_processor
self._disparity_processor = disparity_processor
self._camera = camera
self._depth_source = depth_source
self._object_node = object_node
self._stop = False
self._pipeline = pipeline
self._configure_pipeline()
self._focal_length_in_pixels: float | None = None
def _configure_pipeline(self) -> None:
logger.info("configure pipeline")
self._pipeline.setOpenVINOVersion(version=dai.OpenVINO.VERSION_2021_4)
# Link preview to manip and manip to nn
self._camera.link(self._object_node.get_input())
logger.info("pipeline configured")
def run(self) -> None:
"""
Start event loop
:return:
"""
# Connect to device and start pipeline
with Device(pipeline=self._pipeline) as dev:
logger.info('MxId: %s', dev.getDeviceInfo().getMxId())
logger.info('USB speed: %s', dev.getUsbSpeed())
logger.info('Connected cameras: %s', str(dev.getConnectedCameras()))
logger.info("output queues found: %s", str(''.join(dev.getOutputQueueNames()))) # type: ignore
calib_data = dev.readCalibration()
intrinsics = calib_data.getCameraIntrinsics(dai.CameraBoardSocket.CAM_C)
self._focal_length_in_pixels = intrinsics[0][0]
logger.info('Right mono camera focal length in pixels: %s', self._focal_length_in_pixels)
dev.startPipeline()
# Queues
queue_size = 4
q_rgb = dev.getOutputQueue(name=self._camera.get_stream_name(), maxSize=queue_size, # type: ignore
blocking=False)
q_nn = dev.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, # type: ignore
blocking=False)
if self._disparity_processor is not None:
q_disparity = dev.getOutputQueue(name=self._depth_source.get_stream_name(), maxSize=queue_size, # type: ignore
blocking=False)
else:
q_disparity = None
start_time = time.time()
counter = 0
fps = 0
display_time = time.time()
self._stop = False
while True:
if self._stop:
logger.info("stop loop event")
return
try:
self._loop_on_camera_events(q_nn, q_rgb, q_disparity)
# pylint: disable=broad-except # bad frame or event must not stop loop
except Exception as ex:
logger.exception("unexpected error: %s", str(ex))
counter += 1
if (time.time() - start_time) > 1:
fps = counter / (time.time() - start_time)
counter = 0
start_time = time.time()
if (time.time() - display_time) >= 10:
display_time = time.time()
logger.info("fps: %s", fps)
def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue, q_disparity: dai.DataOutputQueue) -> None:
logger.debug("wait for new frame")
# Wait for frame
in_rgb: dai.ImgFrame = q_rgb.get() # type: ignore # blocking call, will wait until a new data has arrived
try:
logger.debug("process frame")
frame_ref = self._frame_processor.process(in_rgb)
except FrameProcessError as ex:
logger.error("unable to process frame: %s", str(ex))
return
logger.debug("frame processed")
logger.debug("wait for nn response")
# Read NN result
in_nn: dai.NNData = q_nn.get() # type: ignore
logger.debug("process objects")
self._object_processor.process(in_nn, frame_ref)
logger.debug("objects processed")
logger.debug("process disparity")
if self._disparity_processor is not None:
in_disparity: dai.ImgFrame = q_disparity.get() # type: ignore
self._disparity_processor.process(in_disparity, frame_ref=frame_ref,
focal_length_in_pixels=self._focal_length_in_pixels)
logger.debug("disparity processed")
def stop(self) -> None:
"""
Stop event loop, if loop is not running, do nothing
:return:
"""
self._stop = True
def _bbox_to_object(bbox: npt.NDArray[np.float64], score: float) -> evt.Object:
obj = evt.Object()
obj.type = evt.TypeObject.ANY
obj.top = bbox[0].astype(float)
obj.right = bbox[3].astype(float)
obj.bottom = bbox[2].astype(float)
obj.left = bbox[1].astype(float)
obj.confidence = score
return obj

View File

@ -10,7 +10,7 @@ import paho.mqtt.client as mqtt
import pytest import pytest
import pytest_mock import pytest_mock
import camera.depthai from camera.oak_pipeline import DisparityProcessor, ObjectProcessor, FrameProcessor, FrameProcessError
Object = dict[str, float] Object = dict[str, float]
@ -76,16 +76,16 @@ class TestObjectProcessor:
return m return m
@pytest.fixture @pytest.fixture
def object_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.ObjectProcessor: def object_processor(self, mqtt_client: mqtt.Client) -> ObjectProcessor:
return camera.depthai.ObjectProcessor(mqtt_client, "topic/object", 0.2) return ObjectProcessor(mqtt_client, "topic/object", 0.2)
def test_process_without_object(self, object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client, def test_process_without_object(self, object_processor: ObjectProcessor, mqtt_client: mqtt.Client,
raw_objects_empty: dai.NNData, frame_ref: events.events_pb2.FrameRef) -> None: raw_objects_empty: dai.NNData, frame_ref: events.events_pb2.FrameRef) -> None:
object_processor.process(raw_objects_empty, frame_ref) object_processor.process(raw_objects_empty, frame_ref)
publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore publish_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
publish_mock.assert_not_called() publish_mock.assert_not_called()
def test_process_with_object_with_low_score(self, object_processor: camera.depthai.ObjectProcessor, def test_process_with_object_with_low_score(self, object_processor: ObjectProcessor,
mqtt_client: mqtt.Client, raw_objects_one: dai.NNData, mqtt_client: mqtt.Client, raw_objects_one: dai.NNData,
frame_ref: events.events_pb2.FrameRef) -> None: frame_ref: events.events_pb2.FrameRef) -> None:
object_processor._objects_threshold = 0.9 object_processor._objects_threshold = 0.9
@ -94,7 +94,7 @@ class TestObjectProcessor:
publish_mock.assert_not_called() publish_mock.assert_not_called()
def test_process_with_one_object(self, def test_process_with_one_object(self,
object_processor: camera.depthai.ObjectProcessor, mqtt_client: mqtt.Client, object_processor: ObjectProcessor, mqtt_client: mqtt.Client,
raw_objects_one: dai.NNData, frame_ref: events.events_pb2.FrameRef, raw_objects_one: dai.NNData, frame_ref: events.events_pb2.FrameRef,
object1: Object) -> None: object1: Object) -> None:
object_processor.process(raw_objects_one, frame_ref) object_processor.process(raw_objects_one, frame_ref)
@ -120,10 +120,10 @@ class TestObjectProcessor:
class TestFrameProcessor: class TestFrameProcessor:
@pytest.fixture @pytest.fixture
def frame_processor(self, mqtt_client: mqtt.Client) -> camera.depthai.FrameProcessor: def frame_processor(self, mqtt_client: mqtt.Client) -> FrameProcessor:
return camera.depthai.FrameProcessor(mqtt_client, "topic/frame") return FrameProcessor(mqtt_client, "topic/frame")
def test_process(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture, def test_process(self, frame_processor: FrameProcessor, mocker: pytest_mock.MockerFixture,
mqtt_client: mqtt.Client) -> None: mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock() img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content")) mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
@ -145,12 +145,57 @@ class TestFrameProcessor:
assert now - datetime.timedelta( assert now - datetime.timedelta(
milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10) milliseconds=10) < frame_msg.id.created_at.ToDatetime() < now + datetime.timedelta(milliseconds=10)
def test_process_error(self, frame_processor: camera.depthai.FrameProcessor, mocker: pytest_mock.MockerFixture, def test_process_error(self, frame_processor: FrameProcessor, mocker: pytest_mock.MockerFixture,
mqtt_client: mqtt.Client) -> None: mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock() img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (False, None) mocker.patch(target="cv2.imencode").return_value = (False, None)
with pytest.raises(camera.depthai.FrameProcessError) as ex: with pytest.raises(FrameProcessError) as ex:
_ = frame_processor.process(img) _ = frame_processor.process(img)
exception_raised = ex.value exception_raised = ex.value
assert exception_raised.message == "unable to process to encode frame to jpg" assert exception_raised.message == "unable to process to encode frame to jpg"
class TestDisparityProcessor:
@pytest.fixture
def frame_ref(self) -> events.events_pb2.FrameRef:
now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now)
return frame_msg.id
@pytest.fixture
def disparity_processor(self, mqtt_client: mqtt.Client) -> DisparityProcessor:
return DisparityProcessor(mqtt_client, "topic/disparity")
def test_process(self, disparity_processor: DisparityProcessor, mocker: pytest_mock.MockerFixture,
frame_ref: events.events_pb2.FrameRef,
mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (True, np.array(b"img content"))
disparity_processor.process(img, frame_ref, 42)
pub_mock: unittest.mock.MagicMock = mqtt_client.publish # type: ignore
pub_mock.assert_called_once_with(payload=unittest.mock.ANY, qos=0, retain=False, topic="topic/disparity")
payload = pub_mock.call_args.kwargs['payload']
disparity_msg = events.events_pb2.DisparityMessage()
disparity_msg.ParseFromString(payload)
assert disparity_msg.frame_ref == frame_ref
assert disparity_msg.disparity == b"img content"
assert disparity_msg.focal_length_in_pixels == 42
assert disparity_msg.baseline_in_mm == 75
def test_process_error(self, disparity_processor: DisparityProcessor, mocker: pytest_mock.MockerFixture,
frame_ref: events.events_pb2.FrameRef,
mqtt_client: mqtt.Client) -> None:
img: dai.ImgFrame = mocker.MagicMock()
mocker.patch(target="cv2.imencode").return_value = (False, None)
with pytest.raises(FrameProcessError) as ex:
disparity_processor.process(img, frame_ref, 42)
exception_raised = ex.value
assert exception_raised.message == "unable to process to encode frame to jpg"

Binary file not shown.

Binary file not shown.

2083
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,15 +9,14 @@ packages = [
] ]
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.10" python = "^3.12"
paho-mqtt = "^1.6.1" paho-mqtt = "^1.6"
depthai = "^2.17.4.0" depthai = "^2"
protobuf3 = "^0.2.1" protobuf3 = "^0.2.1"
google = "^3.0.0" google = "^3.0.0"
blobconverter = "^1.3.0" protobuf = "^4.21"
protobuf = "^4.21.8" opencv-python-headless = "^4.6.0"
opencv-python-headless = "^4.6.0.66" robocar-protobuf = {version = "^1.6", source = "robocar"}
robocar-protobuf = { version = "^1.1.1", source = "robocar" }
[tool.poetry.group.test.dependencies] [tool.poetry.group.test.dependencies]
@ -35,8 +34,7 @@ types-protobuf = "^3.20.4.2"
[[tool.poetry.source]] [[tool.poetry.source]]
name = "robocar" name = "robocar"
url = "https://git.cyrilix.bzh/api/packages/robocars/pypi/simple" url = "https://git.cyrilix.bzh/api/packages/robocars/pypi/simple"
default = false priority = "explicit"
secondary = false
[build-system] [build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"] requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning"]