2020-09-20 21:16:06 +00:00
/ *
2022-06-09 16:09:49 +00:00
* Copyright ( c ) 2021 IBM Corp and others .
2020-09-20 21:16:06 +00:00
*
* All rights reserved . This program and the accompanying materials
2022-06-09 16:09:49 +00:00
* are made available under the terms of the Eclipse Public License v2 .0
* and Eclipse Distribution License v1 .0 which accompany this distribution .
*
* The Eclipse Public License is available at
* https : //www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http : //www.eclipse.org/org/documents/edl-v10.php.
2020-09-20 21:16:06 +00:00
*
* Contributors :
* Seth Hoenig
* Allan Stockdill - Mander
* Mike Robertson
* /
package mqtt
import (
"errors"
2021-01-17 18:11:37 +00:00
"io"
2020-09-20 21:16:06 +00:00
"sync/atomic"
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
)
2021-01-17 18:11:37 +00:00
// keepalive - Send ping when connection unused for set period
// connection passed in to avoid race condition on shutdown
func keepalive ( c * client , conn io . Writer ) {
2020-09-20 21:16:06 +00:00
defer c . workers . Done ( )
DEBUG . Println ( PNG , "keepalive starting" )
var checkInterval int64
var pingSent time . Time
if c . options . KeepAlive > 10 {
checkInterval = 5
} else {
checkInterval = c . options . KeepAlive / 2
}
intervalTicker := time . NewTicker ( time . Duration ( checkInterval * int64 ( time . Second ) ) )
defer intervalTicker . Stop ( )
for {
select {
case <- c . stop :
DEBUG . Println ( PNG , "keepalive stopped" )
return
case <- intervalTicker . C :
lastSent := c . lastSent . Load ( ) . ( time . Time )
lastReceived := c . lastReceived . Load ( ) . ( time . Time )
DEBUG . Println ( PNG , "ping check" , time . Since ( lastSent ) . Seconds ( ) )
if time . Since ( lastSent ) >= time . Duration ( c . options . KeepAlive * int64 ( time . Second ) ) || time . Since ( lastReceived ) >= time . Duration ( c . options . KeepAlive * int64 ( time . Second ) ) {
if atomic . LoadInt32 ( & c . pingOutstanding ) == 0 {
DEBUG . Println ( PNG , "keepalive sending ping" )
ping := packets . NewControlPacket ( packets . Pingreq ) . ( * packets . PingreqPacket )
2021-01-17 18:11:37 +00:00
// We don't want to wait behind large messages being sent, the Write call
// will block until it it able to send the packet.
2020-09-20 21:16:06 +00:00
atomic . StoreInt32 ( & c . pingOutstanding , 1 )
2021-01-17 18:11:37 +00:00
if err := ping . Write ( conn ) ; err != nil {
ERROR . Println ( PNG , err )
}
2020-09-20 21:16:06 +00:00
c . lastSent . Store ( time . Now ( ) )
pingSent = time . Now ( )
}
}
2021-01-17 18:11:37 +00:00
if atomic . LoadInt32 ( & c . pingOutstanding ) > 0 && time . Since ( pingSent ) >= c . options . PingTimeout {
2020-09-20 21:16:06 +00:00
CRITICAL . Println ( PNG , "pingresp not received, disconnecting" )
2021-01-17 18:11:37 +00:00
c . internalConnLost ( errors . New ( "pingresp not received, disconnecting" ) ) // no harm in calling this if the connection is already down (or shutdown is in progress)
2020-09-20 21:16:06 +00:00
return
}
}
}
}