[mqtt] Implements mqtt publish tooling
This commit is contained in:
		
							
								
								
									
										146
									
								
								mqttdevice/mqttdevice.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										146
									
								
								mqttdevice/mqttdevice.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,146 @@
 | 
			
		||||
package mqttdevice
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"github.com/cyrilix/robocar-base/mode"
 | 
			
		||||
	MQTT "github.com/eclipse/paho.mqtt.golang"
 | 
			
		||||
	"io"
 | 
			
		||||
	"log"
 | 
			
		||||
	"strconv"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Publisher interface {
 | 
			
		||||
	Publish(topic string, payload MqttValue)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Subscriber interface {
 | 
			
		||||
	Subscribe(topic string, mh MQTT.MessageHandler)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MQTTPubSub interface {
 | 
			
		||||
	Publisher
 | 
			
		||||
	Subscriber
 | 
			
		||||
	io.Closer
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type pahoMqttPubSub struct {
 | 
			
		||||
	Uri      string
 | 
			
		||||
	Username string
 | 
			
		||||
	Password string
 | 
			
		||||
	ClientId string
 | 
			
		||||
	Qos      int
 | 
			
		||||
	Retain   bool
 | 
			
		||||
	client   MQTT.Client
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewPahoMqttPubSub(uri string, username string, password string, clientId string, qos int, retain bool) MQTTPubSub {
 | 
			
		||||
	p := pahoMqttPubSub{Uri: uri, Username: username, Password: password, ClientId: clientId, Qos: qos, Retain: retain}
 | 
			
		||||
	p.Connect()
 | 
			
		||||
	return &p
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Publish message to broker
 | 
			
		||||
func (p *pahoMqttPubSub) Publish(topic string, payload MqttValue) {
 | 
			
		||||
	tokenResp := p.client.Publish(topic, byte(p.Qos), p.Retain, string(payload))
 | 
			
		||||
	if tokenResp.Error() != nil {
 | 
			
		||||
		log.Fatalf("%+v\n", tokenResp.Error())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Register func to execute on message
 | 
			
		||||
func (p *pahoMqttPubSub) Subscribe(topic string, callback MQTT.MessageHandler) {
 | 
			
		||||
	tokenResp := p.client.Subscribe(topic, byte(p.Qos), callback)
 | 
			
		||||
	if tokenResp.Error() != nil {
 | 
			
		||||
		log.Fatalf("%+v\n", tokenResp.Error())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Close connection to broker
 | 
			
		||||
func (p *pahoMqttPubSub) Close() error {
 | 
			
		||||
	p.client.Disconnect(500)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *pahoMqttPubSub) Connect() {
 | 
			
		||||
	if p.client != nil && p.client.IsConnected() {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
	//create a ClientOptions struct setting the broker address, clientid, turn
 | 
			
		||||
	//off trace output and set the default message handler
 | 
			
		||||
	opts := MQTT.NewClientOptions().AddBroker(p.Uri)
 | 
			
		||||
	opts.SetUsername(p.Username)
 | 
			
		||||
	opts.SetPassword(p.Password)
 | 
			
		||||
	opts.SetClientID(p.ClientId)
 | 
			
		||||
	opts.SetAutoReconnect(true)
 | 
			
		||||
	opts.SetDefaultPublishHandler(
 | 
			
		||||
		//define a function for the default message handler
 | 
			
		||||
		func(client MQTT.Client, msg MQTT.Message) {
 | 
			
		||||
			fmt.Printf("TOPIC: %s\n", msg.Topic())
 | 
			
		||||
			fmt.Printf("MSG: %s\n", msg.Payload())
 | 
			
		||||
		})
 | 
			
		||||
 | 
			
		||||
	//create and start a client using the above ClientOptions
 | 
			
		||||
	p.client = MQTT.NewClient(opts)
 | 
			
		||||
	if token := p.client.Connect(); token.Wait() && token.Error() != nil {
 | 
			
		||||
		panic(token.Error())
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type MqttValue []byte
 | 
			
		||||
 | 
			
		||||
func NewMqttValue(v interface{}) MqttValue {
 | 
			
		||||
	switch val := v.(type) {
 | 
			
		||||
	case string:
 | 
			
		||||
		return MqttValue(val)
 | 
			
		||||
	case float32, float64:
 | 
			
		||||
		return MqttValue(fmt.Sprintf("%0.2f", val))
 | 
			
		||||
	case int, int8, int16, int32, int64:
 | 
			
		||||
		return MqttValue(fmt.Sprintf("%d", val))
 | 
			
		||||
	case mode.DriveMode:
 | 
			
		||||
		return MqttValue(mode.ToString(val))
 | 
			
		||||
	case bool:
 | 
			
		||||
		if val {
 | 
			
		||||
			return []byte("ON")
 | 
			
		||||
		} else {
 | 
			
		||||
			return []byte("OFF")
 | 
			
		||||
		}
 | 
			
		||||
	case []byte:
 | 
			
		||||
		return val
 | 
			
		||||
	case MqttValue:
 | 
			
		||||
		return val
 | 
			
		||||
	default:
 | 
			
		||||
		log.Printf("invalid mqtt value: %v", val)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *MqttValue) IntValue() (int, error) {
 | 
			
		||||
	return strconv.Atoi(string(*m))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *MqttValue) Float32Value() (float32, error) {
 | 
			
		||||
	val := string(*m)
 | 
			
		||||
	r, err := strconv.ParseFloat(val, 32)
 | 
			
		||||
	return float32(r), err
 | 
			
		||||
}
 | 
			
		||||
func (m *MqttValue) Float64Value() (float64, error) {
 | 
			
		||||
	val := string(*m)
 | 
			
		||||
	return strconv.ParseFloat(val, 64)
 | 
			
		||||
}
 | 
			
		||||
func (m *MqttValue) StringValue() (string, error) {
 | 
			
		||||
	return string(*m), nil
 | 
			
		||||
}
 | 
			
		||||
func (m *MqttValue) ByteSliceValue() ([]byte, error) {
 | 
			
		||||
	return *m, nil
 | 
			
		||||
}
 | 
			
		||||
func (m *MqttValue) BoolValue() (bool, error) {
 | 
			
		||||
	val := string(*m)
 | 
			
		||||
	switch val {
 | 
			
		||||
	case "ON":
 | 
			
		||||
		return true, nil
 | 
			
		||||
	case "OFF":
 | 
			
		||||
		return false, nil
 | 
			
		||||
	default:
 | 
			
		||||
		return false, fmt.Errorf("value %v can't be converted to bool", val)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										189
									
								
								mqttdevice/mqttdevice_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										189
									
								
								mqttdevice/mqttdevice_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,189 @@
 | 
			
		||||
package mqttdevice
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"github.com/cyrilix/robocar-base/testtools"
 | 
			
		||||
	mqtt "github.com/eclipse/paho.mqtt.golang"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestIntegration(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	ctx, mqttC, mqttUri := testtools.MqttContainer(t)
 | 
			
		||||
	defer mqttC.Terminate(ctx)
 | 
			
		||||
 | 
			
		||||
	t.Run("ConnectAndClose", func(t *testing.T) {
 | 
			
		||||
		t.Logf("Mqtt connection %s ready", mqttUri)
 | 
			
		||||
 | 
			
		||||
		p := pahoMqttPubSub{Uri: mqttUri, ClientId: "TestMqtt", Username: "guest", Password: "guest"}
 | 
			
		||||
		p.Connect()
 | 
			
		||||
		p.Close()
 | 
			
		||||
	})
 | 
			
		||||
	t.Run("Publish", func(t *testing.T) {
 | 
			
		||||
		options := mqtt.NewClientOptions().AddBroker(mqttUri)
 | 
			
		||||
		options.SetUsername("guest")
 | 
			
		||||
		options.SetPassword("guest")
 | 
			
		||||
 | 
			
		||||
		client := mqtt.NewClient(options)
 | 
			
		||||
		token := client.Connect()
 | 
			
		||||
		defer client.Disconnect(100)
 | 
			
		||||
		token.Wait()
 | 
			
		||||
		if token.Error() != nil {
 | 
			
		||||
			t.Fatalf("unable to connect to mqtt broker: %v\n", token.Error())
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		c := make(chan string)
 | 
			
		||||
		defer close(c)
 | 
			
		||||
		client.Subscribe("test/publish", 0, func(client mqtt.Client, message mqtt.Message) {
 | 
			
		||||
			c <- string(message.Payload())
 | 
			
		||||
		}).Wait()
 | 
			
		||||
 | 
			
		||||
		p := pahoMqttPubSub{Uri: mqttUri, ClientId: "TestMqtt", Username: "guest", Password: "guest"}
 | 
			
		||||
		p.Connect()
 | 
			
		||||
		defer p.Close()
 | 
			
		||||
 | 
			
		||||
		p.Publish("test/publish", []byte("Test1234"))
 | 
			
		||||
		result := <-c
 | 
			
		||||
		if result != "Test1234" {
 | 
			
		||||
			t.Fatalf("bad message: %v\n", result)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestNewMqttValue(t *testing.T) {
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		value    interface{}
 | 
			
		||||
		expected MqttValue
 | 
			
		||||
	}{
 | 
			
		||||
		{"text", []byte("text")},
 | 
			
		||||
		{float32(2.0123), []byte("2.01")},
 | 
			
		||||
		{3.12345, []byte("3.12")},
 | 
			
		||||
		{12, []byte("12")},
 | 
			
		||||
		{true, []byte("ON")},
 | 
			
		||||
		{false, []byte("OFF")},
 | 
			
		||||
		{MqttValue("13"), []byte("13")},
 | 
			
		||||
		{[]byte("test bytes"), []byte("test bytes")},
 | 
			
		||||
 | 
			
		||||
		{struct {
 | 
			
		||||
			content string
 | 
			
		||||
		}{"invalid"}, nil},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, c := range cases {
 | 
			
		||||
		val := NewMqttValue(c.value)
 | 
			
		||||
		if string(val) != string(c.expected) {
 | 
			
		||||
			t.Errorf("NewMqttValue(%v): %v, wants %v", c.value, val, c.expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMqttValue_BoolValue(t *testing.T) {
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		value    MqttValue
 | 
			
		||||
		expected bool
 | 
			
		||||
	}{
 | 
			
		||||
		{NewMqttValue("ON"), true},
 | 
			
		||||
		{NewMqttValue("OFF"), false},
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range cases {
 | 
			
		||||
		val, err := c.value.BoolValue()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("unexpected conversion error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if c.expected != val {
 | 
			
		||||
			t.Errorf("MqttValue.BoolValue(): %v, wants %v", val, c.expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMqttValue_ByteSliceValue(t *testing.T) {
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		value    MqttValue
 | 
			
		||||
		expected []byte
 | 
			
		||||
	}{
 | 
			
		||||
		{NewMqttValue([]byte("content")), []byte("content")},
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range cases{
 | 
			
		||||
		val, err := c.value.ByteSliceValue()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("unexpected conversion error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if string(c.expected) != string(val){
 | 
			
		||||
			t.Errorf("MqttValue.BoolValue(): %v, wants %v", val, c.expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMqttValue_Float32Value(t *testing.T) {
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		value    MqttValue
 | 
			
		||||
		expected float32
 | 
			
		||||
	}{
 | 
			
		||||
		{NewMqttValue("32.0123"), float32(32.0123)},
 | 
			
		||||
		{NewMqttValue("33"), float32(33.)},
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range cases{
 | 
			
		||||
		val, err := c.value.Float32Value()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("unexpected conversion error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if c.expected != val{
 | 
			
		||||
			t.Errorf("MqttValue.BoolValue(): %v, wants %v", val, c.expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestMqttValue_Float64Value(t *testing.T) {
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		value    MqttValue
 | 
			
		||||
		expected float64
 | 
			
		||||
	}{
 | 
			
		||||
		{NewMqttValue("32.0123"), 32.0123},
 | 
			
		||||
		{NewMqttValue("33"), 33.},
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range cases{
 | 
			
		||||
		val, err := c.value.Float64Value()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("unexpected conversion error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if c.expected != val{
 | 
			
		||||
			t.Errorf("MqttValue.BoolValue(): %v, wants %v", val, c.expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
func TestMqttValue_IntValue(t *testing.T) {
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		value    MqttValue
 | 
			
		||||
		expected int
 | 
			
		||||
	}{
 | 
			
		||||
		{NewMqttValue("1"), 1},
 | 
			
		||||
		{NewMqttValue("-10"), -10},
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range cases{
 | 
			
		||||
		val, err := c.value.IntValue()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("unexpected conversion error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if c.expected != val{
 | 
			
		||||
			t.Errorf("MqttValue.BoolValue(): %v, wants %v", val, c.expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
func TestMqttValue_StringValue(t *testing.T) {
 | 
			
		||||
	cases := []struct {
 | 
			
		||||
		value    MqttValue
 | 
			
		||||
		expected string
 | 
			
		||||
	}{
 | 
			
		||||
		{NewMqttValue("ON"), "ON"},
 | 
			
		||||
		{NewMqttValue("OFF"), "OFF"},
 | 
			
		||||
	}
 | 
			
		||||
	for _, c := range cases{
 | 
			
		||||
		val, err := c.value.StringValue()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Errorf("unexpected conversion error: %v", err)
 | 
			
		||||
		}
 | 
			
		||||
		if c.expected != val{
 | 
			
		||||
			t.Errorf("MqttValue.BoolValue(): %v, wants %v", val, c.expected)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user