You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Farm-Data-Relay-System/src/fdrs_gateway_mqtt.h

180 lines
4.2 KiB
C

#include <PubSubClient.h>
#include <ArduinoJson.h>
// select MQTT server address
#if defined(MQTT_ADDR)
#define FDRS_MQTT_ADDR MQTT_ADDR
#elif defined(GLOBAL_MQTT_ADDR)
#define FDRS_MQTT_ADDR GLOBAL_MQTT_ADDR
#else
// ASSERT("NO MQTT address defined! Please define in fdrs_globals.h (recommended) or in fdrs_node_config.h");
#endif // MQTT_ADDR
// select MQTT server port
#if defined(MQTT_PORT)
#define FDRS_MQTT_PORT MQTT_PORT
#elif defined(GLOBAL_MQTT_PORT)
#define FDRS_MQTT_PORT GLOBAL_MQTT_PORT
#else
#define FDRS_MQTT_PORT 1883
#endif // MQTT_PORT
// select MQTT user name
#if defined(MQTT_USER)
#define FDRS_MQTT_USER MQTT_USER
#elif defined(GLOBAL_MQTT_USER)
#define FDRS_MQTT_USER GLOBAL_MQTT_USER
#else
// ASSERT("NO MQTT user defined! Please define in fdrs_globals.h (recommended) or in fdrs_node_config.h");
#endif // MQTT_USER
// select MQTT user password
#if defined(MQTT_PASS)
#define FDRS_MQTT_PASS MQTT_PASS
#elif defined(GLOBAL_MQTT_PASS)
#define FDRS_MQTT_PASS GLOBAL_MQTT_PASS
#else
// ASSERT("NO MQTT password defined! Please define in fdrs_globals.h (recommended) or in fdrs_node_config.h");
#endif // MQTT_PASS
#if defined(MQTT_AUTH) || defined(GLOBAL_MQTT_AUTH)
#define FDRS_MQTT_AUTH
#endif // MQTT_AUTH
#define MQTT_MAX_BUFF_SIZE 1024
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMqttConnectAttempt = 0;
const char *mqtt_server = FDRS_MQTT_ADDR;
const int mqtt_port = FDRS_MQTT_PORT;
#ifdef FDRS_MQTT_AUTH
const char *mqtt_user = FDRS_MQTT_USER;
const char *mqtt_pass = FDRS_MQTT_PASS;
#else
const char *mqtt_user = NULL;
const char *mqtt_pass = NULL;
#endif // FDRS_MQTT_AUTH
void reconnect_mqtt(short int attempts, bool silent)
{
if (!silent)
DBG("Connecting MQTT...");
for (short int i = 1; i <= attempts; i++)
{
// Attempt to connect
if (client.connect("FDRS_GATEWAY", mqtt_user, mqtt_pass))
{
// Subscribe
client.subscribe(TOPIC_COMMAND);
if (!silent)
DBG(" MQTT Connected");
return;
}
else
{
if (!silent)
{
char msg[23];
sprintf(msg, " Attempt %d/%d", i, attempts);
DBG(msg);
}
if ((attempts != 1))
{
delay(3000);
}
}
}
if (!silent)
DBG(" Connecting MQTT failed.");
}
void reconnect_mqtt(int attempts)
{
reconnect_mqtt(attempts, false);
}
// Handles MQTT in loop()
void handleMQTT()
{
if (!client.connected())
{
if(TDIFF(lastMqttConnectAttempt,5000)) {
reconnect_mqtt(1, true);
lastMqttConnectAttempt = millis();
}
}
client.loop(); // for recieving incoming messages and maintaining connection
}
void mqtt_callback(char *topic, byte *message, unsigned int length)
{
String incomingString;
DBG(topic);
for (unsigned int i = 0; i < length; i++)
{
incomingString += (char)message[i];
}
JsonDocument doc;
DeserializationError error = deserializeJson(doc, incomingString);
if (error)
{ // Test if parsing succeeds.
DBG2("json parse err");
DBG2(incomingString);
return;
}
else
{
int s = doc.size();
// UART_IF.println(s);
for (int i = 0; i < s; i++)
{
theData[i].id = doc[i]["id"];
theData[i].t = doc[i]["type"];
theData[i].d = doc[i]["data"];
}
ln = s;
newData = event_mqtt;
DBG("Incoming MQTT.");
}
}
void begin_mqtt()
{
client.setServer(mqtt_server, mqtt_port);
client.setBufferSize(MQTT_MAX_BUFF_SIZE);
if (!client.connected())
{
reconnect_mqtt(5);
}
client.setCallback(mqtt_callback);
}
void mqtt_publish(const char *payload)
{
if (!client.publish(TOPIC_DATA, payload))
{
DBG(" Error on sending MQTT");
}
}
void sendMQTT()
{
DBG("Sending MQTT.");
JsonDocument doc;
for (int i = 0; i < ln; i++)
{
doc[i]["id"] = theData[i].id;
doc[i]["type"] = theData[i].t;
doc[i]["data"] = theData[i].d;
}
String outgoingString;
serializeJson(doc, outgoingString);
mqtt_publish((char *)outgoingString.c_str());
}