Skip to content

[MQTT] Fix change in client.connected() behavior for pubsubclient #2811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 67 additions & 44 deletions lib/pubsubclient/src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,74 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
return len;
}

bool PubSubClient::loop_read() {
if (_client == nullptr) {
return false;
}
if (!_client->available()) {
return false;
}
uint8_t llen;
uint16_t len = readPacket(&llen);
if (len == 0) {
return false;
}
unsigned long t = millis();
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;

switch(type) {
case MQTTPUBLISH:
{
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+llen+2;
uint8_t *payload;
// msgId only present for QOS>0
if ((buffer[0]&0x06) == MQTTQOS1) {
const uint16_t msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);
if (_client->connected()) {
buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
if (_client->write(buffer,4) != 0) {
lastOutActivity = t;
}
}
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
break;
}
case MQTTPINGREQ:
{
if (_client->connected()) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
}
break;
}
case MQTTPINGRESP:
{
pingOutstanding = false;
break;
}
default:
return false;
}
return true;
}

boolean PubSubClient::loop() {
loop_read();
if (connected()) {
unsigned long t = millis();
if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
Expand All @@ -328,50 +395,6 @@ boolean PubSubClient::loop() {
pingOutstanding = true;
}
}
if (_client->available()) {
uint8_t llen;
uint16_t len = readPacket(&llen);
uint16_t msgId = 0;
uint8_t *payload;
if (len > 0) {
lastInActivity = t;
uint8_t type = buffer[0]&0xF0;
if (type == MQTTPUBLISH) {
if (callback) {
uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2]; /* topic length in bytes */
memmove(buffer+llen+2,buffer+llen+3,tl); /* move topic inside buffer 1 byte to front */
buffer[llen+2+tl] = 0; /* end the topic as a 'C' string with \x00 */
char *topic = (char*) buffer+llen+2;
// msgId only present for QOS>0
if ((buffer[0]&0x06) == MQTTQOS1) {
msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
payload = buffer+llen+3+tl+2;
callback(topic,payload,len-llen-3-tl-2);

buffer[0] = MQTTPUBACK;
buffer[1] = 2;
buffer[2] = (msgId >> 8);
buffer[3] = (msgId & 0xFF);
if (_client->write(buffer,4) != 0) {
lastOutActivity = t;
}
} else {
payload = buffer+llen+3+tl;
callback(topic,payload,len-llen-3-tl);
}
}
} else if (type == MQTTPINGREQ) {
buffer[0] = MQTTPINGRESP;
buffer[1] = 0;
_client->write(buffer,2);
} else if (type == MQTTPINGRESP) {
pingOutstanding = false;
}
} else if (!connected()) {
// readPacket has closed the connection
return false;
}
}
return true;
}
return false;
Expand Down
2 changes: 2 additions & 0 deletions lib/pubsubclient/src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class PubSubClient : public Print {
unsigned long lastInActivity;
bool pingOutstanding;
MQTT_CALLBACK_SIGNATURE;
// Try to read from the client whatever is available.
bool loop_read();
uint16_t readPacket(uint8_t*);
boolean readByte(uint8_t * result);
boolean readByte(uint8_t * result, uint16_t * index);
Expand Down
3 changes: 2 additions & 1 deletion src/Controller.ino
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,9 @@ bool MQTTConnect(int controller_idx)
}
delay(0);

