diff --git a/cmd/rc-tools/rc-tools.go b/cmd/rc-tools/rc-tools.go index bf7ea80..0542a98 100644 --- a/cmd/rc-tools/rc-tools.go +++ b/cmd/rc-tools/rc-tools.go @@ -28,8 +28,8 @@ func main() { var mqttBroker, username, password, clientId string var framePath string var fps int - var frameTopic, objectsTopic, roadTopic, recordTopic string - var withObjects, withRoad bool + var frameTopic, objectsTopic, roadTopic, recordTopic, throttleFeedbackTopic string + var withObjects, withRoad, withThrottleFeedback bool var recordsPath string var trainArchiveName string var trainSliceSize int @@ -81,6 +81,9 @@ func main() { displayCameraFlags.StringVar(&roadTopic, "mqtt-topic-road", os.Getenv("MQTT_TOPIC_ROAD"), "Mqtt topic that contains road description, use MQTT_TOPIC_ROAD if args not set") displayCameraFlags.BoolVar(&withRoad, "with-road", false, "Display detected road") + displayCameraFlags.StringVar(&throttleFeedbackTopic, "mqtt-topic-throttle-feedback", os.Getenv("MQTT_TOPIC_THROTTLE_FEEDBACK"), "Mqtt topic where to publish throttle feedback, use MQTT_TOPIC_THROTTLE_FEEDBACK if args not set") + displayCameraFlags.BoolVar(&withThrottleFeedback, "with-throttle-feedback", false, "Display throttle feedback") + recordFlags := flag.NewFlagSet("record", flag.ExitOnError) cli.InitMqttFlagSet(recordFlags, DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) recordFlags.StringVar(&recordTopic, "mqtt-topic-records", os.Getenv("MQTT_TOPIC_RECORDS"), "Mqtt topic that contains record data for training, use MQTT_TOPIC_RECORDS if args not set") @@ -195,7 +198,7 @@ func main() { zap.S().Fatalf("unable to connect to mqtt bus: %v", err) } defer client.Disconnect(50) - runDisplay(client, framePath, frameTopic, fps, objectsTopic, roadTopic, withObjects, withRoad) + runDisplay(client, framePath, frameTopic, fps, objectsTopic, roadTopic, throttleFeedbackTopic, withObjects, withRoad, withThrottleFeedback) default: displayFlags.PrintDefaults() os.Exit(0) @@ -325,7 +328,8 @@ func runDisplayRecord(client mqtt.Client, recordTopic string) { zap.S().Fatalf("unable to start service: %v", err) } } -func runDisplay(client mqtt.Client, framePath string, frameTopic string, fps int, objectsTopic string, roadTopic string, withObjects bool, withRoad bool) { +func runDisplay(client mqtt.Client, framePath string, frameTopic string, fps int, objectsTopic, roadTopic, throttleFeedbackTopic string, + withObjects, withRoad, withThrottleFeedback bool) { if framePath != "" { camera, err := video.NewCameraFake(client, frameTopic, framePath, fps) @@ -339,8 +343,8 @@ func runDisplay(client mqtt.Client, framePath string, frameTopic string, fps int } p := part.NewPart(client, frameTopic, - objectsTopic, roadTopic, - withObjects, withRoad) + objectsTopic, roadTopic, throttleFeedbackTopic, + withObjects, withRoad, withThrottleFeedback) defer p.Stop() cli.HandleExit(p) diff --git a/part/part.go b/part/part.go index 131da24..f6a2e5f 100644 --- a/part/part.go +++ b/part/part.go @@ -14,35 +14,41 @@ import ( "time" ) -func NewPart(client mqtt.Client, frameTopic, objectsTopic, roadTopic string, withObjects, withRoad bool) *FramePart { +func NewPart(client mqtt.Client, frameTopic, objectsTopic, roadTopic, throttleFeedbackTopic string, + withObjects, withRoad, withThrottleFeedback bool) *FramePart { return &FramePart{ - client: client, - frameTopic: frameTopic, - objectsTopic: objectsTopic, - roadTopic: roadTopic, - window: gocv.NewWindow("frameTopic"), - withObjects: withObjects, - withRoad: withRoad, - imgChan: make(chan gocv.Mat), - objectsChan: make(chan events.ObjectsMessage), - roadChan: make(chan events.RoadMessage), - cancel: make(chan interface{}), + client: client, + frameTopic: frameTopic, + objectsTopic: objectsTopic, + roadTopic: roadTopic, + throttleFeedbackTopic: throttleFeedbackTopic, + window: gocv.NewWindow("frameTopic"), + withObjects: withObjects, + withRoad: withRoad, + withThrottleFeedback: withThrottleFeedback, + imgChan: make(chan gocv.Mat), + objectsChan: make(chan events.ObjectsMessage), + roadChan: make(chan events.RoadMessage), + throttleFeedbackChan: make(chan events.ThrottleMessage), + cancel: make(chan interface{}), } } type FramePart struct { - client mqtt.Client - frameTopic, objectsTopic, roadTopic string + client mqtt.Client + frameTopic, objectsTopic, roadTopic, throttleFeedbackTopic string - window *gocv.Window - withObjects bool - withRoad bool + window *gocv.Window + withObjects bool + withRoad bool + withThrottleFeedback bool - imgChan chan gocv.Mat - objectsChan chan events.ObjectsMessage - roadChan chan events.RoadMessage - cancel chan interface{} + imgChan chan gocv.Mat + objectsChan chan events.ObjectsMessage + roadChan chan events.RoadMessage + throttleFeedbackChan chan events.ThrottleMessage + cancel chan interface{} } func (p *FramePart) Start() error { @@ -53,6 +59,8 @@ func (p *FramePart) Start() error { var img = gocv.NewMat() var objectsMsg events.ObjectsMessage var roadMsg events.RoadMessage + var throttleFeedbackMsg events.ThrottleMessage + ticker := time.NewTicker(1 * time.Second) for { select { @@ -65,11 +73,13 @@ func (p *FramePart) Start() error { objectsMsg = objects case road := <-p.roadChan: roadMsg = road + case throttleFeedback := <-p.throttleFeedbackChan: + throttleFeedbackMsg = throttleFeedback case <-p.cancel: img.Close() return nil } - p.drawFrame(&img, &objectsMsg, &roadMsg) + p.drawFrame(&img, &objectsMsg, &roadMsg, &throttleFeedbackMsg) ticker.Reset(1 * time.Second) } } @@ -125,6 +135,18 @@ func (p *FramePart) onRoad(_ mqtt.Client, message mqtt.Message) { p.roadChan <- msg } +func (p *FramePart) onThrottleFeedback(_ mqtt.Client, message mqtt.Message) { + var msg events.ThrottleMessage + + err := proto.Unmarshal(message.Payload(), &msg) + if err != nil { + zap.S().Errorf("unable to unmarshal msg %T: %v", msg, err) + return + } + + p.throttleFeedbackChan <- msg +} + func (p *FramePart) registerCallbacks() error { err := RegisterCallback(p.client, p.frameTopic, p.onFrame) if err != nil { @@ -144,10 +166,16 @@ func (p *FramePart) registerCallbacks() error { return err } } + if p.withThrottleFeedback { + err := service.RegisterCallback(p.client, p.throttleFeedbackTopic, p.onThrottleFeedback) + if err != nil { + return err + } + } return nil } -func (p *FramePart) drawFrame(img *gocv.Mat, objects *events.ObjectsMessage, road *events.RoadMessage) { +func (p *FramePart) drawFrame(img *gocv.Mat, objects *events.ObjectsMessage, road *events.RoadMessage, tf *events.ThrottleMessage) { if p.withObjects { p.drawObjects(img, objects) @@ -155,6 +183,9 @@ func (p *FramePart) drawFrame(img *gocv.Mat, objects *events.ObjectsMessage, roa if p.withRoad { p.drawRoad(img, road) } + if p.withThrottleFeedback { + p.drawThrottleFeedbackText(img, tf) + } p.window.IMShow(*img) p.window.WaitKey(1) @@ -216,6 +247,18 @@ func (p *FramePart) drawRoadText(img *gocv.Mat, road *events.RoadMessage) { ) } +func (p *FramePart) drawThrottleFeedbackText(img *gocv.Mat, tf *events.ThrottleMessage) { + gocv.PutText( + img, + fmt.Sprintf("Throttle feedback: %.3f", tf.Throttle), + image.Point{X: 5, Y: 20}, + gocv.FontHersheyPlain, + 0.6, + color.RGBA{R: 0, G: 255, B: 255, A: 255}, + 1, + ) +} + func StopService(name string, client mqtt.Client, topics ...string) { zap.S().Infof("Stop %s service", name) token := client.Unsubscribe(topics...)