Consume objects from mqtt topic

This commit is contained in:
Cyrille Nofficial
2022-08-22 13:12:12 +02:00
parent 2b31a3b7eb
commit 9af8f3a770
7 changed files with 57 additions and 25 deletions

View File

@ -9,13 +9,14 @@ import (
"sync"
)
func NewController(client mqtt.Client, steeringTopic, driveModeTopic, rcSteeringTopic, tfSteeringTopic string, debug bool) *Controller {
func NewController(client mqtt.Client, steeringTopic, driveModeTopic, rcSteeringTopic, tfSteeringTopic, objectsTopic string) *Controller {
return &Controller{
client: client,
steeringTopic: steeringTopic,
driveModeTopic: driveModeTopic,
rcSteeringTopic: rcSteeringTopic,
tfSteeringTopic: tfSteeringTopic,
objectsTopic: objectsTopic,
driveMode: events.DriveMode_USER,
}
@ -28,8 +29,11 @@ type Controller struct {
muDriveMode sync.RWMutex
driveMode events.DriveMode
cancel chan interface{}
driveModeTopic, rcSteeringTopic, tfSteeringTopic string
cancel chan interface{}
driveModeTopic, rcSteeringTopic, tfSteeringTopic, objectsTopic string
muObjects sync.RWMutex
objects []*events.Object
debug bool
}
@ -50,6 +54,19 @@ func (p *Controller) Stop() {
service.StopService("throttle", p.client, p.driveModeTopic, p.rcSteeringTopic, p.tfSteeringTopic)
}
func (p *Controller) onObjects(_ mqtt.Client, message mqtt.Message) {
var msg events.ObjectsMessage
err := proto.Unmarshal(message.Payload(), &msg)
if err != nil {
zap.S().Errorf("unable to unmarshal protobuf %T message: %v", msg, err)
return
}
p.muObjects.Lock()
defer p.muObjects.Unlock()
p.objects = msg.GetObjects()
}
func (p *Controller) onDriveMode(_ mqtt.Client, message mqtt.Message) {
var msg events.DriveModeMessage
err := proto.Unmarshal(message.Payload(), &msg)
@ -115,6 +132,11 @@ var registerCallbacks = func(p *Controller) error {
if err != nil {
return err
}
err = service.RegisterCallback(p.client, p.objectsTopic, p.onObjects)
if err != nil {
return err
}
return nil
}

View File

@ -33,50 +33,58 @@ func TestDefaultSteering(t *testing.T) {
driveModeTopic := "topic/driveMode"
rcSteeringTopic := "topic/rcSteering"
tfSteeringTopic := "topic/tfSteering"
objectsTopic := "topic/objects"
p := NewController(nil, steeringTopic, driveModeTopic, rcSteeringTopic, tfSteeringTopic, true)
p := NewController(nil, steeringTopic, driveModeTopic, rcSteeringTopic, tfSteeringTopic, objectsTopic)
cases := []struct {
driveMode events.DriveModeMessage
rcSteering events.SteeringMessage
tfSteering events.SteeringMessage
expectedSteering events.SteeringMessage
objects events.ObjectsMessage
}{
{
events.DriveModeMessage{DriveMode: events.DriveMode_USER},
events.SteeringMessage{Steering: 0.3, Confidence: 1.0},
events.SteeringMessage{Steering: 0.4, Confidence: 1.0},
events.SteeringMessage{Steering: 0.3, Confidence: 1.0},
events.ObjectsMessage{},
},
{
events.DriveModeMessage{DriveMode: events.DriveMode_PILOT},
events.SteeringMessage{Steering: 0.5, Confidence: 1.0},
events.SteeringMessage{Steering: 0.6, Confidence: 1.0},
events.SteeringMessage{Steering: 0.6, Confidence: 1.0},
events.ObjectsMessage{},
},
{
events.DriveModeMessage{DriveMode: events.DriveMode_PILOT},
events.SteeringMessage{Steering: 0.4, Confidence: 1.0},
events.SteeringMessage{Steering: 0.7, Confidence: 1.0},
events.SteeringMessage{Steering: 0.7, Confidence: 1.0},
events.ObjectsMessage{},
},
{
events.DriveModeMessage{DriveMode: events.DriveMode_USER},
events.SteeringMessage{Steering: 0.5, Confidence: 1.0},
events.SteeringMessage{Steering: 0.8, Confidence: 1.0},
events.SteeringMessage{Steering: 0.5, Confidence: 1.0},
events.ObjectsMessage{},
},
{
events.DriveModeMessage{DriveMode: events.DriveMode_USER},
events.SteeringMessage{Steering: 0.4, Confidence: 1.0},
events.SteeringMessage{Steering: 0.9, Confidence: 1.0},
events.SteeringMessage{Steering: 0.4, Confidence: 1.0},
events.ObjectsMessage{},
},
{
events.DriveModeMessage{DriveMode: events.DriveMode_USER},
events.SteeringMessage{Steering: 0.6, Confidence: 1.0},
events.SteeringMessage{Steering: -0.3, Confidence: 1.0},
events.SteeringMessage{Steering: 0.6, Confidence: 1.0},
events.ObjectsMessage{},
},
}
@ -88,6 +96,7 @@ func TestDefaultSteering(t *testing.T) {
p.onDriveMode(nil, testtools.NewFakeMessageFromProtobuf(driveModeTopic, &c.driveMode))
p.onRCSteering(nil, testtools.NewFakeMessageFromProtobuf(rcSteeringTopic, &c.rcSteering))
p.onTFSteering(nil, testtools.NewFakeMessageFromProtobuf(tfSteeringTopic, &c.tfSteering))
p.onObjects(nil, testtools.NewFakeMessageFromProtobuf(objectsTopic, &c.objects))
time.Sleep(10 * time.Millisecond)