robocar-oak-camera/camera/depthai.py

373 lines
12 KiB
Python

"""
Camera event loop
"""
import abc
import datetime
import logging
import typing
from dataclasses import dataclass
import cv2
import depthai as dai
import events.events_pb2
import numpy as np
import paho.mqtt.client as mqtt
logger = logging.getLogger(__name__)
_NN_PATH = "/models/mobile_object_localizer_192x192_openvino_2021.4_6shave.blob"
_NN_WIDTH = 192
_NN_HEIGHT = 192
class ObjectProcessor:
"""
Processor for Object detection
"""
def __init__(self, mqtt_client: mqtt.Client, objects_topic: str, objects_threshold: float):
self._mqtt_client = mqtt_client
self._objects_topic = objects_topic
self._objects_threshold = objects_threshold
def process(self, in_nn: dai.NNData, frame_ref) -> None:
"""
Parse and publish result of NeuralNetwork result
:param in_nn: NeuralNetwork result read from device
:param frame_ref: Id of the frame where objects are been detected
:return:
"""
detection_boxes = np.array(in_nn.getLayerFp16("ExpandDims")).reshape((100, 4))
detection_scores = np.array(in_nn.getLayerFp16("ExpandDims_2")).reshape((100,))
# keep boxes bigger than threshold
mask = detection_scores >= self._objects_threshold
boxes = detection_boxes[mask]
scores = detection_scores[mask]
if boxes.shape[0] > 0:
self._publish_objects(boxes, frame_ref, scores)
def _publish_objects(self, boxes: np.array, frame_ref, scores: np.array) -> None:
objects_msg = events.events_pb2.ObjectsMessage()
objs = []
for i in range(boxes.shape[0]):
logger.debug("new object detected: %s", str(boxes[i]))
objs.append(_bbox_to_object(boxes[i], scores[i].astype(float)))
objects_msg.objects.extend(objs)
objects_msg.frame_ref.name = frame_ref.name
objects_msg.frame_ref.id = frame_ref.id
objects_msg.frame_ref.created_at.FromDatetime(frame_ref.created_at.ToDatetime())
logger.debug("publish object event to %s", self._objects_topic)
self._mqtt_client.publish(topic=self._objects_topic,
payload=objects_msg.SerializeToString(),
qos=0,
retain=False)
class FrameProcessError(Exception):
"""
Error base for invalid frame processing
Attributes:
message -- explanation of the error
"""
def __init__(self, message: str):
"""
:param message: explanation of the error
"""
self.message = message
class FrameProcessor:
"""
Processor for camera frames
"""
def __init__(self, mqtt_client: mqtt.Client, frame_topic: str):
self._mqtt_client = mqtt_client
self._frame_topic = frame_topic
def process(self, img: dai.ImgFrame) -> typing.Any:
"""
Publish camera frames
:param img: image read from camera
:return:
id frame reference
:raise:
FrameProcessError if frame can't be processed
"""
im_resize = img.getCvFrame()
is_success, im_buf_arr = cv2.imencode(".jpg", im_resize)
if not is_success:
raise FrameProcessError("unable to process to encode frame to jpg")
byte_im = im_buf_arr.tobytes()
now = datetime.datetime.now()
frame_msg = events.events_pb2.FrameMessage()
frame_msg.id.name = "robocar-oak-camera-oak"
frame_msg.id.id = str(int(now.timestamp() * 1000))
frame_msg.id.created_at.FromDatetime(now)
frame_msg.frame = byte_im
logger.debug("publish frame event to %s", self._frame_topic)
self._mqtt_client.publish(topic=self._frame_topic,
payload=frame_msg.SerializeToString(),
qos=0,
retain=False)
return frame_msg.id
class Source(abc.ABC):
"""Base class for image source"""
@abc.abstractmethod
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: steam name
"""
@abc.abstractmethod
def link(self, input_node: dai.Node.Input) -> None:
"""
Link this source to the input node
:param: input_node: input node to link
"""
class ObjectDetectionNN:
"""
Node to detect objects into image
Read image as input and apply resize transformation before to run NN on it
Result is available with 'get_stream_name()' stream
"""
def __init__(self, pipeline: dai.Pipeline):
# Define a neural network that will make predictions based on the source frames
detection_nn = pipeline.createNeuralNetwork()
detection_nn.setBlobPath(_NN_PATH)
detection_nn.setNumPoolFrames(4)
detection_nn.input.setBlocking(False)
detection_nn.setNumInferenceThreads(2)
self._detection_nn = detection_nn
self._xout = self._configure_xout_nn(pipeline)
self._detection_nn.out.link(self._xout.input)
self._manip_image = self._configure_manip(pipeline)
@staticmethod
def _configure_manip(pipeline: dai.Pipeline) -> dai.node.ImageManip:
# Resize image
manip = pipeline.createImageManip()
manip.initialConfig.setResize(_NN_WIDTH, _NN_HEIGHT)
manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
manip.initialConfig.setKeepAspectRatio(False)
return manip
@staticmethod
def _configure_xout_nn(pipeline: dai.Pipeline) -> dai.node.XLinkOut:
xout_nn = pipeline.createXLinkOut()
xout_nn.setStreamName("nn")
xout_nn.input.setBlocking(False)
return xout_nn
def get_stream_name(self) -> str:
"""
Queue/stream name to use to get data
:return: stream name
"""
return self._xout.getStreamName()
def get_input(self) -> dai.Node.Input:
"""
Get input node to use to link with source node
:return: input to link with source output, see Source.link()
"""
return self._manip_image.inputImage
class CameraSource(Source):
"""Image source based on camera preview"""
def __init__(self, pipeline: dai.Pipeline, img_width: int, img_height: int):
cam_rgb = pipeline.createColorCamera()
xout_rgb = pipeline.createXLinkOut()
xout_rgb.setStreamName("rgb")
self._cam_rgb = cam_rgb
self._xout_rgb = xout_rgb
# Properties
cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
cam_rgb.setPreviewSize(width=img_width, height=img_height)
cam_rgb.setInterleaved(False)
cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
cam_rgb.setFps(30)
# link camera preview to output
cam_rgb.preview.link(xout_rgb.input)
def link(self, input_node: dai.Node.Input) -> None:
self._cam_rgb.preview.link(input_node)
def get_stream_name(self) -> str:
return self._xout_rgb.getStreamName()
@dataclass
class MqttConfig:
"""MQTT configuration"""
host: str
topic: str
port: int = 1883
qos: int = 0
class MqttSource(Source):
"""Image source based onto mqtt stream"""
def __init__(self, device: dai.Device, pipeline: dai.Pipeline, mqtt_config: MqttConfig):
self._mqtt_config = mqtt_config
self._client = mqtt.Client()
self._client.user_data_set(mqtt_config)
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._img_in = pipeline.createXLinkIn()
self._img_in.setStreamName("img_input")
self._img_out = pipeline.createXLinkOut()
self._img_out.setStreamName("img_output")
self._img_in.out.link(self._img_out.input)
self._img_in_queue = device.getInputQueue(self._img_in.getStreamName())
def run(self) -> None:
""" Connect and start mqtt loop """
self._client.connect(host=self._mqtt_config.host, port=self._mqtt_config.port)
self._client.loop_start()
def stop(self) -> None:
"""Stop and disconnect mqtt loop"""
self._client.loop_stop()
self._client.disconnect()
@staticmethod
# pylint: disable=unused-argument
def _on_connect(client: mqtt.Client, userdata: MqttConfig, flags: typing.Any,
result_connection: typing.Any) -> None:
# if we lose the connection and reconnect then subscriptions will be renewed.
client.subscribe(topic=userdata.topic, qos=userdata.qos)
# pylint: disable=unused-argument
def _on_message(self, _: mqtt.Client, user_data: MqttConfig, msg: mqtt.MQTTMessage) -> None:
frame_msg = events.events_pb2.FrameMessage()
frame_msg.ParseFromString(msg.payload)
frame = np.asarray(frame_msg.frame, dtype="uint8")
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
nn_data = dai.NNData()
nn_data.setLayer("data", _to_planar(frame, (300, 300)))
self._img_in_queue.send(nn_data)
def get_stream_name(self) -> str:
return self._img_out.getStreamName()
def link_preview(self, input_node: dai.Node.Input):
self._img_in.out.link(input_node)
def _to_planar(arr: np.ndarray, shape: tuple) -> list:
return [val for channel in cv2.resize(arr, shape).transpose(2, 0, 1) for y_col in channel for val in y_col]
class PipelineController:
"""
Pipeline controller that drive camera device
"""
def __init__(self, img_width: int, img_height: int, frame_processor: FrameProcessor,
object_processor: ObjectProcessor, camera: Source, object_node: ObjectDetectionNN):
self._pipeline = self._configure_pipeline()
self._frame_processor = frame_processor
self._object_processor = object_processor
self._camera = camera
self._object_node = object_node
self._stop = False
def _configure_pipeline(self) -> dai.Pipeline:
logger.info("configure pipeline")
pipeline = dai.Pipeline()
pipeline.setOpenVINOVersion(version=dai.OpenVINO.VERSION_2021_4)
# Link preview to manip and manip to nn
self._camera.link(self._object_node.get_input())
logger.info("pipeline configured")
return pipeline
def run(self) -> None:
"""
Start event loop
:return:
"""
# Connect to device and start pipeline
with dai.Device(pipeline=self._pipeline) as device:
logger.info('MxId: %s', device.getDeviceInfo().getMxId())
logger.info('USB speed: %s', device.getUsbSpeed())
logger.info('Connected cameras: %s', device.getConnectedCameras())
logger.info("output queues found: %s", device.getOutputQueueNames())
device.startPipeline()
# Queues
queue_size = 4
q_rgb = device.getOutputQueue(name=self._camera.get_stream_name(), maxSize=queue_size, blocking=False)
q_nn = device.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, blocking=False)
self._stop = False
while True:
if self._stop:
logger.info("stop loop event")
return
try:
self._loop_on_camera_events(q_nn, q_rgb)
# pylint: disable=broad-except # bad frame or event must not stop loop
except Exception as ex:
logger.exception("unexpected error: %s", str(ex))
def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue) -> None:
logger.debug("wait for new frame")
# Wait for frame
in_rgb: dai.ImgFrame = q_rgb.get() # blocking call, will wait until a new data has arrived
try:
frame_ref = self._frame_processor.process(in_rgb)
except FrameProcessError as ex:
logger.error("unable to process frame: %s", str(ex))
return
# Read NN result
in_nn: dai.NNData = q_nn.get()
self._object_processor.process(in_nn, frame_ref)
def stop(self) -> None:
"""
Stop event loop, if loop is not running, do nothing
:return:
"""
self._stop = True
def _bbox_to_object(bbox: np.array, score: float) -> events.events_pb2.Object:
obj = events.events_pb2.Object()
obj.type = events.events_pb2.TypeObject.ANY
obj.top = bbox[0].astype(float)
obj.right = bbox[3].astype(float)
obj.bottom = bbox[2].astype(float)
obj.left = bbox[1].astype(float)
obj.confidence = score
return obj