diff --git a/cmd/rc-simulator/rc-simultator.go b/cmd/rc-simulator/rc-simultator.go index 224968a..547d479 100644 --- a/cmd/rc-simulator/rc-simultator.go +++ b/cmd/rc-simulator/rc-simultator.go @@ -4,7 +4,7 @@ import ( "flag" "github.com/cyrilix/robocar-base/cli" "github.com/cyrilix/robocar-simulator/pkg/gateway" - log"github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" "os" ) @@ -39,7 +39,6 @@ func main() { } defer client.Disconnect(10) - c := gateway.New(gateway.NewMqttPublisher(client, topicFrame, "", ""), address) defer c.Stop() diff --git a/pkg/controls/controls.go b/pkg/controls/controls.go deleted file mode 100644 index 1b0dcdf..0000000 --- a/pkg/controls/controls.go +++ /dev/null @@ -1,123 +0,0 @@ -package controls - -import ( - "bufio" - "encoding/json" - "fmt" - "github.com/cyrilix/robocar-protobuf/go/events" - "github.com/cyrilix/robocar-simulator/pkg/simulator" - log "github.com/sirupsen/logrus" - "io" - "net" - "sync" -) - -type SteeringController interface { - WriteSteering(message *events.SteeringMessage) -} - -type ThrottleController interface { - WriteThrottle(message *events.ThrottleMessage) -} - -func New(address string) (*Gateway, error) { - conn, err := net.Dial("tcp", address) - if err != nil { - return nil, fmt.Errorf("unable to connect to %v", address) - } - return &Gateway{ - conn: conn, - }, nil -} - -/* Simulator interface to publish command controls from mqtt topic */ -type Gateway struct { - cancel chan interface{} - - muControl sync.Mutex - lastControl *simulator.ControlMsg - conn io.WriteCloser -} - -func (g *Gateway) Start() { - log.Info("connect to simulator") - g.cancel = make(chan interface{}) -} - -func (g *Gateway) Stop() { - log.Info("close simulator gateway") - close(g.cancel) - - if err := g.Close(); err != nil { - log.Printf("unexpected error while simulator connection is closed: %v", err) - } -} - -func (g *Gateway) Close() error { - if g.conn == nil { - log.Warnln("no connection to close") - return nil - } - if err := g.conn.Close(); err != nil { - return fmt.Errorf("unable to close connection to simulator: %v", err) - } - return nil -} - -func (g *Gateway) WriteSteering(message *events.SteeringMessage) { - g.muControl.Lock() - defer g.muControl.Unlock() - g.initLastControlMsg() - - g.lastControl.Steering = message.Steering - g.writeContent() -} - -func (g *Gateway) WriteThrottle(message *events.ThrottleMessage) { - g.muControl.Lock() - defer g.muControl.Unlock() - g.initLastControlMsg() - - if message.Throttle > 0 { - g.lastControl.Throttle = message.Throttle - g.lastControl.Brake = 0. - } else { - g.lastControl.Throttle = 0. - g.lastControl.Brake = -1 * message.Throttle - } - - g.writeContent() -} - -func (g *Gateway) writeContent() { - w := bufio.NewWriter(g.conn) - content, err := json.Marshal(g.lastControl) - if err != nil { - log.Errorf("unable to marshall control msg \"%#v\": %v", g.lastControl, err) - return - } - - _, err = w.Write(append(content, '\n')) - if err != nil { - log.Errorf("unable to write control msg \"%#v\" to simulator: %v", g.lastControl, err) - return - } - err = w.Flush() - if err != nil { - log.Errorf("unable to flush control msg \"%#v\" to simulator: %v", g.lastControl, err) - return - } -} - -func (g *Gateway) initLastControlMsg() { - if g.lastControl != nil { - return - } - g.lastControl = &simulator.ControlMsg{ - MsgType: "control", - Steering: 0., - Throttle: 0., - Brake: 0., - } -} - diff --git a/pkg/controls/controls_test.go b/pkg/gateway/controls_test.go similarity index 91% rename from pkg/controls/controls_test.go rename to pkg/gateway/controls_test.go index bc01caf..142bbcb 100644 --- a/pkg/controls/controls_test.go +++ b/pkg/gateway/controls_test.go @@ -1,4 +1,4 @@ -package controls +package gateway import ( "bufio" @@ -60,28 +60,30 @@ func TestGateway_WriteSteering(t *testing.T) { }}, } - simulatorMock := ConnMock{} + simulatorMock := Gw2SimMock{} err := simulatorMock.listen() if err != nil { - t.Errorf("unable to start mock server: %v", err) + t.Errorf("unable to start mock gw: %v", err) } - defer func(){ + defer func() { if err := simulatorMock.Close(); err != nil { t.Errorf("unable to stop simulator mock: %v", err) } }() - server, err := New(simulatorMock.Addr()) + gw := New(nil, simulatorMock.Addr()) if err != nil { t.Fatalf("unable to init simulator gateway: %v", err) } + go gw.Start() + defer gw.Close() for _, c := range cases { - server.lastControl = c.previousMsg + gw.lastControl = c.previousMsg - server.WriteSteering(c.msg) + gw.WriteSteering(c.msg) - ctrlMsg := <- simulatorMock.Notify() + ctrlMsg := <-simulatorMock.Notify() if *ctrlMsg != c.expectedMsg { t.Errorf("[%v] bad messge received: %#v, wants %#v", c.name, ctrlMsg, c.expectedMsg) } @@ -177,28 +179,28 @@ func TestGateway_WriteThrottle(t *testing.T) { }}, } - simulatorMock := ConnMock{} + simulatorMock := Gw2SimMock{} err := simulatorMock.listen() if err != nil { - t.Errorf("unable to start mock server: %v", err) + t.Errorf("unable to start mock gw: %v", err) } - defer func(){ + defer func() { if err := simulatorMock.Close(); err != nil { t.Errorf("unable to stop simulator mock: %v", err) } }() - server, err := New(simulatorMock.Addr()) + gw := New(nil, simulatorMock.Addr()) if err != nil { t.Fatalf("unable to init simulator gateway: %v", err) } for _, c := range cases { - server.lastControl = c.previousMsg + gw.lastControl = c.previousMsg - server.WriteThrottle(c.msg) + gw.WriteThrottle(c.msg) - ctrlMsg := <- simulatorMock.Notify() + ctrlMsg := <-simulatorMock.Notify() if *ctrlMsg != c.expectedMsg { t.Errorf("[%v] bad messge received: %#v, wants %#v", c.name, ctrlMsg, c.expectedMsg) } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 2d8edea..48a1dad 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -13,6 +13,7 @@ import ( log "github.com/sirupsen/logrus" "io" "net" + "sync" "time" ) @@ -32,7 +33,11 @@ type Gateway struct { cancel chan interface{} address string - conn io.ReadCloser + muConn sync.Mutex + conn io.ReadWriteCloser + + muControl sync.Mutex + lastControl *simulator.ControlMsg publisher Publisher log *log.Entry @@ -78,18 +83,7 @@ func (p *Gateway) Close() error { } func (p *Gateway) run(msgChan chan<- *simulator.TelemetryMsg) { - err := retry.Do(func() error { - p.log.Info("connect to simulator") - conn, err := connect(p.address) - if err != nil { - return fmt.Errorf("unable to connect to simulator at %v", p.address) - } - p.conn = conn - p.log.Info("connection success") - return nil - }, - retry.Delay(1*time.Second), - ) + err := p.connect() if err != nil { p.log.Panicf("unable to connect to simulator: %v", err) } @@ -104,6 +98,29 @@ func (p *Gateway) run(msgChan chan<- *simulator.TelemetryMsg) { } } +func (p *Gateway) connect() error { + p.muConn.Lock() + defer p.muConn.Unlock() + + if p.conn != nil { + // already connected + return nil + } + err := retry.Do(func() error { + p.log.Info("connect to simulator") + conn, err := connect(p.address) + if err != nil { + return fmt.Errorf("unable to connect to simulator at %v", p.address) + } + p.conn = conn + p.log.Info("connection success") + return nil + }, + retry.Delay(1*time.Second), + ) + return err +} + func (p *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.Reader) error { for { rawLine, err := reader.ReadBytes('\n') @@ -151,7 +168,7 @@ func (p *Gateway) publishFrame(msgSim *simulator.TelemetryMsg) { func (p *Gateway) publishInputSteering(msgSim *simulator.TelemetryMsg) { steering := &events.SteeringMessage{ - Steering: float32(msgSim.SteeringAngle), + Steering: float32(msgSim.SteeringAngle), Confidence: 1.0, } @@ -165,7 +182,7 @@ func (p *Gateway) publishInputSteering(msgSim *simulator.TelemetryMsg) { func (p *Gateway) publishInputThrottle(msgSim *simulator.TelemetryMsg) { steering := &events.ThrottleMessage{ - Throttle: float32(msgSim.Throttle), + Throttle: float32(msgSim.Throttle), Confidence: 1.0, } @@ -185,6 +202,67 @@ var connect = func(address string) (io.ReadWriteCloser, error) { return conn, nil } +func (p *Gateway) WriteSteering(message *events.SteeringMessage) { + p.muControl.Lock() + defer p.muControl.Unlock() + p.initLastControlMsg() + + p.lastControl.Steering = message.Steering + p.writeControlCommandToSimulator() +} + +func (p *Gateway) writeControlCommandToSimulator() { + if err := p.connect(); err != nil { + p.log.Errorf("unable to connect to simulator to send control command: %v", err) + return + } + w := bufio.NewWriter(p.conn) + content, err := json.Marshal(p.lastControl) + if err != nil { + p.log.Errorf("unable to marshall control msg \"%#v\": %v", p.lastControl, err) + return + } + + _, err = w.Write(append(content, '\n')) + if err != nil { + p.log.Errorf("unable to write control msg \"%#v\" to simulator: %v", p.lastControl, err) + return + } + err = w.Flush() + if err != nil { + p.log.Errorf("unable to flush control msg \"%#v\" to simulator: %v", p.lastControl, err) + return + } +} + +func (p *Gateway) WriteThrottle(message *events.ThrottleMessage) { + p.muControl.Lock() + defer p.muControl.Unlock() + p.initLastControlMsg() + + if message.Throttle > 0 { + p.lastControl.Throttle = message.Throttle + p.lastControl.Brake = 0. + } else { + p.lastControl.Throttle = 0. + p.lastControl.Brake = -1 * message.Throttle + } + + p.writeControlCommandToSimulator() +} + +func (p *Gateway) initLastControlMsg() { + if p.lastControl != nil { + return + } + p.lastControl = &simulator.ControlMsg{ + MsgType: "control", + Steering: 0., + Throttle: 0., + Brake: 0., + } +} + type Publisher interface { PublishFrame(payload []byte) PublishThrottle(payload []byte) @@ -193,16 +271,16 @@ type Publisher interface { func NewMqttPublisher(client mqtt.Client, topicFrame, topicThrottle, topicSteering string) Publisher { return &MqttPublisher{ - client: client, - topicFrame: topicFrame, + client: client, + topicFrame: topicFrame, topicSteering: topicSteering, topicThrottle: topicThrottle, } } type MqttPublisher struct { - client mqtt.Client - topicFrame string + client mqtt.Client + topicFrame string topicSteering string topicThrottle string } diff --git a/pkg/gateway/gateway_test.go b/pkg/gateway/gateway_test.go index d60b321..08d13ec 100644 --- a/pkg/gateway/gateway_test.go +++ b/pkg/gateway/gateway_test.go @@ -1,15 +1,11 @@ package gateway import ( - "bufio" "encoding/json" - "fmt" "github.com/cyrilix/robocar-protobuf/go/events" "github.com/golang/protobuf/proto" log "github.com/sirupsen/logrus" - "io" "io/ioutil" - "net" "strings" "sync" "testing" @@ -61,8 +57,8 @@ func (p *MockPublisher) NotifyThrottle() <-chan []byte { return p.notifyThrottleChan } -func TestPart_ListenEvents(t *testing.T) { - simulatorMock := SimulatorMock{} +func TestGateway_ListenEvents(t *testing.T) { + simulatorMock := Sim2GwMock{} err := simulatorMock.Start() if err != nil { t.Errorf("unable to start mock server: %v", err) @@ -147,7 +143,6 @@ func TestPart_ListenEvents(t *testing.T) { } } - func checkFrame(t *testing.T, byteMsg []byte) { var msg events.FrameMessage err := proto.Unmarshal(byteMsg, &msg) @@ -176,7 +171,7 @@ func checkSteering(t *testing.T, byteMsg []byte, rawLine string) { t.Errorf("unable to unmarshal steering msg: %v", err) } - if msg.GetSteering() != expectedSteering{ + if msg.GetSteering() != expectedSteering { t.Errorf("invalid steering value: %f, wants %f", msg.GetSteering(), expectedSteering) } if msg.Confidence != 1.0 { @@ -206,84 +201,3 @@ func checkThrottle(t *testing.T, byteMsg []byte, rawLine string) { t.Errorf("invalid throttle confidence: %f, wants %f", msg.Confidence, 1.0) } } - -type SimulatorMock struct { - ln net.Listener - muConn sync.Mutex - conn net.Conn - writer *bufio.Writer - newConnection chan net.Conn - logger *log.Entry -} - -func (c *SimulatorMock) EmitMsg(p string) (err error) { - c.muConn.Lock() - defer c.muConn.Unlock() - _, err = c.writer.WriteString(p + "\n") - if err != nil { - c.logger.Errorf("unable to write response: %v", err) - } - if err == io.EOF { - c.logger.Info("Connection closed") - return err - } - err = c.writer.Flush() - return err -} - -func (c *SimulatorMock) WaitConnection() { - c.muConn.Lock() - defer c.muConn.Unlock() - c.logger.Debug("simulator waiting connection") - if c.conn != nil { - return - } - c.logger.Debug("new connection") - conn := <-c.newConnection - - c.conn = conn - c.writer = bufio.NewWriter(conn) -} - -func (c *SimulatorMock) Start() error { - c.logger = log.WithField("simulator", "mock") - c.newConnection = make(chan net.Conn) - ln, err := net.Listen("tcp", "127.0.0.1:") - c.ln = ln - if err != nil { - return fmt.Errorf("unable to listen on port: %v", err) - } - go func() { - for { - conn, err := c.ln.Accept() - if err != nil && err == io.EOF { - c.logger.Errorf("connection close: %v", err) - break - } - if c.newConnection == nil { - break - } - c.newConnection <- conn - } - }() - return nil -} - -func (c *SimulatorMock) Addr() string { - return c.ln.Addr().String() -} - -func (c *SimulatorMock) Close() error { - c.logger.Debug("close mock server") - - if c == nil { - return nil - } - close(c.newConnection) - c.newConnection = nil - err := c.ln.Close() - if err != nil { - return fmt.Errorf("unable to close mock server: %v", err) - } - return nil -} diff --git a/pkg/gateway/util_test.go.go b/pkg/gateway/util_test.go.go new file mode 100644 index 0000000..4a7dfc6 --- /dev/null +++ b/pkg/gateway/util_test.go.go @@ -0,0 +1,168 @@ +package gateway + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/cyrilix/robocar-simulator/pkg/simulator" + log "github.com/sirupsen/logrus" + "io" + "net" + "sync" +) + +type Gw2SimMock struct { + initMsgsOnce sync.Once + + ln net.Listener + notifyChan chan *simulator.ControlMsg + initNotifyChan sync.Once +} + +func (c *Gw2SimMock) Notify() <-chan *simulator.ControlMsg { + c.initNotifyChan.Do(func() { c.notifyChan = make(chan *simulator.ControlMsg) }) + return c.notifyChan +} + +func (c *Gw2SimMock) listen() error { + ln, err := net.Listen("tcp", "127.0.0.1:") + c.ln = ln + if err != nil { + + return fmt.Errorf("unable to listen on port: %v", err) + } + + go func() { + for { + conn, err := c.ln.Accept() + if err != nil { + log.Debugf("connection close: %v", err) + break + } + go c.handleConnection(conn) + } + }() + return nil +} + +func (c *Gw2SimMock) Addr() string { + return c.ln.Addr().String() +} + +func (c *Gw2SimMock) handleConnection(conn net.Conn) { + c.initNotifyChan.Do(func() { c.notifyChan = make(chan *simulator.ControlMsg) }) + reader := bufio.NewReader(conn) + for { + rawCmd, err := reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + log.Debug("connection closed") + break + } + log.Errorf("unable to read request: %v", err) + return + } + + var msg simulator.ControlMsg + err = json.Unmarshal(rawCmd, &msg) + if err != nil { + log.Errorf("unable to unmarchal control msg \"%v\": %v", string(rawCmd), err) + continue + } + + c.notifyChan <- &msg + } +} + +func (c *Gw2SimMock) Close() error { + log.Debugf("close mock server") + err := c.ln.Close() + if err != nil { + return fmt.Errorf("unable to close mock server: %v", err) + } + if c.notifyChan != nil { + close(c.notifyChan) + } + return nil +} + +type Sim2GwMock struct { + ln net.Listener + muConn sync.Mutex + conn net.Conn + writer *bufio.Writer + newConnection chan net.Conn + logger *log.Entry +} + +func (c *Sim2GwMock) EmitMsg(p string) (err error) { + c.muConn.Lock() + defer c.muConn.Unlock() + _, err = c.writer.WriteString(p + "\n") + if err != nil { + c.logger.Errorf("unable to write response: %v", err) + } + if err == io.EOF { + c.logger.Info("Connection closed") + return err + } + err = c.writer.Flush() + return err +} + +func (c *Sim2GwMock) WaitConnection() { + c.muConn.Lock() + defer c.muConn.Unlock() + c.logger.Debug("simulator waiting connection") + if c.conn != nil { + return + } + c.logger.Debug("new connection") + conn := <-c.newConnection + + c.conn = conn + c.writer = bufio.NewWriter(conn) +} + +func (c *Sim2GwMock) Start() error { + c.logger = log.WithField("simulator", "mock") + c.newConnection = make(chan net.Conn) + ln, err := net.Listen("tcp", "127.0.0.1:") + c.ln = ln + if err != nil { + return fmt.Errorf("unable to listen on port: %v", err) + } + go func() { + for { + conn, err := c.ln.Accept() + if err != nil && err == io.EOF { + c.logger.Errorf("connection close: %v", err) + break + } + if c.newConnection == nil { + break + } + c.newConnection <- conn + } + }() + return nil +} + +func (c *Sim2GwMock) Addr() string { + return c.ln.Addr().String() +} + +func (c *Sim2GwMock) Close() error { + c.logger.Debug("close mock server") + + if c == nil { + return nil + } + close(c.newConnection) + c.newConnection = nil + err := c.ln.Close() + if err != nil { + return fmt.Errorf("unable to close mock server: %v", err) + } + return nil +}