byte controller_number = Settings.Protocol[controller_idx];
count_connection_results(MQTTresult, F("MQTT : Broker "), controller_number, ControllerSettings);
if (!MQTTresult) {
addLog(LOG_LEVEL_ERROR, F("MQTT : Failed to connect to broker"));
MQTTclient.disconnect();
updateMQTTclient_connected();
return false;
Expand Down
56 changes: 39 additions & 17 deletions src/ESPEasyWifi.ino
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@
bool WiFiConnected() {
START_TIMER;

if (unprocessedWifiEvents()) { return false; }
if (unprocessedWifiEvents()) {
return false;
}

if ((timerAPstart != 0) && timeOutReached(timerAPstart)) {
// Timer reached, so enable AP mode.
Expand All @@ -88,6 +90,7 @@ bool WiFiConnected() {
if (wifiStatus != ESPEASY_WIFI_SERVICES_INITIALIZED) {
if (validWiFi) {
// Set internal wifiStatus and reset timer to disable AP mode
addLog(LOG_LEVEL_INFO, F("Has IP address"));
markWiFi_services_initialized();
}
}
Expand Down Expand Up @@ -267,6 +270,19 @@ void WifiDisconnect()
#endif // if defined(ESP32)
wifiStatus = ESPEASY_WIFI_DISCONNECTED;
processedDisconnect = false;
++WifiDisconnectCounter;
}


void evaluateConnectionFailures()
{
if (Settings.ConnectionFailuresThreshold) {
unsigned long threshold = WifiDisconnectCounter * Settings.ConnectionFailuresThreshold / 10;
if (connectionFailures > threshold && timePassedSince(lastConnectMoment) > WIFI_RECONNECT_WAIT)
{
cmd_within_mainloop = CMD_WIFI_DISCONNECT;
}
}
}

// ********************************************************************************
Expand Down Expand Up @@ -607,22 +623,28 @@ void setConnectionSpeed() {
void setupStaticIPconfig() {
setUseStaticIP(useStaticIP());

if (!useStaticIP()) { return; }
const IPAddress ip = Settings.IP;
const IPAddress gw = Settings.Gateway;
const IPAddress subnet = Settings.Subnet;
const IPAddress dns = Settings.DNS;

if (loglevelActiveFor(LOG_LEVEL_INFO)) {
String log = F("IP : Static IP : ");
log += formatIP(ip);
log += F(" GW: ");
log += formatIP(gw);
log += F(" SN: ");
log += formatIP(subnet);
log += F(" DNS: ");
log += formatIP(dns);
addLog(LOG_LEVEL_INFO, log);
IPAddress ip;
IPAddress gw;
IPAddress subnet;
IPAddress dns;


if (useStaticIP()) {
ip = Settings.IP;
gw = Settings.Gateway;
subnet = Settings.Subnet;
dns = Settings.DNS;
if (loglevelActiveFor(LOG_LEVEL_INFO)) {
String log = F("IP : Static IP : ");
log += formatIP(ip);
log += F(" GW: ");
log += formatIP(gw);
log += F(" SN: ");
log += formatIP(subnet);
log += F(" DNS: ");
log += formatIP(dns);
addLog(LOG_LEVEL_INFO, log);
}
}
WiFi.config(ip, gw, subnet, dns);
}
Expand Down
1 change: 1 addition & 0 deletions src/ESPEasy_fdwdecl.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void WifiScan(bool async, bool quick = false);
void WifiScan();
void WiFiConnectRelaxed();
void WifiDisconnect();
void evaluateConnectionFailures();
void setAP(bool enable);
void setSTA(bool enable);

Expand Down
33 changes: 17 additions & 16 deletions src/Networking.ino
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ void SSDP_update() {
}

# endif // ifdef USES_SSDP
#endif // if defined(ESP8266)
#endif // if defined(ESP8266)


// ********************************************************************************
Expand Down Expand Up @@ -778,20 +778,23 @@ bool getSubnetRange(IPAddress& low, IPAddress& high)

bool hasIPaddr() {
#ifdef CORE_POST_2_5_0
bool configured = false;

for (auto addr : addrList) {
if ((configured = (!addr.isLocal() && (addr.ifnumber() == STATION_IF)))) {
/*
Serial.printf("STA: IF='%s' hostname='%s' addr= %s\n",
addr.ifname().c_str(),
addr.ifhostname(),
addr.toString().c_str());
*/
break;

for (netif *interface = netif_list; interface != nullptr; interface = interface->next) {
if (
(interface->flags & NETIF_FLAG_LINK_UP)
&& (interface->flags & NETIF_FLAG_UP)
# if LWIP_VERSION_MAJOR == 1
&& interface == eagle_lwip_getif(STATION_IF) /* lwip1 does not set if->num properly */
&& (!ip_addr_isany(&interface->ip_addr))
# else // if LWIP_VERSION_MAJOR == 1
&& interface->num == STATION_IF
&& (!ip4_addr_isany_val(*netif_ip4_addr(interface)))
# endif // if LWIP_VERSION_MAJOR == 1
) {
return true;
}
}
return configured;
return false;
#else // ifdef CORE_POST_2_5_0
return WiFi.isConnected();
#endif // ifdef CORE_POST_2_5_0
Expand Down Expand Up @@ -936,9 +939,6 @@ bool beginWiFiUDP_randomPort(WiFiUDP& udp) {
}

void sendGratuitousARP() {
if (!WiFiConnected()) {
return;
}
#ifdef SUPPORT_ARP

// See https://github.com/letscontrolit/ESPEasy/issues/2374
Expand All @@ -954,6 +954,7 @@ void sendGratuitousARP() {
# else // ifdef ESP32
etharp_gratuitous(n);
# endif // ifdef ESP32
addLog(LOG_LEVEL_INFO, F("Send Gratuitous ARP"));
}
n = n->next;
}
Expand Down
8 changes: 1 addition & 7 deletions src/_C002.ino
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,7 @@ bool CPlugin_002(byte function, struct EventStruct *event, String& string)
String pubname = ControllerSettings.Publish;
parseControllerVariables(pubname, event, false);

if (!MQTTpublish(event->ControllerIndex, pubname.c_str(), json.c_str(), Settings.MQTTRetainFlag))
{
connectionFailures++;
}
else if (connectionFailures) {
connectionFailures--;
}
success = MQTTpublish(event->ControllerIndex, pubname.c_str(), json.c_str(), Settings.MQTTRetainFlag);
} // if ixd !=0
else
{
Expand Down
22 changes: 5 additions & 17 deletions src/_C003.ino
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,12 @@ bool do_process_c003_delay_queue(int controller_number, const C003_queue_element

bool do_process_c003_delay_queue(int controller_number, const C003_queue_element& element, ControllerSettingsStruct& ControllerSettings) {
bool success = false;
char log[80];
addLog(LOG_LEVEL_DEBUG, String(F("TELNT : connecting to ")) + ControllerSettings.getHostPortString());
// Use WiFiClient class to create TCP connections
WiFiClient client;
if (!ControllerSettings.connectToHost(client))
if (!try_connect_host(controller_number, client, ControllerSettings, F("TELNT: ")))
{
connectionFailures++;
strcpy_P(log, PSTR("TELNT: connection failed"));
addLog(LOG_LEVEL_ERROR, log);
return success;
}
statusLED(true);
if (connectionFailures)
connectionFailures--;

// strcpy_P(log, PSTR("TELNT: Sending enter"));
// addLog(LOG_LEVEL_ERROR, log);
Expand All @@ -101,28 +93,24 @@ bool do_process_c003_delay_queue(int controller_number, const C003_queue_element
if (line.startsWith(F("Enter your password:")))
{
success = true;
strcpy_P(log, PSTR("TELNT: Password request ok"));
addLog(LOG_LEVEL_DEBUG, log);
addLog(LOG_LEVEL_DEBUG, F("TELNT: Password request ok"));
}
delay(1);
}

strcpy_P(log, PSTR("TELNT: Sending pw"));
addLog(LOG_LEVEL_DEBUG, log);
addLog(LOG_LEVEL_DEBUG, F("TELNT: Sending pw"));
client.println(SecuritySettings.ControllerPassword[element.controller_idx]);
delay(100);
while (client_available(client))
client.read();

strcpy_P(log, PSTR("TELNT: Sending cmd"));
addLog(LOG_LEVEL_DEBUG, log);
addLog(LOG_LEVEL_DEBUG, F("TELNT: Sending cmd"));
client.print(element.txt);
delay(10);
while (client_available(client))
client.read();

strcpy_P(log, PSTR("TELNT: closing connection"));
addLog(LOG_LEVEL_DEBUG, log);
addLog(LOG_LEVEL_DEBUG, F("TELNT: closing connection"));

client.stop();
return success;
Expand Down
7 changes: 1 addition & 6 deletions src/_C017.ino
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,10 @@ bool do_process_c017_delay_queue(int controller_number, const C017_queue_element
}

WiFiClient client;
if (!ControllerSettings.connectToHost(client))
if (!try_connect_host(controller_number, client, ControllerSettings, F("ZBX : ")))
{
connectionFailures++;
addLog(LOG_LEVEL_ERROR, String(F("ZBX: Cannot connect")));
return false;
}
statusLED(true);
if (connectionFailures)
connectionFailures--;

LoadTaskSettings(element.TaskIndex);

Expand Down
Loading