refactor: rewrite depthai node management
This commit is contained in:
		@@ -1,6 +1,7 @@
 | 
			
		||||
"""
 | 
			
		||||
Camera event loop
 | 
			
		||||
"""
 | 
			
		||||
import abc
 | 
			
		||||
import datetime
 | 
			
		||||
import logging
 | 
			
		||||
import typing
 | 
			
		||||
@@ -117,18 +118,105 @@ class FrameProcessor:
 | 
			
		||||
        return frame_msg.id
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Source(abc.ABC):
 | 
			
		||||
    @abc.abstractmethod
 | 
			
		||||
    def get_stream_name(self) -> str:
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    @abc.abstractmethod
 | 
			
		||||
    def link_preview(self, input_node: dai.Node.Input):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ObjectDetectionNN:
 | 
			
		||||
    """
 | 
			
		||||
    Node to detect objects into image
 | 
			
		||||
 | 
			
		||||
    Read image as input and apply resize transformation before to run NN on it
 | 
			
		||||
    Result is available with 'get_stream_name()' stream
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, pipeline: dai.Pipeline):
 | 
			
		||||
        # Define a neural network that will make predictions based on the source frames
 | 
			
		||||
        detection_nn = pipeline.createNeuralNetwork()
 | 
			
		||||
        detection_nn.setBlobPath(_NN_PATH)
 | 
			
		||||
        detection_nn.setNumPoolFrames(4)
 | 
			
		||||
        detection_nn.input.setBlocking(False)
 | 
			
		||||
        detection_nn.setNumInferenceThreads(2)
 | 
			
		||||
        self._detection_nn = detection_nn
 | 
			
		||||
        self._xout = self._configure_xout_nn(pipeline)
 | 
			
		||||
        self._detection_nn.out.link(self._xout.input)
 | 
			
		||||
        self._manip_image = self._configure_manip(pipeline)
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _configure_manip(pipeline: dai.Pipeline) -> dai.node.ImageManip:
 | 
			
		||||
        # Resize image
 | 
			
		||||
        manip = pipeline.createImageManip()
 | 
			
		||||
        manip.initialConfig.setResize(_NN_WIDTH, _NN_HEIGHT)
 | 
			
		||||
        manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
 | 
			
		||||
        manip.initialConfig.setKeepAspectRatio(False)
 | 
			
		||||
        return manip
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _configure_xout_nn(pipeline: dai.Pipeline) -> dai.node.XLinkOut:
 | 
			
		||||
        xout_nn = pipeline.createXLinkOut()
 | 
			
		||||
        xout_nn.setStreamName("nn")
 | 
			
		||||
        xout_nn.input.setBlocking(False)
 | 
			
		||||
        return xout_nn
 | 
			
		||||
 | 
			
		||||
    def get_stream_name(self) -> str:
 | 
			
		||||
        return self._xout.getStreamName()
 | 
			
		||||
 | 
			
		||||
    def get_input(self) -> dai.Node.Input:
 | 
			
		||||
        return self._manip_image.inputImage
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CameraSource(Source):
 | 
			
		||||
    """Image source based on camera preview"""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, pipeline: dai.Pipeline, img_width: int, img_height: int):
 | 
			
		||||
        cam_rgb = pipeline.createColorCamera()
 | 
			
		||||
        xout_rgb = pipeline.createXLinkOut()
 | 
			
		||||
        xout_rgb.setStreamName("rgb")
 | 
			
		||||
 | 
			
		||||
        self._cam_rgb = cam_rgb
 | 
			
		||||
        self._xout_rgb = xout_rgb
 | 
			
		||||
 | 
			
		||||
        # Properties
 | 
			
		||||
        cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
 | 
			
		||||
        cam_rgb.setPreviewSize(width=img_width, height=img_height)
 | 
			
		||||
        cam_rgb.setInterleaved(False)
 | 
			
		||||
        cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
 | 
			
		||||
        cam_rgb.setFps(30)
 | 
			
		||||
 | 
			
		||||
        # link camera preview to output
 | 
			
		||||
        cam_rgb.preview.link(xout_rgb.input)
 | 
			
		||||
 | 
			
		||||
    def link_preview(self, input_node: dai.Node.Input):
 | 
			
		||||
        self._cam_rgb.preview.link(input_node)
 | 
			
		||||
 | 
			
		||||
    def get_stream_name(self) -> str:
 | 
			
		||||
        return self._xout_rgb.getStreamName()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class PipelineController:
 | 
			
		||||
    """
 | 
			
		||||
    Pipeline controller that drive camera device
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, img_width: int, img_height: int, frame_processor: FrameProcessor,
 | 
			
		||||
                 object_processor: ObjectProcessor):
 | 
			
		||||
                 object_processor: ObjectProcessor, camera: Source, object_node: ObjectDetectionNN):
 | 
			
		||||
        self._img_width = img_width
 | 
			
		||||
        self._img_height = img_height
 | 
			
		||||
        self._pipeline = self._configure_pipeline()
 | 
			
		||||
        self._frame_processor = frame_processor
 | 
			
		||||
        self._object_processor = object_processor
 | 
			
		||||
        self._camera = camera
 | 
			
		||||
        self._object_node = object_node
 | 
			
		||||
        self._stop = False
 | 
			
		||||
 | 
			
		||||
    def _configure_pipeline(self) -> dai.Pipeline:
 | 
			
		||||
