diff --git a/cmd/rc-simulator/rc-simultator.go b/cmd/rc-simulator/rc-simultator.go index 095afb4..2e5a956 100644 --- a/cmd/rc-simulator/rc-simultator.go +++ b/cmd/rc-simulator/rc-simultator.go @@ -3,7 +3,11 @@ package main import ( "flag" "github.com/cyrilix/robocar-base/cli" + events2 "github.com/cyrilix/robocar-protobuf/go/events" + "github.com/cyrilix/robocar-simulator/pkg/events" "github.com/cyrilix/robocar-simulator/pkg/gateway" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" "os" ) @@ -11,7 +15,8 @@ import ( const DefaultClientId = "robocar-simulator" func main() { - var mqttBroker, username, password, clientId, topicFrame string + var mqttBroker, username, password, clientId, topicFrame, topicSteering, topicThrottle string + var topicCtrlSteering, topicCtrlThrottle string var address string var debug bool @@ -20,7 +25,11 @@ func main() { cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) - flag.StringVar(&topicFrame, "events-topic-frame", os.Getenv("MQTT_TOPIC"), "Mqtt topic to events gateway frames, use MQTT_TOPIC_FRAME if args not set") + flag.StringVar(&topicFrame, "events-topic-frame", os.Getenv("MQTT_TOPIC_FRAME"), "Mqtt topic to events gateway frames, use MQTT_TOPIC_FRAME if args not set") + flag.StringVar(&topicFrame, "events-topic-steering", os.Getenv("MQTT_TOPIC_STEERING"), "Mqtt topic to events gateway steering, use MQTT_TOPIC_STEERING if args not set") + flag.StringVar(&topicFrame, "events-topic-throttle", os.Getenv("MQTT_TOPIC_THROTTLE"), "Mqtt topic to events gateway throttle, use MQTT_TOPIC_THROTTLE if args not set") + flag.StringVar(&topicFrame, "topic-steering-ctrl", os.Getenv("MQTT_TOPIC_STEERING_CTRL"), "Mqtt topic to send steering instructions, use MQTT_TOPIC_STEERING_CTRL if args not set") + flag.StringVar(&topicFrame, "topic-throttle-ctrl", os.Getenv("MQTT_TOPIC_THROTTLE_CTRL"), "Mqtt topic to send throttle instructions, use MQTT_TOPIC_THROTTLE_CTRL if args not set") flag.StringVar(&address, "simulator-address", "127.0.0.1:9091", "Simulator address") flag.BoolVar(&debug, "debug", false, "Debug logs") @@ -39,13 +48,56 @@ func main() { } defer client.Disconnect(10) - c := gateway.New(address) - defer c.Stop() + gtw := gateway.New(address) + defer gtw.Stop() - cli.HandleExit(c) + msgPub := events.NewMsgPublisher( + gtw, + events.NewMqttPublisher(client), + topicFrame, + topicSteering, + topicThrottle, + ) + defer msgPub.Stop() - err = c.Start() + cli.HandleExit(gtw) + + err = gtw.Start() if err != nil { log.Fatalf("unable to start service: %v", err) } + msgPub.Start() + + if topicCtrlSteering != "" { + log.Infof("configure mqtt route on steering command") + client.AddRoute(topicCtrlSteering, func(client mqtt.Client, message mqtt.Message) { + onSteeringCommand(gtw, message) + }) + } + if topicCtrlThrottle != "" { + log.Infof("configure mqtt route on throttle command") + client.AddRoute(topicCtrlThrottle, func(client mqtt.Client, message mqtt.Message) { + onThrottleCommand(gtw, message) + }) + } +} + +func onSteeringCommand(c *gateway.Gateway, message mqtt.Message) { + var steeringMsg *events2.SteeringMessage + err := proto.Unmarshal(message.Payload(), steeringMsg) + if err != nil { + log.Errorf("unable to unmarshal steering msg: %v", err) + return + } + c.WriteSteering(steeringMsg) +} + +func onThrottleCommand(c *gateway.Gateway, message mqtt.Message) { + var throttleMsg *events2.ThrottleMessage + err := proto.Unmarshal(message.Payload(), throttleMsg) + if err != nil { + log.Errorf("unable to unmarshal throttle msg: %v", err) + return + } + c.WriteThrottle(throttleMsg) } diff --git a/pkg/events/events.go b/pkg/events/events.go index 30f76b0..8ed73a2 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -131,6 +131,10 @@ type Publisher interface { Publish(topic string, payload []byte) error } +func NewMqttPublisher(client mqtt.Client) *MqttPublisher { + return &MqttPublisher{client: client} +} + type MqttPublisher struct { client mqtt.Client }