diff --git a/cmd/rc-simulator/rc-simultator.go b/cmd/rc-simulator/rc-simultator.go index 2e5a956..589952d 100644 --- a/cmd/rc-simulator/rc-simultator.go +++ b/cmd/rc-simulator/rc-simultator.go @@ -25,11 +25,11 @@ func main() { cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) - flag.StringVar(&topicFrame, "events-topic-frame", os.Getenv("MQTT_TOPIC_FRAME"), "Mqtt topic to events gateway frames, use MQTT_TOPIC_FRAME if args not set") - flag.StringVar(&topicFrame, "events-topic-steering", os.Getenv("MQTT_TOPIC_STEERING"), "Mqtt topic to events gateway steering, use MQTT_TOPIC_STEERING if args not set") - flag.StringVar(&topicFrame, "events-topic-throttle", os.Getenv("MQTT_TOPIC_THROTTLE"), "Mqtt topic to events gateway throttle, use MQTT_TOPIC_THROTTLE if args not set") - flag.StringVar(&topicFrame, "topic-steering-ctrl", os.Getenv("MQTT_TOPIC_STEERING_CTRL"), "Mqtt topic to send steering instructions, use MQTT_TOPIC_STEERING_CTRL if args not set") - flag.StringVar(&topicFrame, "topic-throttle-ctrl", os.Getenv("MQTT_TOPIC_THROTTLE_CTRL"), "Mqtt topic to send throttle instructions, use MQTT_TOPIC_THROTTLE_CTRL if args not set") + flag.StringVar(&topicFrame, "events-topic-camera", os.Getenv("MQTT_TOPIC_CAMERA"), "Mqtt topic to events gateway frames, use MQTT_TOPIC_CAMERA if args not set") + flag.StringVar(&topicSteering, "events-topic-steering", os.Getenv("MQTT_TOPIC_STEERING"), "Mqtt topic to events gateway steering, use MQTT_TOPIC_STEERING if args not set") + flag.StringVar(&topicThrottle, "events-topic-throttle", os.Getenv("MQTT_TOPIC_THROTTLE"), "Mqtt topic to events gateway throttle, use MQTT_TOPIC_THROTTLE if args not set") + flag.StringVar(&topicCtrlSteering, "topic-steering-ctrl", os.Getenv("MQTT_TOPIC_STEERING_CTRL"), "Mqtt topic to send steering instructions, use MQTT_TOPIC_STEERING_CTRL if args not set") + flag.StringVar(&topicCtrlThrottle, "topic-throttle-ctrl", os.Getenv("MQTT_TOPIC_THROTTLE_CTRL"), "Mqtt topic to send throttle instructions, use MQTT_TOPIC_THROTTLE_CTRL if args not set") flag.StringVar(&address, "simulator-address", "127.0.0.1:9091", "Simulator address") flag.BoolVar(&debug, "debug", false, "Debug logs") @@ -59,6 +59,7 @@ func main() { topicThrottle, ) defer msgPub.Stop() + msgPub.Start() cli.HandleExit(gtw) @@ -66,7 +67,7 @@ func main() { if err != nil { log.Fatalf("unable to start service: %v", err) } - msgPub.Start() + if topicCtrlSteering != "" { log.Infof("configure mqtt route on steering command") diff --git a/pkg/events/events.go b/pkg/events/events.go index 8ed73a2..8c8da09 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -111,6 +111,7 @@ func (m *MsgPublisher) listenFrame() { msgChan := m.srcEvents.SubscribeFrame() for { msg := <-msgChan + logr.Debugf("new frame %v", msg.Id) if m.topicFrame == "" { return } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 9696df0..d6a248f 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -62,86 +62,86 @@ type Gateway struct { throttleSubscribers map[chan<- *events.ThrottleMessage]interface{} } -func (p *Gateway) Start() error { - p.log.Info("connect to simulator") - p.cancel = make(chan interface{}) +func (g *Gateway) Start() error { + g.log.Info("connect to simulator") + g.cancel = make(chan interface{}) msgChan := make(chan *simulator.TelemetryMsg) - go p.run(msgChan) + go g.run(msgChan) for { select { case msg := <-msgChan: - fr := p.publishFrame(msg) - go p.publishInputSteering(msg, fr) - go p.publishInputThrottle(msg, fr) - case <-p.cancel: + fr := g.publishFrame(msg) + go g.publishInputSteering(msg, fr) + go g.publishInputThrottle(msg, fr) + case <-g.cancel: return nil } } } -func (p *Gateway) Stop() { - p.log.Info("close simulator gateway") - close(p.cancel) +func (g *Gateway) Stop() { + g.log.Info("close simulator gateway") + close(g.cancel) - if err := p.Close(); err != nil { - p.log.Warnf("unexpected error while simulator connection is closed: %v", err) + if err := g.Close(); err != nil { + g.log.Warnf("unexpected error while simulator connection is closed: %v", err) } } -func (p *Gateway) Close() error { - for c := range p.frameSubscribers { +func (g *Gateway) Close() error { + for c := range g.frameSubscribers { close(c) } - for c := range p.steeringSubscribers { + for c := range g.steeringSubscribers { close(c) } - for c := range p.throttleSubscribers { + for c := range g.throttleSubscribers { close(c) } - if p.conn == nil { - p.log.Warn("no connection to close") + if g.conn == nil { + g.log.Warn("no connection to close") return nil } - if err := p.conn.Close(); err != nil { + if err := g.conn.Close(); err != nil { return fmt.Errorf("unable to close connection to simulator: %v", err) } return nil } -func (p *Gateway) run(msgChan chan<- *simulator.TelemetryMsg) { - err := p.connect() +func (g *Gateway) run(msgChan chan<- *simulator.TelemetryMsg) { + err := g.connect() if err != nil { - p.log.Panicf("unable to connect to simulator: %v", err) + g.log.Panicf("unable to connect to simulator: %v", err) } - reader := bufio.NewReader(p.conn) + reader := bufio.NewReader(g.conn) err = retry.Do( - func() error { return p.listen(msgChan, reader) }, + func() error { return g.listen(msgChan, reader) }, ) if err != nil { - p.log.Errorf("unable to connect to server: %v", err) + g.log.Errorf("unable to connect to server: %v", err) } } -func (p *Gateway) connect() error { - p.muConn.Lock() - defer p.muConn.Unlock() +func (g *Gateway) connect() error { + g.muConn.Lock() + defer g.muConn.Unlock() - if p.conn != nil { + if g.conn != nil { // already connected return nil } err := retry.Do(func() error { - p.log.Info("connect to simulator") - conn, err := connect(p.address) + g.log.Info("connect to simulator") + conn, err := connect(g.address) if err != nil { - return fmt.Errorf("unable to connect to simulator at %v", p.address) + return fmt.Errorf("unable to connect to simulator at %v", g.address) } - p.conn = conn - p.log.Info("connection success") + g.conn = conn + g.log.Info("connection success") return nil }, retry.Delay(1*time.Second), @@ -149,11 +149,11 @@ func (p *Gateway) connect() error { return err } -func (p *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.Reader) error { +func (g *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.Reader) error { for { rawLine, err := reader.ReadBytes('\n') if err == io.EOF { - p.log.Info("Connection closed") + g.log.Info("Connection closed") return err } if err != nil { @@ -163,7 +163,7 @@ func (p *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.R var msg simulator.TelemetryMsg err = json.Unmarshal(rawLine, &msg) if err != nil { - p.log.Errorf("unable to unmarshal simulator msg '%v': %v", string(rawLine), err) + g.log.Errorf("unable to unmarshal simulator msg '%v': %v", string(rawLine), err) } if "telemetry" != msg.MsgType { continue @@ -172,7 +172,7 @@ func (p *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.R } } -func (p *Gateway) publishFrame(msgSim *simulator.TelemetryMsg) *events.FrameRef { +func (g *Gateway) publishFrame(msgSim *simulator.TelemetryMsg) *events.FrameRef { now := time.Now() frameRef := &events.FrameRef{ Name: "gateway", @@ -188,13 +188,17 @@ func (p *Gateway) publishFrame(msgSim *simulator.TelemetryMsg) *events.FrameRef } log.Debugf("events frame '%v/%v'", msg.Id.Name, msg.Id.Id) - for fs := range p.frameSubscribers { + log.Infof("publish frame to %v receiver", len(g.frameSubscribers)) + + for fs := range g.frameSubscribers { + log.Info("publish frame") fs <- msg + log.Info("frame published") } return frameRef } -func (p *Gateway) publishInputSteering(msgSim *simulator.TelemetryMsg, frameRef *events.FrameRef) { +func (g *Gateway) publishInputSteering(msgSim *simulator.TelemetryMsg, frameRef *events.FrameRef) { steering := &events.SteeringMessage{ FrameRef: frameRef, Steering: float32(msgSim.SteeringAngle), @@ -202,12 +206,12 @@ func (p *Gateway) publishInputSteering(msgSim *simulator.TelemetryMsg, frameRef } log.Debugf("events steering '%v'", steering.Steering) - for ss := range p.steeringSubscribers { + for ss := range g.steeringSubscribers { ss <- steering } } -func (p *Gateway) publishInputThrottle(msgSim *simulator.TelemetryMsg, frameRef *events.FrameRef) { +func (g *Gateway) publishInputThrottle(msgSim *simulator.TelemetryMsg, frameRef *events.FrameRef) { msg := &events.ThrottleMessage{ FrameRef: frameRef, Throttle: float32(msgSim.Throttle), @@ -215,24 +219,24 @@ func (p *Gateway) publishInputThrottle(msgSim *simulator.TelemetryMsg, frameRef } log.Debugf("events throttle '%v'", msg.Throttle) - for ts := range p.throttleSubscribers { + for ts := range g.throttleSubscribers { ts <- msg } } -func (p *Gateway) SubscribeFrame() <-chan *events.FrameMessage { +func (g *Gateway) SubscribeFrame() <-chan *events.FrameMessage { frameChan := make(chan *events.FrameMessage) - p.frameSubscribers[frameChan] = struct{}{} + g.frameSubscribers[frameChan] = struct{}{} return frameChan } -func (p *Gateway) SubscribeSteering() <-chan *events.SteeringMessage { +func (g *Gateway) SubscribeSteering() <-chan *events.SteeringMessage { steeringChan := make(chan *events.SteeringMessage) - p.steeringSubscribers[steeringChan] = struct{}{} + g.steeringSubscribers[steeringChan] = struct{}{} return steeringChan } -func (p *Gateway) SubscribeThrottle() <-chan *events.ThrottleMessage { +func (g *Gateway) SubscribeThrottle() <-chan *events.ThrottleMessage { throttleChan := make(chan *events.ThrottleMessage) - p.throttleSubscribers[throttleChan] = struct{}{} + g.throttleSubscribers[throttleChan] = struct{}{} return throttleChan } @@ -244,60 +248,60 @@ var connect = func(address string) (io.ReadWriteCloser, error) { return conn, nil } -func (p *Gateway) WriteSteering(message *events.SteeringMessage) { - p.muControl.Lock() - defer p.muControl.Unlock() - p.initLastControlMsg() +func (g *Gateway) WriteSteering(message *events.SteeringMessage) { + g.muControl.Lock() + defer g.muControl.Unlock() + g.initLastControlMsg() - p.lastControl.Steering = message.Steering - p.writeControlCommandToSimulator() + g.lastControl.Steering = message.Steering + g.writeControlCommandToSimulator() } -func (p *Gateway) writeControlCommandToSimulator() { - if err := p.connect(); err != nil { - p.log.Errorf("unable to connect to simulator to send control command: %v", err) +func (g *Gateway) writeControlCommandToSimulator() { + if err := g.connect(); err != nil { + g.log.Errorf("unable to connect to simulator to send control command: %v", err) return } - w := bufio.NewWriter(p.conn) - content, err := json.Marshal(p.lastControl) + w := bufio.NewWriter(g.conn) + content, err := json.Marshal(g.lastControl) if err != nil { - p.log.Errorf("unable to marshall control msg \"%#v\": %v", p.lastControl, err) + g.log.Errorf("unable to marshall control msg \"%#v\": %v", g.lastControl, err) return } _, err = w.Write(append(content, '\n')) if err != nil { - p.log.Errorf("unable to write control msg \"%#v\" to simulator: %v", p.lastControl, err) + g.log.Errorf("unable to write control msg \"%#v\" to simulator: %v", g.lastControl, err) return } err = w.Flush() if err != nil { - p.log.Errorf("unable to flush control msg \"%#v\" to simulator: %v", p.lastControl, err) + g.log.Errorf("unable to flush control msg \"%#v\" to simulator: %v", g.lastControl, err) return } } -func (p *Gateway) WriteThrottle(message *events.ThrottleMessage) { - p.muControl.Lock() - defer p.muControl.Unlock() - p.initLastControlMsg() +func (g *Gateway) WriteThrottle(message *events.ThrottleMessage) { + g.muControl.Lock() + defer g.muControl.Unlock() + g.initLastControlMsg() if message.Throttle > 0 { - p.lastControl.Throttle = message.Throttle - p.lastControl.Brake = 0. + g.lastControl.Throttle = message.Throttle + g.lastControl.Brake = 0. } else { - p.lastControl.Throttle = 0. - p.lastControl.Brake = -1 * message.Throttle + g.lastControl.Throttle = 0. + g.lastControl.Brake = -1 * message.Throttle } - p.writeControlCommandToSimulator() + g.writeControlCommandToSimulator() } -func (p *Gateway) initLastControlMsg() { - if p.lastControl != nil { +func (g *Gateway) initLastControlMsg() { + if g.lastControl != nil { return } - p.lastControl = &simulator.ControlMsg{ + g.lastControl = &simulator.ControlMsg{ MsgType: "control", Steering: 0., Throttle: 0.,