robocar-simulator/pkg/events/events.go

155 lines
3.2 KiB
Go

package events
import (
"fmt"
"github.com/cyrilix/robocar-simulator/pkg/gateway"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"sync"
"time"
)
func NewMsgPublisher(srcEvents gateway.SimulatorSource, p Publisher, topicFrame, topicSteering, topicThrottle string) *MsgPublisher {
return &MsgPublisher{
p: p,
topicFrame: topicFrame,
topicSteering: topicSteering,
topicThrottle: topicThrottle,
srcEvents: srcEvents,
muCancel: sync.Mutex{},
cancel: nil,
}
}
type MsgPublisher struct {
p Publisher
topicFrame string
topicSteering string
topicThrottle string
srcEvents gateway.SimulatorSource
muCancel sync.Mutex
cancel chan interface{}
}
func (m *MsgPublisher) Start() {
m.muCancel.Lock()
defer m.muCancel.Unlock()
m.cancel = make(chan interface{})
if m.topicThrottle != "" {
go m.listenThrottle()
}
if m.topicSteering != "" {
go m.listenSteering()
}
if m.topicFrame != "" {
go m.listenFrame()
}
}
func (m *MsgPublisher) Stop() {
m.muCancel.Lock()
defer m.muCancel.Unlock()
close(m.cancel)
m.cancel = nil
}
func (m *MsgPublisher) listenThrottle() {
logr := log.WithField("msg_type", "throttleChan")
msgChan := m.srcEvents.SubscribeThrottle()
for {
select {
case <-m.cancel:
logr.Debug("exit listen throttleChan loop")
return
case msg := <-msgChan:
payload, err := proto.Marshal(msg)
if err != nil {
logr.Errorf("unable to marshal protobuf message: %v", err)
} else {
err = m.p.Publish(m.topicThrottle, payload)
if err != nil {
logr.Errorf("unable to publish events message: %v", err)
}
}
}
}
}
func (m *MsgPublisher) listenSteering() {
logr := log.WithField("msg_type", "steeringChan")
msgChan := m.srcEvents.SubscribeSteering()
for {
select {
case msg := <-msgChan:
if m.topicSteering == "" {
return
}
payload, err := proto.Marshal(msg)
if err != nil {
logr.Errorf("unable to marshal protobuf message: %v", err)
} else {
err = m.p.Publish(m.topicSteering, payload)
if err != nil {
logr.Errorf("unable to publish events message: %v", err)
}
}
}
}
}
func (m *MsgPublisher) listenFrame() {
logr := log.WithField("msg_type", "frame")
msgChan := m.srcEvents.SubscribeFrame()
for {
msg := <-msgChan
if msg == nil {
// channel closed
break
}
logr.Debugf("new frame %v", msg.Id)
if m.topicFrame == "" {
return
}
payload, err := proto.Marshal(msg)
if err != nil {
logr.Errorf("unable to marshal protobuf message: %v", err)
continue
}
err = m.p.Publish(m.topicFrame, payload)
if err != nil {
logr.Errorf("unable to publish events message: %v", err)
}
}
}
type Publisher interface {
Publish(topic string, payload []byte) error
}
func NewMqttPublisher(client mqtt.Client) *MqttPublisher {
return &MqttPublisher{client: client}
}
type MqttPublisher struct {
client mqtt.Client
}
func (m *MqttPublisher) Publish(topic string, payload []byte) error {
token := m.client.Publish(topic, 0, false, payload)
token.WaitTimeout(10 * time.Millisecond)
if err := token.Error(); err != nil {
return fmt.Errorf("unable to events to topic: %v", err)
}
return nil
}