feat: send car configuration at startime
This commit is contained in:
		@@ -18,7 +18,7 @@ func TestGateway_WriteSteering(t *testing.T) {
 | 
			
		||||
			&events.SteeringMessage{Steering: 0.5, Confidence: 1},
 | 
			
		||||
			nil,
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.50",
 | 
			
		||||
				Throttle: "0.0",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
@@ -26,13 +26,13 @@ func TestGateway_WriteSteering(t *testing.T) {
 | 
			
		||||
		{"Update steering",
 | 
			
		||||
			&events.SteeringMessage{Steering: -0.5, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.0",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "-0.50",
 | 
			
		||||
				Throttle: "0.0",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
@@ -40,13 +40,13 @@ func TestGateway_WriteSteering(t *testing.T) {
 | 
			
		||||
		{"Update steering shouldn't erase throttle value",
 | 
			
		||||
			&events.SteeringMessage{Steering: -0.3, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.6",
 | 
			
		||||
				Brake:    "0.1",
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "-0.30",
 | 
			
		||||
				Throttle: "0.6",
 | 
			
		||||
				Brake:    "0.1",
 | 
			
		||||
@@ -95,7 +95,7 @@ func TestGateway_WriteThrottle(t *testing.T) {
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.5, Confidence: 1},
 | 
			
		||||
			nil,
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.0",
 | 
			
		||||
				Throttle: "0.50",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
@@ -103,13 +103,13 @@ func TestGateway_WriteThrottle(t *testing.T) {
 | 
			
		||||
		{"Update Throttle",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.6, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0",
 | 
			
		||||
				Throttle: "0.4",
 | 
			
		||||
				Brake:    "0",
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0",
 | 
			
		||||
				Throttle: "0.60",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
@@ -117,13 +117,13 @@ func TestGateway_WriteThrottle(t *testing.T) {
 | 
			
		||||
		{"Update steering shouldn't erase throttle value",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.3, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.6",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.30",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
@@ -131,13 +131,13 @@ func TestGateway_WriteThrottle(t *testing.T) {
 | 
			
		||||
		{"Throttle to brake",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: -0.7, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.6",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.0",
 | 
			
		||||
				Brake:    "0.70",
 | 
			
		||||
@@ -145,13 +145,13 @@ func TestGateway_WriteThrottle(t *testing.T) {
 | 
			
		||||
		{"Update brake",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: -0.2, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.0",
 | 
			
		||||
				Brake:    "0.5",
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.0",
 | 
			
		||||
				Brake:    "0.20",
 | 
			
		||||
@@ -159,13 +159,13 @@ func TestGateway_WriteThrottle(t *testing.T) {
 | 
			
		||||
		{"Brake to throttle",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.9, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.0",
 | 
			
		||||
				Brake:    "0.4",
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				MsgType: simulator.MsgTypeControl,
 | 
			
		||||
				Steering: "0.2",
 | 
			
		||||
				Throttle: "0.90",
 | 
			
		||||
				Brake:    "0.0",
 | 
			
		||||
 
 | 
			
		||||
@@ -31,7 +31,7 @@ type ThrottleSource interface {
 | 
			
		||||
	SubscribeThrottle() <-chan *events.ThrottleMessage
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(addressSimulator string) *Gateway {
 | 
			
		||||
func New(addressSimulator string, car *simulator.CarConfigMsg) *Gateway {
 | 
			
		||||
	l := log.WithField("simulator", addressSimulator)
 | 
			
		||||
	l.Info("run gateway from simulator")
 | 
			
		||||
 | 
			
		||||
@@ -41,6 +41,9 @@ func New(addressSimulator string) *Gateway {
 | 
			
		||||
		frameSubscribers:    make(map[chan<- *events.FrameMessage]interface{}),
 | 
			
		||||
		steeringSubscribers: make(map[chan<- *events.SteeringMessage]interface{}),
 | 
			
		||||
		throttleSubscribers: make(map[chan<- *events.ThrottleMessage]interface{}),
 | 
			
		||||
		telemetrySubscribers: make(map[chan *simulator.TelemetryMsg]interface{}),
 | 
			
		||||
		carSubscribers: make(map[chan *simulator.Msg]interface{}),
 | 
			
		||||
		carConfig:           car,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -49,6 +52,8 @@ type Gateway struct {
 | 
			
		||||
	cancel chan interface{}
 | 
			
		||||
 | 
			
		||||
	address string
 | 
			
		||||
 | 
			
		||||
	muCommand sync.Mutex
 | 
			
		||||
	muConn  sync.Mutex
 | 
			
		||||
	conn    io.ReadWriteCloser
 | 
			
		||||
 | 
			
		||||
@@ -60,14 +65,24 @@ type Gateway struct {
 | 
			
		||||
	frameSubscribers    map[chan<- *events.FrameMessage]interface{}
 | 
			
		||||
	steeringSubscribers map[chan<- *events.SteeringMessage]interface{}
 | 
			
		||||
	throttleSubscribers map[chan<- *events.ThrottleMessage]interface{}
 | 
			
		||||
 | 
			
		||||
	telemetrySubscribers map[chan *simulator.TelemetryMsg]interface{}
 | 
			
		||||
	carSubscribers map[chan *simulator.Msg]interface{}
 | 
			
		||||
 | 
			
		||||
	carConfig *simulator.CarConfigMsg
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) Start() error {
 | 
			
		||||
	g.log.Info("connect to simulator")
 | 
			
		||||
	g.cancel = make(chan interface{})
 | 
			
		||||
	msgChan := make(chan *simulator.TelemetryMsg)
 | 
			
		||||
	msgChan := g.subscribeTelemetryEvents()
 | 
			
		||||
 | 
			
		||||
	go g.run(msgChan)
 | 
			
		||||
	go g.run()
 | 
			
		||||
 | 
			
		||||
	err := g.writeCarConfig()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to configure car to server: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
@@ -100,6 +115,12 @@ func (g *Gateway) Close() error {
 | 
			
		||||
	for c := range g.throttleSubscribers {
 | 
			
		||||
		close(c)
 | 
			
		||||
	}
 | 
			
		||||
	for c := range g.telemetrySubscribers {
 | 
			
		||||
		close(c)
 | 
			
		||||
	}
 | 
			
		||||
	for c := range g.carSubscribers {
 | 
			
		||||
		close(c)
 | 
			
		||||
	}
 | 
			
		||||
	if g.conn == nil {
 | 
			
		||||
		g.log.Warn("no connection to close")
 | 
			
		||||
		return nil
 | 
			
		||||
@@ -110,16 +131,14 @@ func (g *Gateway) Close() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) run(msgChan chan<- *simulator.TelemetryMsg) {
 | 
			
		||||
func (g *Gateway) run() {
 | 
			
		||||
	err := g.connect()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		g.log.Panicf("unable to connect to simulator: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	reader := bufio.NewReader(g.conn)
 | 
			
		||||
 | 
			
		||||
	err = retry.Do(
 | 
			
		||||
		func() error { return g.listen(msgChan, reader) },
 | 
			
		||||
		func() error { return g.listen() },
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		g.log.Errorf("unable to connect to server: %v", err)
 | 
			
		||||
@@ -149,7 +168,9 @@ func (g *Gateway) connect() error {
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.Reader) error {
 | 
			
		||||
func (g *Gateway) listen() error {
 | 
			
		||||
	reader := bufio.NewReader(g.conn)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		rawLine, err := reader.ReadBytes('\n')
 | 
			
		||||
		if err == io.EOF {
 | 
			
		||||
@@ -160,15 +181,43 @@ func (g *Gateway) listen(msgChan chan<- *simulator.TelemetryMsg, reader *bufio.R
 | 
			
		||||
			return fmt.Errorf("unable to read response: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var msg simulator.TelemetryMsg
 | 
			
		||||
		var msg simulator.Msg
 | 
			
		||||
		err = json.Unmarshal(rawLine, &msg)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			g.log.Errorf("unable to unmarshal simulator msg '%v': %v", string(rawLine), err)
 | 
			
		||||
		}
 | 
			
		||||
		if "telemetry" != msg.MsgType {
 | 
			
		||||
			continue
 | 
			
		||||
 | 
			
		||||
		switch msg.MsgType {
 | 
			
		||||
		case simulator.MsgTypeTelemetry:
 | 
			
		||||
			g.broadcastTelemetryMsg(rawLine)
 | 
			
		||||
		case simulator.MsgTypeCarLoaded:
 | 
			
		||||
			g.broadcastCarMsg(rawLine)
 | 
			
		||||
		default:
 | 
			
		||||
			log.Warnf("unmanaged simulator message: %v", rawLine)
 | 
			
		||||
		}
 | 
			
		||||
		msgChan <- &msg
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) broadcastTelemetryMsg(rawLine []byte) {
 | 
			
		||||
	for c := range g.telemetrySubscribers {
 | 
			
		||||
 | 
			
		||||
		var tMsg simulator.TelemetryMsg
 | 
			
		||||
		err := json.Unmarshal(rawLine, &tMsg)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			g.log.Errorf("unable to unmarshal telemetry simulator msg '%v': %v", string(rawLine), err)
 | 
			
		||||
		}
 | 
			
		||||
		c <- &tMsg
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) broadcastCarMsg(rawLine []byte) {
 | 
			
		||||
	for c := range g.carSubscribers {
 | 
			
		||||
		var tMsg simulator.Msg
 | 
			
		||||
		err := json.Unmarshal(rawLine, &tMsg)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			g.log.Errorf("unable to unmarshal car simulator msg '%v': %v", string(rawLine), err)
 | 
			
		||||
		}
 | 
			
		||||
		c <- &tMsg
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -240,6 +289,25 @@ func (g *Gateway) SubscribeThrottle() <-chan *events.ThrottleMessage {
 | 
			
		||||
	return throttleChan
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) subscribeTelemetryEvents() chan *simulator.TelemetryMsg {
 | 
			
		||||
	telemetryChan := make(chan *simulator.TelemetryMsg)
 | 
			
		||||
	g.telemetrySubscribers[telemetryChan] = struct{}{}
 | 
			
		||||
	return telemetryChan
 | 
			
		||||
}
 | 
			
		||||
func (g *Gateway) unsubscribeTelemetryEvents(telemetryChan chan *simulator.TelemetryMsg) {
 | 
			
		||||
	delete(g.telemetrySubscribers, telemetryChan)
 | 
			
		||||
	close(telemetryChan)
 | 
			
		||||
}
 | 
			
		||||
func (g *Gateway) subscribeCarEvents() chan *simulator.Msg {
 | 
			
		||||
	carChan := make(chan *simulator.Msg)
 | 
			
		||||
	g.carSubscribers[carChan] = struct{}{}
 | 
			
		||||
	return carChan
 | 
			
		||||
}
 | 
			
		||||
func (g *Gateway) unsubscribeCarEvents(carChan chan *simulator.Msg) {
 | 
			
		||||
	delete(g.carSubscribers, carChan)
 | 
			
		||||
	close(carChan)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var connect = func(address string) (io.ReadWriteCloser, error) {
 | 
			
		||||
	conn, err := net.Dial("tcp", address)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -258,28 +326,38 @@ func (g *Gateway) WriteSteering(message *events.SteeringMessage) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) writeControlCommandToSimulator() {
 | 
			
		||||
	if err := g.connect(); err != nil {
 | 
			
		||||
		g.log.Errorf("unable to connect to simulator to send control command: %v", err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	log.Debugf("write command to simulator: %v", g.lastControl)
 | 
			
		||||
	w := bufio.NewWriter(g.conn)
 | 
			
		||||
	content, err := json.Marshal(g.lastControl)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		g.log.Errorf("unable to marshall control msg \"%#v\": %v", g.lastControl, err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = w.Write(append(content, '\n'))
 | 
			
		||||
	err = g.writeCommand(content)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		g.log.Errorf("unable to write control msg \"%#v\" to simulator: %v", g.lastControl, err)
 | 
			
		||||
		return
 | 
			
		||||
		g.log.Errorf("unable to send control command to simulator: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) writeCommand(content []byte) error {
 | 
			
		||||
	g.muCommand.Lock()
 | 
			
		||||
	defer g.muCommand.Unlock()
 | 
			
		||||
 | 
			
		||||
	if err := g.connect(); err != nil {
 | 
			
		||||
		g.log.Errorf("unable to connect to simulator to send control command: %v", err)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	log.Debugf("write command to simulator: %v", g.lastControl)
 | 
			
		||||
	w := bufio.NewWriter(g.conn)
 | 
			
		||||
 | 
			
		||||
	_, err := w.Write(append(content, '\n'))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to write control msg \"%#v\" to simulator: %v", g.lastControl, err)
 | 
			
		||||
	}
 | 
			
		||||
	err = w.Flush()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		g.log.Errorf("unable to flush control msg \"%#v\" to simulator: %v", g.lastControl, err)
 | 
			
		||||
		return
 | 
			
		||||
		return fmt.Errorf("unable to flush control msg \"%#v\" to simulator: %v", g.lastControl, err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) WriteThrottle(message *events.ThrottleMessage) {
 | 
			
		||||
@@ -292,7 +370,7 @@ func (g *Gateway) WriteThrottle(message *events.ThrottleMessage) {
 | 
			
		||||
		g.lastControl.Brake = "0.0"
 | 
			
		||||
	} else {
 | 
			
		||||
		g.lastControl.Throttle = "0.0"
 | 
			
		||||
		g.lastControl.Brake = fmt.Sprintf("%.2f", -1 * message.Throttle)
 | 
			
		||||
		g.lastControl.Brake = fmt.Sprintf("%.2f", -1*message.Throttle)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	g.writeControlCommandToSimulator()
 | 
			
		||||
@@ -303,9 +381,30 @@ func (g *Gateway) initLastControlMsg() {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.lastControl = &simulator.ControlMsg{
 | 
			
		||||
		MsgType:  "control",
 | 
			
		||||
		MsgType:  simulator.MsgTypeControl,
 | 
			
		||||
		Steering: "0.0",
 | 
			
		||||
		Throttle: "0.0",
 | 
			
		||||
		Brake:    "0.0",
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) writeCarConfig() error {
 | 
			
		||||
	carChan := g.subscribeCarEvents()
 | 
			
		||||
	defer g.unsubscribeCarEvents(carChan)
 | 
			
		||||
 | 
			
		||||
	g.log.Info("Send car configuration")
 | 
			
		||||
 | 
			
		||||
	content, err := json.Marshal(g.carConfig)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to marshall car config msg \"%#v\": %v", g.lastControl, err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = g.writeCommand(content)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to send car config to simulator: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	msg := <- carChan
 | 
			
		||||
	g.log.Infof("Car loaded: %v", msg)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user