From 0f79efacf2d11fbcd870cc7eab2763a9735ba71a Mon Sep 17 00:00:00 2001 From: Cyrille Nofficial Date: Thu, 21 Jan 2021 23:39:52 +0100 Subject: [PATCH] Refactor gateway module to remove mqtt dependency --- cmd/rc-simulator/rc-simultator.go | 6 +- pkg/events/events.go | 145 +++++++++++ pkg/events/events_test.go | 236 ++++++++++++++++++ pkg/gateway/controls_test.go | 86 +------ pkg/gateway/gateway.go | 181 ++++++-------- pkg/gateway/gateway_test.go | 109 +++----- pkg/gateway/{util_test.go.go => util_test.go} | 0 .../{simulator.go.go => simulator.go} | 0 8 files changed, 496 insertions(+), 267 deletions(-) create mode 100644 pkg/events/events.go create mode 100644 pkg/events/events_test.go rename pkg/gateway/{util_test.go.go => util_test.go} (100%) rename pkg/simulator/{simulator.go.go => simulator.go} (100%) diff --git a/cmd/rc-simulator/rc-simultator.go b/cmd/rc-simulator/rc-simultator.go index 547d479..095afb4 100644 --- a/cmd/rc-simulator/rc-simultator.go +++ b/cmd/rc-simulator/rc-simultator.go @@ -20,7 +20,7 @@ func main() { cli.InitMqttFlags(DefaultClientId, &mqttBroker, &username, &password, &clientId, &mqttQos, &mqttRetain) - flag.StringVar(&topicFrame, "mqtt-topic-frame", os.Getenv("MQTT_TOPIC"), "Mqtt topic to publish gateway frames, use MQTT_TOPIC_FRAME if args not set") + flag.StringVar(&topicFrame, "events-topic-frame", os.Getenv("MQTT_TOPIC"), "Mqtt topic to events gateway frames, use MQTT_TOPIC_FRAME if args not set") flag.StringVar(&address, "simulator-address", "127.0.0.1:9091", "Simulator address") flag.BoolVar(&debug, "debug", false, "Debug logs") @@ -35,11 +35,11 @@ func main() { client, err := cli.Connect(mqttBroker, username, password, clientId) if err != nil { - log.Fatalf("unable to connect to mqtt broker: %v", err) + log.Fatalf("unable to connect to events broker: %v", err) } defer client.Disconnect(10) - c := gateway.New(gateway.NewMqttPublisher(client, topicFrame, "", ""), address) + c := gateway.New(address) defer c.Stop() cli.HandleExit(c) diff --git a/pkg/events/events.go b/pkg/events/events.go new file mode 100644 index 0000000..30f76b0 --- /dev/null +++ b/pkg/events/events.go @@ -0,0 +1,145 @@ +package events + +import ( + "fmt" + "github.com/cyrilix/robocar-simulator/pkg/gateway" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/golang/protobuf/proto" + log "github.com/sirupsen/logrus" + "sync" + "time" +) + +func NewMsgPublisher(srcEvents gateway.SimulatorSource, p Publisher, topicFrame, topicSteering, topicThrottle string) *MsgPublisher { + return &MsgPublisher{ + p: p, + topicFrame: topicFrame, + topicSteering: topicSteering, + topicThrottle: topicThrottle, + srcEvents: srcEvents, + muCancel: sync.Mutex{}, + cancel: nil, + } +} + +type MsgPublisher struct { + p Publisher + topicFrame string + topicSteering string + topicThrottle string + + srcEvents gateway.SimulatorSource + + muCancel sync.Mutex + cancel chan interface{} +} + +func (m *MsgPublisher) Start() { + m.muCancel.Lock() + defer m.muCancel.Unlock() + + m.cancel = make(chan interface{}) + + if m.topicThrottle != "" { + go m.listenThrottle() + } + if m.topicSteering != "" { + go m.listenSteering() + } + if m.topicFrame != "" { + go m.listenFrame() + } +} + +func (m *MsgPublisher) Stop() { + m.muCancel.Lock() + defer m.muCancel.Unlock() + close(m.cancel) + m.cancel = nil +} + +func (m *MsgPublisher) listenThrottle() { + logr := log.WithField("msg_type", "throttleChan") + msgChan := m.srcEvents.SubscribeThrottle() + for { + select { + case <-m.cancel: + logr.Debug("exit listen throttleChan loop") + return + case msg := <-msgChan: + + payload, err := proto.Marshal(msg) + if err != nil { + logr.Errorf("unable to marshal protobuf message: %v", err) + } else { + + err = m.p.Publish(m.topicThrottle, payload) + if err != nil { + logr.Errorf("unable to publish events message: %v", err) + } + } + } + } +} + +func (m *MsgPublisher) listenSteering() { + logr := log.WithField("msg_type", "steeringChan") + msgChan := m.srcEvents.SubscribeSteering() + for { + select { + + case msg := <-msgChan: + if m.topicSteering == "" { + return + } + + payload, err := proto.Marshal(msg) + if err != nil { + logr.Errorf("unable to marshal protobuf message: %v", err) + } else { + err = m.p.Publish(m.topicSteering, payload) + if err != nil { + logr.Errorf("unable to publish events message: %v", err) + } + } + } + } +} + +func (m *MsgPublisher) listenFrame() { + logr := log.WithField("msg_type", "frame") + msgChan := m.srcEvents.SubscribeFrame() + for { + msg := <-msgChan + if m.topicFrame == "" { + return + } + + payload, err := proto.Marshal(msg) + if err != nil { + logr.Errorf("unable to marshal protobuf message: %v", err) + continue + } + err = m.p.Publish(m.topicFrame, payload) + if err != nil { + logr.Errorf("unable to publish events message: %v", err) + } + } +} + +type Publisher interface { + Publish(topic string, payload []byte) error +} + +type MqttPublisher struct { + client mqtt.Client +} + +func (m *MqttPublisher) Publish(topic string, payload []byte) error { + token := m.client.Publish(topic, 0, false, payload) + token.WaitTimeout(10 * time.Millisecond) + if err := token.Error(); err != nil { + return fmt.Errorf("unable to events to topic: %v", err) + } + return nil +} diff --git a/pkg/events/events_test.go b/pkg/events/events_test.go new file mode 100644 index 0000000..f8968f9 --- /dev/null +++ b/pkg/events/events_test.go @@ -0,0 +1,236 @@ +package events + +import ( + "fmt" + "github.com/cyrilix/robocar-protobuf/go/events" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/timestamp" + "testing" + "time" +) + +func NewMockSimulator() *SrcEventsMock { + return &SrcEventsMock{ + frameChan: make(chan *events.FrameMessage), + steeringChan: make(chan *events.SteeringMessage), + throttleChan: make(chan *events.ThrottleMessage), + } +} + +type SrcEventsMock struct { + frameChan chan *events.FrameMessage + steeringChan chan *events.SteeringMessage + throttleChan chan *events.ThrottleMessage +} + +func (s *SrcEventsMock) Close() { + if s.frameChan != nil { + close(s.frameChan) + } + if s.steeringChan != nil { + close(s.steeringChan) + } + if s.throttleChan != nil { + close(s.throttleChan) + } +} + +func (s *SrcEventsMock) WriteFrame(msg *events.FrameMessage) { + s.frameChan <- msg +} +func (s *SrcEventsMock) WriteSteering(msg *events.SteeringMessage) { + s.steeringChan <- msg +} +func (s *SrcEventsMock) WriteThrottle(msg *events.ThrottleMessage) { + s.throttleChan <- msg +} + +func (s *SrcEventsMock) SubscribeFrame() <-chan *events.FrameMessage { + return s.frameChan +} + +func (s *SrcEventsMock) SubscribeSteering() <-chan *events.SteeringMessage { + return s.steeringChan +} + +func (s *SrcEventsMock) SubscribeThrottle() <-chan *events.ThrottleMessage { + return s.throttleChan +} + +func NewPublisherMock(topicFrame string, topicSteering string, topicThrottle string) *PublisherMock { + return &PublisherMock{ + topicFrame: topicFrame, + topicSteering: topicSteering, + topicThrottle: topicThrottle, + frameChan: make(chan []byte), + steeringChan: make(chan []byte), + throttleChan: make(chan []byte), + } +} + +type PublisherMock struct { + frameChan chan []byte + steeringChan chan []byte + throttleChan chan []byte + topicFrame string + topicSteering string + topicThrottle string +} + +func (p *PublisherMock) Close() { + close(p.frameChan) + close(p.steeringChan) + close(p.throttleChan) +} + +func (p *PublisherMock) NotifyFrame() <-chan []byte { + return p.frameChan +} +func (p *PublisherMock) NotifySteering() <-chan []byte { + return p.steeringChan +} +func (p *PublisherMock) NotifyThrottle() <-chan []byte { + return p.throttleChan +} + +func (p PublisherMock) Publish(topic string, payload []byte) error { + + switch topic { + case p.topicFrame: + p.frameChan <- payload + case p.topicSteering: + p.steeringChan <- payload + case p.topicThrottle: + p.throttleChan <- payload + default: + return fmt.Errorf("invalid topic: %v", topic) + } + return nil +} + +func TestMsgPublisher_Frame(t *testing.T) { + sem := NewMockSimulator() + defer sem.Close() + + p := NewPublisherMock("frame", "steering", "throttle") + defer p.Close() + + mp := NewMsgPublisher(sem, p, "frame", "steering", "throttle") + mp.Start() + defer mp.Stop() + + frameMsg := &events.FrameMessage{ + Id: &events.FrameRef{ + Name: "frame1", + Id: "1", + CreatedAt: ×tamp.Timestamp{ + Seconds: time.Now().Unix(), + }, + }, + Frame: []byte("image content"), + } + + go sem.WriteFrame(frameMsg) + + framePayload := <-p.NotifyFrame() + var frameSended events.FrameMessage + err := proto.Unmarshal(framePayload, &frameSended) + if err != nil { + t.Errorf("unable to unmarshal frame msg: %v", err) + } + if frameSended.Id.Id != frameMsg.Id.Id { + t.Errorf("invalid id frame '%v', wants %v", frameSended.Id.Id, frameMsg.Id.Id) + } + if frameSended.Id.Name != frameMsg.Id.Name { + t.Errorf("invalid name frame '%v', wants %v", frameSended.Id.Name, frameMsg.Id.Name) + } + +} +func TestMsgPublisher_Steering(t *testing.T) { + sem := NewMockSimulator() + defer sem.Close() + + p := NewPublisherMock("frame", "steering", "throttle") + defer p.Close() + + mp := NewMsgPublisher(sem, p, "frame", "steering", "throttle") + mp.Start() + defer mp.Stop() + + steeringMsg := &events.SteeringMessage{ + FrameRef: &events.FrameRef{ + Name: "frame1", + Id: "1", + CreatedAt: ×tamp.Timestamp{ + Seconds: time.Now().Unix(), + }, + }, + Steering: 0.8, + Confidence: 1.0, + } + + go sem.WriteSteering(steeringMsg) + + steeringPayload := <-p.NotifySteering() + var steeringSended events.SteeringMessage + err := proto.Unmarshal(steeringPayload, &steeringSended) + if err != nil { + t.Errorf("unable to unmarshal steering msg: %v", err) + } + if steeringSended.FrameRef.Id != steeringMsg.FrameRef.Id { + t.Errorf("invalid id frame '%v', wants %v", steeringSended.FrameRef.Id, steeringMsg.FrameRef.Id) + } + if steeringSended.FrameRef.Name != steeringMsg.FrameRef.Name { + t.Errorf("invalid name frame '%v', wants %v", steeringSended.FrameRef.Name, steeringMsg.FrameRef.Name) + } + if steeringSended.Steering != steeringMsg.Steering { + t.Errorf("invalid steering value '%v', wants '%v'", steeringSended.Steering, steeringMsg.Steering) + } + if steeringSended.Confidence != steeringMsg.Confidence { + t.Errorf("invalid steering confidence value '%v', wants '%v'", steeringSended.Confidence, steeringMsg.Confidence) + } +} +func TestMsgPublisher_Throttle(t *testing.T) { + sem := NewMockSimulator() + defer sem.Close() + + p := NewPublisherMock("frame", "steering", "throttle") + defer p.Close() + + mp := NewMsgPublisher(sem, p, "frame", "steering", "throttle") + mp.Start() + defer mp.Stop() + + throttleMsg := &events.ThrottleMessage{ + FrameRef: &events.FrameRef{ + Name: "frame1", + Id: "1", + CreatedAt: ×tamp.Timestamp{ + Seconds: time.Now().Unix(), + }, + }, + Throttle: 0.7, + Confidence: 1.0, + } + + go sem.WriteThrottle(throttleMsg) + + throttlePayload := <-p.NotifyThrottle() + var throttleSended events.ThrottleMessage + err := proto.Unmarshal(throttlePayload, &throttleSended) + if err != nil { + t.Errorf("unable to unmarshal throttle msg: %v", err) + } + if throttleSended.FrameRef.Id != throttleMsg.FrameRef.Id { + t.Errorf("invalid id frame '%v', wants %v", throttleSended.FrameRef.Id, throttleMsg.FrameRef.Id) + } + if throttleSended.FrameRef.Name != throttleMsg.FrameRef.Name { + t.Errorf("invalid name frame '%v', wants %v", throttleSended.FrameRef.Name, throttleMsg.FrameRef.Name) + } + if throttleSended.Throttle != throttleMsg.Throttle { + t.Errorf("invalid throttle value '%v', wants '%v'", throttleSended.Throttle, throttleMsg.Throttle) + } + if throttleSended.Confidence != throttleMsg.Confidence { + t.Errorf("invalid throttle confidence value '%v', wants '%v'", throttleSended.Confidence, throttleMsg.Confidence) + } +} diff --git a/pkg/gateway/controls_test.go b/pkg/gateway/controls_test.go index 142bbcb..a865f6b 100644 --- a/pkg/gateway/controls_test.go +++ b/pkg/gateway/controls_test.go @@ -1,15 +1,8 @@ package gateway import ( - "bufio" - "encoding/json" - "fmt" "github.com/cyrilix/robocar-protobuf/go/events" "github.com/cyrilix/robocar-simulator/pkg/simulator" - log "github.com/sirupsen/logrus" - "io" - "net" - "sync" "testing" ) @@ -71,7 +64,7 @@ func TestGateway_WriteSteering(t *testing.T) { } }() - gw := New(nil, simulatorMock.Addr()) + gw := New(simulatorMock.Addr()) if err != nil { t.Fatalf("unable to init simulator gateway: %v", err) } @@ -190,7 +183,7 @@ func TestGateway_WriteThrottle(t *testing.T) { } }() - gw := New(nil, simulatorMock.Addr()) + gw := New(simulatorMock.Addr()) if err != nil { t.Fatalf("unable to init simulator gateway: %v", err) } @@ -206,78 +199,3 @@ func TestGateway_WriteThrottle(t *testing.T) { } } } - -type ConnMock struct { - initMsgsOnce sync.Once - - ln net.Listener - notifyChan chan *simulator.ControlMsg - initNotifyChan sync.Once -} - -func (c *ConnMock) Notify() <-chan *simulator.ControlMsg { - c.initNotifyChan.Do(func() { c.notifyChan = make(chan *simulator.ControlMsg) }) - return c.notifyChan -} - -func (c *ConnMock) listen() error { - ln, err := net.Listen("tcp", "127.0.0.1:") - c.ln = ln - if err != nil { - - return fmt.Errorf("unable to listen on port: %v", err) - } - - go func() { - for { - conn, err := c.ln.Accept() - if err != nil { - log.Infof("connection close: %v", err) - break - } - go c.handleConnection(conn) - } - }() - return nil -} - -func (c *ConnMock) Addr() string { - return c.ln.Addr().String() -} - -func (c *ConnMock) handleConnection(conn net.Conn) { - c.initNotifyChan.Do(func() { c.notifyChan = make(chan *simulator.ControlMsg) }) - reader := bufio.NewReader(conn) - for { - rawCmd, err := reader.ReadBytes('\n') - if err != nil { - if err == io.EOF { - log.Info("connection closed") - break - } - log.Errorf("unable to read request: %v", err) - return - } - - var msg simulator.ControlMsg - err = json.Unmarshal(rawCmd, &msg) - if err != nil { - log.Errorf("unable to unmarchal control msg \"%v\": %v", string(rawCmd), err) - continue - } - - c.notifyChan <- &msg - } -} - -func (c *ConnMock) Close() error { - log.Infof("close mock server") - err := c.ln.Close() - if err != nil { - return fmt.Errorf("unable to close mock server: %v", err) - } - if c.notifyChan != nil { - close(c.notifyChan) - } - return nil -} diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 48a1dad..9696df0 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -7,8 +7,6 @@ import ( "github.com/avast/retry-go" "github.com/cyrilix/robocar-protobuf/go/events" "github.com/cyrilix/robocar-simulator/pkg/simulator" - mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/timestamp" log "github.com/sirupsen/logrus" "io" @@ -17,18 +15,36 @@ import ( "time" ) -func New(publisher Publisher, addressSimulator string) *Gateway { +type SimulatorSource interface { + FrameSource + SteeringSource + ThrottleSource +} + +type FrameSource interface { + SubscribeFrame() <-chan *events.FrameMessage +} +type SteeringSource interface { + SubscribeSteering() <-chan *events.SteeringMessage +} +type ThrottleSource interface { + SubscribeThrottle() <-chan *events.ThrottleMessage +} + +func New(addressSimulator string) *Gateway { l := log.WithField("simulator", addressSimulator) l.Info("run gateway from simulator") return &Gateway{ - address: addressSimulator, - publisher: publisher, - log: l, + address: addressSimulator, + log: l, + frameSubscribers: make(map[chan<- *events.FrameMessage]interface{}), + steeringSubscribers: make(map[chan<- *events.SteeringMessage]interface{}), + throttleSubscribers: make(map[chan<- *events.ThrottleMessage]interface{}), } } -/* Simulator interface to publish gateway frames into mqtt topicFrame */ +/* Simulator interface to events gateway frames into events topicFrame */ type Gateway struct { cancel chan interface{} @@ -39,8 +55,11 @@ type Gateway struct { muControl sync.Mutex lastControl *simulator.ControlMsg - publisher Publisher - log *log.Entry + log *log.Entry + + frameSubscribers map[chan<- *events.FrameMessage]interface{} + steeringSubscribers map[chan<- *events.SteeringMessage]interface{} + throttleSubscribers map[chan<- *events.ThrottleMessage]interface{} } func (p *Gateway) Start() error { @@ -53,9 +72,9 @@ func (p *Gateway) Start() error { for { select { case msg := <-msgChan: - go p.publishFrame(msg) - go p.publishInputSteering(msg) - go p.publishInputThrottle(msg) + fr := p.publishFrame(msg) + go p.publishInputSteering(msg, fr) + go p.publishInputThrottle(msg, fr) case <-p.cancel: return nil } @@ -72,6 +91,15 @@ func (p *Gateway) Stop() { } func (p *Gateway) Close() error { + for c := range p.frameSubscribers { + close(c) + } + for c := range p.steeringSubscribers { + close(c) + } + for c := range p.throttleSubscribers { + close(c) + } if p.conn == nil { p.log.Warn("no connection to close") return nil @@ -144,54 +172,68 @@ func (p *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.R } } -func (p *Gateway) publishFrame(msgSim *simulator.TelemetryMsg) { +func (p *Gateway) publishFrame(msgSim *simulator.TelemetryMsg) *events.FrameRef { now := time.Now() - msg := &events.FrameMessage{ - Id: &events.FrameRef{ - Name: "gateway", - Id: fmt.Sprintf("%d%03d", now.Unix(), now.Nanosecond()/1000/1000), - CreatedAt: ×tamp.Timestamp{ - Seconds: now.Unix(), - Nanos: int32(now.Nanosecond()), - }, + frameRef := &events.FrameRef{ + Name: "gateway", + Id: fmt.Sprintf("%d%03d", now.Unix(), now.Nanosecond()/1000/1000), + CreatedAt: ×tamp.Timestamp{ + Seconds: now.Unix(), + Nanos: int32(now.Nanosecond()), }, + } + msg := &events.FrameMessage{ + Id: frameRef, Frame: msgSim.Image, } - log.Debugf("publish frame '%v/%v'", msg.Id.Name, msg.Id.Id) - payload, err := proto.Marshal(msg) - if err != nil { - p.log.Errorf("unable to marshal protobuf message: %v", err) + log.Debugf("events frame '%v/%v'", msg.Id.Name, msg.Id.Id) + for fs := range p.frameSubscribers { + fs <- msg } - p.publisher.PublishFrame(payload) + return frameRef } -func (p *Gateway) publishInputSteering(msgSim *simulator.TelemetryMsg) { +func (p *Gateway) publishInputSteering(msgSim *simulator.TelemetryMsg, frameRef *events.FrameRef) { steering := &events.SteeringMessage{ + FrameRef: frameRef, Steering: float32(msgSim.SteeringAngle), Confidence: 1.0, } - log.Debugf("publish steering '%v'", steering.Steering) - payload, err := proto.Marshal(steering) - if err != nil { - p.log.Errorf("unable to marshal protobuf message: %v", err) + log.Debugf("events steering '%v'", steering.Steering) + for ss := range p.steeringSubscribers { + ss <- steering } - p.publisher.PublishSteering(payload) } -func (p *Gateway) publishInputThrottle(msgSim *simulator.TelemetryMsg) { - steering := &events.ThrottleMessage{ +func (p *Gateway) publishInputThrottle(msgSim *simulator.TelemetryMsg, frameRef *events.FrameRef) { + msg := &events.ThrottleMessage{ + FrameRef: frameRef, Throttle: float32(msgSim.Throttle), Confidence: 1.0, } - log.Debugf("publish throttle '%v'", steering.Throttle) - payload, err := proto.Marshal(steering) - if err != nil { - p.log.Errorf("unable to marshal protobuf message: %v", err) + log.Debugf("events throttle '%v'", msg.Throttle) + for ts := range p.throttleSubscribers { + ts <- msg } - p.publisher.PublishThrottle(payload) +} + +func (p *Gateway) SubscribeFrame() <-chan *events.FrameMessage { + frameChan := make(chan *events.FrameMessage) + p.frameSubscribers[frameChan] = struct{}{} + return frameChan +} +func (p *Gateway) SubscribeSteering() <-chan *events.SteeringMessage { + steeringChan := make(chan *events.SteeringMessage) + p.steeringSubscribers[steeringChan] = struct{}{} + return steeringChan +} +func (p *Gateway) SubscribeThrottle() <-chan *events.ThrottleMessage { + throttleChan := make(chan *events.ThrottleMessage) + p.throttleSubscribers[throttleChan] = struct{}{} + return throttleChan } var connect = func(address string) (io.ReadWriteCloser, error) { @@ -262,64 +304,3 @@ func (p *Gateway) initLastControlMsg() { Brake: 0., } } - -type Publisher interface { - PublishFrame(payload []byte) - PublishThrottle(payload []byte) - PublishSteering(payload []byte) -} - -func NewMqttPublisher(client mqtt.Client, topicFrame, topicThrottle, topicSteering string) Publisher { - return &MqttPublisher{ - client: client, - topicFrame: topicFrame, - topicSteering: topicSteering, - topicThrottle: topicThrottle, - } -} - -type MqttPublisher struct { - client mqtt.Client - topicFrame string - topicSteering string - topicThrottle string -} - -func (m *MqttPublisher) PublishThrottle(payload []byte) { - if m.topicThrottle == "" { - return - } - err := m.publish(m.topicThrottle, payload) - if err != nil { - log.Errorf("unable to publish throttle: %v", err) - } -} - -func (m *MqttPublisher) PublishSteering(payload []byte) { - if m.topicSteering == "" { - return - } - err := m.publish(m.topicSteering, payload) - if err != nil { - log.Errorf("unable to publish steering: %v", err) - } -} - -func (m *MqttPublisher) PublishFrame(payload []byte) { - if m.topicFrame == "" { - return - } - err := m.publish(m.topicFrame, payload) - if err != nil { - log.Errorf("unable to publish frame: %v", err) - } -} - -func (m *MqttPublisher) publish(topic string, payload []byte) error { - token := m.client.Publish(topic, 0, false, payload) - token.WaitTimeout(10 * time.Millisecond) - if err := token.Error(); err != nil { - return fmt.Errorf("unable to publish to topic: %v", err) - } - return nil -} diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index 08d13ec..554ef24 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -3,7 +3,6 @@ package gateway import ( "encoding/json" "github.com/cyrilix/robocar-protobuf/go/events" - "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" "io/ioutil" "strings" @@ -12,51 +11,6 @@ import ( "time" ) -type MockPublisher struct { - notifyFrameChan chan []byte - initNotifyFrameChan sync.Once - notifySteeringChan chan []byte - initNotifySteeringChan sync.Once - notifyThrottleChan chan []byte - initNotifyThrottleChan sync.Once -} - -func (p *MockPublisher) Close() error { - if p.notifyFrameChan != nil { - close(p.notifyFrameChan) - } - if p.notifyThrottleChan != nil { - close(p.notifyThrottleChan) - } - if p.notifySteeringChan != nil { - close(p.notifySteeringChan) - } - return nil -} - -func (p *MockPublisher) PublishFrame(payload []byte) { - p.notifyFrameChan <- payload -} -func (p *MockPublisher) PublishSteering(payload []byte) { - p.notifySteeringChan <- payload -} -func (p *MockPublisher) PublishThrottle(payload []byte) { - p.notifyThrottleChan <- payload -} - -func (p *MockPublisher) NotifyFrame() <-chan []byte { - p.initNotifyFrameChan.Do(func() { p.notifyFrameChan = make(chan []byte) }) - return p.notifyFrameChan -} -func (p *MockPublisher) NotifySteering() <-chan []byte { - p.initNotifySteeringChan.Do(func() { p.notifySteeringChan = make(chan []byte) }) - return p.notifySteeringChan -} -func (p *MockPublisher) NotifyThrottle() <-chan []byte { - p.initNotifyThrottleChan.Do(func() { p.notifyThrottleChan = make(chan []byte) }) - return p.notifyThrottleChan -} - func TestGateway_ListenEvents(t *testing.T) { simulatorMock := Sim2GwMock{} err := simulatorMock.Start() @@ -69,20 +23,21 @@ func TestGateway_ListenEvents(t *testing.T) { } }() - publisher := MockPublisher{} - - part := New(&publisher, simulatorMock.Addr()) + gw := New(simulatorMock.Addr()) go func() { - err := part.Start() + err := gw.Start() if err != nil { t.Fatalf("unable to start gateway simulator: %v", err) } }() defer func() { - if err := part.Close(); err != nil { + if err := gw.Close(); err != nil { t.Errorf("unable to close gateway simulator: %v", err) } }() + frameChannel := gw.SubscribeFrame() + steeringChannel := gw.SubscribeSteering() + throttleChannel := gw.SubscribeThrottle() simulatorMock.WaitConnection() log.Trace("read test data") @@ -98,7 +53,7 @@ func TestGateway_ListenEvents(t *testing.T) { eventsType := map[string]bool{"frame": false, "steering": false, "throttle": false} nbEventsExpected := len(eventsType) wg := sync.WaitGroup{} - // Expect number mqtt event + // Expect number events event wg.Add(nbEventsExpected) finished := make(chan struct{}) @@ -110,19 +65,24 @@ func TestGateway_ListenEvents(t *testing.T) { timeout := time.Tick(100 * time.Millisecond) endLoop := false + var frameRef, steeringIdRef, throttleIdRef *events.FrameRef + for { select { - case byteMsg := <-publisher.NotifyFrame(): - checkFrame(t, byteMsg) + case msg := <-frameChannel: + checkFrame(t, msg) eventsType["frame"] = true + frameRef = msg.Id wg.Done() - case byteMsg := <-publisher.NotifySteering(): - checkSteering(t, byteMsg, line) + case msg := <-steeringChannel: + checkSteering(t, msg, line) eventsType["steering"] = true + steeringIdRef = msg.FrameRef wg.Done() - case byteMsg := <-publisher.NotifyThrottle(): - checkThrottle(t, byteMsg, line) + case msg := <-throttleChannel: + checkThrottle(t, msg, line) eventsType["throttle"] = true + throttleIdRef = msg.FrameRef wg.Done() case <-finished: log.Trace("loop ended") @@ -132,6 +92,12 @@ func TestGateway_ListenEvents(t *testing.T) { t.FailNow() } if endLoop { + if frameRef != steeringIdRef || steeringIdRef == nil { + t.Errorf("steering msg without frameRef '%#v', wants '%#v'", steeringIdRef, frameRef) + } + if frameRef != throttleIdRef || throttleIdRef == nil { + t.Errorf("throttle msg without frameRef '%#v', wants '%#v'", throttleIdRef, frameRef) + } break } } @@ -143,12 +109,7 @@ func TestGateway_ListenEvents(t *testing.T) { } } -func checkFrame(t *testing.T, byteMsg []byte) { - var msg events.FrameMessage - err := proto.Unmarshal(byteMsg, &msg) - if err != nil { - t.Errorf("unable to unmarshal frame msg: %v", err) - } +func checkFrame(t *testing.T, msg *events.FrameMessage) { if msg.GetId() == nil { t.Error("frame msg has not Id") } @@ -156,7 +117,7 @@ func checkFrame(t *testing.T, byteMsg []byte) { t.Errorf("[%v] invalid frame image: %v", msg.Id, msg.GetFrame()) } } -func checkSteering(t *testing.T, byteMsg []byte, rawLine string) { +func checkSteering(t *testing.T, msg *events.SteeringMessage, rawLine string) { var input map[string]interface{} err := json.Unmarshal([]byte(rawLine), &input) if err != nil { @@ -165,12 +126,6 @@ func checkSteering(t *testing.T, byteMsg []byte, rawLine string) { steering := input["steering_angle"].(float64) expectedSteering := float32(steering) - var msg events.SteeringMessage - err = proto.Unmarshal(byteMsg, &msg) - if err != nil { - t.Errorf("unable to unmarshal steering msg: %v", err) - } - if msg.GetSteering() != expectedSteering { t.Errorf("invalid steering value: %f, wants %f", msg.GetSteering(), expectedSteering) } @@ -179,7 +134,7 @@ func checkSteering(t *testing.T, byteMsg []byte, rawLine string) { } } -func checkThrottle(t *testing.T, byteMsg []byte, rawLine string) { +func checkThrottle(t *testing.T, msg *events.ThrottleMessage, rawLine string) { var input map[string]interface{} err := json.Unmarshal([]byte(rawLine), &input) if err != nil { @@ -188,14 +143,8 @@ func checkThrottle(t *testing.T, byteMsg []byte, rawLine string) { throttle := input["throttle"].(float64) expectedThrottle := float32(throttle) - var msg events.SteeringMessage - err = proto.Unmarshal(byteMsg, &msg) - if err != nil { - t.Errorf("unable to unmarshal throttle msg: %v", err) - } - - if msg.GetSteering() != expectedThrottle { - t.Errorf("invalid throttle value: %f, wants %f", msg.GetSteering(), expectedThrottle) + if msg.Throttle != expectedThrottle { + t.Errorf("invalid throttle value: %f, wants %f", msg.Throttle, expectedThrottle) } if msg.Confidence != 1.0 { t.Errorf("invalid throttle confidence: %f, wants %f", msg.Confidence, 1.0) diff --git a/pkg/gateway/util_test.go.go b/pkg/gateway/util_test.go similarity index 100% rename from pkg/gateway/util_test.go.go rename to pkg/gateway/util_test.go diff --git a/pkg/simulator/simulator.go.go b/pkg/simulator/simulator.go similarity index 100% rename from pkg/simulator/simulator.go.go rename to pkg/simulator/simulator.go