first implementation
This commit is contained in:
104
pkg/recorder/recorder.go
Normal file
104
pkg/recorder/recorder.go
Normal file
@ -0,0 +1,104 @@
|
||||
package recorder
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/cyrilix/robocar-base/service"
|
||||
"github.com/cyrilix/robocar-protobuf/go/events"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
)
|
||||
|
||||
func New(client mqtt.Client, recordsDir, recordTopic string) (*Recorder, error) {
|
||||
err := os.MkdirAll(recordsDir, os.FileMode(0755))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create %v directory: %v", recordsDir, err)
|
||||
}
|
||||
return &Recorder{
|
||||
client: client,
|
||||
recordsDir: recordsDir,
|
||||
recordTopic: recordTopic,
|
||||
cancel: make(chan interface{}),
|
||||
}, nil
|
||||
|
||||
}
|
||||
|
||||
type Recorder struct {
|
||||
client mqtt.Client
|
||||
recordsDir string
|
||||
recordTopic string
|
||||
cancel chan interface{}
|
||||
}
|
||||
|
||||
var FileNameFormat = "record_%s.json"
|
||||
|
||||
func (r *Recorder) Start() error {
|
||||
err := service.RegisterCallback(r.client, r.recordTopic, r.onRecordMsg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to start rc-recorder part: %v", err)
|
||||
}
|
||||
<-r.cancel
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Recorder) Stop() {
|
||||
service.StopService("record", r.client, r.recordTopic)
|
||||
close(r.cancel)
|
||||
}
|
||||
|
||||
func (r *Recorder) onRecordMsg(_ mqtt.Client, message mqtt.Message) {
|
||||
l := zap.S()
|
||||
var msg events.RecordMessage
|
||||
err := proto.Unmarshal(message.Payload(), &msg)
|
||||
if err != nil {
|
||||
zap.S().Errorf("unable to unmarshal protobuf %T: %v", msg, err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("record %s: %s\r", msg.GetRecordSet(), msg.GetFrame().GetId().GetId())
|
||||
|
||||
recordDir := fmt.Sprintf("%s/%s", r.recordsDir, msg.GetRecordSet())
|
||||
|
||||
imgDir := fmt.Sprintf("%s/cam", recordDir)
|
||||
imgRef := fmt.Sprintf("cam/cam-image_array_%s.jpg", msg.GetFrame().GetId().GetId())
|
||||
imgName := fmt.Sprintf("%s/cam-image_array_%s.jpg", imgDir, msg.GetFrame().GetId().GetId())
|
||||
err = os.MkdirAll(imgDir, os.FileMode(0755))
|
||||
if err != nil {
|
||||
l.Errorf("unable to create %v directory: %v", imgDir, err)
|
||||
return
|
||||
}
|
||||
err = ioutil.WriteFile(imgName, msg.GetFrame().GetFrame(), os.FileMode(0755))
|
||||
if err != nil {
|
||||
l.Errorf("unable to write img file %v: %v", imgName, err)
|
||||
return
|
||||
}
|
||||
|
||||
jsonDir := fmt.Sprintf("%s/", recordDir)
|
||||
recordName := fmt.Sprintf("%s/%s", jsonDir, fmt.Sprintf(FileNameFormat, msg.GetFrame().GetId().GetId()))
|
||||
err = os.MkdirAll(jsonDir, os.FileMode(0755))
|
||||
if err != nil {
|
||||
l.Errorf("unable to create %v directory: %v", jsonDir, err)
|
||||
return
|
||||
}
|
||||
record := Record{
|
||||
UserAngle: msg.GetSteering().GetSteering(),
|
||||
CamImageArray: imgRef,
|
||||
}
|
||||
jsonBytes, err := json.Marshal(&record)
|
||||
if err != nil {
|
||||
l.Errorf("unable to marshal json content: %v", err)
|
||||
return
|
||||
}
|
||||
err = ioutil.WriteFile(recordName, jsonBytes, 0755)
|
||||
if err != nil {
|
||||
l.Errorf("unable to write json file %v: %v", recordName, err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type Record struct {
|
||||
UserAngle float32 `json:"user/angle,"`
|
||||
CamImageArray string `json:"cam/image_array,"`
|
||||
}
|
146
pkg/recorder/recorder_test.go
Normal file
146
pkg/recorder/recorder_test.go
Normal file
@ -0,0 +1,146 @@
|
||||
package recorder
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/cyrilix/robocar-base/testtools"
|
||||
"github.com/cyrilix/robocar-protobuf/go/events"
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
timestamp "github.com/golang/protobuf/ptypes/timestamp"
|
||||
"io/ioutil"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRecorder_onRecordMsg(t *testing.T) {
|
||||
type fields struct {
|
||||
recordsDir string
|
||||
recordSet string
|
||||
}
|
||||
type args struct {
|
||||
message mqtt.Message
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantJsonFileName string
|
||||
wantRecord Record
|
||||
}{
|
||||
{
|
||||
name: "default",
|
||||
fields: fields{
|
||||
recordsDir: t.TempDir(),
|
||||
recordSet: "default",
|
||||
},
|
||||
args: args{
|
||||
message: generateMessage("1", "default", -0.5),
|
||||
},
|
||||
wantJsonFileName: "record_1.json",
|
||||
wantRecord: Record{
|
||||
UserAngle: -0.5,
|
||||
CamImageArray: "cam/cam-image_array_1.jpg",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
r := &Recorder{
|
||||
recordsDir: tt.fields.recordsDir,
|
||||
}
|
||||
r.onRecordMsg(nil, tt.args.message)
|
||||
fis, err := ioutil.ReadDir(tt.fields.recordsDir)
|
||||
if err != nil {
|
||||
t.Errorf("unable to list files: %v", err)
|
||||
return
|
||||
}
|
||||
if len(fis) != 1 {
|
||||
t.Errorf("bad number of entry into %v: %v, want %v", tt.fields.recordsDir, len(fis), 1)
|
||||
}
|
||||
if !fis[0].IsDir() {
|
||||
t.Errorf("target record is not a directory")
|
||||
}
|
||||
if fis[0].Name() != tt.name {
|
||||
t.Errorf("bad directory name '%v', want '%v'", fis[0].Name(), tt.fields.recordSet)
|
||||
}
|
||||
records, err := ioutil.ReadDir(path.Join(tt.fields.recordsDir, fis[0].Name()))
|
||||
if err != nil {
|
||||
t.Errorf("unable to list record files")
|
||||
return
|
||||
}
|
||||
if len(records) != 2 {
|
||||
t.Errorf("records have %d entry, want %d", len(records), 2)
|
||||
return
|
||||
}
|
||||
|
||||
var record Record
|
||||
for _, r := range records {
|
||||
if r.IsDir() {
|
||||
if r.Name() != "cam" {
|
||||
t.Errorf("bad name for cam records '%v', want %v", r.Name(), "cam")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if r.Name() != tt.wantJsonFileName {
|
||||
t.Errorf("bad json filename '%v', want '%v'", r.Name(), tt.wantJsonFileName)
|
||||
}
|
||||
jsonContent, err := ioutil.ReadFile(path.Join(tt.fields.recordsDir, tt.fields.recordSet, r.Name()))
|
||||
if err != nil {
|
||||
t.Errorf("unable to read json record: %v", err)
|
||||
}
|
||||
err = json.Unmarshal(jsonContent, &record)
|
||||
if err != nil {
|
||||
t.Errorf("unable to unmarshal record: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if record != tt.wantRecord {
|
||||
t.Errorf("bad json record '%v', want '%v'", record, tt.wantRecord)
|
||||
}
|
||||
|
||||
img, err := ioutil.ReadFile(path.Join(tt.fields.recordsDir, tt.fields.recordSet, record.CamImageArray))
|
||||
if err != nil {
|
||||
t.Errorf("unable to read image: %v", err)
|
||||
return
|
||||
}
|
||||
if string(img) != "frame content" {
|
||||
t.Errorf("bad image content")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func generateMessage(id string, recordSet string, userAngle float32) mqtt.Message {
|
||||
now := time.Now()
|
||||
msg := events.RecordMessage{
|
||||
Frame: &events.FrameMessage{
|
||||
Id: &events.FrameRef{
|
||||
Name: fmt.Sprintf("framie-%s", id),
|
||||
Id: id,
|
||||
CreatedAt: ×tamp.Timestamp{
|
||||
Seconds: now.Unix(),
|
||||
Nanos: int32(now.Nanosecond()),
|
||||
},
|
||||
},
|
||||
Frame: []byte("frame content"),
|
||||
},
|
||||
Steering: &events.SteeringMessage{
|
||||
Steering: userAngle,
|
||||
Confidence: 1.0,
|
||||
FrameRef: &events.FrameRef{
|
||||
Name: fmt.Sprintf("framie-%s", id),
|
||||
Id: id,
|
||||
CreatedAt: ×tamp.Timestamp{
|
||||
Seconds: now.Unix(),
|
||||
Nanos: int32(now.Nanosecond()),
|
||||
},
|
||||
},
|
||||
},
|
||||
RecordSet: recordSet,
|
||||
}
|
||||
|
||||
return testtools.NewFakeMessageFromProtobuf("topic", &msg)
|
||||
}
|
Reference in New Issue
Block a user