refactor: move package to pkg
This commit is contained in:
186
pkg/part/record.go
Normal file
186
pkg/part/record.go
Normal file
@ -0,0 +1,186 @@
|
||||
package part
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/cyrilix/robocar-base/service"
|
||||
"github.com/cyrilix/robocar-protobuf/go/events"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewRecorder(client mqtt.Client, recordTopic, cameraTopic, steeringTopic, switchRecordTopic string) *Recorder {
|
||||
return &Recorder{
|
||||
client: client,
|
||||
recordTopic: recordTopic,
|
||||
cameraTopic: cameraTopic,
|
||||
steeringTopic: steeringTopic,
|
||||
switchRecordTopic: switchRecordTopic,
|
||||
enabled: false,
|
||||
idGenerator: NewDateBasedGenerator(),
|
||||
recordSet: "",
|
||||
cancel: make(chan interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
type Recorder struct {
|
||||
client mqtt.Client
|
||||
recordTopic string
|
||||
cameraTopic, steeringTopic, switchRecordTopic string
|
||||
|
||||
muSteeringMsg sync.Mutex
|
||||
currentSteering *events.SteeringMessage
|
||||
|
||||
muEnabled sync.RWMutex
|
||||
enabled bool
|
||||
|
||||
idGenerator IdGenerator
|
||||
recordSet string
|
||||
|
||||
cancel chan interface {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Recorder) Start() error {
|
||||
registerCallBacks(r)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.cancel:
|
||||
zap.S().Info("Stop service")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Recorder) Stop() {
|
||||
close(r.cancel)
|
||||
service.StopService("record", r.client, r.cameraTopic, r.steeringTopic)
|
||||
}
|
||||
|
||||
func (r *Recorder) onSwitchRecord(_ mqtt.Client, message mqtt.Message) {
|
||||
var msg events.SwitchRecordMessage
|
||||
err := proto.Unmarshal(message.Payload(), &msg)
|
||||
if err != nil {
|
||||
zap.S().Errorf("unable to unmarshal protobuf %T: %v", msg, err)
|
||||
return
|
||||
}
|
||||
|
||||
r.muEnabled.Lock()
|
||||
defer r.muEnabled.Unlock()
|
||||
|
||||
if !r.enabled && msg.GetEnabled() {
|
||||
r.recordSet = r.idGenerator.Next()
|
||||
}
|
||||
|
||||
r.enabled = msg.GetEnabled()
|
||||
}
|
||||
|
||||
func (r *Recorder) onSteering(_ mqtt.Client, message mqtt.Message) {
|
||||
var msg events.SteeringMessage
|
||||
err := proto.Unmarshal(message.Payload(), &msg)
|
||||
if err != nil {
|
||||
zap.S().Errorf("unable to unmarshal protobuf %T: %v", msg, err)
|
||||
return
|
||||
}
|
||||
|
||||
r.muSteeringMsg.Lock()
|
||||
defer r.muSteeringMsg.Unlock()
|
||||
r.currentSteering = &msg
|
||||
}
|
||||
|
||||
func (r *Recorder) onFrame(_ mqtt.Client, message mqtt.Message) {
|
||||
if !r.Enabled() {
|
||||
return
|
||||
}
|
||||
|
||||
var msg events.FrameMessage
|
||||
err := proto.Unmarshal(message.Payload(), &msg)
|
||||
if err != nil {
|
||||
zap.S().Errorf("unable to unmarshal protobuf FrameMessage: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
steering := r.CurrentSteering()
|
||||
if steering == nil {
|
||||
zap.S().Warnf("no current steeringMsg, skip frameMsg %v", msg.GetId().Id)
|
||||
return
|
||||
}
|
||||
|
||||
record := events.RecordMessage{
|
||||
Frame: &msg,
|
||||
Steering: steering,
|
||||
RecordSet: r.recordSet,
|
||||
}
|
||||
|
||||
payload, err := proto.Marshal(&record)
|
||||
if err != nil {
|
||||
zap.S().Errorf("unable to marshal message %v: %v", record, err)
|
||||
return
|
||||
}
|
||||
publish(r.client, r.recordTopic, &payload)
|
||||
}
|
||||
|
||||
var publish = func(client mqtt.Client, topic string, payload *[]byte) {
|
||||
client.Publish(topic, 0, false, *payload)
|
||||
}
|
||||
|
||||
func (r *Recorder) CurrentSteering() *events.SteeringMessage {
|
||||
r.muSteeringMsg.Lock()
|
||||
defer r.muSteeringMsg.Unlock()
|
||||
steering := r.currentSteering
|
||||
return steering
|
||||
}
|
||||
|
||||
func (r *Recorder) Enabled() bool {
|
||||
r.muEnabled.RLock()
|
||||
defer r.muEnabled.RUnlock()
|
||||
return r.enabled
|
||||
}
|
||||
|
||||
var registerCallBacks = func(r *Recorder) {
|
||||
err := service.RegisterCallback(r.client, r.cameraTopic, r.onFrame)
|
||||
if err != nil {
|
||||
log.Panicf("unable to register callback to %v:%v", r.cameraTopic, err)
|
||||
}
|
||||
|
||||
err = service.RegisterCallback(r.client, r.steeringTopic, r.onSteering)
|
||||
if err != nil {
|
||||
log.Panicf("unable to register callback to %v:%v", r.steeringTopic, err)
|
||||
}
|
||||
|
||||
err = service.RegisterCallback(r.client, r.switchRecordTopic, r.onSwitchRecord)
|
||||
if err != nil {
|
||||
log.Panicf("unable to register callback to %v:%v", r.switchRecordTopic, err)
|
||||
}
|
||||
}
|
||||
|
||||
type IdGenerator interface {
|
||||
Next() string
|
||||
}
|
||||
|
||||
func NewDateBasedGenerator() *DateBasedGenerator {
|
||||
return &DateBasedGenerator{
|
||||
muCpt: sync.Mutex{},
|
||||
cpt: 0,
|
||||
idTemplate: "%s-%d",
|
||||
start: time.Now().Format("200601021504"),
|
||||
}
|
||||
}
|
||||
|
||||
type DateBasedGenerator struct {
|
||||
muCpt sync.Mutex
|
||||
cpt int
|
||||
idTemplate string
|
||||
start string
|
||||
}
|
||||
|
||||
func (d *DateBasedGenerator) Next() string {
|
||||
d.muCpt.Lock()
|
||||
defer d.muCpt.Unlock()
|
||||
d.cpt += 1
|
||||
return fmt.Sprintf(d.idTemplate, d.start, d.cpt)
|
||||
}
|
157
pkg/part/record_test.go
Normal file
157
pkg/part/record_test.go
Normal file
@ -0,0 +1,157 @@
|
||||
package part
|
||||
|
||||
import (
|
||||
"github.com/cyrilix/robocar-base/testtools"
|
||||
"github.com/cyrilix/robocar-protobuf/go/events"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/golang/protobuf/proto"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"io/ioutil"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRecorder_Record(t *testing.T) {
|
||||
oldRegister := registerCallBacks
|
||||
oldPublish := publish
|
||||
defer func() {
|
||||
registerCallBacks = oldRegister
|
||||
publish = oldPublish
|
||||
}()
|
||||
|
||||
registerCallBacks = func(_ *Recorder) {}
|
||||
|
||||
recordTopic := "topic/record"
|
||||
cameraTopic := "topic/camera"
|
||||
steeringTopic := "topic/steeringMsg"
|
||||
switchRecord := "topic/switch/record"
|
||||
|
||||
var muEventsPublished sync.Mutex
|
||||
var eventsPublished *events.RecordMessage
|
||||
publish = func(client mqtt.Client, topic string, payload *[]byte) {
|
||||
if topic != recordTopic {
|
||||
t.Errorf("event published on bad topic: %v, wants %v", topic, recordTopic)
|
||||
return
|
||||
}
|
||||
muEventsPublished.Lock()
|
||||
defer muEventsPublished.Unlock()
|
||||
var msg events.RecordMessage
|
||||
err := proto.Unmarshal(*payload, &msg)
|
||||
if err != nil {
|
||||
t.Errorf("unable to record plublished event: %v", err)
|
||||
}
|
||||
eventsPublished = &msg
|
||||
}
|
||||
|
||||
recorder := NewRecorder(nil, recordTopic, cameraTopic, steeringTopic, switchRecord)
|
||||
recorder.idGenerator = &DateBasedGenerator{
|
||||
muCpt: sync.Mutex{},
|
||||
cpt: 0,
|
||||
idTemplate: "%s-%d",
|
||||
start: "record",
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := recorder.Start(); err == nil {
|
||||
t.Fatalf("unable to start recorder: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
frame1 := loadImage(t, "testdata/img.jpg", "01")
|
||||
frame2 := loadImage(t, "testdata/img.jpg", "02")
|
||||
steeringRight := events.SteeringMessage{Steering: 0.5, Confidence: 1.0}
|
||||
steeringLeft := events.SteeringMessage{Steering: -0.5, Confidence: 1.0}
|
||||
|
||||
cases := []struct {
|
||||
recordMsg *events.SwitchRecordMessage
|
||||
frameMsg *events.FrameMessage
|
||||
steeringMsg *events.SteeringMessage
|
||||
expectedRecordMsg *events.RecordMessage
|
||||
wait time.Duration
|
||||
}{
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: false}, frameMsg: nil, steeringMsg: nil, expectedRecordMsg: nil, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: true}, frameMsg: nil, steeringMsg: nil, expectedRecordMsg: nil, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: true}, frameMsg: frame1, steeringMsg: nil, expectedRecordMsg: nil, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: true}, frameMsg: nil, steeringMsg: &steeringRight, expectedRecordMsg: nil, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: true}, frameMsg: frame1, steeringMsg: &steeringRight, expectedRecordMsg: &events.RecordMessage{RecordSet: "record-1", Frame: frame1, Steering: &steeringRight}, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: true}, frameMsg: nil, steeringMsg: &steeringLeft, expectedRecordMsg: nil, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: true}, frameMsg: frame2, steeringMsg: &steeringLeft, expectedRecordMsg: &events.RecordMessage{RecordSet: "record-1", Frame: frame2, Steering: &steeringLeft}, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: false}, frameMsg: nil, steeringMsg: nil, expectedRecordMsg: nil, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: false}, frameMsg: nil, steeringMsg: nil, expectedRecordMsg: nil, wait: 5 * time.Millisecond},
|
||||
{recordMsg: &events.SwitchRecordMessage{Enabled: true}, frameMsg: frame1, steeringMsg: &steeringRight, expectedRecordMsg: &events.RecordMessage{RecordSet: "record-2", Frame: frame1, Steering: &steeringLeft}, wait: 5 * time.Millisecond},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
muEventsPublished.Lock()
|
||||
eventsPublished = nil
|
||||
muEventsPublished.Unlock()
|
||||
|
||||
if c.recordMsg != nil {
|
||||
recorder.onSwitchRecord(nil, testtools.NewFakeMessageFromProtobuf(recordTopic, c.recordMsg))
|
||||
}
|
||||
if c.frameMsg != nil {
|
||||
recorder.onFrame(nil, testtools.NewFakeMessageFromProtobuf(cameraTopic, c.frameMsg))
|
||||
}
|
||||
if c.steeringMsg != nil {
|
||||
recorder.onSteering(nil, testtools.NewFakeMessageFromProtobuf(steeringTopic, c.steeringMsg))
|
||||
}
|
||||
|
||||
time.Sleep(c.wait)
|
||||
|
||||
if c.expectedRecordMsg == nil && eventsPublished != nil {
|
||||
t.Errorf("unexpected published event: %v", eventsPublished)
|
||||
}
|
||||
if c.expectedRecordMsg.String() != eventsPublished.String() {
|
||||
t.Errorf("bad message published: %v, wants %v", eventsPublished, c.expectedRecordMsg)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func loadImage(t *testing.T, imgPath string, id string) *events.FrameMessage {
|
||||
jpegContent, err := ioutil.ReadFile(imgPath)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to load image: %v", err)
|
||||
}
|
||||
msg := &events.FrameMessage{
|
||||
Id: &events.FrameRef{
|
||||
Name: imgPath,
|
||||
Id: id,
|
||||
},
|
||||
Frame: jpegContent,
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func TestDateBasedGenerator_Next(t *testing.T) {
|
||||
log.SetLevel(log.InfoLevel)
|
||||
|
||||
expectedFmt := "[0-9]{4}[01][0-9][0-3][0-9][0-2][0-9][0-5][0-9]-[0-9]+"
|
||||
r, err := regexp.Compile(expectedFmt)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to compile expected regex: %v", err)
|
||||
}
|
||||
d := NewDateBasedGenerator()
|
||||
id1 := d.Next()
|
||||
log.Debugf("first id: %v", id1)
|
||||
if !r.MatchString(id1) {
|
||||
t.Errorf("Unexpected id format: %v, wants: %s", id1, expectedFmt)
|
||||
}
|
||||
|
||||
id2 := d.Next()
|
||||
log.Debugf("2nd id: %v", id2)
|
||||
|
||||
if strings.Split(id1, "-")[0] != strings.Split(id2, "-")[0] {
|
||||
t.Errorf("ids are differentt prefixes: %v - %v", strings.Split(id1, "-")[0], strings.Split(id2, "-")[0])
|
||||
}
|
||||
|
||||
if strings.Split(id1, "-")[1] != "1" {
|
||||
t.Errorf("unexpected suffix: %v, wants %v", strings.Split(id1, "-")[1], "1")
|
||||
}
|
||||
if strings.Split(id2, "-")[1] != "2" {
|
||||
t.Errorf("unexpected suffix: %v, wants %v", strings.Split(id2, "-")[1], "2")
|
||||
}
|
||||
}
|
BIN
pkg/part/testdata/img.jpg
vendored
Normal file
BIN
pkg/part/testdata/img.jpg
vendored
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.1 KiB |
Reference in New Issue
Block a user