robocar-steering/vendor/github.com/eclipse/paho.mqtt.golang/client.go

1187 lines
42 KiB
Go

/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
* Matt Brittan
*/
// Portions copyright © 2018 TIBCO Software Inc.
// Package mqtt provides an MQTT v3.1.1 client library.
package mqtt
import (
"bytes"
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/sync/semaphore"
"github.com/eclipse/paho.mqtt.golang/packets"
)
const (
disconnected uint32 = iota
connecting
reconnecting
connected
)
// Client is the interface definition for a Client as used by this
// library, the interface is primarily to allow mocking tests.
//
// It is an MQTT v3.1.1 client for communicating
// with an MQTT server using non-blocking methods that allow work
// to be done in the background.
// An application may connect to an MQTT server using:
// A plain TCP socket
// A secure SSL/TLS socket
// A websocket
// To enable ensured message delivery at Quality of Service (QoS) levels
// described in the MQTT spec, a message persistence mechanism must be
// used. This is done by providing a type which implements the Store
// interface. For convenience, FileStore and MemoryStore are provided
// implementations that should be sufficient for most use cases. More
// information can be found in their respective documentation.
// Numerous connection options may be specified by configuring a
// and then supplying a ClientOptions type.
// Implementations of Client must be safe for concurrent use by multiple
// goroutines
type Client interface {
// IsConnected returns a bool signifying whether
// the client is connected or not.
IsConnected() bool
// IsConnectionOpen return a bool signifying whether the client has an active
// connection to mqtt broker, i.e not in disconnected or reconnect mode
IsConnectionOpen() bool
// Connect will create a connection to the message broker, by default
// it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
// fails
Connect() Token
// Disconnect will end the connection with the server, but not before waiting
// the specified number of milliseconds to wait for existing work to be
// completed.
Disconnect(quiesce uint)
// Publish will publish a message with the specified QoS and content
// to the specified topic.
// Returns a token to track delivery of the message to the broker
Publish(topic string, qos byte, retained bool, payload interface{}) Token
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
// a message is published on the topic provided, or nil for the default handler.
//
// If options.OrderMatters is true (the default) then callback must not block or
// call functions within this package that may block (e.g. Publish) other than in
// a new go routine.
// callback must be safe for concurrent use by multiple goroutines.
Subscribe(topic string, qos byte, callback MessageHandler) Token
// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
// be executed when a message is published on one of the topics provided, or nil for the
// default handler.
//
// If options.OrderMatters is true (the default) then callback must not block or
// call functions within this package that may block (e.g. Publish) other than in
// a new go routine.
// callback must be safe for concurrent use by multiple goroutines.
SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
// Unsubscribe will end the subscription from each of the topics provided.
// Messages published to those topics from other clients will no longer be
// received.
Unsubscribe(topics ...string) Token
// AddRoute allows you to add a handler for messages on a specific topic
// without making a subscription. For example having a different handler
// for parts of a wildcard subscription or for receiving retained messages
// upon connection (before Sub scribe can be processed).
//
// If options.OrderMatters is true (the default) then callback must not block or
// call functions within this package that may block (e.g. Publish) other than in
// a new go routine.
// callback must be safe for concurrent use by multiple goroutines.
AddRoute(topic string, callback MessageHandler)
// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
// in use by the client.
OptionsReader() ClientOptionsReader
}
// client implements the Client interface
// clients are safe for concurrent use by multiple
// goroutines
type client struct {
lastSent atomic.Value // time.Time - the last time a packet was successfully sent to network
lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
pingOutstanding int32 // set to 1 if a ping has been sent but response not ret received
status uint32 // see const definitions at top of file for possible values
sync.RWMutex // Protects the above two variables (note: atomic writes are also used somewhat inconsistently)
messageIds // effectively a map from message id to token completor
obound chan *PacketAndToken // outgoing publish packet
oboundP chan *PacketAndToken // outgoing 'priority' packet (anything other than publish)
msgRouter *router // routes topics to handlers
persist Store
options ClientOptions
optionsMu sync.Mutex // Protects the options in a few limited cases where needed for testing
conn net.Conn // the network connection, must only be set with connMu locked (only used when starting/stopping workers)
connMu sync.Mutex // mutex for the connection (again only used in two functions)
stop chan struct{} // Closed to request that workers stop
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
}
// NewClient will create an MQTT v3.1.1 client with all of the options specified
// in the provided ClientOptions. The client must have the Connect method called
// on it before it may be used. This is to make sure resources (such as a net
// connection) are created before the application is actually ready.
func NewClient(o *ClientOptions) Client {
c := &client{}
c.options = *o
if c.options.Store == nil {
c.options.Store = NewMemoryStore()
}
switch c.options.ProtocolVersion {
case 3, 4:
c.options.protocolVersionExplicit = true
case 0x83, 0x84:
c.options.protocolVersionExplicit = true
default:
c.options.ProtocolVersion = 4
c.options.protocolVersionExplicit = false
}
c.persist = c.options.Store
c.status = disconnected
c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
c.msgRouter = newRouter()
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
c.obound = make(chan *PacketAndToken)
c.oboundP = make(chan *PacketAndToken)
return c
}
// AddRoute allows you to add a handler for messages on a specific topic
// without making a subscription. For example having a different handler
// for parts of a wildcard subscription
//
// If options.OrderMatters is true (the default) then callback must not block or
// call functions within this package that may block (e.g. Publish) other than in
// a new go routine.
// callback must be safe for concurrent use by multiple goroutines.
func (c *client) AddRoute(topic string, callback MessageHandler) {
if callback != nil {
c.msgRouter.addRoute(topic, callback)
}
}
// IsConnected returns a bool signifying whether
// the client is connected or not.
// connected means that the connection is up now OR it will
// be established/reestablished automatically when possible
func (c *client) IsConnected() bool {
c.RLock()
defer c.RUnlock()
status := atomic.LoadUint32(&c.status)
switch {
case status == connected:
return true
case c.options.AutoReconnect && status > connecting:
return true
case c.options.ConnectRetry && status == connecting:
return true
default:
return false
}
}
// IsConnectionOpen return a bool signifying whether the client has an active
// connection to mqtt broker, i.e not in disconnected or reconnect mode
func (c *client) IsConnectionOpen() bool {
c.RLock()
defer c.RUnlock()
status := atomic.LoadUint32(&c.status)
switch {
case status == connected:
return true
default:
return false
}
}
func (c *client) connectionStatus() uint32 {
c.RLock()
defer c.RUnlock()
status := atomic.LoadUint32(&c.status)
return status
}
func (c *client) setConnected(status uint32) {
c.Lock()
defer c.Unlock()
atomic.StoreUint32(&c.status, status)
}
// ErrNotConnected is the error returned from function calls that are
// made when the client is not connected to a broker
var ErrNotConnected = errors.New("not Connected")
// Connect will create a connection to the message broker, by default
// it will attempt to connect at v3.1.1 and auto retry at v3.1 if that
// fails
// Note: If using QOS1+ and CleanSession=false it is advisable to add
// routes (or a DefaultPublishHandler) prior to calling Connect()
// because queued messages may be delivered immediately post connection
func (c *client) Connect() Token {
t := newToken(packets.Connect).(*ConnectToken)
DEBUG.Println(CLI, "Connect()")
if c.options.ConnectRetry && atomic.LoadUint32(&c.status) != disconnected {
// if in any state other than disconnected and ConnectRetry is
// enabled then the connection will come up automatically
// client can assume connection is up
WARN.Println(CLI, "Connect() called but not disconnected")
t.returnCode = packets.Accepted
t.flowComplete()
return t
}
c.persist.Open()
if c.options.ConnectRetry {
c.reserveStoredPublishIDs() // Reserve IDs to allow publish before connect complete
}
c.setConnected(connecting)
go func() {
if len(c.options.Servers) == 0 {
t.setError(fmt.Errorf("no servers defined to connect to"))
return
}
RETRYCONN:
var conn net.Conn
var rc byte
var err error
conn, rc, t.sessionPresent, err = c.attemptConnection()
if err != nil {
if c.options.ConnectRetry {
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
time.Sleep(c.options.ConnectRetryInterval)
if atomic.LoadUint32(&c.status) == connecting {
goto RETRYCONN
}
}
ERROR.Println(CLI, "Failed to connect to a broker")
c.setConnected(disconnected)
c.persist.Close()
t.returnCode = rc
t.setError(err)
return
}
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, inboundFromStore) {
// Take care of any messages in the store
if !c.options.CleanSession {
c.resume(c.options.ResumeSubs, inboundFromStore)
} else {
c.persist.Reset()
}
} else {
WARN.Println(CLI, "Connect() called but connection established in another goroutine")
}
close(inboundFromStore)
t.flowComplete()
DEBUG.Println(CLI, "exit startClient")
}()
return t
}
// internal function used to reconnect the client when it loses its connection
func (c *client) reconnect() {
DEBUG.Println(CLI, "enter reconnect")
var (
sleep = 1 * time.Second
conn net.Conn
)
for {
if nil != c.options.OnReconnecting {
c.options.OnReconnecting(c, &c.options)
}
var err error
conn, _, _, err = c.attemptConnection()
if err == nil {
break
}
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
time.Sleep(sleep)
if sleep < c.options.MaxReconnectInterval {
sleep *= 2
}
if sleep > c.options.MaxReconnectInterval {
sleep = c.options.MaxReconnectInterval
}
// Disconnect may have been called
if atomic.LoadUint32(&c.status) == disconnected {
break
}
}
// Disconnect() must have been called while we were trying to reconnect.
if c.connectionStatus() == disconnected {
if conn != nil {
conn.Close()
}
DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
return
}
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, inboundFromStore) {
c.resume(c.options.ResumeSubs, inboundFromStore)
}
close(inboundFromStore)
}
// attemptConnection makes a single attempt to connect to each of the brokers
// the protocol version to use is passed in (as c.options.ProtocolVersion)
// Note: Does not set c.conn in order to minimise race conditions
// Returns:
// net.Conn - Connected network connection
// byte - Return code (packets.Accepted indicates a successful connection).
// bool - SessionPresent flag from the connect ack (only valid if packets.Accepted)
// err - Error (err != nil guarantees that conn has been set to active connection).
func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
protocolVersion := c.options.ProtocolVersion
var (
sessionPresent bool
conn net.Conn
err error
rc byte
)
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
brokers := c.options.Servers
c.optionsMu.Unlock()
for _, broker := range brokers {
cm := newConnectMsgFromOptions(&c.options, broker)
DEBUG.Println(CLI, "about to write new connect msg")
CONN:
tlsCfg := c.options.TLSConfig
if c.options.OnConnectAttempt != nil {
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
}
dialer := c.options.Dialer
if dialer == nil { //
WARN.Println(CLI, "dialer was nil, using default")
dialer = &net.Dialer{Timeout: 30 * time.Second}
}
// Start by opening the network connection (tcp, tls, ws) etc
if c.options.CustomOpenConnectionFn != nil {
conn, err = c.options.CustomOpenConnectionFn(broker, c.options)
} else {
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, dialer)
}
if err != nil {
ERROR.Println(CLI, err.Error())
WARN.Println(CLI, "failed to connect to broker, trying next")
rc = packets.ErrNetworkError
continue
}
DEBUG.Println(CLI, "socket connected to broker")
// Now we send the perform the MQTT connection handshake
rc, sessionPresent, err = connectMQTT(conn, cm, protocolVersion)
if rc == packets.Accepted {
break // successfully connected
}
// We may be have to attempt the connection with MQTT 3.1
if conn != nil {
_ = conn.Close()
}
if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
protocolVersion = 3
goto CONN
}
if c.options.protocolVersionExplicit { // to maintain logging from previous version
ERROR.Println(CLI, "Connecting to", broker, "CONNACK was not CONN_ACCEPTED, but rather", packets.ConnackReturnCodes[rc])
}
}
// If the connection was successful we set member variable and lock in the protocol version for future connection attempts (and users)
if rc == packets.Accepted {
c.options.ProtocolVersion = protocolVersion
c.options.protocolVersionExplicit = true
} else {
// Maintain same error format as used previously
if rc != packets.ErrNetworkError { // mqtt error
err = packets.ConnErrors[rc]
} else { // network error (if this occurred in ConnectMQTT then err will be nil)
err = fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)
}
}
return conn, rc, sessionPresent, err
}
// Disconnect will end the connection with the server, but not before waiting
// the specified number of milliseconds to wait for existing work to be
// completed.
// WARNING: `Disconnect` may return before all activities (goroutines) have completed. This means that
// reusing the `client` may lead to panics. If you want to reconnect when the connection drops then use
// `SetAutoReconnect` and/or `SetConnectRetry`options instead of implementing this yourself.
func (c *client) Disconnect(quiesce uint) {
defer c.disconnect()
status := atomic.LoadUint32(&c.status)
c.setConnected(disconnected)
if status != connected {
WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
return
}
DEBUG.Println(CLI, "disconnecting")
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
dt := newToken(packets.Disconnect)
select {
case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
// wait for work to finish, or quiesce time consumed
DEBUG.Println(CLI, "calling WaitTimeout")
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
DEBUG.Println(CLI, "WaitTimeout done")
// Let's comment this chunk of code until we are able to safely read this variable
// without data races.
// case <-c.commsStopped:
// WARN.Println("Disconnect packet could not be sent because comms stopped")
case <-time.After(time.Duration(quiesce) * time.Millisecond):
WARN.Println("Disconnect packet not sent due to timeout")
}
}
// forceDisconnect will end the connection with the mqtt broker immediately (used for tests only)
func (c *client) forceDisconnect() {
if !c.IsConnected() {
WARN.Println(CLI, "already disconnected")
return
}
c.setConnected(disconnected)
DEBUG.Println(CLI, "forcefully disconnecting")
c.disconnect()
}
// disconnect cleans up after a final disconnection (user requested so no auto reconnection)
func (c *client) disconnect() {
done := c.stopCommsWorkers()
if done != nil {
<-done // Wait until the disconnect is complete (to limit chance that another connection will be started)
DEBUG.Println(CLI, "forcefully disconnecting")
c.messageIds.cleanUp()
DEBUG.Println(CLI, "disconnected")
c.persist.Close()
}
}
// internalConnLost cleanup when connection is lost or an error occurs
// Note: This function will not block
func (c *client) internalConnLost(err error) {
// It is possible that internalConnLost will be called multiple times simultaneously
// (including after sending a DisconnectPacket) as such we only do cleanup etc if the
// routines were actually running and are not being disconnected at users request
DEBUG.Println(CLI, "internalConnLost called")
stopDone := c.stopCommsWorkers()
if stopDone != nil { // stopDone will be nil if workers already in the process of stopping or stopped
go func() {
DEBUG.Println(CLI, "internalConnLost waiting on workers")
<-stopDone
DEBUG.Println(CLI, "internalConnLost workers stopped")
// It is possible that Disconnect was called which led to this error so reconnection depends upon status
reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting
if c.options.CleanSession && !reconnect {
c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens
} else if !c.options.ResumeSubs {
c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens
}
if reconnect {
c.setConnected(reconnecting)
go c.reconnect()
} else {
c.setConnected(disconnected)
}
if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, err)
}
DEBUG.Println(CLI, "internalConnLost complete")
}()
}
}
// startCommsWorkers is called when the connection is up.
// It starts off all of the routines needed to process incoming and outgoing messages.
// Returns true if the comms workers were started (i.e. they were not already running)
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
DEBUG.Println(CLI, "startCommsWorkers called")
c.connMu.Lock()
defer c.connMu.Unlock()
if c.conn != nil {
WARN.Println(CLI, "startCommsWorkers called when commsworkers already running")
conn.Close() // No use for the new network connection
return false
}
c.conn = conn // Store the connection
c.stop = make(chan struct{})
if c.options.KeepAlive != 0 {
atomic.StoreInt32(&c.pingOutstanding, 0)
c.lastReceived.Store(time.Now())
c.lastSent.Store(time.Now())
c.workers.Add(1)
go keepalive(c, conn)
}
// matchAndDispatch will process messages received from the network. It may generate acknowledgements
// It will complete when incomingPubChan is closed and will close ackOut prior to exiting
incomingPubChan := make(chan *packets.PublishPacket)
c.workers.Add(1) // Done will be called when ackOut is closed
ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
c.setConnected(connected)
DEBUG.Println(CLI, "client is connected/reconnected")
if c.options.OnConnect != nil {
go c.options.OnConnect(c)
}
// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
// messages may be published while the client is disconnected (they will block unless in a goroutine). However
// to keep the comms routines clean we want to shutdown the input messages it uses so create out own channels
// and copy data across.
commsobound := make(chan *PacketAndToken) // outgoing publish packets
commsoboundP := make(chan *PacketAndToken) // outgoing 'priority' packet
c.workers.Add(1)
go func() {
defer c.workers.Done()
for {
select {
case msg := <-c.oboundP:
commsoboundP <- msg
case msg := <-c.obound:
commsobound <- msg
case msg, ok := <-ackOut:
if !ok {
ackOut = nil // ignore channel going forward
c.workers.Done() // matchAndDispatch has completed
continue // await next message
}
commsoboundP <- msg
case <-c.stop:
// Attempt to transmit any outstanding acknowledgements (this may well fail but should work if this is a clean disconnect)
if ackOut != nil {
for msg := range ackOut {
commsoboundP <- msg
}
c.workers.Done() // matchAndDispatch has completed
}
close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit
close(commsobound)
DEBUG.Println(CLI, "startCommsWorkers output redirector finished")
return
}
}
}()
commsIncomingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound)
c.commsStopped = make(chan struct{})
go func() {
for {
if commsIncomingPub == nil && commsErrors == nil {
break
}
select {
case pub, ok := <-commsIncomingPub:
if !ok {
// Incoming comms has shutdown
close(incomingPubChan) // stop the router
commsIncomingPub = nil
continue
}
// Care is needed here because an error elsewhere could trigger a deadlock
sendPubLoop:
for {
select {
case incomingPubChan <- pub:
break sendPubLoop
case err, ok := <-commsErrors:
if !ok { // commsErrors has been closed so we can ignore it
commsErrors = nil
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
case err, ok := <-commsErrors:
if !ok {
commsErrors = nil
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered", err)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
DEBUG.Println(CLI, "incoming comms goroutine done")
close(c.commsStopped)
}()
DEBUG.Println(CLI, "startCommsWorkers done")
return true
}
// stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped
// Returns nil it workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
// Note: This may block so run as a go routine if calling from any of the comms routines
func (c *client) stopCommsWorkers() chan struct{} {
DEBUG.Println(CLI, "stopCommsWorkers called")
// It is possible that this function will be called multiple times simultaneously due to the way things get shutdown
c.connMu.Lock()
if c.conn == nil {
DEBUG.Println(CLI, "stopCommsWorkers done (not running)")
c.connMu.Unlock()
return nil
}
// It is important that everything is stopped in the correct order to avoid deadlocks. The main issue here is
// the router because it both receives incoming publish messages and also sends outgoing acknowledgements. To
// avoid issues we signal the workers to stop and close the connection (it is probably already closed but
// there is no harm in being sure). We can then wait for the workers to finnish before closing outbound comms
// channels which will allow the comms routines to exit.
// We stop all non-comms related workers first (ping, keepalive, errwatch, resume etc) so they don't get blocked waiting on comms
close(c.stop) // Signal for workers to stop
c.conn.Close() // Possible that this is already closed but no harm in closing again
c.conn = nil // Important that this is the only place that this is set to nil
c.connMu.Unlock() // As the connection is now nil we can unlock the mu (allowing subsequent calls to exit immediately)
doneChan := make(chan struct{})
go func() {
DEBUG.Println(CLI, "stopCommsWorkers waiting for workers")
c.workers.Wait()
// Stopping the workers will allow the comms routines to exit; we wait for these to complete
DEBUG.Println(CLI, "stopCommsWorkers waiting for comms")
<-c.commsStopped // wait for comms routine to stop
DEBUG.Println(CLI, "stopCommsWorkers done")
close(doneChan)
}()
return doneChan
}
// Publish will publish a message with the specified QoS and content
// to the specified topic.
// Returns a token to track delivery of the message to the broker
func (c *client) Publish(topic string, qos byte, retained bool, payload interface{}) Token {
token := newToken(packets.Publish).(*PublishToken)
DEBUG.Println(CLI, "enter Publish")
switch {
case !c.IsConnected():
token.setError(ErrNotConnected)
return token
case c.connectionStatus() == reconnecting && qos == 0:
token.flowComplete()
return token
}
pub := packets.NewControlPacket(packets.Publish).(*packets.PublishPacket)
pub.Qos = qos
pub.TopicName = topic
pub.Retain = retained
switch p := payload.(type) {
case string:
pub.Payload = []byte(p)
case []byte:
pub.Payload = p
case bytes.Buffer:
pub.Payload = p.Bytes()
default:
token.setError(fmt.Errorf("unknown payload type"))
return token
}
if pub.Qos != 0 && pub.MessageID == 0 {
mID := c.getID(token)
if mID == 0 {
token.setError(fmt.Errorf("no message IDs available"))
return token
}
pub.MessageID = mID
token.messageID = mID
}
persistOutbound(c.persist, pub)
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
default:
DEBUG.Println(CLI, "sending publish message, topic:", topic)
publishWaitTimeout := c.options.WriteTimeout
if publishWaitTimeout == 0 {
publishWaitTimeout = time.Second * 30
}
select {
case c.obound <- &PacketAndToken{p: pub, t: token}:
case <-time.After(publishWaitTimeout):
token.setError(errors.New("publish was broken by timeout"))
}
}
return token
}
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
// a message is published on the topic provided.
//
// If options.OrderMatters is true (the default) then callback must not block or
// call functions within this package that may block (e.g. Publish) other than in
// a new go routine.
// callback must be safe for concurrent use by multiple goroutines.
func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
token := newToken(packets.Subscribe).(*SubscribeToken)
DEBUG.Println(CLI, "enter Subscribe")
if !c.IsConnected() {
token.setError(ErrNotConnected)
return token
}
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this sub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this sub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
}
sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
if err := validateTopicAndQos(topic, qos); err != nil {
token.setError(err)
return token
}
sub.Topics = append(sub.Topics, topic)
sub.Qoss = append(sub.Qoss, qos)
if strings.HasPrefix(topic, "$share/") {
topic = strings.Join(strings.Split(topic, "/")[2:], "/")
}
if strings.HasPrefix(topic, "$queue/") {
topic = strings.TrimPrefix(topic, "$queue/")
}
if callback != nil {
c.msgRouter.addRoute(topic, callback)
}
token.subs = append(token.subs, topic)
if sub.MessageID == 0 {
mID := c.getID(token)
if mID == 0 {
token.setError(fmt.Errorf("no message IDs available"))
return token
}
sub.MessageID = mID
token.messageID = mID
}
DEBUG.Println(CLI, sub.String())
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
default:
DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
subscribeWaitTimeout := c.options.WriteTimeout
if subscribeWaitTimeout == 0 {
subscribeWaitTimeout = time.Second * 30
}
select {
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
case <-time.After(subscribeWaitTimeout):
token.setError(errors.New("subscribe was broken by timeout"))
}
}
DEBUG.Println(CLI, "exit Subscribe")
return token
}
// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
// be executed when a message is published on one of the topics provided.
//
// If options.OrderMatters is true (the default) then callback must not block or
// call functions within this package that may block (e.g. Publish) other than in
// a new go routine.
// callback must be safe for concurrent use by multiple goroutines.
func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
var err error
token := newToken(packets.Subscribe).(*SubscribeToken)
DEBUG.Println(CLI, "enter SubscribeMultiple")
if !c.IsConnected() {
token.setError(ErrNotConnected)
return token
}
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this sub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this sub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
}
sub := packets.NewControlPacket(packets.Subscribe).(*packets.SubscribePacket)
if sub.Topics, sub.Qoss, err = validateSubscribeMap(filters); err != nil {
token.setError(err)
return token
}
if callback != nil {
for topic := range filters {
c.msgRouter.addRoute(topic, callback)
}
}
token.subs = make([]string, len(sub.Topics))
copy(token.subs, sub.Topics)
if sub.MessageID == 0 {
mID := c.getID(token)
if mID == 0 {
token.setError(fmt.Errorf("no message IDs available"))
return token
}
sub.MessageID = mID
token.messageID = mID
}
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
default:
DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
subscribeWaitTimeout := c.options.WriteTimeout
if subscribeWaitTimeout == 0 {
subscribeWaitTimeout = time.Second * 30
}
select {
case c.oboundP <- &PacketAndToken{p: sub, t: token}:
case <-time.After(subscribeWaitTimeout):
token.setError(errors.New("subscribe was broken by timeout"))
}
}
DEBUG.Println(CLI, "exit SubscribeMultiple")
return token
}
// reserveStoredPublishIDs reserves the ids for publish packets in the persistent store to ensure these are not duplicated
func (c *client) reserveStoredPublishIDs() {
// The resume function sets the stored id for publish packets only (some other packets
// will get new ids in net code). This means that the only keys we need to ensure are
// unique are the publish ones (and these will completed/replaced in resume() )
if !c.options.CleanSession {
storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
if packet == nil {
continue
}
switch packet.(type) {
case *packets.PublishPacket:
details := packet.Details()
token := &PlaceHolderToken{id: details.MessageID}
c.claimID(token, details.MessageID)
}
}
}
}
// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
// Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock)
// other than that it does not return until all messages in the store have been sent (connect() does not complete its
// token before this completes)
func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
DEBUG.Println(STR, "enter Resume")
// Prior to sending a message getSemaphore will be called and once sent releaseSemaphore will be called
// with the token (so semaphore can be released when ACK received if applicable).
// Using a weighted semaphore rather than channels because this retains ordering
getSemaphore := func() {} // Default = do nothing
releaseSemaphore := func(_ *PublishToken) {} // Default = do nothing
var sem *semaphore.Weighted
if c.options.MaxResumePubInFlight > 0 {
sem = semaphore.NewWeighted(int64(c.options.MaxResumePubInFlight))
ctx, cancel := context.WithCancel(context.Background()) // Context needed for semaphore
defer cancel() // ensure context gets cancelled
go func() {
select {
case <-c.stop: // Request to stop (due to comm error etc)
cancel()
case <-ctx.Done(): // resume completed normally
}
}()
getSemaphore = func() { sem.Acquire(ctx, 1) }
releaseSemaphore = func(token *PublishToken) { // Note: If token never completes then resume() may stall (will still exit on ctx.Done())
go func() {
select {
case <-token.Done():
case <-ctx.Done():
}
sem.Release(1)
}()
}
}
storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
if packet == nil {
DEBUG.Println(STR, fmt.Sprintf("resume found NIL packet (%s)", key))
continue
}
details := packet.Details()
if isKeyOutbound(key) {
switch p := packet.(type) {
case *packets.SubscribePacket:
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
subPacket := packet.(*packets.SubscribePacket)
token := newToken(packets.Subscribe).(*SubscribeToken)
token.messageID = details.MessageID
token.subs = append(token.subs, subPacket.Topics...)
c.claimID(token, details.MessageID)
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
} else {
c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
}
case *packets.UnsubscribePacket:
if subscription {
DEBUG.Println(STR, fmt.Sprintf("loaded pending unsubscribe (%d)", details.MessageID))
token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
select {
case c.oboundP <- &PacketAndToken{p: packet, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
} else {
c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
}
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending pubrel (%d)", details.MessageID))
select {
case c.oboundP <- &PacketAndToken{p: packet, t: nil}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
case *packets.PublishPacket:
// spec: If the DUP flag is set to 0, it indicates that this is the first occasion that the Client or
// Server has attempted to send this MQTT PUBLISH Packet. If the DUP flag is set to 1, it indicates that
// this might be re-delivery of an earlier attempt to send the Packet.
//
// If the message is in the store than an attempt at delivery has been made (note that the message may
// never have made it onto the wire but tracking that would be complicated!).
if p.Qos != 0 { // spec: The DUP flag MUST be set to 0 for all QoS 0 messages
p.Dup = true
}
token := newToken(packets.Publish).(*PublishToken)
token.messageID = details.MessageID
c.claimID(token, details.MessageID)
DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
DEBUG.Println(STR, details)
getSemaphore()
select {
case c.obound <- &PacketAndToken{p: p, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
releaseSemaphore(token) // If limiting simultaneous messages then we need to know when message is acknowledged
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
}
} else {
switch packet.(type) {
case *packets.PubrelPacket:
DEBUG.Println(STR, fmt.Sprintf("loaded pending incomming (%d)", details.MessageID))
select {
case ibound <- packet:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop (ibound <- packet)")
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
c.persist.Del(key)
}
}
}
DEBUG.Println(STR, "exit resume")
}
// Unsubscribe will end the subscription from each of the topics provided.
// Messages published to those topics from other clients will no longer be
// received.
func (c *client) Unsubscribe(topics ...string) Token {
token := newToken(packets.Unsubscribe).(*UnsubscribeToken)
DEBUG.Println(CLI, "enter Unsubscribe")
if !c.IsConnected() {
token.setError(ErrNotConnected)
return token
}
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this unsub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this unsub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
}
unsub := packets.NewControlPacket(packets.Unsubscribe).(*packets.UnsubscribePacket)
unsub.Topics = make([]string, len(topics))
copy(unsub.Topics, topics)
if unsub.MessageID == 0 {
mID := c.getID(token)
if mID == 0 {
token.setError(fmt.Errorf("no message IDs available"))
return token
}
unsub.MessageID = mID
token.messageID = mID
}
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, unsub)
}
switch c.connectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing unsubscribe message (connecting), topics:", topics)
case reconnecting:
DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
default:
DEBUG.Println(CLI, "sending unsubscribe message, topics:", topics)
subscribeWaitTimeout := c.options.WriteTimeout
if subscribeWaitTimeout == 0 {
subscribeWaitTimeout = time.Second * 30
}
select {
case c.oboundP <- &PacketAndToken{p: unsub, t: token}:
for _, topic := range topics {
c.msgRouter.deleteRoute(topic)
}
case <-time.After(subscribeWaitTimeout):
token.setError(errors.New("unsubscribe was broken by timeout"))
}
}
DEBUG.Println(CLI, "exit Unsubscribe")
return token
}
// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
// in use by the client.
func (c *client) OptionsReader() ClientOptionsReader {
r := ClientOptionsReader{options: &c.options}
return r
}
// DefaultConnectionLostHandler is a definition of a function that simply
// reports to the DEBUG log the reason for the client losing a connection.
func DefaultConnectionLostHandler(client Client, reason error) {
DEBUG.Println("Connection lost:", reason.Error())
}
// UpdateLastReceived - Will be called whenever a packet is received off the network
// This is used by the keepalive routine to
func (c *client) UpdateLastReceived() {
if c.options.KeepAlive != 0 {
c.lastReceived.Store(time.Now())
}
}
// UpdateLastReceived - Will be called whenever a packet is successfully transmitted to the network
func (c *client) UpdateLastSent() {
if c.options.KeepAlive != 0 {
c.lastSent.Store(time.Now())
}
}
// getWriteTimeOut returns the writetimeout (duration to wait when writing to the connection) or 0 if none
func (c *client) getWriteTimeOut() time.Duration {
return c.options.WriteTimeout
}
// persistOutbound adds the packet to the outbound store
func (c *client) persistOutbound(m packets.ControlPacket) {
persistOutbound(c.persist, m)
}
// persistInbound adds the packet to the inbound store
func (c *client) persistInbound(m packets.ControlPacket) {
persistInbound(c.persist, m)
}
// pingRespReceived will be called by the network routines when a ping response is received
func (c *client) pingRespReceived() {
atomic.StoreInt32(&c.pingOutstanding, 0)
}