diff --git a/cmd/rc-throttle/rc-throttle.go b/cmd/rc-throttle/rc-throttle.go index 1d2d3e3..64d4491 100644 --- a/cmd/rc-throttle/rc-throttle.go +++ b/cmd/rc-throttle/rc-throttle.go @@ -16,7 +16,7 @@ const ( func main() { var mqttBroker, username, password, clientId string - var throttleTopic string + var throttleTopic, driveModeTopic, rcThrottleTopic string var minThrottle, maxThrottle float64 err := cli.SetFloat64DefaultValueFromEnv(&minThrottle, "THROTTLE_MIN", DefaultThrottleMin) @@ -34,6 +34,8 @@ func main() { cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) flag.StringVar(&throttleTopic, "mqtt-topic-throttle", os.Getenv("MQTT_TOPIC_THROTTLE"), "Mqtt topic to publish throttle result, use MQTT_TOPIC_THROTTLE if args not set") + flag.StringVar(&driveModeTopic, "mqtt-topic-drive-mode", os.Getenv("MQTT_TOPIC_DRIVE_MODE"), "Mqtt topic that contains DriveMode value, use MQTT_TOPIC_DRIVE_MODE if args not set") + flag.StringVar(&rcThrottleTopic, "mqtt-topic-rc-throttle", os.Getenv("MQTT_TOPIC_RC_THROTTLE"), "Mqtt topic that contains RC Throttle value, use MQTT_TOPIC_RC_THROTTLE if args not set") flag.Float64Var(&minThrottle, "throttle-min", minThrottle, "Minimum throttle value, use THROTTLE_MIN if args not set") flag.Float64Var(&maxThrottle, "throttle-max", maxThrottle, "Minimum throttle value, use THROTTLE_MAX if args not set") @@ -51,7 +53,7 @@ func main() { pub := mqttdevice.NewPahoMqttPubSub(mqttBroker, username, password,clientId,mqttQos, mqttRetain) - p := part.NewPart(pub, throttleTopic, minThrottle, maxThrottle) + p := part.NewPart(client, pub, throttleTopic, driveModeTopic, rcThrottleTopic, minThrottle, maxThrottle, 2) defer p.Stop() cli.HandleExit(p) diff --git a/go.mod b/go.mod index 595396f..bb509c8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/cyrilix/robocar-throttle go 1.13 require ( - github.com/cyrilix/robocar-base v0.0.0-20191227142243-e35d6f13814e + github.com/cyrilix/robocar-base v0.0.0-20191227154304-47d48c39b0a2 github.com/eclipse/paho.mqtt.golang v1.2.0 golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect ) diff --git a/go.sum b/go.sum index 2fb511b..3d572e0 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/Microsoft/hcsshim v0.8.6/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXn github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= -github.com/cyrilix/robocar-base v0.0.0-20191227142243-e35d6f13814e h1:g7pMRmVOpaP71VcXE7JNQCjGp6/Wx3GKUASX8hhO4+Q= -github.com/cyrilix/robocar-base v0.0.0-20191227142243-e35d6f13814e/go.mod h1:/KZidG8Y4sKxCCkTcswpKz20oFN3j62tJvamEHcSgLM= +github.com/cyrilix/robocar-base v0.0.0-20191227154304-47d48c39b0a2 h1:7E0P2+YXKtRM++vnBZtaNVlhKEMkp+X3qYdd0CzseJY= +github.com/cyrilix/robocar-base v0.0.0-20191227154304-47d48c39b0a2/go.mod h1:/KZidG8Y4sKxCCkTcswpKz20oFN3j62tJvamEHcSgLM= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v0.7.3-0.20190506211059-b20a14b54661/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= diff --git a/part/part.go b/part/part.go index 30349ad..0d27645 100644 --- a/part/part.go +++ b/part/part.go @@ -2,23 +2,26 @@ package part import ( "github.com/cyrilix/robocar-base/mqttdevice" + "github.com/cyrilix/robocar-base/service" + "github.com/cyrilix/robocar-base/types" mqtt "github.com/eclipse/paho.mqtt.golang" + "log" + "sync" "time" ) -type CommandValue struct { - Value float64 - Confidence float64 -} -type Steering CommandValue -type Throttle CommandValue - -func NewPart(pub mqttdevice.Publisher, throttleTopic string, minValue, maxValue float64) *ThrottlePart { +func NewPart(client mqtt.Client, pub mqttdevice.Publisher, throttleTopic, driveModeTopic, rcThrottleTopic string, + minValue, maxValue float64, publishPilotFrequency int) *ThrottlePart { return &ThrottlePart{ - pub: pub, - throttleTopic: throttleTopic, - minThrottle: minValue, - maxThrottle: maxValue, + client: client, + pub: pub, + throttleTopic: throttleTopic, + driveModeTopic: driveModeTopic, + rcThrottleTopic: rcThrottleTopic, + minThrottle: minValue, + maxThrottle: maxValue, + driveMode: types.DriveModeUser, + publishPilotFrequency: publishPilotFrequency, } } @@ -28,27 +31,91 @@ type ThrottlePart struct { pub mqttdevice.Publisher throttleTopic string minThrottle, maxThrottle float64 - cancel chan interface{} + + muDriveMode sync.RWMutex + driveMode types.DriveMode + + cancel chan interface{} + publishPilotFrequency int + driveModeTopic, rcThrottleTopic string } func (p *ThrottlePart) Start() error { + if err := registerCallbacks(p); err != nil { + log.Printf("unable to rgeister callbacks: %v", err) + return err + } + p.cancel = make(chan interface{}) - ticker := time.NewTicker(500 * time.Millisecond) + ticker := time.NewTicker(1 * time.Second / time.Duration(p.publishPilotFrequency)) for { - - p.pub.Publish(p.throttleTopic, mqttdevice.NewMqttValue(Throttle{ - Value: p.minThrottle, - Confidence: 1.0, - })) - select { case <-ticker.C: + p.publishPilotValue() case <-p.cancel: break } } } +func (p *ThrottlePart) publishPilotValue() { + p.muDriveMode.RLock() + defer p.muDriveMode.RUnlock() + + if p.driveMode != types.DriveModePilot { + return + } + + p.pub.Publish(p.throttleTopic, mqttdevice.NewMqttValue(types.Throttle{ + Value: p.minThrottle, + Confidence: 1.0, + })) +} + func (p *ThrottlePart) Stop() { close(p.cancel) + service.StopService("throttle", p.client, p.driveModeTopic, p.rcThrottleTopic) +} + +func (p *ThrottlePart) onDriveMode(_ mqtt.Client, message mqtt.Message) { + payload := message.Payload() + value := mqttdevice.NewMqttValue(payload) + m, err := value.DriveModeValue() + if err != nil { + log.Printf("invalid drive mode: %v", err) + return + } + + p.muDriveMode.Lock() + defer p.muDriveMode.Unlock() + p.driveMode = m +} + +func (p *ThrottlePart) onRCThrottle(_ mqtt.Client, message mqtt.Message) { + payload := message.Payload() + value := mqttdevice.NewMqttValue(payload) + val, err := value.Float64Value() + if err != nil { + log.Printf("invalid throttle value from arduino: %v", err) + return + } + + p.muDriveMode.RLock() + defer p.muDriveMode.RUnlock() + if p.driveMode == types.DriveModeUser { + p.pub.Publish(p.throttleTopic, mqttdevice.NewMqttValue(types.Throttle{Value: val, Confidence: 1.0})) + } +} + +var registerCallbacks = func (p *ThrottlePart) error { + err := service.RegisterCallback(p.client, p.driveModeTopic, p.onDriveMode) + if err != nil { + return err + } + + err = service.RegisterCallback(p.client, p.rcThrottleTopic, p.onRCThrottle) + if err != nil { + return err + } + return nil } diff --git a/part/part_test.go b/part/part_test.go index 3ae59ce..4460b15 100644 --- a/part/part_test.go +++ b/part/part_test.go @@ -2,35 +2,72 @@ package part import ( "encoding/json" + "github.com/cyrilix/robocar-base/mqttdevice" "github.com/cyrilix/robocar-base/testtools" + "github.com/cyrilix/robocar-base/types" "testing" "time" ) -func TestDefaultThrottle(t *testing.T){ +func TestDefaultThrottle(t *testing.T) { + oldRegister := registerCallbacks + defer func(){ + registerCallbacks = oldRegister + }() + registerCallbacks = func(p *ThrottlePart) error { + return nil + } + throttleTopic := "topic/throttle" + driveModeTopic := "topic/driveMode" + rcThrottleTopic := "topic/rcThrottle" + minValue := 0.56 pub := testtools.NewFakePublisher() - p := NewPart(pub, throttleTopic, minValue, 1.) + p := NewPart(nil, pub, throttleTopic, driveModeTopic, rcThrottleTopic, minValue, 1., 200) + + cases := []struct { + driveMode types.DriveMode + rcThrottle float64 + expectedThrottle float64 + }{ + {types.DriveModeUser, 0.3, 0.3}, + {types.DriveModePilot, 0.5, minValue}, + {types.DriveModePilot, 0.4, minValue}, + {types.DriveModeUser, 0.5, 0.5}, + {types.DriveModeUser, 0.4, 0.4}, + {types.DriveModeUser, 0.6, 0.6}, + } go p.Start() - defer p.Stop() + defer func(){close(p.cancel)}() - time.Sleep(1 * time.Millisecond) + for _, c := range cases { - mqttValue := pub.PublishedEvent(throttleTopic) - var throttle Throttle - err := json.Unmarshal(mqttValue, &throttle) - if err != nil { - t.Errorf("unable to unmarshall response: %v", err) - t.Fail() - } + p.onDriveMode(nil, testtools.NewFakeMessage(driveModeTopic, mqttdevice.NewMqttValue(c.driveMode))) + p.onRCThrottle(nil, testtools.NewFakeMessage(rcThrottleTopic, mqttdevice.NewMqttValue(c.rcThrottle))) - if throttle.Value != minValue { - t.Errorf("bad throttle value: %v, wants %v", throttle.Value, minValue) - } - if throttle.Confidence != 1. { - t.Errorf("bad throtlle confidence: %v, wants %v", throttle.Confidence, 1.) + time.Sleep(10 * time.Millisecond) + + for i := 3; i >= 0; i-- { + + mqttValue := pub.PublishedEvent(throttleTopic) + var throttle types.Throttle + err := json.Unmarshal(mqttValue, &throttle) + if err != nil { + t.Errorf("unable to unmarshall response: %v", err) + t.Fail() + } + + if throttle.Value != c.expectedThrottle { + t.Errorf("bad throttle value for mode %v: %v, wants %v", c.driveMode, throttle.Value, c.expectedThrottle) + } + if throttle.Confidence != 1. { + t.Errorf("bad throtlle confidence: %v, wants %v", throttle.Confidence, 1.) + } + + time.Sleep(1 * time.Millisecond) + } } } diff --git a/vendor/github.com/cyrilix/robocar-base/cli/cli.go b/vendor/github.com/cyrilix/robocar-base/cli/cli.go index d0113cc..7cfaa8f 100644 --- a/vendor/github.com/cyrilix/robocar-base/cli/cli.go +++ b/vendor/github.com/cyrilix/robocar-base/cli/cli.go @@ -3,6 +3,7 @@ package cli import ( "flag" "fmt" + "github.com/cyrilix/robocar-base/service" MQTT "github.com/eclipse/paho.mqtt.golang" "log" "os" @@ -49,12 +50,7 @@ func SetFloat64DefaultValueFromEnv(value *float64, key string, defaultValue floa return nil } -type Part interface { - Start() error - Stop() -} - -func HandleExit(p Part) { +func HandleExit(p service.Part) { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Kill, os.Interrupt, syscall.SIGTERM) diff --git a/vendor/github.com/cyrilix/robocar-base/service/part.go b/vendor/github.com/cyrilix/robocar-base/service/part.go new file mode 100644 index 0000000..c370778 --- /dev/null +++ b/vendor/github.com/cyrilix/robocar-base/service/part.go @@ -0,0 +1,33 @@ +package service + +import ( + "fmt" + mqtt "github.com/eclipse/paho.mqtt.golang" + "log" +) + +func StopService(name string, client mqtt.Client, topics ...string) { + log.Printf("Stop %s service", name) + token := client.Unsubscribe(topics...) + token.Wait() + if token.Error() != nil { + log.Printf("unable to unsubscribe service: %v", token.Error()) + } + client.Disconnect(50) +} + +func RegisterCallback(client mqtt.Client, topic string, callback mqtt.MessageHandler) error { + log.Printf("Register callback on topic %v", topic) + token := client.Subscribe(topic, 0, callback) + token.Wait() + if token.Error() != nil { + return fmt.Errorf("unable to register callback on topic %s: %v", topic, token.Error()) + } + return nil +} + +type Part interface { + Start() error + Stop() +} + diff --git a/vendor/modules.txt b/vendor/modules.txt index 2bf917b..b01e4e8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,6 +1,7 @@ -# github.com/cyrilix/robocar-base v0.0.0-20191227142243-e35d6f13814e +# github.com/cyrilix/robocar-base v0.0.0-20191227154304-47d48c39b0a2 github.com/cyrilix/robocar-base/cli github.com/cyrilix/robocar-base/mqttdevice +github.com/cyrilix/robocar-base/service github.com/cyrilix/robocar-base/testtools github.com/cyrilix/robocar-base/types # github.com/eclipse/paho.mqtt.golang v1.2.0