build: upgrade dependencies
This commit is contained in:
8
vendor/github.com/eclipse/paho.mqtt.golang/README.md
generated
vendored
8
vendor/github.com/eclipse/paho.mqtt.golang/README.md
generated
vendored
@ -104,8 +104,12 @@ func main() {
|
||||
|
||||
* Seemingly random disconnections may be caused by another client connecting to the broker with the same client
|
||||
identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800405).
|
||||
* A `MessageHandler` (called when a new message is received) must not block. If you wish to perform a long-running task,
|
||||
or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected `pingresp
|
||||
* Unless ordered delivery of messages is essential (and you have configured your broker to support this e.g.
|
||||
`max_inflight_messages=1` in mosquitto) then set `ClientOptions.SetOrderMatters(false)`. Doing so will avoid the
|
||||
below issue (deadlocks due to blocking message handlers).
|
||||
* A `MessageHandler` (called when a new message is received) must not block (unless
|
||||
`ClientOptions.SetOrderMatters(false)` set). If you wish to perform a long-running task, or publish a message, then
|
||||
please use a go routine (blocking in the handler is a common cause of unexpected `pingresp
|
||||
not received, disconnecting` errors).
|
||||
* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible that the broker will deliver retained
|
||||
messages before `Subscribe` can be called. To process these messages either configure a handler with `AddRoute` or
|
||||
|
112
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
112
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
@ -55,6 +55,8 @@ const (
|
||||
// information can be found in their respective documentation.
|
||||
// Numerous connection options may be specified by configuring a
|
||||
// and then supplying a ClientOptions type.
|
||||
// Implementations of Client must be safe for concurrent use by multiple
|
||||
// goroutines
|
||||
type Client interface {
|
||||
// IsConnected returns a bool signifying whether
|
||||
// the client is connected or not.
|
||||
@ -75,11 +77,21 @@ type Client interface {
|
||||
// Returns a token to track delivery of the message to the broker
|
||||
Publish(topic string, qos byte, retained bool, payload interface{}) Token
|
||||
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
|
||||
// a message is published on the topic provided, or nil for the default handler
|
||||
// a message is published on the topic provided, or nil for the default handler.
|
||||
//
|
||||
// If options.OrderMatters is true (the default) then callback must not block or
|
||||
// call functions within this package that may block (e.g. Publish) other than in
|
||||
// a new go routine.
|
||||
// callback must be safe for concurrent use by multiple goroutines.
|
||||
Subscribe(topic string, qos byte, callback MessageHandler) Token
|
||||
// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
|
||||
// be executed when a message is published on one of the topics provided, or nil for the
|
||||
// default handler
|
||||
// default handler.
|
||||
//
|
||||
// If options.OrderMatters is true (the default) then callback must not block or
|
||||
// call functions within this package that may block (e.g. Publish) other than in
|
||||
// a new go routine.
|
||||
// callback must be safe for concurrent use by multiple goroutines.
|
||||
SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token
|
||||
// Unsubscribe will end the subscription from each of the topics provided.
|
||||
// Messages published to those topics from other clients will no longer be
|
||||
@ -87,7 +99,13 @@ type Client interface {
|
||||
Unsubscribe(topics ...string) Token
|
||||
// AddRoute allows you to add a handler for messages on a specific topic
|
||||
// without making a subscription. For example having a different handler
|
||||
// for parts of a wildcard subscription
|
||||
// for parts of a wildcard subscription or for receiving retained messages
|
||||
// upon connection (before Sub scribe can be processed).
|
||||
//
|
||||
// If options.OrderMatters is true (the default) then callback must not block or
|
||||
// call functions within this package that may block (e.g. Publish) other than in
|
||||
// a new go routine.
|
||||
// callback must be safe for concurrent use by multiple goroutines.
|
||||
AddRoute(topic string, callback MessageHandler)
|
||||
// OptionsReader returns a ClientOptionsReader which is a copy of the clientoptions
|
||||
// in use by the client.
|
||||
@ -95,6 +113,8 @@ type Client interface {
|
||||
}
|
||||
|
||||
// client implements the Client interface
|
||||
// clients are safe for concurrent use by multiple
|
||||
// goroutines
|
||||
type client struct {
|
||||
lastSent atomic.Value // time.Time - the last time a packet was successfully sent to network
|
||||
lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
|
||||
@ -153,6 +173,11 @@ func NewClient(o *ClientOptions) Client {
|
||||
// AddRoute allows you to add a handler for messages on a specific topic
|
||||
// without making a subscription. For example having a different handler
|
||||
// for parts of a wildcard subscription
|
||||
//
|
||||
// If options.OrderMatters is true (the default) then callback must not block or
|
||||
// call functions within this package that may block (e.g. Publish) other than in
|
||||
// a new go routine.
|
||||
// callback must be safe for concurrent use by multiple goroutines.
|
||||
func (c *client) AddRoute(topic string, callback MessageHandler) {
|
||||
if callback != nil {
|
||||
c.msgRouter.addRoute(topic, callback)
|
||||
@ -354,8 +379,13 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
cm := newConnectMsgFromOptions(&c.options, broker)
|
||||
DEBUG.Println(CLI, "about to write new connect msg")
|
||||
CONN:
|
||||
tlsCfg := c.options.TLSConfig
|
||||
if c.options.OnConnectAttempt != nil {
|
||||
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
|
||||
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
|
||||
}
|
||||
// Start by opening the network connection (tcp, tls, ws) etc
|
||||
conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
|
||||
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
|
||||
if err != nil {
|
||||
ERROR.Println(CLI, err.Error())
|
||||
WARN.Println(CLI, "failed to connect to broker, trying next")
|
||||
@ -372,7 +402,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
|
||||
// We may be have to attempt the connection with MQTT 3.1
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
_ = conn.Close()
|
||||
}
|
||||
if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
|
||||
DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
|
||||
@ -409,12 +439,22 @@ func (c *client) Disconnect(quiesce uint) {
|
||||
|
||||
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
|
||||
dt := newToken(packets.Disconnect)
|
||||
c.oboundP <- &PacketAndToken{p: dm, t: dt}
|
||||
disconnectSent := false
|
||||
select {
|
||||
case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
|
||||
disconnectSent = true
|
||||
case <-c.commsStopped:
|
||||
WARN.Println("Disconnect packet could not be sent because comms stopped")
|
||||
case <-time.After(time.Duration(quiesce) * time.Millisecond):
|
||||
WARN.Println("Disconnect packet not sent due to timeout")
|
||||
}
|
||||
|
||||
// wait for work to finish, or quiesce time consumed
|
||||
DEBUG.Println(CLI, "calling WaitTimeout")
|
||||
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
|
||||
DEBUG.Println(CLI, "WaitTimeout done")
|
||||
if disconnectSent {
|
||||
DEBUG.Println(CLI, "calling WaitTimeout")
|
||||
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
|
||||
DEBUG.Println(CLI, "WaitTimeout done")
|
||||
}
|
||||
} else {
|
||||
WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
|
||||
c.setConnected(disconnected)
|
||||
@ -459,10 +499,13 @@ func (c *client) internalConnLost(err error) {
|
||||
DEBUG.Println(CLI, "internalConnLost waiting on workers")
|
||||
<-stopDone
|
||||
DEBUG.Println(CLI, "internalConnLost workers stopped")
|
||||
if c.options.CleanSession && !c.options.AutoReconnect {
|
||||
// It is possible that Disconnect was called which led to this error so reconnection depends upon status
|
||||
reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting
|
||||
|
||||
if c.options.CleanSession && !reconnect {
|
||||
c.messageIds.cleanUp()
|
||||
}
|
||||
if c.options.AutoReconnect {
|
||||
if reconnect {
|
||||
c.setConnected(reconnecting)
|
||||
go c.reconnect()
|
||||
} else {
|
||||
@ -476,8 +519,8 @@ func (c *client) internalConnLost(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and
|
||||
// outgoing messages.
|
||||
// startCommsWorkers is called when the connection is up.
|
||||
// It starts off all of the routines needed to process incoming and outgoing messages.
|
||||
// Returns true if the comms workers were started (i.e. they were not already running)
|
||||
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
|
||||
DEBUG.Println(CLI, "startCommsWorkers called")
|
||||
@ -564,7 +607,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
|
||||
commsIncomingPub = nil
|
||||
continue
|
||||
}
|
||||
incomingPubChan <- pub
|
||||
// Care is needed here because an error elsewhere could trigger a deadlock
|
||||
sendPubLoop:
|
||||
for {
|
||||
select {
|
||||
case incomingPubChan <- pub:
|
||||
break sendPubLoop
|
||||
case err, ok := <-commsErrors:
|
||||
if !ok { // commsErrors has been closed so we can ignore it
|
||||
commsErrors = nil
|
||||
continue
|
||||
}
|
||||
ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
|
||||
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
|
||||
continue
|
||||
}
|
||||
}
|
||||
case err, ok := <-commsErrors:
|
||||
if !ok {
|
||||
commsErrors = nil
|
||||
@ -686,10 +744,10 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
|
||||
// Subscribe starts a new subscription. Provide a MessageHandler to be executed when
|
||||
// a message is published on the topic provided.
|
||||
//
|
||||
// Please note: you should try to keep the execution time of the callback to be
|
||||
// as low as possible, especially when SetOrderMatters(true) (the default) is in
|
||||
// place. Blocking calls in message handlers might otherwise delay delivery to
|
||||
// other message handlers.
|
||||
// If options.OrderMatters is true (the default) then callback must not block or
|
||||
// call functions within this package that may block (e.g. Publish) other than in
|
||||
// a new go routine.
|
||||
// callback must be safe for concurrent use by multiple goroutines.
|
||||
func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
|
||||
token := newToken(packets.Subscribe).(*SubscribeToken)
|
||||
DEBUG.Println(CLI, "enter Subscribe")
|
||||
@ -766,6 +824,11 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
|
||||
|
||||
// SubscribeMultiple starts a new subscription for multiple topics. Provide a MessageHandler to
|
||||
// be executed when a message is published on one of the topics provided.
|
||||
//
|
||||
// If options.OrderMatters is true (the default) then callback must not block or
|
||||
// call functions within this package that may block (e.g. Publish) other than in
|
||||
// a new go routine.
|
||||
// callback must be safe for concurrent use by multiple goroutines.
|
||||
func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHandler) Token {
|
||||
var err error
|
||||
token := newToken(packets.Subscribe).(*SubscribeToken)
|
||||
@ -869,7 +932,7 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
|
||||
}
|
||||
details := packet.Details()
|
||||
if isKeyOutbound(key) {
|
||||
switch packet.(type) {
|
||||
switch p := packet.(type) {
|
||||
case *packets.SubscribePacket:
|
||||
if subscription {
|
||||
DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
|
||||
@ -909,13 +972,22 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
|
||||
return
|
||||
}
|
||||
case *packets.PublishPacket:
|
||||
// spec: If the DUP flag is set to 0, it indicates that this is the first occasion that the Client or
|
||||
// Server has attempted to send this MQTT PUBLISH Packet. If the DUP flag is set to 1, it indicates that
|
||||
// this might be re-delivery of an earlier attempt to send the Packet.
|
||||
//
|
||||
// If the message is in the store than an attempt at delivery has been made (note that the message may
|
||||
// never have made it onto the wire but tracking that would be complicated!).
|
||||
if p.Qos != 0 { // spec: The DUP flag MUST be set to 0 for all QoS 0 messages
|
||||
p.Dup = true
|
||||
}
|
||||
token := newToken(packets.Publish).(*PublishToken)
|
||||
token.messageID = details.MessageID
|
||||
c.claimID(token, details.MessageID)
|
||||
DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
|
||||
DEBUG.Println(STR, details)
|
||||
select {
|
||||
case c.obound <- &PacketAndToken{p: packet, t: token}:
|
||||
case c.obound <- &PacketAndToken{p: p, t: token}:
|
||||
case <-c.stop:
|
||||
DEBUG.Println(STR, "resume exiting due to stop")
|
||||
return
|
||||
|
5
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
5
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
@ -30,7 +30,8 @@ import (
|
||||
// This just establishes the network connection; once established the type of connection should be irrelevant
|
||||
//
|
||||
|
||||
// openConnection opens a network connection using the protocol indicated in the URL. Does not carry out any MQTT specific handshakes
|
||||
// openConnection opens a network connection using the protocol indicated in the URL.
|
||||
// Does not carry out any MQTT specific handshakes.
|
||||
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions) (net.Conn, error) {
|
||||
switch uri.Scheme {
|
||||
case "ws":
|
||||
@ -81,7 +82,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade
|
||||
|
||||
err = tlsConn.Handshake()
|
||||
if err != nil {
|
||||
conn.Close()
|
||||
_ = conn.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
32
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
32
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
@ -21,7 +21,6 @@ import (
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@ -50,7 +49,11 @@ type OnConnectHandler func(Client)
|
||||
// the initial connection is lost
|
||||
type ReconnectHandler func(Client, *ClientOptions)
|
||||
|
||||
// ClientOptions contains configurable options for an Client.
|
||||
// ConnectionAttemptHandler is invoked prior to making the initial connection.
|
||||
type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config
|
||||
|
||||
// ClientOptions contains configurable options for an Client. Note that these should be set using the
|
||||
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
|
||||
type ClientOptions struct {
|
||||
Servers []*url.URL
|
||||
ClientID string
|
||||
@ -79,6 +82,7 @@ type ClientOptions struct {
|
||||
OnConnect OnConnectHandler
|
||||
OnConnectionLost ConnectionLostHandler
|
||||
OnReconnecting ReconnectHandler
|
||||
OnConnectAttempt ConnectionAttemptHandler
|
||||
WriteTimeout time.Duration
|
||||
MessageChannelDepth uint
|
||||
ResumeSubs bool
|
||||
@ -90,7 +94,7 @@ type ClientOptions struct {
|
||||
// default values.
|
||||
// Port: 1883
|
||||
// CleanSession: True
|
||||
// Order: True
|
||||
// Order: True (note: it is recommended that this be set to FALSE unless order is important)
|
||||
// KeepAlive: 30 (seconds)
|
||||
// ConnectTimeout: 30 (seconds)
|
||||
// MaxReconnectInterval 10 (minutes)
|
||||
@ -120,6 +124,7 @@ func NewClientOptions() *ClientOptions {
|
||||
Store: nil,
|
||||
OnConnect: nil,
|
||||
OnConnectionLost: DefaultConnectionLostHandler,
|
||||
OnConnectAttempt: nil,
|
||||
WriteTimeout: 0, // 0 represents timeout disabled
|
||||
ResumeSubs: false,
|
||||
HTTPHeaders: make(map[string][]string),
|
||||
@ -137,14 +142,12 @@ func NewClientOptions() *ClientOptions {
|
||||
//
|
||||
// An example broker URI would look like: tcp://foobar.com:1883
|
||||
func (o *ClientOptions) AddBroker(server string) *ClientOptions {
|
||||
re := regexp.MustCompile(`%(25)?`)
|
||||
if len(server) > 0 && server[0] == ':' {
|
||||
server = "127.0.0.1" + server
|
||||
}
|
||||
if !strings.Contains(server, "://") {
|
||||
server = "tcp://" + server
|
||||
}
|
||||
server = re.ReplaceAllLiteralString(server, "%25")
|
||||
brokerURI, err := url.Parse(server)
|
||||
if err != nil {
|
||||
ERROR.Println(CLI, "Failed to parse %q broker address: %s", server, err)
|
||||
@ -206,10 +209,13 @@ func (o *ClientOptions) SetCleanSession(clean bool) *ClientOptions {
|
||||
}
|
||||
|
||||
// SetOrderMatters will set the message routing to guarantee order within
|
||||
// each QoS level. By default, this value is true. If set to false,
|
||||
// each QoS level. By default, this value is true. If set to false (recommended),
|
||||
// this flag indicates that messages can be delivered asynchronously
|
||||
// from the client to the application and possibly arrive out of order.
|
||||
// Specifically, the message handler is called in its own go routine.
|
||||
// Note that setting this to true does not guarantee in-order delivery
|
||||
// (this is subject to broker settings like "max_inflight_messages=1" in mosquitto)
|
||||
// and if true then handlers must not block.
|
||||
func (o *ClientOptions) SetOrderMatters(order bool) *ClientOptions {
|
||||
o.Order = order
|
||||
return o
|
||||
@ -289,6 +295,11 @@ func (o *ClientOptions) SetBinaryWill(topic string, payload []byte, qos byte, re
|
||||
|
||||
// SetDefaultPublishHandler sets the MessageHandler that will be called when a message
|
||||
// is received that does not match any known subscriptions.
|
||||
//
|
||||
// If OrderMatters is true (the defaultHandler) then callback must not block or
|
||||
// call functions within this package that may block (e.g. Publish) other than in
|
||||
// a new go routine.
|
||||
// defaultHandler must be safe for concurrent use by multiple goroutines.
|
||||
func (o *ClientOptions) SetDefaultPublishHandler(defaultHandler MessageHandler) *ClientOptions {
|
||||
o.DefaultPublishHandler = defaultHandler
|
||||
return o
|
||||
@ -315,6 +326,15 @@ func (o *ClientOptions) SetReconnectingHandler(cb ReconnectHandler) *ClientOptio
|
||||
return o
|
||||
}
|
||||
|
||||
// SetConnectionAttemptHandler sets the ConnectionAttemptHandler callback to be executed prior
|
||||
// to each attempt to connect to an MQTT broker. Returns the *tls.Config that will be used when establishing
|
||||
// the connection (a copy of the tls.Config from ClientOptions will be passed in along with the broker URL).
|
||||
// This allows connection specific changes to be made to the *tls.Config.
|
||||
func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionAttemptHandler) *ClientOptions {
|
||||
o.OnConnectAttempt = onConnectAttempt
|
||||
return o
|
||||
}
|
||||
|
||||
// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
|
||||
// timeout error. A duration of 0 never times out. Default never times out
|
||||
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
|
||||
|
6
vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go
generated
vendored
6
vendor/github.com/eclipse/paho.mqtt.golang/packets/connect.go
generated
vendored
@ -29,7 +29,11 @@ type ConnectPacket struct {
|
||||
}
|
||||
|
||||
func (c *ConnectPacket) String() string {
|
||||
return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, c.Password)
|
||||
var password string
|
||||
if len(c.Password) > 0 {
|
||||
password = "<redacted>"
|
||||
}
|
||||
return fmt.Sprintf("%s protocolversion: %d protocolname: %s cleansession: %t willflag: %t WillQos: %d WillRetain: %t Usernameflag: %t Passwordflag: %t keepalive: %d clientId: %s willtopic: %s willmessage: %s Username: %s Password: %s", c.FixedHeader, c.ProtocolVersion, c.ProtocolName, c.CleanSession, c.WillFlag, c.WillQos, c.WillRetain, c.UsernameFlag, c.PasswordFlag, c.Keepalive, c.ClientIdentifier, c.WillTopic, c.WillMessage, c.Username, password)
|
||||
}
|
||||
|
||||
func (c *ConnectPacket) Write(w io.Writer) error {
|
||||
|
24
vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
generated
vendored
24
vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
generated
vendored
@ -81,17 +81,27 @@ var ConnackReturnCodes = map[uint8]string{
|
||||
255: "Connection Refused: Protocol Violation",
|
||||
}
|
||||
|
||||
var (
|
||||
ErrorRefusedBadProtocolVersion = errors.New("unacceptable protocol version")
|
||||
ErrorRefusedIDRejected = errors.New("identifier rejected")
|
||||
ErrorRefusedServerUnavailable = errors.New("server Unavailable")
|
||||
ErrorRefusedBadUsernameOrPassword = errors.New("bad user name or password")
|
||||
ErrorRefusedNotAuthorised = errors.New("not Authorized")
|
||||
ErrorNetworkError = errors.New("network Error")
|
||||
ErrorProtocolViolation = errors.New("protocol Violation")
|
||||
)
|
||||
|
||||
// ConnErrors is a map of the errors codes constants for Connect()
|
||||
// to a Go error
|
||||
var ConnErrors = map[byte]error{
|
||||
Accepted: nil,
|
||||
ErrRefusedBadProtocolVersion: errors.New("unacceptable protocol version"),
|
||||
ErrRefusedIDRejected: errors.New("identifier rejected"),
|
||||
ErrRefusedServerUnavailable: errors.New("server Unavailable"),
|
||||
ErrRefusedBadUsernameOrPassword: errors.New("bad user name or password"),
|
||||
ErrRefusedNotAuthorised: errors.New("not Authorized"),
|
||||
ErrNetworkError: errors.New("network Error"),
|
||||
ErrProtocolViolation: errors.New("protocol Violation"),
|
||||
ErrRefusedBadProtocolVersion: ErrorRefusedBadProtocolVersion,
|
||||
ErrRefusedIDRejected: ErrorRefusedIDRejected,
|
||||
ErrRefusedServerUnavailable: ErrorRefusedServerUnavailable,
|
||||
ErrRefusedBadUsernameOrPassword: ErrorRefusedBadUsernameOrPassword,
|
||||
ErrRefusedNotAuthorised: ErrorRefusedNotAuthorised,
|
||||
ErrNetworkError: ErrorNetworkError,
|
||||
ErrProtocolViolation: ErrorProtocolViolation,
|
||||
}
|
||||
|
||||
// ReadPacket takes an instance of an io.Reader (such as net.Conn) and attempts
|
||||
|
57
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
57
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
@ -132,13 +132,46 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
|
||||
// associated callback (or the defaultHandler, if one exists and no other route matched). If
|
||||
// anything is sent down the stop channel the function will end.
|
||||
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
|
||||
ackChan := make(chan *PacketAndToken)
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
|
||||
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
|
||||
|
||||
stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
|
||||
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
|
||||
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
|
||||
if order {
|
||||
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
|
||||
} else {
|
||||
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
|
||||
ackInChan = make(chan *PacketAndToken)
|
||||
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
|
||||
for {
|
||||
select {
|
||||
case a := <-ackInChan:
|
||||
ackOutChan <- a
|
||||
case <-stopAckCopy:
|
||||
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
|
||||
for {
|
||||
select {
|
||||
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
|
||||
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
|
||||
case <-goRoutinesDone:
|
||||
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
|
||||
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() { // Main go routine handling inbound messages
|
||||
for message := range messages {
|
||||
// DEBUG.Println(ROU, "matchAndDispatch received message")
|
||||
sent := false
|
||||
r.RLock()
|
||||
m := messageFromPublish(message, ackFunc(ackChan, client.persist, message))
|
||||
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
|
||||
var handlers []MessageHandler
|
||||
for e := r.routes.Front(); e != nil; e = e.Next() {
|
||||
if e.Value.(*route).match(message.TopicName) {
|
||||
@ -146,9 +179,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
handlers = append(handlers, e.Value.(*route).callback)
|
||||
} else {
|
||||
hd := e.Value.(*route).callback
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
hd(client, m)
|
||||
m.Ack()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
sent = true
|
||||
@ -159,9 +194,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
if order {
|
||||
handlers = append(handlers, r.defaultHandler)
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
r.defaultHandler(client, m)
|
||||
m.Ack()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
} else {
|
||||
@ -175,8 +212,18 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
}
|
||||
// DEBUG.Println(ROU, "matchAndDispatch handled message")
|
||||
}
|
||||
close(ackChan)
|
||||
if order {
|
||||
close(ackOutChan)
|
||||
} else { // Ensure that nothing further will be written to ackOutChan before closing it
|
||||
close(stopAckCopy)
|
||||
<-ackCopyStopped
|
||||
close(ackOutChan)
|
||||
go func() {
|
||||
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
|
||||
close(goRoutinesDone)
|
||||
}()
|
||||
}
|
||||
DEBUG.Println(ROU, "matchAndDispatch exiting")
|
||||
}()
|
||||
return ackChan
|
||||
return ackOutChan
|
||||
}
|
||||
|
16
vendor/github.com/eclipse/paho.mqtt.golang/websocket.go
generated
vendored
16
vendor/github.com/eclipse/paho.mqtt.golang/websocket.go
generated
vendored
@ -2,9 +2,11 @@ package mqtt
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -15,8 +17,11 @@ import (
|
||||
type WebsocketOptions struct {
|
||||
ReadBufferSize int
|
||||
WriteBufferSize int
|
||||
Proxy ProxyFunction
|
||||
}
|
||||
|
||||
type ProxyFunction func(req *http.Request) (*url.URL, error)
|
||||
|
||||
// NewWebsocket returns a new websocket and returns a net.Conn compatible interface using the gorilla/websocket package
|
||||
func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestHeader http.Header, options *WebsocketOptions) (net.Conn, error) {
|
||||
if timeout == 0 {
|
||||
@ -27,9 +32,11 @@ func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestH
|
||||
// Apply default options
|
||||
options = &WebsocketOptions{}
|
||||
}
|
||||
|
||||
if options.Proxy == nil {
|
||||
options.Proxy = http.ProxyFromEnvironment
|
||||
}
|
||||
dialer := &websocket.Dialer{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Proxy: options.Proxy,
|
||||
HandshakeTimeout: timeout,
|
||||
EnableCompression: false,
|
||||
TLSClientConfig: tlsc,
|
||||
@ -38,9 +45,12 @@ func NewWebsocket(host string, tlsc *tls.Config, timeout time.Duration, requestH
|
||||
WriteBufferSize: options.WriteBufferSize,
|
||||
}
|
||||
|
||||
ws, _, err := dialer.Dial(host, requestHeader)
|
||||
ws, resp, err := dialer.Dial(host, requestHeader)
|
||||
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
WARN.Println(CLI, fmt.Sprintf("Websocket handshake failure. StatusCode: %d. Body: %s", resp.StatusCode, resp.Body))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user