From 1d2e00b1e62438263910a50d989753c345a56838 Mon Sep 17 00:00:00 2001 From: Cyrille Nofficial Date: Mon, 1 Nov 2021 11:59:22 +0100 Subject: [PATCH] feat: publish region of interest to new mqtt topic --- cmd/rc-camera/rc-camera.go | 18 +++++++++--- pkg/camera/camera.go | 32 ++++++++++++++++++---- pkg/camera/camera_test.go | 56 ++++++++++++++++++++++++-------------- 3 files changed, 76 insertions(+), 30 deletions(-) diff --git a/cmd/rc-camera/rc-camera.go b/cmd/rc-camera/rc-camera.go index c045a6a..9cdc42a 100644 --- a/cmd/rc-camera/rc-camera.go +++ b/cmd/rc-camera/rc-camera.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "github.com/cyrilix/robocar-base/cli" "github.com/cyrilix/robocar-camera/pkg/camera" "go.uber.org/zap" @@ -13,8 +14,8 @@ import ( const DefaultClientId = "robocar-camera" func main() { - var mqttBroker, username, password, clientId, topicBase string - var pubFrequency int + var mqttBroker, username, password, clientId, topicBase, topicRoi string + var pubFrequency, horizon int var device, videoWidth, videoHeight int var debug bool @@ -22,14 +23,20 @@ func main() { _, mqttRetain := os.LookupEnv("MQTT_RETAIN") cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) - + err := cli.SetIntDefaultValueFromEnv(&horizon, "HORIZON", 0) + if err != nil { + log.Printf("unable to parse horizon value arg: %v", err) + } flag.StringVar(&topicBase, "mqtt-topic", os.Getenv("MQTT_TOPIC"), "Mqtt topic to publish camera frames, use MQTT_TOPIC if args not set") + flag.StringVar(&topicRoi, "mqtt-topic-roi", os.Getenv("MQTT_TOPIC_ROI"), "Mqtt topic to publish camera frames cropped to horizon value, mqtt-topic value with '-roi' suffix if args not set") flag.IntVar(&pubFrequency, "mqtt-pub-frequency", 25., "Number of messages to publish per second") flag.IntVar(&device, "video-device", 0, "Video device number") flag.IntVar(&videoWidth, "video-width", 160, "Video pixels width") flag.IntVar(&videoHeight, "video-height", 128, "Video pixels height") + flag.IntVar(&horizon, "horizon", horizon, "Limit region of interest to horizon in pixels from top, use HORIZON if args not set") + flag.BoolVar(&debug, "debug", false, "Display raw value to debug") flag.Parse() @@ -65,7 +72,10 @@ func main() { videoProperties[gocv.VideoCaptureFrameWidth] = float64(videoWidth) videoProperties[gocv.VideoCaptureFrameHeight] = float64(videoHeight) - c := camera.New(client, topicBase, pubFrequency, videoProperties) + if topicRoi == "" { + topicRoi = fmt.Sprintf( "%s-roi", topicBase) + } + c := camera.New(client, topicBase, topicRoi, pubFrequency, videoProperties, horizon) defer c.Stop() cli.HandleExit(c) diff --git a/pkg/camera/camera.go b/pkg/camera/camera.go index 54b6188..ab0a49d 100644 --- a/pkg/camera/camera.go +++ b/pkg/camera/camera.go @@ -8,6 +8,7 @@ import ( "github.com/golang/protobuf/ptypes/timestamp" "go.uber.org/zap" "gocv.io/x/gocv" + "image" "io" "sync" "time" @@ -22,13 +23,16 @@ type OpencvCameraPart struct { client mqtt.Client vc VideoSource topic string + topicRoi string publishFrequency int muImgBuffered sync.Mutex imgBuffered *gocv.Mat + horizon int cancel chan interface{} } -func New(client mqtt.Client, topic string, publishFrequency int, videoProperties map[gocv.VideoCaptureProperties]float64) *OpencvCameraPart { +func New(client mqtt.Client, topic string, topicRoi string, publishFrequency int, + videoProperties map[gocv.VideoCaptureProperties]float64, horizon int) *OpencvCameraPart { zap.S().Info("run camera part") vc, err := gocv.OpenVideoCapture(0) @@ -44,6 +48,7 @@ func New(client mqtt.Client, topic string, publishFrequency int, videoProperties client: client, vc: vc, topic: topic, + topicRoi: topicRoi, publishFrequency: publishFrequency, imgBuffered: &img, } @@ -60,7 +65,7 @@ func (o *OpencvCameraPart) Start() error { select { case tickerTime := <-ticker.C: - o.publishFrame(tickerTime) + o.publishFrames(tickerTime) case <-o.cancel: return nil } @@ -79,15 +84,29 @@ func (o *OpencvCameraPart) Stop() { } } -func (o *OpencvCameraPart) publishFrame(tickerTime time.Time) { +func (o *OpencvCameraPart) publishFrames(tickerTime time.Time) { o.muImgBuffered.Lock() defer o.muImgBuffered.Unlock() o.vc.Read(o.imgBuffered) - img, err := gocv.IMEncode(gocv.JPEGFileExt, *o.imgBuffered) + // Publish raw image + o.publishFrame(tickerTime, o.topic, o.imgBuffered) + + if o.horizon == 0 { + return + } + + // Region of interest + roi := o.imgBuffered.Region(image.Rect(0, o.horizon, o.imgBuffered.Cols(), o.imgBuffered.Rows())) + defer roi.Close() + o.publishFrame(tickerTime, o.topicRoi, &roi) +} + +func (o *OpencvCameraPart) publishFrame(tickerTime time.Time, topic string, frame *gocv.Mat) { + img, err := gocv.IMEncode(gocv.JPEGFileExt, *frame) if err != nil { - zap.S().Errorf("unable to convert image to jpeg: %v", err) + zap.S().With("topic", topic).Errorf("unable to convert image to jpeg: %v", err) return } @@ -108,9 +127,10 @@ func (o *OpencvCameraPart) publishFrame(tickerTime time.Time) { zap.S().Errorf("unable to marshal protobuf message: %v", err) } - publish(o.client, o.topic, &payload) + publish(o.client, topic, &payload) } + var publish = func(client mqtt.Client, topic string, payload *[]byte) { token := client.Publish(topic, 0, false, *payload) token.WaitTimeout(10 * time.Millisecond) diff --git a/pkg/camera/camera_test.go b/pkg/camera/camera_test.go index acf6673..997f9e8 100644 --- a/pkg/camera/camera_test.go +++ b/pkg/camera/camera_test.go @@ -37,7 +37,7 @@ func TestOpencvCameraPart(t *testing.T) { publish = oldPublish }() waitEvent := sync.WaitGroup{} - waitEvent.Add(1) + waitEvent.Add(2) publish = func(_ mqtt.Client, topic string, payload *[]byte) { muPubEvents.Lock() defer muPubEvents.Unlock() @@ -46,40 +46,56 @@ func TestOpencvCameraPart(t *testing.T) { } const topic = "topic/test/camera" + const topicRoi = "topic/test/camera-roi" imgBuffer := gocv.NewMat() part := OpencvCameraPart{ client: nil, vc: fakeVideoSource{}, topic: topic, + topicRoi: topicRoi, publishFrequency: 2, // Send 2 img/s for tests imgBuffered: &imgBuffer, + horizon: 30, } go part.Start() waitEvent.Wait() - var frameMsg events.FrameMessage - muPubEvents.Lock() - err := proto.Unmarshal(*(publishedEvents[topic]), &frameMsg) - if err != nil { - t.Errorf("unable to unmarshal pubblished frame") - } - muPubEvents.Unlock() + for _, tpc := range []string{topic, topicRoi} { - if frameMsg.GetId().GetName() != "camera" { - t.Errorf("bad name frame: %v, wants %v", frameMsg.GetId().GetName(), "camera") - } - if len(frameMsg.GetId().GetId()) != 13 { - t.Errorf("bad id length: %v, wants %v", len(frameMsg.GetId().GetId()), 13) - } + var frameMsg events.FrameMessage + muPubEvents.Lock() + err := proto.Unmarshal(*(publishedEvents[tpc]), &frameMsg) + if err != nil { + t.Errorf("unable to unmarshal published frame to topic %v", tpc) + } + muPubEvents.Unlock() - if frameMsg.GetId().GetCreatedAt() == nil { - t.Errorf("missin CreatedAt field") - } + if frameMsg.GetId().GetName() != "camera" { + t.Errorf("bad name frame: %v, wants %v", frameMsg.GetId().GetName(), "camera") + } + if len(frameMsg.GetId().GetId()) != 13 { + t.Errorf("bad id length: %v, wants %v", len(frameMsg.GetId().GetId()), 13) + } - _, err = jpeg.Decode(bytes.NewReader(frameMsg.GetFrame())) - if err != nil { - t.Errorf("image published can't be decoded: %v", err) + if frameMsg.GetId().GetCreatedAt() == nil { + t.Errorf("missin CreatedAt field") + } + + _, err = jpeg.Decode(bytes.NewReader(frameMsg.GetFrame())) + if err != nil { + t.Errorf("image published can't be decoded: %v", err) + } + + // Uncomment to debug image cropping + /* + dir, f := path.Split(fmt.Sprintf("/tmp/%s.jpg", tpc)) + os.MkdirAll(dir, os.FileMode(0755)) + err = ioutil.WriteFile(path.Join(dir, f), frameMsg.GetFrame(), os.FileMode(0644) ) + if err != nil { + t.Errorf("unable to write image for topic %s: %v", tpc, err) + } + */ } }