package road import ( "github.com/cyrilix/robocar-base/service" "github.com/cyrilix/robocar-protobuf/go/events" mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" "gocv.io/x/gocv" "google.golang.org/protobuf/proto" "log" ) type RoadPart struct { client mqtt.Client frameChan chan frameToProcess readyForNext chan interface{} cancel chan interface{} detector *Detector cameraTopic, roadTopic string } func NewPart(client mqtt.Client, cameraTopic, roadTopic string, detector *Detector) *RoadPart { return &RoadPart{ client: client, frameChan: make(chan frameToProcess), cancel: make(chan interface{}), detector: detector, cameraTopic: cameraTopic, roadTopic: roadTopic, } } func (r *RoadPart) Start() error { registerCallBacks(r) var frame = frameToProcess{} defer func() { if err := frame.Close(); err != nil { zap.S().Errorf("unable to close msg: %v", err) } }() for { select { case f := <-r.frameChan: oldFrame := frame frame = f if err := oldFrame.Close(); err != nil { zap.S().Errorf("unable to close msg: %v", err) } go r.processFrame(&frame) case <-r.cancel: zap.S().Info("Stop service") return nil } } } var registerCallBacks = func(r *RoadPart) { err := service.RegisterCallback(r.client, r.cameraTopic, r.OnFrame) if err != nil { log.Panicf("unable to register callback to topic %v:%v", r.cameraTopic, err) } } func (r *RoadPart) Stop() { defer r.detector.Close() close(r.readyForNext) close(r.cancel) service.StopService("road", r.client, r.roadTopic) } func (r *RoadPart) OnFrame(_ mqtt.Client, msg mqtt.Message) { var frameMsg events.FrameMessage err := proto.Unmarshal(msg.Payload(), &frameMsg) if err != nil { zap.S().Errorf("unable to unmarshal %T message: %v", frameMsg, err) return } img, err := gocv.IMDecode(frameMsg.GetFrame(), gocv.IMReadUnchanged) if err != nil { zap.S().Errorf("unable to decode image: %v", err) return } frame := frameToProcess{ ref: frameMsg.GetId(), Mat: img, } r.frameChan <- frame } type frameToProcess struct { ref *events.FrameRef gocv.Mat } func (r *RoadPart) processFrame(frame *frameToProcess) { img := frame.Mat cntr, ellipse := r.detector.Detect(&img) msg := events.RoadMessage{ Contour: cntr, Ellipse: ellipse, FrameRef: frame.ref, } payload, err := proto.Marshal(&msg) if err != nil { zap.S().Errorf("unable to marshal %T to protobuf: %err", msg, err) return } publish(r.client, r.roadTopic, &payload) } var publish = func(client mqtt.Client, topic string, payload *[]byte) { client.Publish(topic, 0, false, *payload) }