rewrite module
* use protobuf message * use directly paho api to publish message
This commit is contained in:
		@@ -1,10 +1,12 @@
 | 
			
		||||
package camera
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/cyrilix/robocar-base/mqttdevice"
 | 
			
		||||
	"github.com/cyrilix/robocar-protobuf/go/events"
 | 
			
		||||
	mqtt "github.com/eclipse/paho.mqtt.golang"
 | 
			
		||||
	"github.com/golang/protobuf/proto"
 | 
			
		||||
	log "github.com/sirupsen/logrus"
 | 
			
		||||
	"gocv.io/x/gocv"
 | 
			
		||||
	"io"
 | 
			
		||||
	"log"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
@@ -15,16 +17,17 @@ type VideoSource interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type OpencvCameraPart struct {
 | 
			
		||||
	client           mqtt.Client
 | 
			
		||||
	vc               VideoSource
 | 
			
		||||
	pub              mqttdevice.Publisher
 | 
			
		||||
	topic            string
 | 
			
		||||
	publishFrequency int
 | 
			
		||||
	muImgBuffered    sync.Mutex
 | 
			
		||||
	imgBuffered      *gocv.Mat
 | 
			
		||||
	cancel           chan interface{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func New(topic string, publisher mqttdevice.Publisher, publishFrequency int, videoProperties map[gocv.VideoCaptureProperties]float64) *OpencvCameraPart {
 | 
			
		||||
	log.Printf("Run camera part")
 | 
			
		||||
func New(client mqtt.Client, topic string, publishFrequency int, videoProperties map[gocv.VideoCaptureProperties]float64) *OpencvCameraPart {
 | 
			
		||||
	log.Infof("run camera part")
 | 
			
		||||
 | 
			
		||||
	vc, err := gocv.OpenVideoCapture(0)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -36,8 +39,8 @@ func New(topic string, publisher mqttdevice.Publisher, publishFrequency int, vid
 | 
			
		||||
 | 
			
		||||
	img := gocv.NewMat()
 | 
			
		||||
	o := OpencvCameraPart{
 | 
			
		||||
		client:           client,
 | 
			
		||||
		vc:               vc,
 | 
			
		||||
		pub:              publisher,
 | 
			
		||||
		topic:            topic,
 | 
			
		||||
		publishFrequency: publishFrequency,
 | 
			
		||||
		imgBuffered:      &img,
 | 
			
		||||
@@ -47,17 +50,25 @@ func New(topic string, publisher mqttdevice.Publisher, publishFrequency int, vid
 | 
			
		||||
 | 
			
		||||
func (o *OpencvCameraPart) Start() error {
 | 
			
		||||
	log.Printf("start camera")
 | 
			
		||||
	o.cancel = make(chan interface{})
 | 
			
		||||
	ticker := time.NewTicker(1 * time.Second / time.Duration(o.publishFrequency))
 | 
			
		||||
	defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		go o.publishFrame()
 | 
			
		||||
		<-ticker.C
 | 
			
		||||
		select {
 | 
			
		||||
 | 
			
		||||
		case <-ticker.C:
 | 
			
		||||
			o.publishFrame()
 | 
			
		||||
		case <-o.cancel:
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (o *OpencvCameraPart) Stop() {
 | 
			
		||||
	log.Print("close video device")
 | 
			
		||||
	close(o.cancel)
 | 
			
		||||
 | 
			
		||||
	if err := o.vc.Close(); err != nil {
 | 
			
		||||
		log.Printf("unexpected error while VideoCapture is closed: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -77,5 +88,22 @@ func (o *OpencvCameraPart) publishFrame() {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	o.pub.Publish(o.topic, img)
 | 
			
		||||
	msg := &events.FrameMessage{
 | 
			
		||||
		Id: &events.FrameRef{
 | 
			
		||||
			Name: "camera",
 | 
			
		||||
			Id:   "XX",
 | 
			
		||||
		},
 | 
			
		||||
		Frame: img,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	payload, err := proto.Marshal(msg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Errorf("unable to marshal protobuf message: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	publish(o.client, o.topic, &payload)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var publish = func(client mqtt.Client, topic string, payload *[]byte) {
 | 
			
		||||
	client.Publish(topic, 0, false, *payload)
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -2,17 +2,22 @@ package camera
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"github.com/cyrilix/robocar-base/testtools"
 | 
			
		||||
	"github.com/cyrilix/robocar-protobuf/go/events"
 | 
			
		||||
	mqtt "github.com/eclipse/paho.mqtt.golang"
 | 
			
		||||
	"github.com/golang/protobuf/proto"
 | 
			
		||||
	"gocv.io/x/gocv"
 | 
			
		||||
	"image/jpeg"
 | 
			
		||||
	"io"
 | 
			
		||||
	"log"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type fakeVideoSource struct {
 | 
			
		||||
	io.Closer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f fakeVideoSource) Close() error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (f fakeVideoSource) Read(dest *gocv.Mat) bool {
 | 
			
		||||
@@ -25,32 +30,50 @@ func (f fakeVideoSource) Read(dest *gocv.Mat) bool {
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func TestOpencvCameraPart(t *testing.T) {
 | 
			
		||||
	p := testtools.NewFakePublisher()
 | 
			
		||||
	var muPubEvents sync.Mutex
 | 
			
		||||
	publishedEvents := make(map[string]*[]byte)
 | 
			
		||||
	oldPublish := publish
 | 
			
		||||
	defer func() {
 | 
			
		||||
		publish = oldPublish}()
 | 
			
		||||
	publish = func(_ mqtt.Client, topic string, payload *[]byte){
 | 
			
		||||
		muPubEvents.Lock()
 | 
			
		||||
		defer muPubEvents.Unlock()
 | 
			
		||||
		publishedEvents[topic] = payload
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	const topic = "topic/test/camera"
 | 
			
		||||
	imgBuffer := gocv.NewMat()
 | 
			
		||||
 | 
			
		||||
	part := OpencvCameraPart{
 | 
			
		||||
		client: nil,
 | 
			
		||||
		vc:               fakeVideoSource{},
 | 
			
		||||
		pub:              p,
 | 
			
		||||
		topic:            topic,
 | 
			
		||||
		publishFrequency: 1000,
 | 
			
		||||
		imgBuffered:      &imgBuffer,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	go part.Start()
 | 
			
		||||
	time.Sleep(1 * time.Millisecond)
 | 
			
		||||
	time.Sleep(5 * time.Millisecond)
 | 
			
		||||
 | 
			
		||||
	img := p.PublishedEvent(topic)
 | 
			
		||||
	if img == nil {
 | 
			
		||||
		t.Fatalf("event %s has not been published", topic)
 | 
			
		||||
	}
 | 
			
		||||
	content, err := img.ByteSliceValue()
 | 
			
		||||
	var frameMsg events.FrameMessage
 | 
			
		||||
	muPubEvents.Lock()
 | 
			
		||||
	err := proto.Unmarshal(*(publishedEvents[topic]), &frameMsg)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unexpected error: %v", err)
 | 
			
		||||
		t.Errorf("unable to unmarshal pubblished frame")
 | 
			
		||||
	}
 | 
			
		||||
	muPubEvents.Unlock()
 | 
			
		||||
 | 
			
		||||
	if frameMsg.GetId().GetName() != "camera" {
 | 
			
		||||
		t.Errorf("bad name frame: %v, wants %v", frameMsg.GetId().GetName(), "camera")
 | 
			
		||||
	}
 | 
			
		||||
	if frameMsg.GetId().GetId() != "XX" {
 | 
			
		||||
		t.Errorf("bad name frame: %v, wants %v", frameMsg.GetId().GetId(), "XX")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	_, err = jpeg.Decode(bytes.NewReader(content))
 | 
			
		||||
	_, err = jpeg.Decode(bytes.NewReader(frameMsg.GetFrame()))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("image published can't be decoded: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user