robocar-steering-tflite-edg.../pkg/steering/steering.go

263 lines
6.5 KiB
Go
Raw Normal View History

2021-10-10 19:03:57 +00:00
package steering
import (
"bytes"
"context"
2021-10-10 19:03:57 +00:00
"fmt"
"github.com/cyrilix/robocar-base/service"
"github.com/cyrilix/robocar-protobuf/go/events"
"github.com/cyrilix/robocar-steering-tflite-edgetpu/pkg/metrics"
"github.com/disintegration/imaging"
2021-10-10 19:03:57 +00:00
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/golang/protobuf/proto"
"github.com/mattn/go-tflite"
"github.com/mattn/go-tflite/delegates/edgetpu"
"go.uber.org/zap"
"image"
2021-10-12 15:30:37 +00:00
_ "image/jpeg"
2021-10-10 19:03:57 +00:00
"sort"
"time"
2021-10-10 19:03:57 +00:00
)
func NewPart(client mqtt.Client, modelPath, steeringTopic, cameraTopic string, edgeVerbosity int, imgWidth, imgHeight, horizon int) *Part {
2021-10-10 19:03:57 +00:00
return &Part{
client: client,
2021-10-12 15:32:03 +00:00
modelPath: modelPath,
2021-10-10 19:03:57 +00:00
steeringTopic: steeringTopic,
cameraTopic: cameraTopic,
edgeVebosity: edgeVerbosity,
imgWidth: imgWidth,
imgHeight: imgHeight,
horizon: horizon,
2021-10-10 19:03:57 +00:00
}
}
type Part struct {
client mqtt.Client
steeringTopic string
cameraTopic string
cancel chan interface{}
options *tflite.InterpreterOptions
interpreter *tflite.Interpreter
modelPath string
model *tflite.Model
edgeVebosity int
imgWidth int
imgHeight int
horizon int
2021-10-10 19:03:57 +00:00
}
func (p *Part) Start() error {
2021-10-12 15:32:03 +00:00
p.cancel = make(chan interface{})
2021-10-10 19:03:57 +00:00
p.model = tflite.NewModelFromFile(p.modelPath)
if p.model == nil {
return fmt.Errorf("cannot load model %v", p.modelPath)
}
// Get the list of devices
devices, err := edgetpu.DeviceList()
if err != nil {
return fmt.Errorf("could not get EdgeTPU devices: %w", err)
}
if len(devices) == 0 {
return fmt.Errorf("no edge TPU devices found")
}
// Print the EdgeTPU version
edgetpuVersion, err := edgetpu.Version()
if err != nil {
return fmt.Errorf("cannot get EdgeTPU version: %w", err)
}
2021-10-12 15:34:47 +00:00
zap.S().Infof("EdgeTPU Version: %s", edgetpuVersion)
2021-10-10 19:03:57 +00:00
edgetpu.Verbosity(p.edgeVebosity)
p.options = tflite.NewInterpreterOptions()
2021-10-12 15:32:03 +00:00
p.options.SetNumThread(4)
2021-10-10 19:03:57 +00:00
p.options.SetErrorReporter(func(msg string, userData interface{}) {
2021-10-12 15:34:47 +00:00
zap.S().Errorw(msg,
"userData", userData,
2021-10-10 19:03:57 +00:00
)
}, nil)
2021-10-12 15:34:47 +00:00
zap.S().Infof("find %d edgetpu devices", len(devices))
zap.S().Infow("configure edgetpu",
"path", devices[0].Path,
"type", uint32(devices[0].Type),
)
2021-10-10 19:03:57 +00:00
// Add the first EdgeTPU device
d := edgetpu.New(devices[0])
2021-10-12 15:32:03 +00:00
if d == nil {
return fmt.Errorf("unable to create new EdgeTpu delegate")
}
2021-10-10 19:03:57 +00:00
p.options.AddDelegate(d)
p.interpreter = tflite.NewInterpreter(p.model, p.options)
if p.interpreter == nil {
return fmt.Errorf("cannot create interpreter")
}
if err := registerCallbacks(p); err != nil {
2021-10-12 15:34:47 +00:00
zap.S().Errorw("unable to register callbacks", "error", err)
2021-10-10 19:03:57 +00:00
return err
}
p.cancel = make(chan interface{})
<-p.cancel
return nil
}
func (p *Part) Stop() {
close(p.cancel)
service.StopService("steering", p.client, p.cameraTopic)
if p.interpreter != nil {
p.interpreter.Delete()
}
p.interpreter.Delete()
if p.options != nil {
p.options.Delete()
}
if p.model != nil {
p.model.Delete()
}
}
func (p *Part) onFrame(_ mqtt.Client, message mqtt.Message) {
var msg events.FrameMessage
err := proto.Unmarshal(message.Payload(), &msg)
if err != nil {
zap.S().Errorf("unable to unmarshal protobuf %T message: %v", &msg, err)
return
}
now := time.Now().UnixMilli()
frameAge := now - msg.Id.CreatedAt.AsTime().UnixMilli()
go metrics.FrameAge.Record(context.Background(), frameAge)
2021-10-10 19:03:57 +00:00
img, _, err := image.Decode(bytes.NewReader(msg.GetFrame()))
if err != nil {
2021-10-12 15:34:47 +00:00
zap.L().Error("unable to decode frame, skip frame", zap.Error(err))
return
2021-10-10 19:03:57 +00:00
}
steering, confidence, err := p.Value(img)
inferenceDuration := time.Now().UnixMilli() - now
go metrics.InferenceDuration.Record(context.Background(), inferenceDuration)
2021-10-12 15:32:03 +00:00
if err != nil {
zap.S().Errorw("unable to compute sterring",
"frame", msg.GetId().GetId(),
"error", err,
2021-10-12 15:32:03 +00:00
)
return
}
2021-10-12 15:34:47 +00:00
zap.L().Debug("new steering value",
zap.Float32("steering", steering),
zap.Float32("confidence", confidence),
)
2021-10-10 19:03:57 +00:00
msgSteering := &events.SteeringMessage{
Steering: steering,
Confidence: confidence,
FrameRef: msg.Id,
}
payload, err := proto.Marshal(msgSteering)
if err != nil {
zap.L().Error("unable to marshal protobuf message", zap.Error(err))
}
publish(p.client, p.steeringTopic, payload)
}
func (p *Part) Value(img image.Image) (float32, float32, error) {
status := p.interpreter.AllocateTensors()
if status != tflite.OK {
return 0., 0., fmt.Errorf("tensor allocate failed: %v", status)
}
input := p.interpreter.GetInputTensor(0)
dx := img.Bounds().Dx()
dy := img.Bounds().Dy()
if dx != p.imgWidth || dy != p.imgHeight {
img = imaging.Resize(img, p.imgWidth, p.imgHeight, imaging.NearestNeighbor)
}
if p.horizon > 0 {
img = imaging.Crop(img, image.Rect(0, p.horizon, img.Bounds().Dx(), img.Bounds().Dy()))
}
dx = img.Bounds().Dx()
dy = img.Bounds().Dy()
2021-10-10 19:03:57 +00:00
bb := make([]byte, dx*dy*3)
for y := 0; y < dy; y++ {
for x := 0; x < dx; x++ {
2021-10-10 19:03:57 +00:00
r, g, b, _ := img.At(x, y).RGBA()
2021-12-11 17:22:36 +00:00
bb[(y*dx+x)*3+0] = byte(float64(r) / 257.0)
bb[(y*dx+x)*3+1] = byte(float64(g) / 257.0)
bb[(y*dx+x)*3+2] = byte(float64(b) / 257.0)
2021-10-10 19:03:57 +00:00
}
}
status = input.CopyFromBuffer(bb)
if status != tflite.OK {
return 0., 0., fmt.Errorf("input copy from buffer failed: %v", status)
}
status = p.interpreter.Invoke()
if status != tflite.OK {
return 0., 0., fmt.Errorf("invoke failed: %v", status)
}
output := p.interpreter.GetOutputTensor(0)
outputSize := output.Dim(output.NumDims() - 1)
b := make([]byte, outputSize)
type result struct {
score float64
index int
}
status = output.CopyToBuffer(&b[0])
if status != tflite.OK {
return 0., 0., fmt.Errorf("output failed: %v", status)
}
var results []result
minScore := 0.2
for i := 0; i < outputSize; i++ {
score := float64(b[i]) / 255.0
if score < minScore {
continue
}
results = append(results, result{score: score, index: i})
}
if len(results) == 0 {
zap.L().Warn(fmt.Sprintf("none steering with score > %0.2f found", minScore))
return 0., 0., nil
}
sort.Slice(results, func(i, j int) bool {
return results[i].score > results[j].score
})
steering := float64(results[0].index)*(2./float64(outputSize)) - 1
zap.L().Debug("found steering",
zap.Float64("steering", steering),
zap.Float64("score", results[0].score),
)
return float32(steering), float32(results[0].score), nil
}
var registerCallbacks = func(p *Part) error {
err := service.RegisterCallback(p.client, p.cameraTopic, p.onFrame)
if err != nil {
2021-10-12 15:32:03 +00:00
return fmt.Errorf("unable to register callback: %w", err)
2021-10-10 19:03:57 +00:00
}
return nil
}
var publish = func(client mqtt.Client, topic string, payload []byte) {
client.Publish(topic, 0, false, payload)
}