build: upgrade to go 1.17 and dependencies
This commit is contained in:
		
							
								
								
									
										8
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/README.md
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										8
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/README.md
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -104,8 +104,12 @@ func main() {
 | 
			
		||||
 | 
			
		||||
* Seemingly random disconnections may be caused by another client connecting to the broker with the same client 
 | 
			
		||||
identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800405).
 | 
			
		||||
* A `MessageHandler` (called when a new message is received) must not block. If you wish to perform a long-running task,
 | 
			
		||||
or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected `pingresp 
 | 
			
		||||
* Unless ordered delivery of messages is essential (and you have configured your broker to support this e.g. 
 | 
			
		||||
  `max_inflight_messages=1` in mosquitto) then set `ClientOptions.SetOrderMatters(false)`. Doing so will avoid the 
 | 
			
		||||
  below issue (deadlocks due to blocking message handlers).
 | 
			
		||||
* A `MessageHandler` (called when a new message is received) must not block (unless 
 | 
			
		||||
  `ClientOptions.SetOrderMatters(false)` set). If you wish to perform a long-running task, or publish a message, then 
 | 
			
		||||
  please use a go routine (blocking in the handler is a common cause of unexpected `pingresp 
 | 
			
		||||
not received, disconnecting` errors). 
 | 
			
		||||
* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible that the broker will deliver retained 
 | 
			
		||||
messages before `Subscribe` can be called. To process these messages either configure a handler with `AddRoute` or
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										112
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/client.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										112
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/client.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -55,6 +55,8 @@ const (
 | 
			
		||||
// information can be found in their respective documentation.
 | 
			
		||||
// Numerous connection options may be specified by configuring a
 | 
			
		||||
// and then supplying a ClientOptions type.
 | 
			
		||||
// Implementations of Client must be safe for concurrent use by multiple
 | 
			
		||||
// goroutines
 | 
			
		||||
type Client interface {
 | 
			
		||||
	// IsConnected returns a bool signifying whether
 | 
			
		||||
	// the client is connected or not.
 | 
			
		||||
@@ -75,11 +77,21 @@ type Client interface {
 | 
			
		||||
	// Returns a token to track delivery of the message to the broker
 | 
			
		||||
	Publish(topic string, qos byte, retained bool, payload interface{}) Token
 | 
			
		||||
	// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
 | 
			
		||||
	// a message is published on the topic provided, or nil for the default handler
 | 
			
		||||
	// a message is published on the topic provided, or nil for the default handler.
 | 
			
		||||
	//
 | 
			
		||||
	// If options.OrderMatters is true (the default) then callback must not block or
 | 
			
		||||
	// call functions within this package that may block (e.g. Publish) other than in
 | 
			
		||||
	// a new go routine.
 | 
			
		||||
	// callback must be safe for concurrent use by multiple goroutines.
 | 
			
		||||
	Subscribe(topic string, qos byte, callback MessageHandler) Token
 | 
			
		||||
	// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
 | 
			
		||||
	// be executed when a message is published on one of the topics provided, or nil for the
 | 
			
		||||
	// default handler
 | 
			
		||||
	// default handler.
 | 
			
		||||
	//
 | 
			
		||||
	// If options.OrderMatters is true (the default) then callback must not block or
 | 
			
		||||
	// call functions within this package that may block (e.g. Publish) other than in
 | 
			
		||||
	// a new go routine.
 | 
			
		||||
	// callback must be safe for concurrent use by multiple goroutines.
 | 
			
		||||
	SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
 | 
			
		||||
	// Unsubscribe will end the subscription from each of the topics provided.
 | 
			
		||||
	// Messages published to those topics from other clients will no longer be
 | 
			
		||||
@@ -87,7 +99,13 @@ type Client interface {
 | 
			
		||||
	Unsubscribe(topics ...string) Token
 | 
			
		||||
	// AddRoute allows you to add a handler for messages on a specific topic
 | 
			
		||||
	// without making a subscription. For example having a different handler
 | 
			
		||||
	// for parts of a wildcard subscription
 | 
			
		||||
	// for parts of a wildcard subscription or for receiving retained messages
 | 
			
		||||
	// upon connection (before Sub scribe can be processed).
 | 
			
		||||
	//
 | 
			
		||||
	// If options.OrderMatters is true (the default) then callback must not block or
 | 
			
		||||
	// call functions within this package that may block (e.g. Publish) other than in
 | 
			
		||||
	// a new go routine.
 | 
			
		||||
	// callback must be safe for concurrent use by multiple goroutines.
 | 
			
		||||
	AddRoute(topic string, callback MessageHandler)
 | 
			
		||||
	// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
 | 
			
		||||
	// in use by the client.
 | 
			
		||||
@@ -95,6 +113,8 @@ type Client interface {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// client implements the Client interface
 | 
			
		||||
// clients are safe for concurrent use by multiple
 | 
			
		||||
// goroutines
 | 
			
		||||
type client struct {
 | 
			
		||||
	lastSent        atomic.Value // time.Time - the last time a packet was successfully sent to network
 | 
			
		||||
	lastReceived    atomic.Value // time.Time - the last time a packet was successfully received from network
 | 
			
		||||
@@ -153,6 +173,11 @@ func NewClient(o *ClientOptions) Client {
 | 
			
		||||
// AddRoute allows you to add a handler for messages on a specific topic
 | 
			
		||||
// without making a subscription. For example having a different handler
 | 
			
		||||
// for parts of a wildcard subscription
 | 
			
		||||
//
 | 
			
		||||
// If options.OrderMatters is true (the default) then callback must not block or
 | 
			
		||||
// call functions within this package that may block (e.g. Publish) other than in
 | 
			
		||||
// a new go routine.
 | 
			
		||||
// callback must be safe for concurrent use by multiple goroutines.
 | 
			
		||||
func (c *client) AddRoute(topic string, callback MessageHandler) {
 | 
			
		||||
	if callback != nil {
 | 
			
		||||
		c.msgRouter.addRoute(topic, callback)
 | 
			
		||||
@@ -354,8 +379,13 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
 | 
			
		||||
		cm := newConnectMsgFromOptions(&c.options, broker)
 | 
			
		||||
		DEBUG.Println(CLI, "about to write new connect msg")
 | 
			
		||||
	CONN:
 | 
			
		||||
		tlsCfg := c.options.TLSConfig
 | 
			
		||||
		if c.options.OnConnectAttempt != nil {
 | 
			
		||||
			DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
 | 
			
		||||
			tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
 | 
			
		||||
		}
 | 
			
		||||
		// Start by opening the network connection (tcp, tls, ws) etc
 | 
			
		||||
		conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
 | 
			
		||||
		conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			ERROR.Println(CLI, err.Error())
 | 
			
		||||
			WARN.Println(CLI, "failed to connect to broker, trying next")
 | 
			
		||||
@@ -372,7 +402,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
 | 
			
		||||
 | 
			
		||||
		// We may be have to attempt the connection with MQTT 3.1
 | 
			
		||||
		if conn != nil {
 | 
			
		||||
			conn.Close()
 | 
			
		||||
			_ = conn.Close()
 | 
			
		||||
		}
 | 
			
		||||
		if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
 | 
			
		||||
			DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
 | 
			
		||||
@@ -409,12 +439,22 @@ func (c *client) Disconnect(quiesce uint) {
 | 
			
		||||
 | 
			
		||||
		dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
 | 
			
		||||
		dt := newToken(packets.Disconnect)
 | 
			
		||||
		c.oboundP <- &PacketAndToken{p: dm, t: dt}
 | 
			
		||||
		disconnectSent := false
 | 
			
		||||
		select {
 | 
			
		||||
		case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
 | 
			
		||||
			disconnectSent = true
 | 
			
		||||
		case <-c.commsStopped:
 | 
			
		||||
			WARN.Println("Disconnect packet could not be sent because comms stopped")
 | 
			
		||||
		case <-time.After(time.Duration(quiesce) * time.Millisecond):
 | 
			
		||||
			WARN.Println("Disconnect packet not sent due to timeout")
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// wait for work to finish, or quiesce time consumed
 | 
			
		||||
		DEBUG.Println(CLI, "calling WaitTimeout")
 | 
			
		||||
		dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
 | 
			
		||||
		DEBUG.Println(CLI, "WaitTimeout done")
 | 
			
		||||
		if disconnectSent {
 | 
			
		||||
			DEBUG.Println(CLI, "calling WaitTimeout")
 | 
			
		||||
			dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
 | 
			
		||||
			DEBUG.Println(CLI, "WaitTimeout done")
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
 | 
			
		||||
		c.setConnected(disconnected)
 | 
			
		||||
@@ -459,10 +499,13 @@ func (c *client) internalConnLost(err error) {
 | 
			
		||||
			DEBUG.Println(CLI, "internalConnLost waiting on workers")
 | 
			
		||||
			<-stopDone
 | 
			
		||||
			DEBUG.Println(CLI, "internalConnLost workers stopped")
 | 
			
		||||
			if c.options.CleanSession && !c.options.AutoReconnect {
 | 
			
		||||
			// It is possible that Disconnect was called which led to this error so reconnection depends upon status
 | 
			
		||||
			reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting
 | 
			
		||||
 | 
			
		||||
			if c.options.CleanSession && !reconnect {
 | 
			
		||||
				c.messageIds.cleanUp()
 | 
			
		||||
			}
 | 
			
		||||
			if c.options.AutoReconnect {
 | 
			
		||||
			if reconnect {
 | 
			
		||||
				c.setConnected(reconnecting)
 | 
			
		||||
				go c.reconnect()
 | 
			
		||||
			} else {
 | 
			
		||||
@@ -476,8 +519,8 @@ func (c *client) internalConnLost(err error) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and
 | 
			
		||||
// outgoing messages.
 | 
			
		||||
// startCommsWorkers is called when the connection is up.
 | 
			
		||||
// It starts off all of the routines needed to process incoming and outgoing messages.
 | 
			
		||||
// Returns true if the comms workers were started (i.e. they were not already running)
 | 
			
		||||
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
 | 
			
		||||
	DEBUG.Println(CLI, "startCommsWorkers called")
 | 
			
		||||
@@ -564,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
 | 
			
		||||
					commsIncomingPub = nil
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				incomingPubChan <- pub
 | 
			
		||||
				// Care is needed here because an error elsewhere could trigger a deadlock
 | 
			
		||||
			sendPubLoop:
 | 
			
		||||
				for {
 | 
			
		||||
					select {
 | 
			
		||||
					case incomingPubChan <- pub:
 | 
			
		||||
						break sendPubLoop
 | 
			
		||||
					case err, ok := <-commsErrors:
 | 
			
		||||
						if !ok { // commsErrors has been closed so we can ignore it
 | 
			
		||||
							commsErrors = nil
 | 
			
		||||
							continue
 | 
			
		||||
						}
 | 
			
		||||
						ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
 | 
			
		||||
						c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
 | 
			
		||||
						continue
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			case err, ok := <-commsErrors:
 | 
			
		||||
				if !ok {
 | 
			
		||||
					commsErrors = nil
 | 
			
		||||
@@ -686,10 +744,10 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
 | 
			
		||||
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
 | 
			
		||||
// a message is published on the topic provided.
 | 
			
		||||
//
 | 
			
		||||
// Please note: you should try to keep the execution time of the callback to be
 | 
			
		||||
// as low as possible, especially when SetOrderMatters(true) (the default) is in
 | 
			
		||||
// place. Blocking calls in message handlers might otherwise delay delivery to
 | 
			
		||||
// other message handlers.
 | 
			
		||||
// If options.OrderMatters is true (the default) then callback must not block or
 | 
			
		||||
// call functions within this package that may block (e.g. Publish) other than in
 | 
			
		||||
// a new go routine.
 | 
			
		||||
// callback must be safe for concurrent use by multiple goroutines.
 | 
			
		||||
func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
 | 
			
		||||
	token := newToken(packets.Subscribe).(*SubscribeToken)
 | 
			
		||||
	DEBUG.Println(CLI, "enter Subscribe")
 | 
			
		||||
@@ -766,6 +824,11 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
 | 
			
		||||
 | 
			
		||||
// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
 | 
			
		||||
// be executed when a message is published on one of the topics provided.
 | 
			
		||||
//
 | 
			
		||||
// If options.OrderMatters is true (the default) then callback must not block or
 | 
			
		||||
// call functions within this package that may block (e.g. Publish) other than in
 | 
			
		||||
// a new go routine.
 | 
			
		||||
// callback must be safe for concurrent use by multiple goroutines.
 | 
			
		||||
func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
 | 
			
		||||
	var err error
 | 
			
		||||
	token := newToken(packets.Subscribe).(*SubscribeToken)
 | 
			
		||||
@@ -869,7 +932,7 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
 | 
			
		||||
		}
 | 
			
		||||
		details := packet.Details()
 | 
			
		||||
		if isKeyOutbound(key) {
 | 
			
		||||
			switch packet.(type) {
 | 
			
		||||
			switch p := packet.(type) {
 | 
			
		||||
			case *packets.SubscribePacket:
 | 
			
		||||
				if subscription {
 | 
			
		||||
					DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
 | 
			
		||||
@@ -909,13 +972,22 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
			case *packets.PublishPacket:
 | 
			
		||||
				// spec: If the DUP flag is set to 0, it indicates that this is the first occasion that the Client or
 | 
			
		||||
				// Server has attempted to send this MQTT PUBLISH Packet. If the DUP flag is set to 1, it indicates that
 | 
			
		||||
				// this might be re-delivery of an earlier attempt to send the Packet.
 | 
			
		||||
				//
 | 
			
		||||
				// If the message is in the store than an attempt at delivery has been made (note that the message may
 | 
			
		||||
				// never have made it onto the wire but tracking that would be complicated!).
 | 
			
		||||
				if p.Qos != 0 { // spec: The DUP flag MUST be set to 0 for all QoS 0 messages
 | 
			
		||||
					p.Dup = true
 | 
			
		||||
				}
 | 
			
		||||
				token := newToken(packets.Publish).(*PublishToken)
 | 
			
		||||
				token.messageID = details.MessageID
 | 
			
		||||
				c.claimID(token, details.MessageID)
 | 
			
		||||
				DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
 | 
			
		||||
				DEBUG.Println(STR, details)
 | 
			
		||||
				select {
 | 
			
		||||
				case c.obound <- &PacketAndToken{p: packet, t: token}:
 | 
			
		||||
				case c.obound <- &PacketAndToken{p: p, t: token}:
 | 
			
		||||
				case <-c.stop:
 | 
			
		||||
					DEBUG.Println(STR, "resume exiting due to stop")
 | 
			
		||||
					return
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										8
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/go.mod
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										8
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/go.mod
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -1,8 +0,0 @@
 | 
			
		||||
module github.com/eclipse/paho.mqtt.golang
 | 
			
		||||
 | 
			
		||||
go 1.14
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/gorilla/websocket v1.4.2
 | 
			
		||||
	golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0
 | 
			
		||||
)
 | 
			
		||||
							
								
								
									
										8
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/go.sum
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										8
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/go.sum
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -1,8 +0,0 @@
 | 
			
		||||
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
 | 
			
		||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 | 
			
		||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
 | 
			
		||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 | 
			
		||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 | 
			
		||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 | 
			
		||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 | 
			
		||||
							
								
								
									
										5
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -30,7 +30,8 @@ import (
 | 
			
		||||
// This just establishes the network connection; once established the type of connection should be irrelevant
 | 
			
		||||
//
 | 
			
		||||
 | 
			
		||||
// openConnection opens a network connection using the protocol indicated in the URL. Does not carry out any MQTT specific handshakes
 | 
			
		||||
// openConnection opens a network connection using the protocol indicated in the URL.
 | 
			
		||||
// Does not carry out any MQTT specific handshakes.
 | 
			
		||||
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) {
 | 
			
		||||
	switch uri.Scheme {
 | 
			
		||||
	case "ws":
 | 
			
		||||
@@ -81,7 +82,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade
 | 
			
		||||
 | 
			
		||||
		err = tlsConn.Handshake()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			conn.Close()
 | 
			
		||||
			_ = conn.Close()
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										32
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/options.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										32
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/options.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -21,7 +21,6 @@ import (
 | 
			
		||||
	"crypto/tls"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/url"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
@@ -50,7 +49,11 @@ type OnConnectHandler func(Client)
 | 
			
		||||
// the initial connection is lost
 | 
			
		||||
type ReconnectHandler func(Client, *ClientOptions)
 | 
			
		||||
 | 
			
		||||
// ClientOptions contains configurable options for an Client.
 | 
			
		||||
// ConnectionAttemptHandler is invoked prior to making the initial connection.
 | 
			
		||||
type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config
 | 
			
		||||
 | 
			
		||||
// ClientOptions contains configurable options for an Client. Note that these should be set using the
 | 
			
		||||
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
 | 
			
		||||
type ClientOptions struct {
 | 
			
		||||
	Servers                 []*url.URL
 | 
			
		||||
	ClientID                string
 | 
			
		||||
@@ -79,6 +82,7 @@ type ClientOptions struct {
 | 
			
		||||
	OnConnect               OnConnectHandler
 | 
			
		||||
	OnConnectionLost        ConnectionLostHandler
 | 
			
		||||
	OnReconnecting          ReconnectHandler
 | 
			
		||||
	OnConnectAttempt        ConnectionAttemptHandler
 | 
			
		||||
	WriteTimeout            time.Duration
 | 
			
		||||
	MessageChannelDepth     uint
 | 
			
		||||
	ResumeSubs              bool
 | 
			
		||||
@@ -90,7 +94,7 @@ type ClientOptions struct {
 | 
			
		||||
// default values.
 | 
			
		||||
//   Port: 1883
 | 
			
		||||
//   CleanSession: True
 | 
			
		||||
//   Order: True
 | 
			
		||||
//   Order: True (note: it is recommended that this be set to FALSE unless order is important)
 | 
			
		||||
//   KeepAlive: 30 (seconds)
 | 
			
		||||
//   ConnectTimeout: 30 (seconds)
 | 
			
		||||
//   MaxReconnectInterval 10 (minutes)
 | 
			
		||||
@@ -120,6 +124,7 @@ func NewClientOptions() *ClientOptions {
 | 
			
		||||
		Store:                   nil,
 | 
			
		||||
		OnConnect:               nil,
 | 
			
		||||
		OnConnectionLost:        DefaultConnectionLostHandler,
 | 
			
		||||
		OnConnectAttempt:        nil,
 | 
			
		||||
		WriteTimeout:            0, // 0 represents timeout disabled
 | 
			
		||||
		ResumeSubs:              false,
 | 
			
		||||
		HTTPHeaders:             make(map[string][]string),
 | 
			
		||||
@@ -137,14 +142,12 @@ func NewClientOptions() *ClientOptions {
 | 
			
		||||
//
 | 
			
		||||
// An example broker URI would look like: tcp://foobar.com:1883
 | 
			
		||||
func (o *ClientOptions) AddBroker(server string) *ClientOptions {
 | 
			
		||||
	re := regexp.MustCompile(`%(25)?`)
 | 
			
		||||
	if len(server) > 0 && server[0] == ':' {
 | 
			
		||||
		server = "127.0.0.1" + server
 | 
			
		||||
	}
 | 
			
		||||
	if !strings.Contains(server, "://") {
 | 
			
		||||
		server = "tcp://" + server
 | 
			
		||||
	}
 | 
			
		||||
	server = re.ReplaceAllLiteralString(server, "%25")
 | 
			
		||||
	brokerURI, err := url.Parse(server)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		ERROR.Println(CLI, "Failed to parse %q broker address: %s", server, err)
 | 
			
		||||
@@ -206,10 +209,13 @@ func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetOrderMatters will set the message routing to guarantee order within
 | 
			
		||||
// each QoS level. By default, this value is true. If set to false,
 | 
			
		||||
// each QoS level. By default, this value is true. If set to false (recommended),
 | 
			
		||||
// this flag indicates that messages can be delivered asynchronously
 | 
			
		||||
// from the client to the application and possibly arrive out of order.
 | 
			
		||||
// Specifically, the message handler is called in its own go routine.
 | 
			
		||||
// Note that setting this to true does not guarantee in-order delivery
 | 
			
		||||
// (this is subject to broker settings like "max_inflight_messages=1" in mosquitto)
 | 
			
		||||
// and if true then handlers must not block.
 | 
			
		||||
func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
 | 
			
		||||
	o.Order = order
 | 
			
		||||
	return o
 | 
			
		||||
@@ -289,6 +295,11 @@ func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, re
 | 
			
		||||
 | 
			
		||||
// SetDefaultPublishHandler sets the MessageHandler that will be called when a message
 | 
			
		||||
// is received that does not match any known subscriptions.
 | 
			
		||||
//
 | 
			
		||||
// If OrderMatters is true (the defaultHandler) then callback must not block or
 | 
			
		||||
// call functions within this package that may block (e.g. Publish) other than in
 | 
			
		||||
// a new go routine.
 | 
			
		||||
// defaultHandler must be safe for concurrent use by multiple goroutines.
 | 
			
		||||
func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
 | 
			
		||||
	o.DefaultPublishHandler = defaultHandler
 | 
			
		||||
	return o
 | 
			
		||||
@@ -315,6 +326,15 @@ func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptio
 | 
			
		||||
	return o
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior
 | 
			
		||||
// to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing
 | 
			
		||||
// the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL).
 | 
			
		||||
// This allows connection specific changes to be made to the *tls.Config.
 | 
			
		||||
func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions {
 | 
			
		||||
	o.OnConnectAttempt = onConnectAttempt
 | 
			
		||||
	return o
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
 | 
			
		||||
// timeout error. A duration of 0 never times out. Default never times out
 | 
			
		||||
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										6
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										6
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -29,7 +29,11 @@ type ConnectPacket struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConnectPacket) String() string {
 | 
			
		||||
	return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, c.Password)
 | 
			
		||||
	var password string
 | 
			
		||||
	if len(c.Password) > 0 {
 | 
			
		||||
		password = "<redacted>"
 | 
			
		||||
	}
 | 
			
		||||
	return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, password)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ConnectPacket) Write(w io.Writer) error {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										24
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										24
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -81,17 +81,27 @@ var ConnackReturnCodes = map[uint8]string{
 | 
			
		||||
	255: "Connection Refused: Protocol Violation",
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	ErrorRefusedBadProtocolVersion    = errors.New("unacceptable protocol version")
 | 
			
		||||
	ErrorRefusedIDRejected            = errors.New("identifier rejected")
 | 
			
		||||
	ErrorRefusedServerUnavailable     = errors.New("server Unavailable")
 | 
			
		||||
	ErrorRefusedBadUsernameOrPassword = errors.New("bad user name or password")
 | 
			
		||||
	ErrorRefusedNotAuthorised         = errors.New("not Authorized")
 | 
			
		||||
	ErrorNetworkError                 = errors.New("network Error")
 | 
			
		||||
	ErrorProtocolViolation            = errors.New("protocol Violation")
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ConnErrors is a map of the errors codes constants for Connect()
 | 
			
		||||
// to a Go error
 | 
			
		||||
var ConnErrors = map[byte]error{
 | 
			
		||||
	Accepted:                        nil,
 | 
			
		||||
	ErrRefusedBadProtocolVersion:    errors.New("unacceptable protocol version"),
 | 
			
		||||
	ErrRefusedIDRejected:            errors.New("identifier rejected"),
 | 
			
		||||
	ErrRefusedServerUnavailable:     errors.New("server Unavailable"),
 | 
			
		||||
	ErrRefusedBadUsernameOrPassword: errors.New("bad user name or password"),
 | 
			
		||||
	ErrRefusedNotAuthorised:         errors.New("not Authorized"),
 | 
			
		||||
	ErrNetworkError:                 errors.New("network Error"),
 | 
			
		||||
	ErrProtocolViolation:            errors.New("protocol Violation"),
 | 
			
		||||
	ErrRefusedBadProtocolVersion:    ErrorRefusedBadProtocolVersion,
 | 
			
		||||
	ErrRefusedIDRejected:            ErrorRefusedIDRejected,
 | 
			
		||||
	ErrRefusedServerUnavailable:     ErrorRefusedServerUnavailable,
 | 
			
		||||
	ErrRefusedBadUsernameOrPassword: ErrorRefusedBadUsernameOrPassword,
 | 
			
		||||
	ErrRefusedNotAuthorised:         ErrorRefusedNotAuthorised,
 | 
			
		||||
	ErrNetworkError:                 ErrorNetworkError,
 | 
			
		||||
	ErrProtocolViolation:            ErrorProtocolViolation,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ReadPacket takes an instance of an io.Reader (such as net.Conn) and attempts
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										57
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/router.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										57
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/router.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -132,13 +132,46 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
 | 
			
		||||
// associated callback (or the defaultHandler, if one exists and no other route matched). If
 | 
			
		||||
// anything is sent down the stop channel the function will end.
 | 
			
		||||
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
 | 
			
		||||
	ackChan := make(chan *PacketAndToken)
 | 
			
		||||
	go func() {
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
 | 
			
		||||
	var ackInChan chan *PacketAndToken       // ACKs generated by ackFunc get put onto this channel
 | 
			
		||||
 | 
			
		||||
	stopAckCopy := make(chan struct{})    // Closure requests stop of go routine copying ackInChan to ackOutChan
 | 
			
		||||
	ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
 | 
			
		||||
	goRoutinesDone := make(chan struct{}) // closed on wg.Done()
 | 
			
		||||
	if order {
 | 
			
		||||
		ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
 | 
			
		||||
	} else {
 | 
			
		||||
		// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
 | 
			
		||||
		ackInChan = make(chan *PacketAndToken)
 | 
			
		||||
		go func() { // go routine to copy from ackInChan to ackOutChan until stopped
 | 
			
		||||
			for {
 | 
			
		||||
				select {
 | 
			
		||||
				case a := <-ackInChan:
 | 
			
		||||
					ackOutChan <- a
 | 
			
		||||
				case <-stopAckCopy:
 | 
			
		||||
					close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
 | 
			
		||||
					for {
 | 
			
		||||
						select {
 | 
			
		||||
						case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
 | 
			
		||||
							DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
 | 
			
		||||
						case <-goRoutinesDone:
 | 
			
		||||
							close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
 | 
			
		||||
							DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
 | 
			
		||||
							return
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() { // Main go routine handling inbound messages
 | 
			
		||||
		for message := range messages {
 | 
			
		||||
			// DEBUG.Println(ROU, "matchAndDispatch received message")
 | 
			
		||||
			sent := false
 | 
			
		||||
			r.RLock()
 | 
			
		||||
			m := messageFromPublish(message, ackFunc(ackChan, client.persist, message))
 | 
			
		||||
			m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
 | 
			
		||||
			var handlers []MessageHandler
 | 
			
		||||
			for e := r.routes.Front(); e != nil; e = e.Next() {
 | 
			
		||||
				if e.Value.(*route).match(message.TopicName) {
 | 
			
		||||
@@ -146,9 +179,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
 | 
			
		||||
						handlers = append(handlers, e.Value.(*route).callback)
 | 
			
		||||
					} else {
 | 
			
		||||
						hd := e.Value.(*route).callback
 | 
			
		||||
						wg.Add(1)
 | 
			
		||||
						go func() {
 | 
			
		||||
							hd(client, m)
 | 
			
		||||
							m.Ack()
 | 
			
		||||
							wg.Done()
 | 
			
		||||
						}()
 | 
			
		||||
					}
 | 
			
		||||
					sent = true
 | 
			
		||||
@@ -159,9 +194,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
 | 
			
		||||
					if order {
 | 
			
		||||
						handlers = append(handlers, r.defaultHandler)
 | 
			
		||||
					} else {
 | 
			
		||||
						wg.Add(1)
 | 
			
		||||
						go func() {
 | 
			
		||||
							r.defaultHandler(client, m)
 | 
			
		||||
							m.Ack()
 | 
			
		||||
							wg.Done()
 | 
			
		||||
						}()
 | 
			
		||||
					}
 | 
			
		||||
				} else {
 | 
			
		||||
@@ -175,8 +212,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
 | 
			
		||||
			}
 | 
			
		||||
			// DEBUG.Println(ROU, "matchAndDispatch handled message")
 | 
			
		||||
		}
 | 
			
		||||
		close(ackChan)
 | 
			
		||||
		if order {
 | 
			
		||||
			close(ackOutChan)
 | 
			
		||||
		} else { // Ensure that nothing further will be written to ackOutChan before closing it
 | 
			
		||||
			close(stopAckCopy)
 | 
			
		||||
			<-ackCopyStopped
 | 
			
		||||
			close(ackOutChan)
 | 
			
		||||
			go func() {
 | 
			
		||||
				wg.Wait() // Note: If this remains running then the user has handlers that are not returning
 | 
			
		||||
				close(goRoutinesDone)
 | 
			
		||||
			}()
 | 
			
		||||
		}
 | 
			
		||||
		DEBUG.Println(ROU, "matchAndDispatch exiting")
 | 
			
		||||
	}()
 | 
			
		||||
	return ackChan
 | 
			
		||||
	return ackOutChan
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										16
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							
							
						
						
									
										16
									
								
								vendor/github.com/eclipse/paho.mqtt.golang/websocket.go
									
									
									
										generated
									
									
										vendored
									
									
								
							@@ -2,9 +2,11 @@ package mqtt
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"crypto/tls"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"net"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/url"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
@@ -15,8 +17,11 @@ import (
 | 
			
		||||
type WebsocketOptions struct {
 | 
			
		||||
	ReadBufferSize  int
 | 
			
		||||
	WriteBufferSize int
 | 
			
		||||
	Proxy           ProxyFunction
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ProxyFunction func(req *http.Request) (*url.URL, error)
 | 
			
		||||
 | 
			
		||||
// NewWebsocket returns a new websocket and returns a net.Conn compatible interface using the gorilla/websocket package
 | 
			
		||||
func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header, options *WebsocketOptions) (net.Conn, error) {
 | 
			
		||||
	if timeout == 0 {
 | 
			
		||||
@@ -27,9 +32,11 @@ func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestH
 | 
			
		||||
		// Apply default options
 | 
			
		||||
		options = &WebsocketOptions{}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if options.Proxy == nil {
 | 
			
		||||
		options.Proxy = http.ProxyFromEnvironment
 | 
			
		||||
	}
 | 
			
		||||
	dialer := &websocket.Dialer{
 | 
			
		||||
		Proxy:             http.ProxyFromEnvironment,
 | 
			
		||||
		Proxy:             options.Proxy,
 | 
			
		||||
		HandshakeTimeout:  timeout,
 | 
			
		||||
		EnableCompression: false,
 | 
			
		||||
		TLSClientConfig:   tlsc,
 | 
			
		||||
@@ -38,9 +45,12 @@ func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestH
 | 
			
		||||
		WriteBufferSize:   options.WriteBufferSize,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ws, _, err := dialer.Dial(host, requestHeader)
 | 
			
		||||
	ws, resp, err := dialer.Dial(host, requestHeader)
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if resp != nil {
 | 
			
		||||
			WARN.Println(CLI, fmt.Sprintf("Websocket handshake failure. StatusCode: %d. Body: %s", resp.StatusCode, resp.Body))
 | 
			
		||||
		}
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user