build: upgrade dependency and build with go 1.21
This commit is contained in:
6
vendor/github.com/eclipse/paho.mqtt.golang/README.md
generated
vendored
6
vendor/github.com/eclipse/paho.mqtt.golang/README.md
generated
vendored
@ -113,7 +113,9 @@ identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1
|
||||
not received, disconnecting` errors).
|
||||
* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible
|
||||
that the broker will deliver retained messages before `Subscribe` can be called. To process these messages either
|
||||
configure a handler with `AddRoute` or set a `DefaultPublishHandler`.
|
||||
configure a handler with `AddRoute` or set a `DefaultPublishHandler`. If there is no handler (or `DefaultPublishHandler`)
|
||||
then inbound messages will not be acknowledged. Adding a handler (even if it's `opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})`)
|
||||
is highly recommended to avoid inadvertently hitting inflight message limits.
|
||||
* Loss of network connectivity may not be detected immediately. If this is an issue then consider setting
|
||||
`ClientOptions.KeepAlive` (sends regular messages to check the link is active).
|
||||
* Reusing a `Client` is not completely safe. After calling `Disconnect` please create a new Client (`NewClient()`) rather
|
||||
@ -193,4 +195,4 @@ Discussion of the Paho clients takes place on the [Eclipse paho-dev mailing list
|
||||
|
||||
General questions about the MQTT protocol are discussed in the [MQTT Google Group](https://groups.google.com/forum/?hl=en-US&fromgroups#!forum/mqtt).
|
||||
|
||||
There is much more information available via the [MQTT community site](http://mqtt.org).
|
||||
There is much more information available via the [MQTT community site](http://mqtt.org).
|
||||
|
104
vendor/github.com/eclipse/paho.mqtt.golang/backoff.go
generated
vendored
Normal file
104
vendor/github.com/eclipse/paho.mqtt.golang/backoff.go
generated
vendored
Normal file
@ -0,0 +1,104 @@
|
||||
/*
|
||||
* Copyright (c) 2021 IBM Corp and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v2.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* https://www.eclipse.org/legal/epl-2.0/
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Matt Brittan
|
||||
* Daichi Tomaru
|
||||
*/
|
||||
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Controller for sleep with backoff when the client attempts reconnection
|
||||
// It has statuses for each situations cause reconnection.
|
||||
type backoffController struct {
|
||||
sync.RWMutex
|
||||
statusMap map[string]*backoffStatus
|
||||
}
|
||||
|
||||
type backoffStatus struct {
|
||||
lastSleepPeriod time.Duration
|
||||
lastErrorTime time.Time
|
||||
}
|
||||
|
||||
func newBackoffController() *backoffController {
|
||||
return &backoffController{
|
||||
statusMap: map[string]*backoffStatus{},
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate next sleep period from the specified parameters.
|
||||
// Returned values are next sleep period and whether the error situation is continual.
|
||||
// If connection errors continuouslly occurs, its sleep period is exponentially increased.
|
||||
// Also if there is a lot of time between last and this error, sleep period is initialized.
|
||||
func (b *backoffController) getBackoffSleepTime(
|
||||
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
|
||||
) (time.Duration, bool) {
|
||||
// Decide first sleep time if the situation is not continual.
|
||||
var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) {
|
||||
if skip {
|
||||
status.lastSleepPeriod = 0
|
||||
return 0, false
|
||||
}
|
||||
status.lastSleepPeriod = init
|
||||
return init, false
|
||||
}
|
||||
|
||||
// Prioritize maxSleep.
|
||||
if initSleepPeriod > maxSleepPeriod {
|
||||
initSleepPeriod = maxSleepPeriod
|
||||
}
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
status, exist := b.statusMap[situation]
|
||||
if !exist {
|
||||
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
|
||||
return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst)
|
||||
}
|
||||
|
||||
oldTime := status.lastErrorTime
|
||||
status.lastErrorTime = time.Now()
|
||||
|
||||
// When there is a lot of time between last and this error, sleep period is initialized.
|
||||
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
|
||||
return firstProcess(status, initSleepPeriod, skipFirst)
|
||||
}
|
||||
|
||||
if status.lastSleepPeriod == 0 {
|
||||
status.lastSleepPeriod = initSleepPeriod
|
||||
return initSleepPeriod, true
|
||||
}
|
||||
|
||||
if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
|
||||
status.lastSleepPeriod = nextSleepPeriod
|
||||
} else {
|
||||
status.lastSleepPeriod = maxSleepPeriod
|
||||
}
|
||||
|
||||
return status.lastSleepPeriod, true
|
||||
}
|
||||
|
||||
// Execute sleep the time returned from getBackoffSleepTime.
|
||||
func (b *backoffController) sleepWithBackoff(
|
||||
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
|
||||
) (time.Duration, bool) {
|
||||
sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst)
|
||||
if sleep != 0 {
|
||||
time.Sleep(sleep)
|
||||
}
|
||||
return sleep, isFirst
|
||||
}
|
376
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
376
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
@ -38,13 +38,6 @@ import (
|
||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||
)
|
||||
|
||||
const (
|
||||
disconnected uint32 = iota
|
||||
connecting
|
||||
reconnecting
|
||||
connected
|
||||
)
|
||||
|
||||
// Client is the interface definition for a Client as used by this
|
||||
// library, the interface is primarily to allow mocking tests.
|
||||
//
|
||||
@ -52,9 +45,12 @@ const (
|
||||
// with an MQTT server using non-blocking methods that allow work
|
||||
// to be done in the background.
|
||||
// An application may connect to an MQTT server using:
|
||||
// A plain TCP socket
|
||||
// A secure SSL/TLS socket
|
||||
// A websocket
|
||||
//
|
||||
// A plain TCP socket (e.g. mqtt://test.mosquitto.org:1833)
|
||||
// A secure SSL/TLS socket (e.g. tls://test.mosquitto.org:8883)
|
||||
// A websocket (e.g ws://test.mosquitto.org:8080 or wss://test.mosquitto.org:8081)
|
||||
// Something else (using `options.CustomOpenConnectionFn`)
|
||||
//
|
||||
// To enable ensured message delivery at Quality of Service (QoS) levels
|
||||
// described in the MQTT spec, a message persistence mechanism must be
|
||||
// used. This is done by providing a type which implements the Store
|
||||
@ -128,8 +124,7 @@ type client struct {
|
||||
lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
|
||||
pingOutstanding int32 // set to 1 if a ping has been sent but response not ret received
|
||||
|
||||
status uint32 // see const definitions at top of file for possible values
|
||||
sync.RWMutex // Protects the above two variables (note: atomic writes are also used somewhat inconsistently)
|
||||
status connectionStatus // see constants in status.go for values
|
||||
|
||||
messageIds // effectively a map from message id to token completor
|
||||
|
||||
@ -146,6 +141,8 @@ type client struct {
|
||||
stop chan struct{} // Closed to request that workers stop
|
||||
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
|
||||
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
|
||||
|
||||
backoff *backoffController
|
||||
}
|
||||
|
||||
// NewClient will create an MQTT v3.1.1 client with all of the options specified
|
||||
@ -169,12 +166,12 @@ func NewClient(o *ClientOptions) Client {
|
||||
c.options.protocolVersionExplicit = false
|
||||
}
|
||||
c.persist = c.options.Store
|
||||
c.status = disconnected
|
||||
c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
|
||||
c.msgRouter = newRouter()
|
||||
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
|
||||
c.obound = make(chan *PacketAndToken)
|
||||
c.oboundP = make(chan *PacketAndToken)
|
||||
c.backoff = newBackoffController()
|
||||
return c
|
||||
}
|
||||
|
||||
@ -196,47 +193,27 @@ func (c *client) AddRoute(topic string, callback MessageHandler) {
|
||||
// the client is connected or not.
|
||||
// connected means that the connection is up now OR it will
|
||||
// be established/reestablished automatically when possible
|
||||
// Warning: The connection status may change at any time so use this with care!
|
||||
func (c *client) IsConnected() bool {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
status := atomic.LoadUint32(&c.status)
|
||||
// This will need to change if additional statuses are added
|
||||
s, r := c.status.ConnectionStatusRetry()
|
||||
switch {
|
||||
case status == connected:
|
||||
case s == connected:
|
||||
return true
|
||||
case c.options.AutoReconnect && status > connecting:
|
||||
return true
|
||||
case c.options.ConnectRetry && status == connecting:
|
||||
case c.options.ConnectRetry && s == connecting:
|
||||
return true
|
||||
case c.options.AutoReconnect:
|
||||
return s == reconnecting || (s == disconnecting && r) // r indicates we will reconnect
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsConnectionOpen return a bool signifying whether the client has an active
|
||||
// connection to mqtt broker, i.e not in disconnected or reconnect mode
|
||||
// connection to mqtt broker, i.e. not in disconnected or reconnect mode
|
||||
// Warning: The connection status may change at any time so use this with care!
|
||||
func (c *client) IsConnectionOpen() bool {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
status := atomic.LoadUint32(&c.status)
|
||||
switch {
|
||||
case status == connected:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) connectionStatus() uint32 {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
status := atomic.LoadUint32(&c.status)
|
||||
return status
|
||||
}
|
||||
|
||||
func (c *client) setConnected(status uint32) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
atomic.StoreUint32(&c.status, status)
|
||||
return c.status.ConnectionStatus() == connected
|
||||
}
|
||||
|
||||
// ErrNotConnected is the error returned from function calls that are
|
||||
@ -253,25 +230,31 @@ func (c *client) Connect() Token {
|
||||
t := newToken(packets.Connect).(*ConnectToken)
|
||||
DEBUG.Println(CLI, "Connect()")
|
||||
|
||||
if c.options.ConnectRetry && atomic.LoadUint32(&c.status) != disconnected {
|
||||
// if in any state other than disconnected and ConnectRetry is
|
||||
// enabled then the connection will come up automatically
|
||||
// client can assume connection is up
|
||||
WARN.Println(CLI, "Connect() called but not disconnected")
|
||||
t.returnCode = packets.Accepted
|
||||
t.flowComplete()
|
||||
connectionUp, err := c.status.Connecting()
|
||||
if err != nil {
|
||||
if err == errAlreadyConnectedOrReconnecting && c.options.AutoReconnect {
|
||||
// When reconnection is active we don't consider calls tro Connect to ba an error (mainly for compatability)
|
||||
WARN.Println(CLI, "Connect() called but not disconnected")
|
||||
t.returnCode = packets.Accepted
|
||||
t.flowComplete()
|
||||
return t
|
||||
}
|
||||
ERROR.Println(CLI, err) // CONNECT should never be called unless we are disconnected
|
||||
t.setError(err)
|
||||
return t
|
||||
}
|
||||
|
||||
c.persist.Open()
|
||||
if c.options.ConnectRetry {
|
||||
c.reserveStoredPublishIDs() // Reserve IDs to allow publish before connect complete
|
||||
c.reserveStoredPublishIDs() // Reserve IDs to allow publishing before connect complete
|
||||
}
|
||||
c.setConnected(connecting)
|
||||
|
||||
go func() {
|
||||
if len(c.options.Servers) == 0 {
|
||||
t.setError(fmt.Errorf("no servers defined to connect to"))
|
||||
if err := connectionUp(false); err != nil {
|
||||
ERROR.Println(CLI, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -285,26 +268,28 @@ func (c *client) Connect() Token {
|
||||
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
|
||||
time.Sleep(c.options.ConnectRetryInterval)
|
||||
|
||||
if atomic.LoadUint32(&c.status) == connecting {
|
||||
if c.status.ConnectionStatus() == connecting { // Possible connection aborted elsewhere
|
||||
goto RETRYCONN
|
||||
}
|
||||
}
|
||||
ERROR.Println(CLI, "Failed to connect to a broker")
|
||||
c.setConnected(disconnected)
|
||||
c.persist.Close()
|
||||
t.returnCode = rc
|
||||
t.setError(err)
|
||||
if err := connectionUp(false); err != nil {
|
||||
ERROR.Println(CLI, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
|
||||
if c.startCommsWorkers(conn, inboundFromStore) {
|
||||
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
|
||||
if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected)
|
||||
// Take care of any messages in the store
|
||||
if !c.options.CleanSession {
|
||||
c.resume(c.options.ResumeSubs, inboundFromStore)
|
||||
} else {
|
||||
c.persist.Reset()
|
||||
}
|
||||
} else {
|
||||
} else { // Note: With the new status subsystem this should only happen if Disconnect called simultaneously with the above
|
||||
WARN.Println(CLI, "Connect() called but connection established in another goroutine")
|
||||
}
|
||||
|
||||
@ -316,13 +301,20 @@ func (c *client) Connect() Token {
|
||||
}
|
||||
|
||||
// internal function used to reconnect the client when it loses its connection
|
||||
func (c *client) reconnect() {
|
||||
// The connection status MUST be reconnecting prior to calling this function (via call to status.connectionLost)
|
||||
func (c *client) reconnect(connectionUp connCompletedFn) {
|
||||
DEBUG.Println(CLI, "enter reconnect")
|
||||
var (
|
||||
sleep = 1 * time.Second
|
||||
initSleep = 1 * time.Second
|
||||
conn net.Conn
|
||||
)
|
||||
|
||||
// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
|
||||
// Sleep time is exponentially increased as the same situation continues
|
||||
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual {
|
||||
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
|
||||
}
|
||||
|
||||
for {
|
||||
if nil != c.options.OnReconnecting {
|
||||
c.options.OnReconnecting(c, &c.options)
|
||||
@ -332,32 +324,20 @@ func (c *client) reconnect() {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
|
||||
time.Sleep(sleep)
|
||||
if sleep < c.options.MaxReconnectInterval {
|
||||
sleep *= 2
|
||||
}
|
||||
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
|
||||
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
|
||||
|
||||
if sleep > c.options.MaxReconnectInterval {
|
||||
sleep = c.options.MaxReconnectInterval
|
||||
}
|
||||
// Disconnect may have been called
|
||||
if atomic.LoadUint32(&c.status) == disconnected {
|
||||
break
|
||||
if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
|
||||
if err := connectionUp(false); err != nil { // Should always return an error
|
||||
ERROR.Println(CLI, err.Error())
|
||||
}
|
||||
DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Disconnect() must have been called while we were trying to reconnect.
|
||||
if c.connectionStatus() == disconnected {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
|
||||
return
|
||||
}
|
||||
|
||||
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
|
||||
if c.startCommsWorkers(conn, inboundFromStore) {
|
||||
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
|
||||
if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected)
|
||||
c.resume(c.options.ResumeSubs, inboundFromStore)
|
||||
}
|
||||
close(inboundFromStore)
|
||||
@ -392,6 +372,7 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
|
||||
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
|
||||
}
|
||||
connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established
|
||||
dialer := c.options.Dialer
|
||||
if dialer == nil { //
|
||||
WARN.Println(CLI, "dialer was nil, using default")
|
||||
@ -411,16 +392,23 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
}
|
||||
DEBUG.Println(CLI, "socket connected to broker")
|
||||
|
||||
// Now we send the perform the MQTT connection handshake
|
||||
// Now we perform the MQTT connection handshake ensuring that it does not exceed the timeout
|
||||
if err := conn.SetDeadline(connDeadline); err != nil {
|
||||
ERROR.Println(CLI, "set deadline for handshake ", err)
|
||||
}
|
||||
|
||||
// Now we perform the MQTT connection handshake
|
||||
rc, sessionPresent, err = connectMQTT(conn, cm, protocolVersion)
|
||||
if rc == packets.Accepted {
|
||||
if err := conn.SetDeadline(time.Time{}); err != nil {
|
||||
ERROR.Println(CLI, "reset deadline following handshake ", err)
|
||||
}
|
||||
break // successfully connected
|
||||
}
|
||||
|
||||
// We may be have to attempt the connection with MQTT 3.1
|
||||
if conn != nil {
|
||||
_ = conn.Close()
|
||||
}
|
||||
// We may have to attempt the connection with MQTT 3.1
|
||||
_ = conn.Close()
|
||||
|
||||
if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
|
||||
DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
|
||||
protocolVersion = 3
|
||||
@ -452,43 +440,59 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
// reusing the `client` may lead to panics. If you want to reconnect when the connection drops then use
|
||||
// `SetAutoReconnect` and/or `SetConnectRetry`options instead of implementing this yourself.
|
||||
func (c *client) Disconnect(quiesce uint) {
|
||||
defer c.disconnect()
|
||||
done := make(chan struct{}) // Simplest way to ensure quiesce is always honoured
|
||||
go func() {
|
||||
defer close(done)
|
||||
disDone, err := c.status.Disconnecting()
|
||||
if err != nil {
|
||||
// Status has been set to disconnecting, but we had to wait for something else to complete
|
||||
WARN.Println(CLI, err.Error())
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
c.disconnect() // Force disconnection
|
||||
disDone() // Update status
|
||||
}()
|
||||
DEBUG.Println(CLI, "disconnecting")
|
||||
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
|
||||
dt := newToken(packets.Disconnect)
|
||||
select {
|
||||
case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
|
||||
// wait for work to finish, or quiesce time consumed
|
||||
DEBUG.Println(CLI, "calling WaitTimeout")
|
||||
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
|
||||
DEBUG.Println(CLI, "WaitTimeout done")
|
||||
// Below code causes a potential data race. Following status refactor it should no longer be required
|
||||
// but leaving in as need to check code further.
|
||||
// case <-c.commsStopped:
|
||||
// WARN.Println("Disconnect packet could not be sent because comms stopped")
|
||||
case <-time.After(time.Duration(quiesce) * time.Millisecond):
|
||||
WARN.Println("Disconnect packet not sent due to timeout")
|
||||
}
|
||||
}()
|
||||
|
||||
status := atomic.LoadUint32(&c.status)
|
||||
c.setConnected(disconnected)
|
||||
|
||||
if status != connected {
|
||||
WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
|
||||
return
|
||||
}
|
||||
|
||||
DEBUG.Println(CLI, "disconnecting")
|
||||
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
|
||||
dt := newToken(packets.Disconnect)
|
||||
// Return when done or after timeout expires (would like to change but this maintains compatibility)
|
||||
delay := time.NewTimer(time.Duration(quiesce) * time.Millisecond)
|
||||
select {
|
||||
case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
|
||||
// wait for work to finish, or quiesce time consumed
|
||||
DEBUG.Println(CLI, "calling WaitTimeout")
|
||||
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
|
||||
DEBUG.Println(CLI, "WaitTimeout done")
|
||||
// Let's comment this chunk of code until we are able to safely read this variable
|
||||
// without data races.
|
||||
// case <-c.commsStopped:
|
||||
// WARN.Println("Disconnect packet could not be sent because comms stopped")
|
||||
case <-time.After(time.Duration(quiesce) * time.Millisecond):
|
||||
WARN.Println("Disconnect packet not sent due to timeout")
|
||||
case <-done:
|
||||
if !delay.Stop() {
|
||||
<-delay.C
|
||||
}
|
||||
case <-delay.C:
|
||||
}
|
||||
}
|
||||
|
||||
// forceDisconnect will end the connection with the mqtt broker immediately (used for tests only)
|
||||
func (c *client) forceDisconnect() {
|
||||
if !c.IsConnected() {
|
||||
WARN.Println(CLI, "already disconnected")
|
||||
disDone, err := c.status.Disconnecting()
|
||||
if err != nil {
|
||||
// Possible that we are not actually connected
|
||||
WARN.Println(CLI, err.Error())
|
||||
return
|
||||
}
|
||||
c.setConnected(disconnected)
|
||||
DEBUG.Println(CLI, "forcefully disconnecting")
|
||||
c.disconnect()
|
||||
disDone()
|
||||
}
|
||||
|
||||
// disconnect cleans up after a final disconnection (user requested so no auto reconnection)
|
||||
@ -505,49 +509,79 @@ func (c *client) disconnect() {
|
||||
|
||||
// internalConnLost cleanup when connection is lost or an error occurs
|
||||
// Note: This function will not block
|
||||
func (c *client) internalConnLost(err error) {
|
||||
func (c *client) internalConnLost(whyConnLost error) {
|
||||
// It is possible that internalConnLost will be called multiple times simultaneously
|
||||
// (including after sending a DisconnectPacket) as such we only do cleanup etc if the
|
||||
// routines were actually running and are not being disconnected at users request
|
||||
DEBUG.Println(CLI, "internalConnLost called")
|
||||
stopDone := c.stopCommsWorkers()
|
||||
if stopDone != nil { // stopDone will be nil if workers already in the process of stopping or stopped
|
||||
go func() {
|
||||
DEBUG.Println(CLI, "internalConnLost waiting on workers")
|
||||
<-stopDone
|
||||
DEBUG.Println(CLI, "internalConnLost workers stopped")
|
||||
// It is possible that Disconnect was called which led to this error so reconnection depends upon status
|
||||
reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting
|
||||
|
||||
if c.options.CleanSession && !reconnect {
|
||||
c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens
|
||||
} else if !c.options.ResumeSubs {
|
||||
c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens
|
||||
}
|
||||
if reconnect {
|
||||
c.setConnected(reconnecting)
|
||||
go c.reconnect()
|
||||
} else {
|
||||
c.setConnected(disconnected)
|
||||
}
|
||||
if c.options.OnConnectionLost != nil {
|
||||
go c.options.OnConnectionLost(c, err)
|
||||
}
|
||||
DEBUG.Println(CLI, "internalConnLost complete")
|
||||
}()
|
||||
disDone, err := c.status.ConnectionLost(c.options.AutoReconnect && c.status.ConnectionStatus() > connecting)
|
||||
if err != nil {
|
||||
if err == errConnLossWhileDisconnecting || err == errAlreadyHandlingConnectionLoss {
|
||||
return // Loss of connection is expected or already being handled
|
||||
}
|
||||
ERROR.Println(CLI, fmt.Sprintf("internalConnLost unexpected status: %s", err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
// c.stopCommsWorker returns a channel that is closed when the operation completes. This was required prior
|
||||
// to the implementation of proper status management but has been left in place, for now, to minimise change
|
||||
stopDone := c.stopCommsWorkers()
|
||||
// stopDone was required in previous versions because there was no connectionLost status (and there were
|
||||
// issues with status handling). This code has been left in place for the time being just in case the new
|
||||
// status handling contains bugs (refactoring required at some point).
|
||||
if stopDone == nil { // stopDone will be nil if workers already in the process of stopping or stopped
|
||||
ERROR.Println(CLI, "internalConnLost stopDone unexpectedly nil - BUG BUG")
|
||||
// Cannot really do anything other than leave things disconnected
|
||||
if _, err = disDone(false); err != nil { // Safest option - cannot leave status as connectionLost
|
||||
ERROR.Println(CLI, fmt.Sprintf("internalConnLost failed to set status to disconnected (stopDone): %s", err.Error()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// It may take a while for the disconnection to complete whatever called us needs to exit cleanly so finnish in goRoutine
|
||||
go func() {
|
||||
DEBUG.Println(CLI, "internalConnLost waiting on workers")
|
||||
<-stopDone
|
||||
DEBUG.Println(CLI, "internalConnLost workers stopped")
|
||||
|
||||
reConnDone, err := disDone(true)
|
||||
if err != nil {
|
||||
ERROR.Println(CLI, "failure whilst reporting completion of disconnect", err)
|
||||
} else if reConnDone == nil { // Should never happen
|
||||
ERROR.Println(CLI, "BUG BUG BUG reconnection function is nil", err)
|
||||
}
|
||||
|
||||
reconnect := err == nil && reConnDone != nil
|
||||
|
||||
if c.options.CleanSession && !reconnect {
|
||||
c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens
|
||||
} else if !c.options.ResumeSubs {
|
||||
c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens
|
||||
}
|
||||
if reconnect {
|
||||
go c.reconnect(reConnDone) // Will set connection status to reconnecting
|
||||
}
|
||||
if c.options.OnConnectionLost != nil {
|
||||
go c.options.OnConnectionLost(c, whyConnLost)
|
||||
}
|
||||
DEBUG.Println(CLI, "internalConnLost complete")
|
||||
}()
|
||||
}
|
||||
|
||||
// startCommsWorkers is called when the connection is up.
|
||||
// It starts off all of the routines needed to process incoming and outgoing messages.
|
||||
// Returns true if the comms workers were started (i.e. they were not already running)
|
||||
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
|
||||
// It starts off the routines needed to process incoming and outgoing messages.
|
||||
// Returns true if the comms workers were started (i.e. successful connection)
|
||||
// connectionUp(true) will be called once everything is up; connectionUp(false) will be called on failure
|
||||
func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn, inboundFromStore <-chan packets.ControlPacket) bool {
|
||||
DEBUG.Println(CLI, "startCommsWorkers called")
|
||||
c.connMu.Lock()
|
||||
defer c.connMu.Unlock()
|
||||
if c.conn != nil {
|
||||
WARN.Println(CLI, "startCommsWorkers called when commsworkers already running")
|
||||
conn.Close() // No use for the new network connection
|
||||
if c.conn != nil { // Should never happen due to new status handling; leaving in for safety for the time being
|
||||
WARN.Println(CLI, "startCommsWorkers called when commsworkers already running BUG BUG")
|
||||
_ = conn.Close() // No use for the new network connection
|
||||
if err := connectionUp(false); err != nil {
|
||||
ERROR.Println(CLI, err.Error())
|
||||
}
|
||||
return false
|
||||
}
|
||||
c.conn = conn // Store the connection
|
||||
@ -567,7 +601,17 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
|
||||
c.workers.Add(1) // Done will be called when ackOut is closed
|
||||
ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
|
||||
|
||||
c.setConnected(connected)
|
||||
// The connection is now ready for use (we spin up a few go routines below). It is possible that
|
||||
// Disconnect has been called in the interim...
|
||||
if err := connectionUp(true); err != nil {
|
||||
DEBUG.Println(CLI, err)
|
||||
close(c.stop) // Tidy up anything we have already started
|
||||
close(incomingPubChan)
|
||||
c.workers.Wait()
|
||||
c.conn.Close()
|
||||
c.conn = nil
|
||||
return false
|
||||
}
|
||||
DEBUG.Println(CLI, "client is connected/reconnected")
|
||||
if c.options.OnConnect != nil {
|
||||
go c.options.OnConnect(c)
|
||||
@ -660,8 +704,9 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
|
||||
}
|
||||
|
||||
// stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped
|
||||
// Returns nil it workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
|
||||
// Returns nil if workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
|
||||
// Note: This may block so run as a go routine if calling from any of the comms routines
|
||||
// Note2: It should be possible to simplify this now that the new status management code is in place.
|
||||
func (c *client) stopCommsWorkers() chan struct{} {
|
||||
DEBUG.Println(CLI, "stopCommsWorkers called")
|
||||
// It is possible that this function will be called multiple times simultaneously due to the way things get shutdown
|
||||
@ -710,7 +755,8 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
|
||||
case !c.IsConnected():
|
||||
token.setError(ErrNotConnected)
|
||||
return token
|
||||
case c.connectionStatus() == reconnecting && qos == 0:
|
||||
case c.status.ConnectionStatus() == reconnecting && qos == 0:
|
||||
// message written to store and will be sent when connection comes up
|
||||
token.flowComplete()
|
||||
return token
|
||||
}
|
||||
@ -740,11 +786,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
|
||||
token.messageID = mID
|
||||
}
|
||||
persistOutbound(c.persist, pub)
|
||||
switch c.connectionStatus() {
|
||||
switch c.status.ConnectionStatus() {
|
||||
case connecting:
|
||||
DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic)
|
||||
case reconnecting:
|
||||
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
|
||||
case disconnecting:
|
||||
DEBUG.Println(CLI, "storing publish message (disconnecting), topic:", topic)
|
||||
default:
|
||||
DEBUG.Println(CLI, "sending publish message, topic:", topic)
|
||||
publishWaitTimeout := c.options.WriteTimeout
|
||||
@ -777,11 +825,11 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
|
||||
if !c.IsConnectionOpen() {
|
||||
switch {
|
||||
case !c.options.ResumeSubs:
|
||||
// if not connected and resumesubs not set this sub will be thrown away
|
||||
// if not connected and resumeSubs not set this sub will be thrown away
|
||||
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
|
||||
return token
|
||||
case c.options.CleanSession && c.connectionStatus() == reconnecting:
|
||||
// if reconnecting and cleansession is true this sub will be thrown away
|
||||
case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
|
||||
// if reconnecting and cleanSession is true this sub will be thrown away
|
||||
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
|
||||
return token
|
||||
}
|
||||
@ -822,11 +870,13 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
|
||||
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
|
||||
persistOutbound(c.persist, sub)
|
||||
}
|
||||
switch c.connectionStatus() {
|
||||
switch c.status.ConnectionStatus() {
|
||||
case connecting:
|
||||
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
|
||||
case reconnecting:
|
||||
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
|
||||
case disconnecting:
|
||||
DEBUG.Println(CLI, "storing subscribe message (disconnecting), topic:", topic)
|
||||
default:
|
||||
DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
|
||||
subscribeWaitTimeout := c.options.WriteTimeout
|
||||
@ -864,8 +914,8 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
|
||||
// if not connected and resumesubs not set this sub will be thrown away
|
||||
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
|
||||
return token
|
||||
case c.options.CleanSession && c.connectionStatus() == reconnecting:
|
||||
// if reconnecting and cleansession is true this sub will be thrown away
|
||||
case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
|
||||
// if reconnecting and cleanSession is true this sub will be thrown away
|
||||
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
|
||||
return token
|
||||
}
|
||||
@ -896,11 +946,13 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
|
||||
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
|
||||
persistOutbound(c.persist, sub)
|
||||
}
|
||||
switch c.connectionStatus() {
|
||||
switch c.status.ConnectionStatus() {
|
||||
case connecting:
|
||||
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
|
||||
case reconnecting:
|
||||
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
|
||||
case disconnecting:
|
||||
DEBUG.Println(CLI, "storing subscribe message (disconnecting), topics:", sub.Topics)
|
||||
default:
|
||||
DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
|
||||
subscribeWaitTimeout := c.options.WriteTimeout
|
||||
@ -1050,7 +1102,7 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
|
||||
}
|
||||
releaseSemaphore(token) // If limiting simultaneous messages then we need to know when message is acknowledged
|
||||
default:
|
||||
ERROR.Println(STR, "invalid message type in store (discarded)")
|
||||
ERROR.Println(STR, fmt.Sprintf("invalid message type (inbound - %T) in store (discarded)", packet))
|
||||
c.persist.Del(key)
|
||||
}
|
||||
} else {
|
||||
@ -1064,7 +1116,7 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
|
||||
return
|
||||
}
|
||||
default:
|
||||
ERROR.Println(STR, "invalid message type in store (discarded)")
|
||||
ERROR.Println(STR, fmt.Sprintf("invalid message type (%T) in store (discarded)", packet))
|
||||
c.persist.Del(key)
|
||||
}
|
||||
}
|
||||
@ -1085,11 +1137,11 @@ func (c *client) Unsubscribe(topics ...string) Token {
|
||||
if !c.IsConnectionOpen() {
|
||||
switch {
|
||||
case !c.options.ResumeSubs:
|
||||
// if not connected and resumesubs not set this unsub will be thrown away
|
||||
// if not connected and resumeSubs not set this unsub will be thrown away
|
||||
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
|
||||
return token
|
||||
case c.options.CleanSession && c.connectionStatus() == reconnecting:
|
||||
// if reconnecting and cleansession is true this unsub will be thrown away
|
||||
case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
|
||||
// if reconnecting and cleanSession is true this unsub will be thrown away
|
||||
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
|
||||
return token
|
||||
}
|
||||
@ -1112,11 +1164,13 @@ func (c *client) Unsubscribe(topics ...string) Token {
|
||||
persistOutbound(c.persist, unsub)
|
||||
}
|
||||
|
||||
switch c.connectionStatus() {
|
||||
switch c.status.ConnectionStatus() {
|
||||
case connecting:
|
||||
DEBUG.Println(CLI, "storing unsubscribe message (connecting), topics:", topics)
|
||||
case reconnecting:
|
||||
DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
|
||||
case disconnecting:
|
||||
DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
|
||||
default:
|
||||
DEBUG.Println(CLI, "sending unsubscribe message, topics:", topics)
|
||||
subscribeWaitTimeout := c.options.WriteTimeout
|
||||
|
26
vendor/github.com/eclipse/paho.mqtt.golang/messageids.go
generated
vendored
26
vendor/github.com/eclipse/paho.mqtt.golang/messageids.go
generated
vendored
@ -31,7 +31,7 @@ import (
|
||||
type MId uint16
|
||||
|
||||
type messageIds struct {
|
||||
sync.RWMutex
|
||||
mu sync.RWMutex // Named to prevent Mu from being accessible directly via client
|
||||
index map[uint16]tokenCompletor
|
||||
|
||||
lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier)
|
||||
@ -44,7 +44,7 @@ const (
|
||||
|
||||
// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens.
|
||||
func (mids *messageIds) cleanUp() {
|
||||
mids.Lock()
|
||||
mids.mu.Lock()
|
||||
for _, token := range mids.index {
|
||||
switch token.(type) {
|
||||
case *PublishToken:
|
||||
@ -59,14 +59,14 @@ func (mids *messageIds) cleanUp() {
|
||||
token.flowComplete()
|
||||
}
|
||||
mids.index = make(map[uint16]tokenCompletor)
|
||||
mids.Unlock()
|
||||
mids.mu.Unlock()
|
||||
DEBUG.Println(MID, "cleaned up")
|
||||
}
|
||||
|
||||
// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error)
|
||||
// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets
|
||||
func (mids *messageIds) cleanUpSubscribe() {
|
||||
mids.Lock()
|
||||
mids.mu.Lock()
|
||||
for mid, token := range mids.index {
|
||||
switch token.(type) {
|
||||
case *SubscribeToken:
|
||||
@ -77,19 +77,19 @@ func (mids *messageIds) cleanUpSubscribe() {
|
||||
delete(mids.index, mid)
|
||||
}
|
||||
}
|
||||
mids.Unlock()
|
||||
mids.mu.Unlock()
|
||||
DEBUG.Println(MID, "cleaned up subs")
|
||||
}
|
||||
|
||||
func (mids *messageIds) freeID(id uint16) {
|
||||
mids.Lock()
|
||||
mids.mu.Lock()
|
||||
delete(mids.index, id)
|
||||
mids.Unlock()
|
||||
mids.mu.Unlock()
|
||||
}
|
||||
|
||||
func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
|
||||
mids.Lock()
|
||||
defer mids.Unlock()
|
||||
mids.mu.Lock()
|
||||
defer mids.mu.Unlock()
|
||||
if _, ok := mids.index[id]; !ok {
|
||||
mids.index[id] = token
|
||||
} else {
|
||||
@ -105,8 +105,8 @@ func (mids *messageIds) claimID(token tokenCompletor, id uint16) {
|
||||
// getID will return an available id or 0 if none available
|
||||
// The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler)
|
||||
func (mids *messageIds) getID(t tokenCompletor) uint16 {
|
||||
mids.Lock()
|
||||
defer mids.Unlock()
|
||||
mids.mu.Lock()
|
||||
defer mids.mu.Unlock()
|
||||
i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty
|
||||
looped := false // uint16 will loop from 65535->0
|
||||
for {
|
||||
@ -127,8 +127,8 @@ func (mids *messageIds) getID(t tokenCompletor) uint16 {
|
||||
}
|
||||
|
||||
func (mids *messageIds) getToken(id uint16) tokenCompletor {
|
||||
mids.RLock()
|
||||
defer mids.RUnlock()
|
||||
mids.mu.RLock()
|
||||
defer mids.mu.RUnlock()
|
||||
if token, ok := mids.index[id]; ok {
|
||||
return token
|
||||
}
|
||||
|
11
vendor/github.com/eclipse/paho.mqtt.golang/net.go
generated
vendored
11
vendor/github.com/eclipse/paho.mqtt.golang/net.go
generated
vendored
@ -150,7 +150,7 @@ type incomingComms struct {
|
||||
|
||||
// startIncomingComms initiates incoming communications; this includes starting a goroutine to process incoming
|
||||
// messages.
|
||||
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as the
|
||||
// Accepts a channel of inbound messages from the store (persisted messages); note this must be closed as soon as
|
||||
// everything in the store has been sent.
|
||||
// Returns a channel that will be passed any received packets; this will be closed on a network error (and inboundFromStore closed)
|
||||
func startIncomingComms(conn io.Reader,
|
||||
@ -332,7 +332,7 @@ func startOutgoingComms(conn net.Conn,
|
||||
DEBUG.Println(NET, "outbound wrote disconnect, closing connection")
|
||||
// As per the MQTT spec "After sending a DISCONNECT Packet the Client MUST close the Network Connection"
|
||||
// Closing the connection will cause the goroutines to end in sequence (starting with incoming comms)
|
||||
conn.Close()
|
||||
_ = conn.Close()
|
||||
}
|
||||
case msg, ok := <-oboundFromIncoming: // message triggered by an inbound message (PubrecPacket or PubrelPacket)
|
||||
if !ok {
|
||||
@ -370,9 +370,10 @@ type commsFns interface {
|
||||
// startComms initiates goroutines that handles communications over the network connection
|
||||
// Messages will be stored (via commsFns) and deleted from the store as necessary
|
||||
// It returns two channels:
|
||||
// packets.PublishPacket - Will receive publish packets received over the network.
|
||||
// Closed when incoming comms routines exit (on shutdown or if network link closed)
|
||||
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
|
||||
//
|
||||
// packets.PublishPacket - Will receive publish packets received over the network.
|
||||
// Closed when incoming comms routines exit (on shutdown or if network link closed)
|
||||
// error - Any errors will be sent on this channel. The channel is closed when all comms routines have shut down
|
||||
//
|
||||
// Note: The comms routines monitoring oboundp and obound will not shutdown until those channels are both closed. Any messages received between the
|
||||
// connection being closed and those channels being closed will generate errors (and nothing will be sent). That way the chance of a deadlock is
|
||||
|
8
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
8
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
@ -40,10 +40,14 @@ import (
|
||||
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, dialer *net.Dialer) (net.Conn, error) {
|
||||
switch uri.Scheme {
|
||||
case "ws":
|
||||
conn, err := NewWebsocket(uri.String(), nil, timeout, headers, websocketOptions)
|
||||
dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
|
||||
dialURI.User = nil
|
||||
conn, err := NewWebsocket(dialURI.String(), nil, timeout, headers, websocketOptions)
|
||||
return conn, err
|
||||
case "wss":
|
||||
conn, err := NewWebsocket(uri.String(), tlsc, timeout, headers, websocketOptions)
|
||||
dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
|
||||
dialURI.User = nil
|
||||
conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions)
|
||||
return conn, err
|
||||
case "mqtt", "tcp":
|
||||
allProxy := os.Getenv("all_proxy")
|
||||
|
9
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
9
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
@ -104,6 +104,7 @@ type ClientOptions struct {
|
||||
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
|
||||
Dialer *net.Dialer
|
||||
CustomOpenConnectionFn OpenConnectionFunc
|
||||
AutoAckDisabled bool
|
||||
}
|
||||
|
||||
// NewClientOptions will create a new ClientClientOptions type with some
|
||||
@ -147,6 +148,7 @@ func NewClientOptions() *ClientOptions {
|
||||
WebsocketOptions: &WebsocketOptions{},
|
||||
Dialer: &net.Dialer{Timeout: 30 * time.Second},
|
||||
CustomOpenConnectionFn: nil,
|
||||
AutoAckDisabled: false,
|
||||
}
|
||||
return o
|
||||
}
|
||||
@ -446,3 +448,10 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon
|
||||
}
|
||||
return o
|
||||
}
|
||||
|
||||
// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler.
|
||||
// By default it is set to false. Setting it to true will disable the auto-ack globally.
|
||||
func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions {
|
||||
o.AutoAckDisabled = autoAckDisabled
|
||||
return o
|
||||
}
|
||||
|
12
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
12
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
@ -32,16 +32,16 @@ import (
|
||||
func keepalive(c *client, conn io.Writer) {
|
||||
defer c.workers.Done()
|
||||
DEBUG.Println(PNG, "keepalive starting")
|
||||
var checkInterval int64
|
||||
var checkInterval time.Duration
|
||||
var pingSent time.Time
|
||||
|
||||
if c.options.KeepAlive > 10 {
|
||||
checkInterval = 5
|
||||
checkInterval = 5 * time.Second
|
||||
} else {
|
||||
checkInterval = c.options.KeepAlive / 2
|
||||
checkInterval = time.Duration(c.options.KeepAlive) * time.Second / 2
|
||||
}
|
||||
|
||||
intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second)))
|
||||
intervalTicker := time.NewTicker(checkInterval)
|
||||
defer intervalTicker.Stop()
|
||||
|
||||
for {
|
||||
@ -58,8 +58,8 @@ func keepalive(c *client, conn io.Writer) {
|
||||
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
|
||||
DEBUG.Println(PNG, "keepalive sending ping")
|
||||
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
|
||||
// We don't want to wait behind large messages being sent, the Write call
|
||||
// will block until it it able to send the packet.
|
||||
// We don't want to wait behind large messages being sent, the `Write` call
|
||||
// will block until it is able to send the packet.
|
||||
atomic.StoreInt32(&c.pingOutstanding, 1)
|
||||
if err := ping.Write(conn); err != nil {
|
||||
ERROR.Println(PNG, err)
|
||||
|
12
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
12
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
hd(client, m)
|
||||
m.Ack()
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
r.defaultHandler(client, m)
|
||||
m.Ack()
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
r.RUnlock()
|
||||
for _, handler := range handlers {
|
||||
handler(client, m)
|
||||
m.Ack()
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
}
|
||||
// DEBUG.Println(ROU, "matchAndDispatch handled message")
|
||||
}
|
||||
|
296
vendor/github.com/eclipse/paho.mqtt.golang/status.go
generated
vendored
Normal file
296
vendor/github.com/eclipse/paho.mqtt.golang/status.go
generated
vendored
Normal file
@ -0,0 +1,296 @@
|
||||
/*
|
||||
* Copyright (c) 2021 IBM Corp and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v2.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* https://www.eclipse.org/legal/epl-2.0/
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Seth Hoenig
|
||||
* Allan Stockdill-Mander
|
||||
* Mike Robertson
|
||||
* Matt Brittan
|
||||
*/
|
||||
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Status - Manage the connection status
|
||||
|
||||
// Multiple go routines will want to access/set this. Previously status was implemented as a `uint32` and updated
|
||||
// with a mixture of atomic functions and a mutex (leading to some deadlock type issues that were very hard to debug).
|
||||
|
||||
// In this new implementation `connectionStatus` takes over managing the state and provides functions that allow the
|
||||
// client to request a move to a particular state (it may reject these requests!). In some cases the 'state' is
|
||||
// transitory, for example `connecting`, in those cases a function will be returned that allows the client to move
|
||||
// to a more static state (`disconnected` or `connected`).
|
||||
|
||||
// This "belts-and-braces" may be a little over the top but issues with the status have caused a number of difficult
|
||||
// to trace bugs in the past and the likelihood that introducing a new system would introduce bugs seemed high!
|
||||
// I have written this in a way that should make it very difficult to misuse it (but it does make things a little
|
||||
// complex with functions returning functions that return functions!).
|
||||
|
||||
type status uint32
|
||||
|
||||
const (
|
||||
disconnected status = iota // default (nil) status is disconnected
|
||||
disconnecting // Transitioning from one of the below states back to disconnected
|
||||
connecting
|
||||
reconnecting
|
||||
connected
|
||||
)
|
||||
|
||||
// String simplify output of statuses
|
||||
func (s status) String() string {
|
||||
switch s {
|
||||
case disconnected:
|
||||
return "disconnected"
|
||||
case disconnecting:
|
||||
return "disconnecting"
|
||||
case connecting:
|
||||
return "connecting"
|
||||
case reconnecting:
|
||||
return "reconnecting"
|
||||
case connected:
|
||||
return "connected"
|
||||
default:
|
||||
return "invalid"
|
||||
}
|
||||
}
|
||||
|
||||
type connCompletedFn func(success bool) error
|
||||
type disconnectCompletedFn func()
|
||||
type connectionLostHandledFn func(bool) (connCompletedFn, error)
|
||||
|
||||
/* State transitions
|
||||
|
||||
static states are `disconnected` and `connected`. For all other states a process will hold a function that will move
|
||||
the state to one of those. That function effectively owns the state and any other changes must not proceed until it
|
||||
completes. One exception to that is that the state can always be moved to `disconnecting` which provides a signal that
|
||||
transitions to `connected` will be rejected (this is required because a Disconnect can be requested while in the
|
||||
Connecting state).
|
||||
|
||||
# Basic Operations
|
||||
|
||||
The standard workflows are:
|
||||
|
||||
disconnected -> `Connecting()` -> connecting -> `connCompletedFn(true)` -> connected
|
||||
connected -> `Disconnecting()` -> disconnecting -> `disconnectCompletedFn()` -> disconnected
|
||||
connected -> `ConnectionLost(false)` -> disconnecting -> `connectionLostHandledFn(true/false)` -> disconnected
|
||||
connected -> `ConnectionLost(true)` -> disconnecting -> `connectionLostHandledFn(true)` -> connected
|
||||
|
||||
Unfortunately the above workflows are complicated by the fact that `Disconnecting()` or `ConnectionLost()` may,
|
||||
potentially, be called at any time (i.e. whilst in the middle of transitioning between states). If this happens:
|
||||
|
||||
* The state will be set to disconnecting (which will prevent any request to move the status to connected)
|
||||
* The call to `Disconnecting()`/`ConnectionLost()` will block until the previously active call completes and then
|
||||
handle the disconnection.
|
||||
|
||||
Reading the tests (unit_status_test.go) might help understand these rules.
|
||||
*/
|
||||
|
||||
var (
|
||||
errAbortConnection = errors.New("disconnect called whist connection attempt in progress")
|
||||
errAlreadyConnectedOrReconnecting = errors.New("status is already connected or reconnecting")
|
||||
errStatusMustBeDisconnected = errors.New("status can only transition to connecting from disconnected")
|
||||
errAlreadyDisconnected = errors.New("status is already disconnected")
|
||||
errDisconnectionRequested = errors.New("disconnection was requested whilst the action was in progress")
|
||||
errDisconnectionInProgress = errors.New("disconnection already in progress")
|
||||
errAlreadyHandlingConnectionLoss = errors.New("status is already Connection Lost")
|
||||
errConnLossWhileDisconnecting = errors.New("connection status is disconnecting so loss of connection is expected")
|
||||
)
|
||||
|
||||
// connectionStatus encapsulates, and protects, the connection status.
|
||||
type connectionStatus struct {
|
||||
sync.RWMutex // Protects the variables below
|
||||
status status
|
||||
willReconnect bool // only used when status == disconnecting. Indicates that an attempt will be made to reconnect (allows us to abort that)
|
||||
|
||||
// Some statuses are transitional (e.g. connecting, connectionLost, reconnecting, disconnecting), that is, whatever
|
||||
// process moves us into that status will move us out of it when an action is complete. Sometimes other users
|
||||
// will need to know when the action is complete (e.g. the user calls `Disconnect()` whilst the status is
|
||||
// `connecting`). `actionCompleted` will be set whenever we move into one of the above statues and the channel
|
||||
// returned to anything else requesting a status change. The channel will be closed when the operation is complete.
|
||||
actionCompleted chan struct{} // Only valid whilst status is Connecting or Reconnecting; will be closed when connection completed (success or failure)
|
||||
}
|
||||
|
||||
// ConnectionStatus returns the connection status.
|
||||
// WARNING: the status may change at any time so users should not assume they are the only goroutine touching this
|
||||
func (c *connectionStatus) ConnectionStatus() status {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
return c.status
|
||||
}
|
||||
|
||||
// ConnectionStatusRetry returns the connection status and retry flag (indicates that we expect to reconnect).
|
||||
// WARNING: the status may change at any time so users should not assume they are the only goroutine touching this
|
||||
func (c *connectionStatus) ConnectionStatusRetry() (status, bool) {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
return c.status, c.willReconnect
|
||||
}
|
||||
|
||||
// Connecting - Changes the status to connecting if that is a permitted operation
|
||||
// Will do nothing unless the current status is disconnected
|
||||
// Returns a function that MUST be called when the operation is complete (pass in true if successful)
|
||||
func (c *connectionStatus) Connecting() (connCompletedFn, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
// Calling Connect when already connecting (or if reconnecting) may not always be considered an error
|
||||
if c.status == connected || c.status == reconnecting {
|
||||
return nil, errAlreadyConnectedOrReconnecting
|
||||
}
|
||||
if c.status != disconnected {
|
||||
return nil, errStatusMustBeDisconnected
|
||||
}
|
||||
c.status = connecting
|
||||
c.actionCompleted = make(chan struct{})
|
||||
return c.connected, nil
|
||||
}
|
||||
|
||||
// connected is an internal function (it is returned by functions that set the status to connecting or reconnecting,
|
||||
// calling it completes the operation). `success` is used to indicate whether the operation was successfully completed.
|
||||
func (c *connectionStatus) connected(success bool) error {
|
||||
c.Lock()
|
||||
defer func() {
|
||||
close(c.actionCompleted) // Alert anything waiting on the connection process to complete
|
||||
c.actionCompleted = nil // Be tidy
|
||||
c.Unlock()
|
||||
}()
|
||||
|
||||
// Status may have moved to disconnecting in the interim (i.e. at users request)
|
||||
if c.status == disconnecting {
|
||||
return errAbortConnection
|
||||
}
|
||||
if success {
|
||||
c.status = connected
|
||||
} else {
|
||||
c.status = disconnected
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Disconnecting - should be called when beginning the disconnection process (cleanup etc.).
|
||||
// Can be called from ANY status and the end result will always be a status of disconnected
|
||||
// Note that if a connection/reconnection attempt is in progress this function will set the status to `disconnecting`
|
||||
// then block until the connection process completes (or aborts).
|
||||
// Returns a function that MUST be called when the operation is complete (assumed to always be successful!)
|
||||
func (c *connectionStatus) Disconnecting() (disconnectCompletedFn, error) {
|
||||
c.Lock()
|
||||
if c.status == disconnected {
|
||||
c.Unlock()
|
||||
return nil, errAlreadyDisconnected // May not always be treated as an error
|
||||
}
|
||||
if c.status == disconnecting { // Need to wait for existing process to complete
|
||||
c.willReconnect = false // Ensure that the existing disconnect process will not reconnect
|
||||
disConnectDone := c.actionCompleted
|
||||
c.Unlock()
|
||||
<-disConnectDone // Wait for existing operation to complete
|
||||
return nil, errAlreadyDisconnected // Well we are now!
|
||||
}
|
||||
|
||||
prevStatus := c.status
|
||||
c.status = disconnecting
|
||||
|
||||
// We may need to wait for connection/reconnection process to complete (they should regularly check the status)
|
||||
if prevStatus == connecting || prevStatus == reconnecting {
|
||||
connectDone := c.actionCompleted
|
||||
c.Unlock() // Safe because the only way to leave the disconnecting status is via this function
|
||||
<-connectDone
|
||||
|
||||
if prevStatus == reconnecting && !c.willReconnect {
|
||||
return nil, errAlreadyDisconnected // Following connectionLost process we will be disconnected
|
||||
}
|
||||
c.Lock()
|
||||
}
|
||||
c.actionCompleted = make(chan struct{})
|
||||
c.Unlock()
|
||||
return c.disconnectionCompleted, nil
|
||||
}
|
||||
|
||||
// disconnectionCompleted is an internal function (it is returned by functions that set the status to disconnecting)
|
||||
func (c *connectionStatus) disconnectionCompleted() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.status = disconnected
|
||||
close(c.actionCompleted) // Alert anything waiting on the connection process to complete
|
||||
c.actionCompleted = nil
|
||||
}
|
||||
|
||||
// ConnectionLost - should be called when the connection is lost.
|
||||
// This really only differs from Disconnecting in that we may transition into a reconnection (but that could be
|
||||
// cancelled something else calls Disconnecting in the meantime).
|
||||
// The returned function should be called when cleanup is completed. It will return a function to be called when
|
||||
// reconnect completes (or nil if no reconnect requested/disconnect called in the interim).
|
||||
// Note: This function may block if a connection is in progress (the move to connected will be rejected)
|
||||
func (c *connectionStatus) ConnectionLost(willReconnect bool) (connectionLostHandledFn, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if c.status == disconnected {
|
||||
return nil, errAlreadyDisconnected
|
||||
}
|
||||
if c.status == disconnecting { // its expected that connection lost will be called during the disconnection process
|
||||
return nil, errDisconnectionInProgress
|
||||
}
|
||||
|
||||
c.willReconnect = willReconnect
|
||||
prevStatus := c.status
|
||||
c.status = disconnecting
|
||||
|
||||
// There is a slight possibility that a connection attempt is in progress (connection up and goroutines started but
|
||||
// status not yet changed). By changing the status we ensure that process will exit cleanly
|
||||
if prevStatus == connecting || prevStatus == reconnecting {
|
||||
connectDone := c.actionCompleted
|
||||
c.Unlock() // Safe because the only way to leave the disconnecting status is via this function
|
||||
<-connectDone
|
||||
c.Lock()
|
||||
if !willReconnect {
|
||||
// In this case the connection will always be aborted so there is nothing more for us to do
|
||||
return nil, errAlreadyDisconnected
|
||||
}
|
||||
}
|
||||
c.actionCompleted = make(chan struct{})
|
||||
|
||||
return c.getConnectionLostHandler(willReconnect), nil
|
||||
}
|
||||
|
||||
// getConnectionLostHandler is an internal function. It returns the function to be returned by ConnectionLost
|
||||
func (c *connectionStatus) getConnectionLostHandler(reconnectRequested bool) connectionLostHandledFn {
|
||||
return func(proceed bool) (connCompletedFn, error) {
|
||||
// Note that connCompletedFn will only be provided if both reconnectRequested and proceed are true
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// `Disconnecting()` may have been called while the disconnection was being processed (this makes it permanent!)
|
||||
if !c.willReconnect || !proceed {
|
||||
c.status = disconnected
|
||||
close(c.actionCompleted) // Alert anything waiting on the connection process to complete
|
||||
c.actionCompleted = nil
|
||||
if !reconnectRequested || !proceed {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, errDisconnectionRequested
|
||||
}
|
||||
|
||||
c.status = reconnecting
|
||||
return c.connected, nil // Note that c.actionCompleted is still live and will be closed in connected
|
||||
}
|
||||
}
|
||||
|
||||
// forceConnectionStatus - forces the connection status to the specified value.
|
||||
// This should only be used when there is no alternative (i.e. only in tests and to recover from situations that
|
||||
// are unexpected)
|
||||
func (c *connectionStatus) forceConnectionStatus(s status) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.status = s
|
||||
}
|
Reference in New Issue
Block a user