@@ -137,54 +225,12 @@ class PipelineController:
 | 
			
		||||
 | 
			
		||||
        pipeline.setOpenVINOVersion(version=dai.OpenVINO.VERSION_2021_4)
 | 
			
		||||
 | 
			
		||||
        detection_nn = self._configure_detection_nn(pipeline)
 | 
			
		||||
        xout_nn = self._configure_xout_nn(pipeline)
 | 
			
		||||
 | 
			
		||||
        # Resize image
 | 
			
		||||
        manip = pipeline.create(dai.node.ImageManip)
 | 
			
		||||
        manip.initialConfig.setResize(_NN_WIDTH, _NN_HEIGHT)
 | 
			
		||||
        manip.initialConfig.setFrameType(dai.ImgFrame.Type.RGB888p)
 | 
			
		||||
        manip.initialConfig.setKeepAspectRatio(False)
 | 
			
		||||
 | 
			
		||||
        cam_rgb = pipeline.create(dai.node.ColorCamera)
 | 
			
		||||
        xout_rgb = pipeline.create(dai.node.XLinkOut)
 | 
			
		||||
        xout_rgb.setStreamName("rgb")
 | 
			
		||||
 | 
			
		||||
        # Properties
 | 
			
		||||
        cam_rgb.setBoardSocket(dai.CameraBoardSocket.RGB)
 | 
			
		||||
        cam_rgb.setPreviewSize(width=self._img_width, height=self._img_height)
 | 
			
		||||
        cam_rgb.setInterleaved(False)
 | 
			
		||||
        cam_rgb.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB)
 | 
			
		||||
        cam_rgb.setFps(30)
 | 
			
		||||
 | 
			
		||||
        # Link preview to manip and manip to nn
 | 
			
		||||
        cam_rgb.preview.link(manip.inputImage)
 | 
			
		||||
        manip.out.link(detection_nn.input)
 | 
			
		||||
 | 
			
		||||
        # Linking to output
 | 
			
		||||
        cam_rgb.preview.link(xout_rgb.input)
 | 
			
		||||
        detection_nn.out.link(xout_nn.input)
 | 
			
		||||
        self._camera.link_preview(self._object_node.get_input())
 | 
			
		||||
 | 
			
		||||
        logger.info("pipeline configured")
 | 
			
		||||
        return pipeline
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _configure_xout_nn(pipeline: dai.Pipeline) -> dai.node.XLinkOut:
 | 
			
		||||
        xout_nn = pipeline.create(dai.node.XLinkOut)
 | 
			
		||||
        xout_nn.setStreamName("nn")
 | 
			
		||||
        xout_nn.input.setBlocking(False)
 | 
			
		||||
        return xout_nn
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _configure_detection_nn(pipeline: dai.Pipeline) -> dai.node.NeuralNetwork:
 | 
			
		||||
        # Define a neural network that will make predictions based on the source frames
 | 
			
		||||
        detection_nn = pipeline.create(dai.node.NeuralNetwork)
 | 
			
		||||
        detection_nn.setBlobPath(_NN_PATH)
 | 
			
		||||
        detection_nn.setNumPoolFrames(4)
 | 
			
		||||
        detection_nn.input.setBlocking(False)
 | 
			
		||||
        detection_nn.setNumInferenceThreads(2)
 | 
			
		||||
        return detection_nn
 | 
			
		||||
 | 
			
		||||
    def run(self) -> None:
 | 
			
		||||
        """
 | 
			
		||||
        Start event loop
 | 
			
		||||
@@ -200,8 +246,8 @@ class PipelineController:
 | 
			
		||||
            device.startPipeline()
 | 
			
		||||
            # Queues
 | 
			
		||||
            queue_size = 4
 | 
			
		||||
            q_rgb = device.getOutputQueue(name="rgb", maxSize=queue_size, blocking=False)
 | 
			
		||||
            q_nn = device.getOutputQueue(name="nn", maxSize=queue_size, blocking=False)
 | 
			
		||||
            q_rgb = device.getOutputQueue(name=self._camera.get_stream_name(), maxSize=queue_size, blocking=False)
 | 
			
		||||
            q_nn = device.getOutputQueue(name=self._object_node.get_stream_name(), maxSize=queue_size, blocking=False)
 | 
			
		||||
 | 
			
		||||
            self._stop = False
 | 
			
		||||
            while True:
 | 
			
		||||
@@ -223,6 +269,7 @@ class PipelineController:
 | 
			
		||||
            frame_ref = self._frame_processor.process(in_rgb)
 | 
			
		||||
        except FrameProcessError as ex:
 | 
			
		||||
            logger.error("unable to process frame: %s", str(ex))
 | 
			
		||||
            return
 | 
			
		||||
        # Read NN result
 | 
			
		||||
        in_nn: dai.NNData = q_nn.get()
 | 
			
		||||
        self._object_processor.process(in_nn, frame_ref)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user