feat: publish region of interest to new mqtt topic

This commit is contained in:
Cyrille Nofficial 2021-11-01 11:59:22 +01:00
parent 98382833de
commit 1d2e00b1e6
3 changed files with 76 additions and 30 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"flag" "flag"
"fmt"
"github.com/cyrilix/robocar-base/cli" "github.com/cyrilix/robocar-base/cli"
"github.com/cyrilix/robocar-camera/pkg/camera" "github.com/cyrilix/robocar-camera/pkg/camera"
"go.uber.org/zap" "go.uber.org/zap"
@ -13,8 +14,8 @@ import (
const DefaultClientId = "robocar-camera" const DefaultClientId = "robocar-camera"
func main() { func main() {
var mqttBroker, username, password, clientId, topicBase string var mqttBroker, username, password, clientId, topicBase, topicRoi string
var pubFrequency int var pubFrequency, horizon int
var device, videoWidth, videoHeight int var device, videoWidth, videoHeight int
var debug bool var debug bool
@ -22,14 +23,20 @@ func main() {
_, mqttRetain := os.LookupEnv("MQTT_RETAIN") _, mqttRetain := os.LookupEnv("MQTT_RETAIN")
cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain)
err := cli.SetIntDefaultValueFromEnv(&horizon, "HORIZON", 0)
if err != nil {
log.Printf("unable to parse horizon value arg: %v", err)
}
flag.StringVar(&topicBase, "mqtt-topic", os.Getenv("MQTT_TOPIC"), "Mqtt topic to publish camera frames, use MQTT_TOPIC if args not set") flag.StringVar(&topicBase, "mqtt-topic", os.Getenv("MQTT_TOPIC"), "Mqtt topic to publish camera frames, use MQTT_TOPIC if args not set")
flag.StringVar(&topicRoi, "mqtt-topic-roi", os.Getenv("MQTT_TOPIC_ROI"), "Mqtt topic to publish camera frames cropped to horizon value, mqtt-topic value with '-roi' suffix if args not set")
flag.IntVar(&pubFrequency, "mqtt-pub-frequency", 25., "Number of messages to publish per second") flag.IntVar(&pubFrequency, "mqtt-pub-frequency", 25., "Number of messages to publish per second")
flag.IntVar(&device, "video-device", 0, "Video device number") flag.IntVar(&device, "video-device", 0, "Video device number")
flag.IntVar(&videoWidth, "video-width", 160, "Video pixels width") flag.IntVar(&videoWidth, "video-width", 160, "Video pixels width")
flag.IntVar(&videoHeight, "video-height", 128, "Video pixels height") flag.IntVar(&videoHeight, "video-height", 128, "Video pixels height")
flag.IntVar(&horizon, "horizon", horizon, "Limit region of interest to horizon in pixels from top, use HORIZON if args not set")
flag.BoolVar(&debug, "debug", false, "Display raw value to debug") flag.BoolVar(&debug, "debug", false, "Display raw value to debug")
flag.Parse() flag.Parse()
@ -65,7 +72,10 @@ func main() {
videoProperties[gocv.VideoCaptureFrameWidth] = float64(videoWidth) videoProperties[gocv.VideoCaptureFrameWidth] = float64(videoWidth)
videoProperties[gocv.VideoCaptureFrameHeight] = float64(videoHeight) videoProperties[gocv.VideoCaptureFrameHeight] = float64(videoHeight)
c := camera.New(client, topicBase, pubFrequency, videoProperties) if topicRoi == "" {
topicRoi = fmt.Sprintf( "%s-roi", topicBase)
}
c := camera.New(client, topicBase, topicRoi, pubFrequency, videoProperties, horizon)
defer c.Stop() defer c.Stop()
cli.HandleExit(c) cli.HandleExit(c)

View File

@ -8,6 +8,7 @@ import (
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
"go.uber.org/zap" "go.uber.org/zap"
"gocv.io/x/gocv" "gocv.io/x/gocv"
"image"
"io" "io"
"sync" "sync"
"time" "time"
@ -22,13 +23,16 @@ type OpencvCameraPart struct {
client mqtt.Client client mqtt.Client
vc VideoSource vc VideoSource
topic string topic string
topicRoi string
publishFrequency int publishFrequency int
muImgBuffered sync.Mutex muImgBuffered sync.Mutex
imgBuffered *gocv.Mat imgBuffered *gocv.Mat
horizon int
cancel chan interface{} cancel chan interface{}
} }
func New(client mqtt.Client, topic string, publishFrequency int, videoProperties map[gocv.VideoCaptureProperties]float64) *OpencvCameraPart { func New(client mqtt.Client, topic string, topicRoi string, publishFrequency int,
videoProperties map[gocv.VideoCaptureProperties]float64, horizon int) *OpencvCameraPart {
zap.S().Info("run camera part") zap.S().Info("run camera part")
vc, err := gocv.OpenVideoCapture(0) vc, err := gocv.OpenVideoCapture(0)
@ -44,6 +48,7 @@ func New(client mqtt.Client, topic string, publishFrequency int, videoProperties
client: client, client: client,
vc: vc, vc: vc,
topic: topic, topic: topic,
topicRoi: topicRoi,
publishFrequency: publishFrequency, publishFrequency: publishFrequency,
imgBuffered: &img, imgBuffered: &img,
} }
@ -60,7 +65,7 @@ func (o *OpencvCameraPart) Start() error {
select { select {
case tickerTime := <-ticker.C: case tickerTime := <-ticker.C:
o.publishFrame(tickerTime) o.publishFrames(tickerTime)
case <-o.cancel: case <-o.cancel:
return nil return nil
} }
@ -79,15 +84,29 @@ func (o *OpencvCameraPart) Stop() {
} }
} }
func (o *OpencvCameraPart) publishFrame(tickerTime time.Time) { func (o *OpencvCameraPart) publishFrames(tickerTime time.Time) {
o.muImgBuffered.Lock() o.muImgBuffered.Lock()
defer o.muImgBuffered.Unlock() defer o.muImgBuffered.Unlock()
o.vc.Read(o.imgBuffered) o.vc.Read(o.imgBuffered)
img, err := gocv.IMEncode(gocv.JPEGFileExt, *o.imgBuffered) // Publish raw image
o.publishFrame(tickerTime, o.topic, o.imgBuffered)
if o.horizon == 0 {
return
}
// Region of interest
roi := o.imgBuffered.Region(image.Rect(0, o.horizon, o.imgBuffered.Cols(), o.imgBuffered.Rows()))
defer roi.Close()
o.publishFrame(tickerTime, o.topicRoi, &roi)
}
func (o *OpencvCameraPart) publishFrame(tickerTime time.Time, topic string, frame *gocv.Mat) {
img, err := gocv.IMEncode(gocv.JPEGFileExt, *frame)
if err != nil { if err != nil {
zap.S().Errorf("unable to convert image to jpeg: %v", err) zap.S().With("topic", topic).Errorf("unable to convert image to jpeg: %v", err)
return return
} }
@ -108,9 +127,10 @@ func (o *OpencvCameraPart) publishFrame(tickerTime time.Time) {
zap.S().Errorf("unable to marshal protobuf message: %v", err) zap.S().Errorf("unable to marshal protobuf message: %v", err)
} }
publish(o.client, o.topic, &payload) publish(o.client, topic, &payload)
} }
var publish = func(client mqtt.Client, topic string, payload *[]byte) { var publish = func(client mqtt.Client, topic string, payload *[]byte) {
token := client.Publish(topic, 0, false, *payload) token := client.Publish(topic, 0, false, *payload)
token.WaitTimeout(10 * time.Millisecond) token.WaitTimeout(10 * time.Millisecond)

View File

@ -37,7 +37,7 @@ func TestOpencvCameraPart(t *testing.T) {
publish = oldPublish publish = oldPublish
}() }()
waitEvent := sync.WaitGroup{} waitEvent := sync.WaitGroup{}
waitEvent.Add(1) waitEvent.Add(2)
publish = func(_ mqtt.Client, topic string, payload *[]byte) { publish = func(_ mqtt.Client, topic string, payload *[]byte) {
muPubEvents.Lock() muPubEvents.Lock()
defer muPubEvents.Unlock() defer muPubEvents.Unlock()
@ -46,40 +46,56 @@ func TestOpencvCameraPart(t *testing.T) {
} }
const topic = "topic/test/camera" const topic = "topic/test/camera"
const topicRoi = "topic/test/camera-roi"
imgBuffer := gocv.NewMat() imgBuffer := gocv.NewMat()
part := OpencvCameraPart{ part := OpencvCameraPart{
client: nil, client: nil,
vc: fakeVideoSource{}, vc: fakeVideoSource{},
topic: topic, topic: topic,
topicRoi: topicRoi,
publishFrequency: 2, // Send 2 img/s for tests publishFrequency: 2, // Send 2 img/s for tests
imgBuffered: &imgBuffer, imgBuffered: &imgBuffer,
horizon: 30,
} }
go part.Start() go part.Start()
waitEvent.Wait() waitEvent.Wait()
var frameMsg events.FrameMessage for _, tpc := range []string{topic, topicRoi} {
muPubEvents.Lock()
err := proto.Unmarshal(*(publishedEvents[topic]), &frameMsg)
if err != nil {
t.Errorf("unable to unmarshal pubblished frame")
}
muPubEvents.Unlock()
if frameMsg.GetId().GetName() != "camera" { var frameMsg events.FrameMessage
t.Errorf("bad name frame: %v, wants %v", frameMsg.GetId().GetName(), "camera") muPubEvents.Lock()
} err := proto.Unmarshal(*(publishedEvents[tpc]), &frameMsg)
if len(frameMsg.GetId().GetId()) != 13 { if err != nil {
t.Errorf("bad id length: %v, wants %v", len(frameMsg.GetId().GetId()), 13) t.Errorf("unable to unmarshal published frame to topic %v", tpc)
} }
muPubEvents.Unlock()
if frameMsg.GetId().GetCreatedAt() == nil { if frameMsg.GetId().GetName() != "camera" {
t.Errorf("missin CreatedAt field") t.Errorf("bad name frame: %v, wants %v", frameMsg.GetId().GetName(), "camera")
} }
if len(frameMsg.GetId().GetId()) != 13 {
t.Errorf("bad id length: %v, wants %v", len(frameMsg.GetId().GetId()), 13)
}
_, err = jpeg.Decode(bytes.NewReader(frameMsg.GetFrame())) if frameMsg.GetId().GetCreatedAt() == nil {
if err != nil { t.Errorf("missin CreatedAt field")
t.Errorf("image published can't be decoded: %v", err) }
_, err = jpeg.Decode(bytes.NewReader(frameMsg.GetFrame()))
if err != nil {
t.Errorf("image published can't be decoded: %v", err)
}
// Uncomment to debug image cropping
/*
dir, f := path.Split(fmt.Sprintf("/tmp/%s.jpg", tpc))
os.MkdirAll(dir, os.FileMode(0755))
err = ioutil.WriteFile(path.Join(dir, f), frameMsg.GetFrame(), os.FileMode(0644) )
if err != nil {
t.Errorf("unable to write image for topic %s: %v", tpc, err)
}
*/
} }
} }