Watch mqtt topic to control simulator car
This commit is contained in:
		@@ -3,7 +3,11 @@ package main
 | 
			
		||||
import (
 | 
			
		||||
	"flag"
 | 
			
		||||
	"github.com/cyrilix/robocar-base/cli"
 | 
			
		||||
	events2 "github.com/cyrilix/robocar-protobuf/go/events"
 | 
			
		||||
	"github.com/cyrilix/robocar-simulator/pkg/events"
 | 
			
		||||
	"github.com/cyrilix/robocar-simulator/pkg/gateway"
 | 
			
		||||
	mqtt "github.com/eclipse/paho.mqtt.golang"
 | 
			
		||||
	"github.com/golang/protobuf/proto"
 | 
			
		||||
	log "github.com/sirupsen/logrus"
 | 
			
		||||
	"os"
 | 
			
		||||
)
 | 
			
		||||
@@ -11,7 +15,8 @@ import (
 | 
			
		||||
const DefaultClientId = "robocar-simulator"
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	var mqttBroker, username, password, clientId, topicFrame string
 | 
			
		||||
	var mqttBroker, username, password, clientId, topicFrame, topicSteering, topicThrottle string
 | 
			
		||||
	var topicCtrlSteering, topicCtrlThrottle string
 | 
			
		||||
	var address string
 | 
			
		||||
	var debug bool
 | 
			
		||||
 | 
			
		||||
@@ -20,7 +25,11 @@ func main() {
 | 
			
		||||
 | 
			
		||||
	cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain)
 | 
			
		||||
 | 
			
		||||
	flag.StringVar(&topicFrame, "events-topic-frame", os.Getenv("MQTT_TOPIC"), "Mqtt topic to events gateway frames, use MQTT_TOPIC_FRAME if args not set")
 | 
			
		||||
	flag.StringVar(&topicFrame, "events-topic-frame", os.Getenv("MQTT_TOPIC_FRAME"), "Mqtt topic to events gateway frames, use MQTT_TOPIC_FRAME if args not set")
 | 
			
		||||
	flag.StringVar(&topicFrame, "events-topic-steering", os.Getenv("MQTT_TOPIC_STEERING"), "Mqtt topic to events gateway steering, use MQTT_TOPIC_STEERING if args not set")
 | 
			
		||||
	flag.StringVar(&topicFrame, "events-topic-throttle", os.Getenv("MQTT_TOPIC_THROTTLE"), "Mqtt topic to events gateway throttle, use MQTT_TOPIC_THROTTLE if args not set")
 | 
			
		||||
	flag.StringVar(&topicFrame, "topic-steering-ctrl", os.Getenv("MQTT_TOPIC_STEERING_CTRL"), "Mqtt topic to send steering instructions, use MQTT_TOPIC_STEERING_CTRL if args not set")
 | 
			
		||||
	flag.StringVar(&topicFrame, "topic-throttle-ctrl", os.Getenv("MQTT_TOPIC_THROTTLE_CTRL"), "Mqtt topic to send throttle instructions, use MQTT_TOPIC_THROTTLE_CTRL if args not set")
 | 
			
		||||
	flag.StringVar(&address, "simulator-address", "127.0.0.1:9091", "Simulator address")
 | 
			
		||||
	flag.BoolVar(&debug, "debug", false, "Debug logs")
 | 
			
		||||
 | 
			
		||||
@@ -39,13 +48,56 @@ func main() {
 | 
			
		||||
	}
 | 
			
		||||
	defer client.Disconnect(10)
 | 
			
		||||
 | 
			
		||||
	c := gateway.New(address)
 | 
			
		||||
	defer c.Stop()
 | 
			
		||||
	gtw := gateway.New(address)
 | 
			
		||||
	defer gtw.Stop()
 | 
			
		||||
 | 
			
		||||
	cli.HandleExit(c)
 | 
			
		||||
	msgPub := events.NewMsgPublisher(
 | 
			
		||||
		gtw,
 | 
			
		||||
		events.NewMqttPublisher(client),
 | 
			
		||||
		topicFrame,
 | 
			
		||||
		topicSteering,
 | 
			
		||||
		topicThrottle,
 | 
			
		||||
	)
 | 
			
		||||
	defer msgPub.Stop()
 | 
			
		||||
 | 
			
		||||
	err = c.Start()
 | 
			
		||||
	cli.HandleExit(gtw)
 | 
			
		||||
 | 
			
		||||
	err = gtw.Start()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalf("unable to start service: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	msgPub.Start()
 | 
			
		||||
 | 
			
		||||
	if topicCtrlSteering != "" {
 | 
			
		||||
		log.Infof("configure mqtt route on steering command")
 | 
			
		||||
		client.AddRoute(topicCtrlSteering, func(client mqtt.Client, message mqtt.Message) {
 | 
			
		||||
			onSteeringCommand(gtw, message)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	if topicCtrlThrottle != "" {
 | 
			
		||||
		log.Infof("configure mqtt route on throttle command")
 | 
			
		||||
		client.AddRoute(topicCtrlThrottle, func(client mqtt.Client, message mqtt.Message) {
 | 
			
		||||
			onThrottleCommand(gtw, message)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func onSteeringCommand(c *gateway.Gateway, message mqtt.Message) {
 | 
			
		||||
	var steeringMsg *events2.SteeringMessage
 | 
			
		||||
	err := proto.Unmarshal(message.Payload(), steeringMsg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("unable to unmarshal steering msg: %v", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	c.WriteSteering(steeringMsg)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func onThrottleCommand(c *gateway.Gateway, message mqtt.Message) {
 | 
			
		||||
	var throttleMsg *events2.ThrottleMessage
 | 
			
		||||
	err := proto.Unmarshal(message.Payload(), throttleMsg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("unable to unmarshal throttle msg: %v", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	c.WriteThrottle(throttleMsg)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -131,6 +131,10 @@ type Publisher interface {
 | 
			
		||||
	Publish(topic string, payload []byte) error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewMqttPublisher(client mqtt.Client) *MqttPublisher {
 | 
			
		||||
	return &MqttPublisher{client: client}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MqttPublisher struct {
 | 
			
		||||
	client mqtt.Client
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user