First implementation
This commit is contained in:
69
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
Normal file
69
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
Normal file
@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright (c) 2013 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*
|
||||
* Contributors:
|
||||
* Seth Hoenig
|
||||
* Allan Stockdill-Mander
|
||||
* Mike Robertson
|
||||
*/
|
||||
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang/packets"
|
||||
)
|
||||
|
||||
func keepalive(c *client) {
|
||||
defer c.workers.Done()
|
||||
DEBUG.Println(PNG, "keepalive starting")
|
||||
var checkInterval int64
|
||||
var pingSent time.Time
|
||||
|
||||
if c.options.KeepAlive > 10 {
|
||||
checkInterval = 5
|
||||
} else {
|
||||
checkInterval = c.options.KeepAlive / 2
|
||||
}
|
||||
|
||||
intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second)))
|
||||
defer intervalTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.stop:
|
||||
DEBUG.Println(PNG, "keepalive stopped")
|
||||
return
|
||||
case <-intervalTicker.C:
|
||||
lastSent := c.lastSent.Load().(time.Time)
|
||||
lastReceived := c.lastReceived.Load().(time.Time)
|
||||
|
||||
DEBUG.Println(PNG, "ping check", time.Since(lastSent).Seconds())
|
||||
if time.Since(lastSent) >= time.Duration(c.options.KeepAlive*int64(time.Second)) || time.Since(lastReceived) >= time.Duration(c.options.KeepAlive*int64(time.Second)) {
|
||||
if atomic.LoadInt32(&c.pingOutstanding) == 0 {
|
||||
DEBUG.Println(PNG, "keepalive sending ping")
|
||||
ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
|
||||
//We don't want to wait behind large messages being sent, the Write call
|
||||
//will block until it it able to send the packet.
|
||||
atomic.StoreInt32(&c.pingOutstanding, 1)
|
||||
ping.Write(c.conn)
|
||||
c.lastSent.Store(time.Now())
|
||||
pingSent = time.Now()
|
||||
}
|
||||
}
|
||||
if atomic.LoadInt32(&c.pingOutstanding) > 0 && time.Now().Sub(pingSent) >= c.options.PingTimeout {
|
||||
CRITICAL.Println(PNG, "pingresp not received, disconnecting")
|
||||
c.errors <- errors.New("pingresp not received, disconnecting")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user