120 lines
2.6 KiB
Go
120 lines
2.6 KiB
Go
|
package road
|
||
|
|
||
|
import (
|
||
|
"github.com/cyrilix/robocar-base/service"
|
||
|
"github.com/cyrilix/robocar-protobuf/go/events"
|
||
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||
|
"go.uber.org/zap"
|
||
|
"gocv.io/x/gocv"
|
||
|
"google.golang.org/protobuf/proto"
|
||
|
"log"
|
||
|
)
|
||
|
|
||
|
type RoadPart struct {
|
||
|
client mqtt.Client
|
||
|
frameChan chan frameToProcess
|
||
|
readyForNext chan interface{}
|
||
|
cancel chan interface{}
|
||
|
detector *Detector
|
||
|
cameraTopic, roadTopic string
|
||
|
}
|
||
|
|
||
|
func NewPart(client mqtt.Client, cameraTopic, roadTopic string, detector *Detector) *RoadPart {
|
||
|
return &RoadPart{
|
||
|
client: client,
|
||
|
frameChan: make(chan frameToProcess),
|
||
|
cancel: make(chan interface{}),
|
||
|
detector: detector,
|
||
|
cameraTopic: cameraTopic,
|
||
|
roadTopic: roadTopic,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *RoadPart) Start() error {
|
||
|
registerCallBacks(r)
|
||
|
|
||
|
var frame = frameToProcess{}
|
||
|
defer func() {
|
||
|
if err := frame.Close(); err != nil {
|
||
|
zap.S().Errorf("unable to close msg: %v", err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case f := <-r.frameChan:
|
||
|
oldFrame := frame
|
||
|
frame = f
|
||
|
if err := oldFrame.Close(); err != nil {
|
||
|
zap.S().Errorf("unable to close msg: %v", err)
|
||
|
}
|
||
|
go r.processFrame(&frame)
|
||
|
case <-r.cancel:
|
||
|
zap.S().Info("Stop service")
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var registerCallBacks = func(r *RoadPart) {
|
||
|
err := service.RegisterCallback(r.client, r.cameraTopic, r.OnFrame)
|
||
|
if err != nil {
|
||
|
log.Panicf("unable to register callback to topic %v:%v", r.cameraTopic, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *RoadPart) Stop() {
|
||
|
defer r.detector.Close()
|
||
|
close(r.readyForNext)
|
||
|
close(r.cancel)
|
||
|
service.StopService("road", r.client, r.roadTopic)
|
||
|
}
|
||
|
|
||
|
func (r *RoadPart) OnFrame(_ mqtt.Client, msg mqtt.Message) {
|
||
|
var frameMsg events.FrameMessage
|
||
|
err := proto.Unmarshal(msg.Payload(), &frameMsg)
|
||
|
if err != nil {
|
||
|
zap.S().Errorf("unable to unmarshal %T message: %v", frameMsg, err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
img, err := gocv.IMDecode(frameMsg.GetFrame(), gocv.IMReadUnchanged)
|
||
|
if err != nil {
|
||
|
zap.S().Errorf("unable to decode image: %v", err)
|
||
|
return
|
||
|
}
|
||
|
frame := frameToProcess{
|
||
|
ref: frameMsg.GetId(),
|
||
|
Mat: img,
|
||
|
}
|
||
|
r.frameChan <- frame
|
||
|
}
|
||
|
|
||
|
type frameToProcess struct {
|
||
|
ref *events.FrameRef
|
||
|
gocv.Mat
|
||
|
}
|
||
|
|
||
|
func (r *RoadPart) processFrame(frame *frameToProcess) {
|
||
|
img := frame.Mat
|
||
|
|
||
|
cntr, ellipse := r.detector.Detect(&img)
|
||
|
|
||
|
msg := events.RoadMessage{
|
||
|
Contour: cntr,
|
||
|
Ellipse: ellipse,
|
||
|
FrameRef: frame.ref,
|
||
|
}
|
||
|
|
||
|
payload, err := proto.Marshal(&msg)
|
||
|
if err != nil {
|
||
|
zap.S().Errorf("unable to marshal %T to protobuf: %err", msg, err)
|
||
|
return
|
||
|
}
|
||
|
publish(r.client, r.roadTopic, &payload)
|
||
|
}
|
||
|
|
||
|
var publish = func(client mqtt.Client, topic string, payload *[]byte) {
|
||
|
client.Publish(topic, 0, false, *payload)
|
||
|
}
|