From b50b54be342e932bdaa2d652b35c1a2f6773b23c Mon Sep 17 00:00:00 2001 From: Cyrille Nofficial Date: Thu, 20 Oct 2022 16:02:24 +0200 Subject: [PATCH] refactor: split event loop process --- camera/depthai.py | 135 +++++++++++++++++++++++++--------------------- 1 file changed, 75 insertions(+), 60 deletions(-) diff --git a/camera/depthai.py b/camera/depthai.py index 8953efd..b05b005 100644 --- a/camera/depthai.py +++ b/camera/depthai.py @@ -103,67 +103,63 @@ class FramePublisher: if self._stop: return try: - logger.debug("wait for new frame") - in_rgb = q_rgb.get() # blocking call, will wait until a new data has arrived - - im_resize = in_rgb.getCvFrame() - - is_success, im_buf_arr = cv2.imencode(".jpg", im_resize) - byte_im = im_buf_arr.tobytes() - - now = datetime.datetime.now() - frame_msg = events.events_pb2.FrameMessage() - frame_msg.id.name = "robocar-oak-camera-oak" - frame_msg.id.id = str(int(now.timestamp() * 1000)) - frame_msg.id.created_at.FromDatetime(now) - frame_msg.frame = byte_im - - logger.debug("publish frame event to %s", self._frame_topic) - self._mqtt_client.publish(topic=self._frame_topic, - payload=frame_msg.SerializeToString(), - qos=0, - retain=False) - - in_nn = q_nn.get() - - # get outputs - detection_boxes = np.array(in_nn.getLayerFp16("ExpandDims")).reshape((100, 4)) - detection_scores = np.array(in_nn.getLayerFp16("ExpandDims_2")).reshape((100,)) - - # keep boxes bigger than threshold - mask = detection_scores >= self._objects_threshold - boxes = detection_boxes[mask] - scores = detection_scores[mask] - - if boxes.shape[0] > 0: - objects_msg = events.events_pb2.ObjectsMessage() - objs = [] - for i in range(boxes.shape[0]): - bbox = boxes[i] - logger.debug("new object detected: %s", str(bbox)) - obj = events.events_pb2.Object() - obj.type = events.events_pb2.TypeObject.ANY - obj.top = bbox[0].astype(float) - obj.right = bbox[3].astype(float) - obj.bottom = bbox[2].astype(float) - obj.left = bbox[1].astype(float) - obj.confidence = scores[i].astype(float) - objs.append(obj) - objects_msg.objects.extend(objs) - - objects_msg.frame_ref.name = frame_msg.id.name - objects_msg.frame_ref.id = frame_msg.id.id - objects_msg.frame_ref.created_at.FromDatetime(now) - - logger.debug("publish object event to %s", self._frame_topic) - self._mqtt_client.publish(topic=self._objects_topic, - payload=objects_msg.SerializeToString(), - qos=0, - retain=False) - + self._loop_on_camera_events(q_nn, q_rgb) # pylint: disable=broad-except # bad frame or event must not stop loop - except Exception as excpt: - logger.exception("unexpected error: %s", str(excpt)) + except Exception as ex: + logger.exception("unexpected error: %s", str(ex)) + + def _loop_on_camera_events(self, q_nn: dai.DataOutputQueue, q_rgb: dai.DataOutputQueue): + logger.debug("wait for new frame") + + # Wait for frame + in_rgb = _get_as_imgframe(q_rgb) # blocking call, will wait until a new data has arrived + frame_msg, now = self._read_and_publish_frame(in_rgb) + + # Read NN result + in_nn = _get_as_nndata(q_nn) + # get outputs + detection_boxes = np.array(in_nn.getLayerFp16("ExpandDims")).reshape((100, 4)) + detection_scores = np.array(in_nn.getLayerFp16("ExpandDims_2")).reshape((100,)) + # keep boxes bigger than threshold + mask = detection_scores >= self._objects_threshold + boxes = detection_boxes[mask] + scores = detection_scores[mask] + + if boxes.shape[0] > 0: + self._publish_objects(boxes, frame_msg, now, scores) + + def _read_and_publish_frame(self, in_rgb: dai.ImgFrame) -> (events.events_pb2.FrameMessage, datetime.datetime): + im_resize = in_rgb.getCvFrame() + is_success, im_buf_arr = cv2.imencode(".jpg", im_resize) + byte_im = im_buf_arr.tobytes() + now = datetime.datetime.now() + frame_msg = events.events_pb2.FrameMessage() + frame_msg.id.name = "robocar-oak-camera-oak" + frame_msg.id.id = str(int(now.timestamp() * 1000)) + frame_msg.id.created_at.FromDatetime(now) + frame_msg.frame = byte_im + logger.debug("publish frame event to %s", self._frame_topic) + self._mqtt_client.publish(topic=self._frame_topic, + payload=frame_msg.SerializeToString(), + qos=0, + retain=False) + return frame_msg, now + + def _publish_objects(self, boxes, frame_msg, now, scores): + objects_msg = events.events_pb2.ObjectsMessage() + objs = [] + for i in range(boxes.shape[0]): + logger.debug("new object detected: %s", str(boxes[i])) + objs.append(_bbox_to_object(boxes[i], scores[i].astype(float))) + objects_msg.objects.extend(objs) + objects_msg.frame_ref.name = frame_msg.id.name + objects_msg.frame_ref.id = frame_msg.id.id + objects_msg.frame_ref.created_at.FromDatetime(now) + logger.debug("publish object event to %s", self._frame_topic) + self._mqtt_client.publish(topic=self._objects_topic, + payload=objects_msg.SerializeToString(), + qos=0, + retain=False) def stop(self): """ @@ -171,3 +167,22 @@ class FramePublisher: :return: """ self._stop = True + + +def _get_as_nndata(queue: dai.DataOutputQueue) -> dai.NNData: + return queue.get() + + +def _get_as_imgframe(queue: dai.DataOutputQueue) -> dai.ImgFrame: + return queue.get() + + +def _bbox_to_object(bbox: np.array, score: float) -> events.events_pb2.Object: + obj = events.events_pb2.Object() + obj.type = events.events_pb2.TypeObject.ANY + obj.top = bbox[0].astype(float) + obj.right = bbox[3].astype(float) + obj.bottom = bbox[2].astype(float) + obj.left = bbox[1].astype(float) + obj.confidence = score + return obj