Implement steering/throttle reading from simulator
This commit is contained in:
		
							
								
								
									
										123
									
								
								pkg/controls/controls.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										123
									
								
								pkg/controls/controls.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,123 @@
 | 
			
		||||
package controls
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/cyrilix/robocar-protobuf/go/events"
 | 
			
		||||
	"github.com/cyrilix/robocar-simulator/pkg/simulator"
 | 
			
		||||
	log "github.com/sirupsen/logrus"
 | 
			
		||||
	"io"
 | 
			
		||||
	"net"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type SteeringController interface {
 | 
			
		||||
 	WriteSteering(message *events.SteeringMessage)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ThrottleController interface {
 | 
			
		||||
	WriteThrottle(message *events.ThrottleMessage)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(address string) (*Gateway, error) {
 | 
			
		||||
	conn, err := net.Dial("tcp", address)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("unable to connect to %v", address)
 | 
			
		||||
	}
 | 
			
		||||
	return &Gateway{
 | 
			
		||||
		conn:  conn,
 | 
			
		||||
	}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* Simulator interface to publish command controls from mqtt topic */
 | 
			
		||||
type Gateway struct {
 | 
			
		||||
	cancel           chan interface{}
 | 
			
		||||
 | 
			
		||||
	muControl   sync.Mutex
 | 
			
		||||
	lastControl *simulator.ControlMsg
 | 
			
		||||
	conn        io.WriteCloser
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) Start() {
 | 
			
		||||
	log.Info("connect to simulator")
 | 
			
		||||
	g.cancel = make(chan interface{})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) Stop() {
 | 
			
		||||
	log.Info("close simulator gateway")
 | 
			
		||||
	close(g.cancel)
 | 
			
		||||
 | 
			
		||||
	if err := g.Close(); err != nil {
 | 
			
		||||
		log.Printf("unexpected error while simulator connection is closed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) Close() error {
 | 
			
		||||
	if g.conn == nil {
 | 
			
		||||
		log.Warnln("no connection to close")
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	if err := g.conn.Close(); err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to close connection to simulator: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) WriteSteering(message *events.SteeringMessage) {
 | 
			
		||||
	g.muControl.Lock()
 | 
			
		||||
	defer g.muControl.Unlock()
 | 
			
		||||
	g.initLastControlMsg()
 | 
			
		||||
 | 
			
		||||
	g.lastControl.Steering = message.Steering
 | 
			
		||||
	g.writeContent()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) WriteThrottle(message *events.ThrottleMessage) {
 | 
			
		||||
	g.muControl.Lock()
 | 
			
		||||
	defer g.muControl.Unlock()
 | 
			
		||||
	g.initLastControlMsg()
 | 
			
		||||
 | 
			
		||||
	if message.Throttle > 0 {
 | 
			
		||||
		g.lastControl.Throttle = message.Throttle
 | 
			
		||||
		g.lastControl.Brake = 0.
 | 
			
		||||
	} else {
 | 
			
		||||
		g.lastControl.Throttle = 0.
 | 
			
		||||
		g.lastControl.Brake = -1 * message.Throttle
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	g.writeContent()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) writeContent() {
 | 
			
		||||
	w := bufio.NewWriter(g.conn)
 | 
			
		||||
	content, err := json.Marshal(g.lastControl)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("unable to marshall control msg \"%#v\": %v", g.lastControl, err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = w.Write(append(content, '\n'))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("unable to write control msg \"%#v\" to simulator: %v", g.lastControl, err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	err = w.Flush()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("unable to flush control msg \"%#v\" to simulator: %v", g.lastControl, err)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (g *Gateway) initLastControlMsg() {
 | 
			
		||||
	if g.lastControl != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	g.lastControl = &simulator.ControlMsg{
 | 
			
		||||
		MsgType:  "control",
 | 
			
		||||
		Steering: 0.,
 | 
			
		||||
		Throttle: 0.,
 | 
			
		||||
		Brake:    0.,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										281
									
								
								pkg/controls/controls_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										281
									
								
								pkg/controls/controls_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,281 @@
 | 
			
		||||
package controls
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/cyrilix/robocar-protobuf/go/events"
 | 
			
		||||
	"github.com/cyrilix/robocar-simulator/pkg/simulator"
 | 
			
		||||
	log "github.com/sirupsen/logrus"
 | 
			
		||||
	"io"
 | 
			
		||||
	"net"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestGateway_WriteSteering(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		name        string
 | 
			
		||||
		msg         *events.SteeringMessage
 | 
			
		||||
		previousMsg *simulator.ControlMsg
 | 
			
		||||
		expectedMsg simulator.ControlMsg
 | 
			
		||||
	}{
 | 
			
		||||
		{"First Message",
 | 
			
		||||
			&events.SteeringMessage{Steering: 0.5, Confidence: 1},
 | 
			
		||||
			nil,
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.5,
 | 
			
		||||
				Throttle: 0,
 | 
			
		||||
				Brake:    0,
 | 
			
		||||
			}},
 | 
			
		||||
		{"Update steering",
 | 
			
		||||
			&events.SteeringMessage{Steering: -0.5, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0,
 | 
			
		||||
				Brake:    0,
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: -0.5,
 | 
			
		||||
				Throttle: 0,
 | 
			
		||||
				Brake:    0,
 | 
			
		||||
			}},
 | 
			
		||||
		{"Update steering shouldn't erase throttle value",
 | 
			
		||||
			&events.SteeringMessage{Steering: -0.3, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.6,
 | 
			
		||||
				Brake:    0.1,
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: -0.3,
 | 
			
		||||
				Throttle: 0.6,
 | 
			
		||||
				Brake:    0.1,
 | 
			
		||||
			}},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	simulatorMock := ConnMock{}
 | 
			
		||||
	err := simulatorMock.listen()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unable to start mock server: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer func(){
 | 
			
		||||
		if err := simulatorMock.Close(); err != nil {
 | 
			
		||||
			t.Errorf("unable to stop simulator mock: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	server, err := New(simulatorMock.Addr())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unable to init simulator gateway: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, c := range cases {
 | 
			
		||||
		server.lastControl = c.previousMsg
 | 
			
		||||
 | 
			
		||||
		server.WriteSteering(c.msg)
 | 
			
		||||
 | 
			
		||||
		ctrlMsg := <- simulatorMock.Notify()
 | 
			
		||||
		if *ctrlMsg != c.expectedMsg {
 | 
			
		||||
			t.Errorf("[%v] bad messge received: %#v, wants %#v", c.name, ctrlMsg, c.expectedMsg)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestGateway_WriteThrottle(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		name        string
 | 
			
		||||
		msg         *events.ThrottleMessage
 | 
			
		||||
		previousMsg *simulator.ControlMsg
 | 
			
		||||
		expectedMsg simulator.ControlMsg
 | 
			
		||||
	}{
 | 
			
		||||
		{"First Message",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.5, Confidence: 1},
 | 
			
		||||
			nil,
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0,
 | 
			
		||||
				Throttle: 0.5,
 | 
			
		||||
				Brake:    0,
 | 
			
		||||
			}},
 | 
			
		||||
		{"Update Throttle",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.6, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0,
 | 
			
		||||
				Throttle: 0.4,
 | 
			
		||||
				Brake:    0,
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0,
 | 
			
		||||
				Throttle: 0.6,
 | 
			
		||||
				Brake:    0,
 | 
			
		||||
			}},
 | 
			
		||||
		{"Update steering shouldn't erase throttle value",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.3, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.6,
 | 
			
		||||
				Brake:    0.,
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.3,
 | 
			
		||||
				Brake:    0.,
 | 
			
		||||
			}},
 | 
			
		||||
		{"Throttle to brake",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: -0.7, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.6,
 | 
			
		||||
				Brake:    0.,
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.,
 | 
			
		||||
				Brake:    0.7,
 | 
			
		||||
			}},
 | 
			
		||||
		{"Update brake",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: -0.2, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.,
 | 
			
		||||
				Brake:    0.5,
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.,
 | 
			
		||||
				Brake:    0.2,
 | 
			
		||||
			}},
 | 
			
		||||
		{"Brake to throttle",
 | 
			
		||||
			&events.ThrottleMessage{Throttle: 0.9, Confidence: 1},
 | 
			
		||||
			&simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.,
 | 
			
		||||
				Brake:    0.4,
 | 
			
		||||
			},
 | 
			
		||||
			simulator.ControlMsg{
 | 
			
		||||
				MsgType:  "control",
 | 
			
		||||
				Steering: 0.2,
 | 
			
		||||
				Throttle: 0.9,
 | 
			
		||||
				Brake:    0.,
 | 
			
		||||
			}},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	simulatorMock := ConnMock{}
 | 
			
		||||
	err := simulatorMock.listen()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unable to start mock server: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer func(){
 | 
			
		||||
		if err := simulatorMock.Close(); err != nil {
 | 
			
		||||
			t.Errorf("unable to stop simulator mock: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	server, err := New(simulatorMock.Addr())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("unable to init simulator gateway: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, c := range cases {
 | 
			
		||||
		server.lastControl = c.previousMsg
 | 
			
		||||
 | 
			
		||||
		server.WriteThrottle(c.msg)
 | 
			
		||||
 | 
			
		||||
		ctrlMsg := <- simulatorMock.Notify()
 | 
			
		||||
		if *ctrlMsg != c.expectedMsg {
 | 
			
		||||
			t.Errorf("[%v] bad messge received: %#v, wants %#v", c.name, ctrlMsg, c.expectedMsg)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ConnMock struct {
 | 
			
		||||
	initMsgsOnce sync.Once
 | 
			
		||||
 | 
			
		||||
	ln             net.Listener
 | 
			
		||||
	notifyChan     chan *simulator.ControlMsg
 | 
			
		||||
	initNotifyChan sync.Once
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConnMock) Notify() <-chan *simulator.ControlMsg {
 | 
			
		||||
	c.initNotifyChan.Do(func() { c.notifyChan = make(chan *simulator.ControlMsg) })
 | 
			
		||||
	return c.notifyChan
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConnMock) listen() error {
 | 
			
		||||
	ln, err := net.Listen("tcp", "127.0.0.1:")
 | 
			
		||||
	c.ln = ln
 | 
			
		||||
	if err != nil {
 | 
			
		||||
 | 
			
		||||
		return fmt.Errorf("unable to listen on port: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		for {
 | 
			
		||||
			conn, err := c.ln.Accept()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Infof("connection close: %v", err)
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			go c.handleConnection(conn)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConnMock) Addr() string {
 | 
			
		||||
	return c.ln.Addr().String()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConnMock) handleConnection(conn net.Conn) {
 | 
			
		||||
	c.initNotifyChan.Do(func() { c.notifyChan = make(chan *simulator.ControlMsg) })
 | 
			
		||||
	reader := bufio.NewReader(conn)
 | 
			
		||||
	for {
 | 
			
		||||
		rawCmd, err := reader.ReadBytes('\n')
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			if err == io.EOF {
 | 
			
		||||
				log.Info("connection closed")
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			log.Errorf("unable to read request: %v", err)
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		var msg simulator.ControlMsg
 | 
			
		||||
		err = json.Unmarshal(rawCmd, &msg)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Errorf("unable to unmarchal control msg \"%v\": %v", string(rawCmd), err)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		c.notifyChan <- &msg
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConnMock) Close() error {
 | 
			
		||||
	log.Infof("close mock server")
 | 
			
		||||
	err := c.ln.Close()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("unable to close mock server: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	if c.notifyChan != nil {
 | 
			
		||||
		close(c.notifyChan)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user