From 2641749e151990e08bf859334c75d9f5e59857cc Mon Sep 17 00:00:00 2001 From: Cyrille Nofficial Date: Tue, 31 Dec 2019 14:04:55 +0100 Subject: [PATCH] [mqttdevice] Refactor PubSub implementation Allow a cli program to subscribe and publish with same client --- cli/cli.go | 23 +---------- mqttdevice/mqttdevice.go | 73 ++++++++++++++--------------------- mqttdevice/mqttdevice_test.go | 21 ++++++---- 3 files changed, 44 insertions(+), 73 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index 7cfaa8f..f0c6a7c 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -2,7 +2,7 @@ package cli import ( "flag" - "fmt" + "github.com/cyrilix/robocar-base/mqttdevice" "github.com/cyrilix/robocar-base/service" MQTT "github.com/eclipse/paho.mqtt.golang" "log" @@ -92,24 +92,5 @@ func InitFloat64Flag(key string, defValue float64) float64 { } func Connect(uri, username, password, clientId string) (MQTT.Client, error) { - //create a ClientOptions struct setting the broker address, clientid, turn - //off trace output and set the default message handler - opts := MQTT.NewClientOptions().AddBroker(uri) - opts.SetUsername(username) - opts.SetPassword(password) - opts.SetClientID(clientId) - opts.SetAutoReconnect(true) - opts.SetDefaultPublishHandler( - //define a function for the default message handler - func(client MQTT.Client, msg MQTT.Message) { - fmt.Printf("TOPIC: %s\n", msg.Topic()) - fmt.Printf("MSG: %s\n", msg.Payload()) - }) - - //create and start a client using the above ClientOptions - client := MQTT.NewClient(opts) - if token := client.Connect(); token.Wait() && token.Error() != nil { - return nil, fmt.Errorf("unable to connect to mqtt bus: %v", token.Error()) - } - return client, nil + return mqttdevice.Connect(uri, username, password, clientId) } diff --git a/mqttdevice/mqttdevice.go b/mqttdevice/mqttdevice.go index 5a5af8a..dba5368 100644 --- a/mqttdevice/mqttdevice.go +++ b/mqttdevice/mqttdevice.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/cyrilix/robocar-base/types" MQTT "github.com/eclipse/paho.mqtt.golang" - "io" "log" "strconv" ) @@ -21,57 +20,26 @@ type Subscriber interface { type MQTTPubSub interface { Publisher Subscriber - io.Closer } type pahoMqttPubSub struct { - Uri string - Username string - Password string - ClientId string - Qos int - Retain bool client MQTT.Client + qos int + retain bool } -func NewPahoMqttPubSub(uri string, username string, password string, clientId string, qos int, retain bool) MQTTPubSub { - p := pahoMqttPubSub{Uri: uri, Username: username, Password: password, ClientId: clientId, Qos: qos, Retain: retain} - p.Connect() +func NewPahoMqttPubSub(client MQTT.Client, qos int, retain bool) MQTTPubSub { + p := pahoMqttPubSub{client: client, qos: qos, retain: retain} return &p } -// Publish message to broker -func (p *pahoMqttPubSub) Publish(topic string, payload MqttValue) { - tokenResp := p.client.Publish(topic, byte(p.Qos), p.Retain, string(payload)) - if tokenResp.Error() != nil { - log.Fatalf("%+v\n", tokenResp.Error()) - } -} - -// Register func to execute on message -func (p *pahoMqttPubSub) Subscribe(topic string, callback MQTT.MessageHandler) { - tokenResp := p.client.Subscribe(topic, byte(p.Qos), callback) - if tokenResp.Error() != nil { - log.Fatalf("%+v\n", tokenResp.Error()) - } -} - -// Close connection to broker -func (p *pahoMqttPubSub) Close() error { - p.client.Disconnect(500) - return nil -} - -func (p *pahoMqttPubSub) Connect() { - if p.client != nil && p.client.IsConnected() { - return - } +func Connect(uri, username, password, clientId string) (MQTT.Client, error) { //create a ClientOptions struct setting the broker address, clientid, turn //off trace output and set the default message handler - opts := MQTT.NewClientOptions().AddBroker(p.Uri) - opts.SetUsername(p.Username) - opts.SetPassword(p.Password) - opts.SetClientID(p.ClientId) + opts := MQTT.NewClientOptions().AddBroker(uri) + opts.SetUsername(username) + opts.SetPassword(password) + opts.SetClientID(clientId) opts.SetAutoReconnect(true) opts.SetDefaultPublishHandler( //define a function for the default message handler @@ -81,9 +49,26 @@ func (p *pahoMqttPubSub) Connect() { }) //create and start a client using the above ClientOptions - p.client = MQTT.NewClient(opts) - if token := p.client.Connect(); token.Wait() && token.Error() != nil { - panic(token.Error()) + client := MQTT.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + return nil, fmt.Errorf("unable to connect to mqtt bus: %v", token.Error()) + } + return client, nil +} + +// Publish message to broker +func (p *pahoMqttPubSub) Publish(topic string, payload MqttValue) { + tokenResp := p.client.Publish(topic, byte(p.qos), p.retain, string(payload)) + if tokenResp.Error() != nil { + log.Fatalf("%+v\n", tokenResp.Error()) + } +} + +// Register func to execute on message +func (p *pahoMqttPubSub) Subscribe(topic string, callback MQTT.MessageHandler) { + tokenResp := p.client.Subscribe(topic, byte(p.qos), callback) + if tokenResp.Error() != nil { + log.Fatalf("%+v\n", tokenResp.Error()) } } diff --git a/mqttdevice/mqttdevice_test.go b/mqttdevice/mqttdevice_test.go index 195208c..5d26709 100644 --- a/mqttdevice/mqttdevice_test.go +++ b/mqttdevice/mqttdevice_test.go @@ -1,23 +1,29 @@ package mqttdevice import ( - "github.com/cyrilix/robocar-base/types" "github.com/cyrilix/robocar-base/testtools/docker" + "github.com/cyrilix/robocar-base/types" mqtt "github.com/eclipse/paho.mqtt.golang" + log "github.com/sirupsen/logrus" "testing" ) func TestIntegration(t *testing.T) { ctx, mqttC, mqttUri := docker.MqttContainer(t) - defer mqttC.Terminate(ctx) + defer func(){ + err := mqttC.Terminate(ctx) + log.Errorf("unable to terminate container: %v", err) + }() t.Run("ConnectAndClose", func(t *testing.T) { t.Logf("Mqtt connection %s ready", mqttUri) - p := pahoMqttPubSub{Uri: mqttUri, ClientId: "TestMqtt", Username: "guest", Password: "guest"} - p.Connect() - p.Close() + client, err := Connect(mqttUri, "TestMqtt", "guest", "guest") + if err != nil { + t.Errorf("unable to init mqtt connection: %v", err) + } + defer client.Disconnect(10) }) t.Run("Publish", func(t *testing.T) { options := mqtt.NewClientOptions().AddBroker(mqttUri) @@ -31,6 +37,7 @@ func TestIntegration(t *testing.T) { if token.Error() != nil { t.Fatalf("unable to connect to mqtt broker: %v\n", token.Error()) } + defer client.Disconnect(10) c := make(chan string) defer close(c) @@ -38,9 +45,7 @@ func TestIntegration(t *testing.T) { c <- string(message.Payload()) }).Wait() - p := pahoMqttPubSub{Uri: mqttUri, ClientId: "TestMqtt", Username: "guest", Password: "guest"} - p.Connect() - defer p.Close() + p := pahoMqttPubSub{client: client} p.Publish("test/publish", []byte("Test1234")) result := <-c