package steering import ( "github.com/cyrilix/robocar-base/service" "github.com/cyrilix/robocar-protobuf/go/events" mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" "google.golang.org/protobuf/proto" "sync" ) func NewController(client mqtt.Client, steeringTopic, driveModeTopic, rcSteeringTopic, tfSteeringTopic, objectsTopic string) *Controller { return &Controller{ client: client, steeringTopic: steeringTopic, driveModeTopic: driveModeTopic, rcSteeringTopic: rcSteeringTopic, tfSteeringTopic: tfSteeringTopic, objectsTopic: objectsTopic, driveMode: events.DriveMode_USER, } } type Controller struct { client mqtt.Client steeringTopic string muDriveMode sync.RWMutex driveMode events.DriveMode cancel chan interface{} driveModeTopic, rcSteeringTopic, tfSteeringTopic, objectsTopic string muObjects sync.RWMutex objects []*events.Object debug bool } func (p *Controller) Start() error { if err := registerCallbacks(p); err != nil { zap.S().Errorf("unable to rgeister callbacks: %v", err) return err } p.cancel = make(chan interface{}) <-p.cancel return nil } func (p *Controller) Stop() { close(p.cancel) service.StopService("throttle", p.client, p.driveModeTopic, p.rcSteeringTopic, p.tfSteeringTopic) } func (p *Controller) onObjects(_ mqtt.Client, message mqtt.Message) { var msg events.ObjectsMessage err := proto.Unmarshal(message.Payload(), &msg) if err != nil { zap.S().Errorf("unable to unmarshal protobuf %T message: %v", msg, err) return } p.muObjects.Lock() defer p.muObjects.Unlock() p.objects = msg.GetObjects() } func (p *Controller) onDriveMode(_ mqtt.Client, message mqtt.Message) { var msg events.DriveModeMessage err := proto.Unmarshal(message.Payload(), &msg) if err != nil { zap.S().Errorf("unable to unmarshal protobuf %T message: %v", msg, err) return } p.muDriveMode.Lock() defer p.muDriveMode.Unlock() p.driveMode = msg.GetDriveMode() } func (p *Controller) onRCSteering(_ mqtt.Client, message mqtt.Message) { p.muDriveMode.RLock() defer p.muDriveMode.RUnlock() if p.debug { var evt events.SteeringMessage err := proto.Unmarshal(message.Payload(), &evt) if err != nil { zap.S().Debugf("unable to unmarshal rc event: %v", err) } else { zap.S().Debugf("receive steering message from radio command: %0.00f", evt.GetSteering()) } } if p.driveMode == events.DriveMode_USER { // Republish same content payload := message.Payload() publish(p.client, p.steeringTopic, &payload) } } func (p *Controller) onTFSteering(_ mqtt.Client, message mqtt.Message) { p.muDriveMode.RLock() defer p.muDriveMode.RUnlock() if p.debug { var evt events.SteeringMessage err := proto.Unmarshal(message.Payload(), &evt) if err != nil { zap.S().Debugf("unable to unmarshal tensorflow event: %v", err) } else { zap.S().Debugf("receive steering message from tensorflow: %0.00f", evt.GetSteering()) } } if p.driveMode == events.DriveMode_PILOT { // Republish same content payload := message.Payload() publish(p.client, p.steeringTopic, &payload) } } var registerCallbacks = func(p *Controller) error { err := service.RegisterCallback(p.client, p.driveModeTopic, p.onDriveMode) if err != nil { return err } err = service.RegisterCallback(p.client, p.rcSteeringTopic, p.onRCSteering) if err != nil { return err } err = service.RegisterCallback(p.client, p.tfSteeringTopic, p.onTFSteering) if err != nil { return err } err = service.RegisterCallback(p.client, p.objectsTopic, p.onObjects) if err != nil { return err } return nil } var publish = func(client mqtt.Client, topic string, payload *[]byte) { client.Publish(topic, 0, false, *payload) }