[cli] Split topic prefix arg

This commit is contained in:
Cyrille Nofficial 2020-01-27 19:21:26 +01:00
parent f755c8fba0
commit f2ab59f4d0
3 changed files with 39 additions and 23 deletions

View File

@ -29,7 +29,7 @@ var (
type Part struct { type Part struct {
client mqtt.Client client mqtt.Client
topicBase string throttleTopic, steeringTopic, driveModeTopic, switchRecordTopic string
pubFrequency float64 pubFrequency float64
serial io.Reader serial io.Reader
mutex sync.Mutex mutex sync.Mutex
@ -41,7 +41,7 @@ type Part struct {
cancel chan interface{} cancel chan interface{}
} }
func NewPart(client mqtt.Client, name string, baud int, topicBase string, pubFrequency float64, debug bool) *Part { func NewPart(client mqtt.Client, name string, baud int, throttleTopic, steeringTopic, driveModeTopic, switchRecordTopic string, pubFrequency float64, debug bool) *Part {
c := &serial.Config{Name: name, Baud: baud} c := &serial.Config{Name: name, Baud: baud}
s, err := serial.OpenPort(c) s, err := serial.OpenPort(c)
if err != nil { if err != nil {
@ -50,7 +50,10 @@ func NewPart(client mqtt.Client, name string, baud int, topicBase string, pubFre
return &Part{ return &Part{
client: client, client: client,
serial: s, serial: s,
topicBase: topicBase, throttleTopic: throttleTopic,
steeringTopic: steeringTopic,
driveModeTopic:driveModeTopic,
switchRecordTopic:switchRecordTopic,
pubFrequency: pubFrequency, pubFrequency: pubFrequency,
driveMode: events.DriveMode_INVALID, driveMode: events.DriveMode_INVALID,
debug: debug, debug: debug,
@ -192,13 +195,12 @@ func (a *Part) processChannel6(v string) {
} }
func (a *Part) publishLoop() { func (a *Part) publishLoop() {
prefix := strings.TrimSuffix(a.topicBase, "/")
ticker := time.NewTicker(time.Second / time.Duration(int(a.pubFrequency))) ticker := time.NewTicker(time.Second / time.Duration(int(a.pubFrequency)))
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
a.publishValues(prefix) a.publishValues()
case <-a.cancel: case <-a.cancel:
ticker.Stop() ticker.Stop()
return return
@ -206,17 +208,17 @@ func (a *Part) publishLoop() {
} }
} }
func (a *Part) publishValues(prefix string) { func (a *Part) publishValues() {
a.mutex.Lock() a.mutex.Lock()
defer a.mutex.Unlock() defer a.mutex.Unlock()
a.publishThrottle(prefix) a.publishThrottle()
a.publishSteering(prefix) a.publishSteering()
a.publishDriveMode(prefix) a.publishDriveMode()
a.publishSwitchRecord(prefix) a.publishSwitchRecord()
} }
func (a *Part) publishThrottle(prefix string) { func (a *Part) publishThrottle() {
throttle := events.ThrottleMessage{ throttle := events.ThrottleMessage{
Throttle: a.throttle, Throttle: a.throttle,
Confidence: 1.0, Confidence: 1.0,
@ -226,10 +228,11 @@ func (a *Part) publishThrottle(prefix string) {
log.Errorf("unable to marshal protobuf throttle message: %v", err) log.Errorf("unable to marshal protobuf throttle message: %v", err)
return return
} }
publish(a.client, prefix+"/throttle", &throttleMessage) log.Infof("throttle channel: %v", a.throttle)
publish(a.client, a.throttleTopic, &throttleMessage)
} }
func (a *Part) publishSteering(prefix string) { func (a *Part) publishSteering() {
steering := events.SteeringMessage{ steering := events.SteeringMessage{
Steering: a.steering, Steering: a.steering,
Confidence: 1.0, Confidence: 1.0,
@ -239,10 +242,10 @@ func (a *Part) publishSteering(prefix string) {
log.Errorf("unable to marshal protobuf steering message: %v", err) log.Errorf("unable to marshal protobuf steering message: %v", err)
return return
} }
publish(a.client, prefix+"/steering", &steeringMessage) publish(a.client, a.steeringTopic, &steeringMessage)
} }
func (a *Part) publishDriveMode(prefix string) { func (a *Part) publishDriveMode() {
dm := events.DriveModeMessage{ dm := events.DriveModeMessage{
DriveMode: a.driveMode, DriveMode: a.driveMode,
} }
@ -251,10 +254,10 @@ func (a *Part) publishDriveMode(prefix string) {
log.Errorf("unable to marshal protobuf driveMode message: %v", err) log.Errorf("unable to marshal protobuf driveMode message: %v", err)
return return
} }
publish(a.client, prefix+"/drive_mode", &driveModeMessage) publish(a.client, a.driveModeTopic, &driveModeMessage)
} }
func (a *Part) publishSwitchRecord(prefix string) { func (a *Part) publishSwitchRecord() {
sr := events.SwitchRecordMessage{ sr := events.SwitchRecordMessage{
Enabled: a.ctrlRecord, Enabled: a.ctrlRecord,
} }
@ -263,7 +266,7 @@ func (a *Part) publishSwitchRecord(prefix string) {
log.Errorf("unable to marshal protobuf SwitchRecord message: %v", err) log.Errorf("unable to marshal protobuf SwitchRecord message: %v", err)
return return
} }
publish(a.client, prefix+"/switch_record", &switchRecordMessage) publish(a.client, a.switchRecordTopic, &switchRecordMessage)
} }
var publish = func(client mqtt.Client, topic string, payload *[]byte) { var publish = func(client mqtt.Client, topic string, payload *[]byte) {

View File

@ -192,7 +192,16 @@ func TestPublish(t *testing.T) {
defer conn.Close() defer conn.Close()
pubFrequency := 100. pubFrequency := 100.
a := Part{client: nil, serial: conn, pubFrequency: pubFrequency, topicBase: "car/part/arduino/", cancel:make(chan interface{})} a := Part{
client: nil,
serial: conn,
pubFrequency: pubFrequency,
throttleTopic: "car/part/arduino/throttle",
steeringTopic: "car/part/arduino/steering",
driveModeTopic: "car/part/arduino/drive_mode",
switchRecordTopic: "car/part/arduino/switch_record",
cancel: make(chan interface{}),
}
go a.Start() go a.Start()
defer a.Stop() defer a.Stop()

View File

@ -11,7 +11,8 @@ import (
const DefaultClientId = "robocar-arduino" const DefaultClientId = "robocar-arduino"
func main() { func main() {
var mqttBroker, username, password, clientId, topicBase string var mqttBroker, username, password, clientId string
var throttleTopic, steeringTopic, driveModeTopic, switchRecordTopic string
var device string var device string
var baud int var baud int
var pubFrequency float64 var pubFrequency float64
@ -23,7 +24,10 @@ func main() {
cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain)
flag.Float64Var(&pubFrequency, "mqtt-pub-frequency", 25., "Number of messages to publish per second") flag.Float64Var(&pubFrequency, "mqtt-pub-frequency", 25., "Number of messages to publish per second")
flag.StringVar(&topicBase, "mqtt-topic-base", os.Getenv("MQTT_TOPIC_BASE"), "Mqtt topic prefix, use MQTT_TOPIC_BASE if args not set") flag.StringVar(&throttleTopic, "mqtt-topic-throttle", os.Getenv("MQTT_TOPIC_THROTTLE"), "Mqtt topic where to publish throttle values, use MQTT_TOPIC_THROTTLE if args not set")
flag.StringVar(&steeringTopic, "mqtt-topic-steering", os.Getenv("MQTT_TOPIC_STEERING"), "Mqtt topic where to publish steering values, use MQTT_TOPIC_STEERING if args not set")
flag.StringVar(&driveModeTopic, "mqtt-topic-drive-mode", os.Getenv("MQTT_TOPIC_DRIVE_MODE"), "Mqtt topic where to publish drive mode state, use MQTT_TOPIC_DRIVE_MODE if args not set")
flag.StringVar(&switchRecordTopic, "mqtt-topic-switch-record", os.Getenv("MQTT_TOPIC_SWITCH_RECORD"), "Mqtt topic where to publish switch record state, use MQTT_TOPIC_SWITCH_RECORD if args not set")
flag.StringVar(&device, "device", "/dev/serial0", "Serial device") flag.StringVar(&device, "device", "/dev/serial0", "Serial device")
flag.IntVar(&baud, "baud", 115200, "Serial baud") flag.IntVar(&baud, "baud", 115200, "Serial baud")
flag.BoolVar(&debug, "debug", false, "Display raw value to debug") flag.BoolVar(&debug, "debug", false, "Display raw value to debug")
@ -40,7 +44,7 @@ func main() {
} }
defer client.Disconnect(10) defer client.Disconnect(10)
a := arduino.NewPart(client, device, baud, topicBase, pubFrequency, debug) a := arduino.NewPart(client, device, baud, throttleTopic, steeringTopic, driveModeTopic, switchRecordTopic, pubFrequency, debug)
err = a.Start() err = a.Start()
if err != nil { if err != nil {
log.Printf("unable to start service: %v", err) log.Printf("unable to start service: %v", err)