robocar-road/road/part.go

120 lines
2.6 KiB
Go
Raw Normal View History

package road
import (
"github.com/cyrilix/robocar-base/service"
"github.com/cyrilix/robocar-protobuf/go/events"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
"gocv.io/x/gocv"
"google.golang.org/protobuf/proto"
"log"
)
type RoadPart struct {
client mqtt.Client
frameChan chan frameToProcess
readyForNext chan interface{}
cancel chan interface{}
detector *Detector
cameraTopic, roadTopic string
}
func NewPart(client mqtt.Client, cameraTopic, roadTopic string, detector *Detector) *RoadPart {
return &RoadPart{
client: client,
frameChan: make(chan frameToProcess),
cancel: make(chan interface{}),
detector: detector,
cameraTopic: cameraTopic,
roadTopic: roadTopic,
}
}
func (r *RoadPart) Start() error {
registerCallBacks(r)
var frame = frameToProcess{}
defer func() {
if err := frame.Close(); err != nil {
zap.S().Errorf("unable to close msg: %v", err)
}
}()
for {
select {
case f := <-r.frameChan:
oldFrame := frame
frame = f
if err := oldFrame.Close(); err != nil {
zap.S().Errorf("unable to close msg: %v", err)
}
go r.processFrame(&frame)
case <-r.cancel:
zap.S().Info("Stop service")
return nil
}
}
}
var registerCallBacks = func(r *RoadPart) {
err := service.RegisterCallback(r.client, r.cameraTopic, r.OnFrame)
if err != nil {
log.Panicf("unable to register callback to topic %v:%v", r.cameraTopic, err)
}
}
func (r *RoadPart) Stop() {
defer r.detector.Close()
close(r.readyForNext)
close(r.cancel)
service.StopService("road", r.client, r.roadTopic)
}
func (r *RoadPart) OnFrame(_ mqtt.Client, msg mqtt.Message) {
var frameMsg events.FrameMessage
err := proto.Unmarshal(msg.Payload(), &frameMsg)
if err != nil {
zap.S().Errorf("unable to unmarshal %T message: %v", frameMsg, err)
return
}
img, err := gocv.IMDecode(frameMsg.GetFrame(), gocv.IMReadUnchanged)
if err != nil {
zap.S().Errorf("unable to decode image: %v", err)
return
}
frame := frameToProcess{
ref: frameMsg.GetId(),
Mat: img,
}
r.frameChan <- frame
}
type frameToProcess struct {
ref *events.FrameRef
gocv.Mat
}
func (r *RoadPart) processFrame(frame *frameToProcess) {
img := frame.Mat
cntr, ellipse := r.detector.Detect(&img)
msg := events.RoadMessage{
Contour: cntr,
Ellipse: ellipse,
FrameRef: frame.ref,
}
payload, err := proto.Marshal(&msg)
if err != nil {
zap.S().Errorf("unable to marshal %T to protobuf: %err", msg, err)
return
}
publish(r.client, r.roadTopic, &payload)
}
var publish = func(client mqtt.Client, topic string, payload *[]byte) {
client.Publish(topic, 0, false, *payload)
}