How can I control a relay using MQTT?

guys, i use soil moisture sensor to measure the volumetric water content in soil, and i use mqtt to send the value, i declared if the value is under 30%, the soil is dry and need to watering the soil, how can i control relay to turn on/off water pump automaticaly based on the moisture value?

Can you explain a bit more about your hardware setup.
For example, are you using Raspberry Pi, what is your moisture sensor connected to, how do you control your water-pump? This will enable people to give a response to your question.

It sounds fairly straightforward, if all you want to do is create a flow to take an input from MQTT, check its value, and then send an output via MQTT. But maybe I'm mistaken???

yes, what i need is to take the moisture value and check if it's dry or wet, if it's dry(less than 30), i need to turn on the relay(water pump), let's say for 3 seconds, then after it, the realy turn back off.

i use ESP32 btw

OK - so where does Node-RED come into the picture?
Can you share your flow?
What firmware are you running on the ESP32 (as you could probably use a rule-set to perform the calculation locally and cut out the middle-man)?

Something along these lines might be a useful starting point for you...

1 Like

i'm agree with @dynamicdave , great example. Another example (with the same logic) is this, with switch node and change node, if you have ready the ESP32 code, only change the basic configuration on MQTT nodes (topic, payload, server ... ).

[{"id":"6728746bc51b1bf2","type":"tab","label":"Flow 2","disabled":false,"info":"","env":[]},{"id":"d4b21c5f6dc81a96","type":"mqtt in","z":"6728746bc51b1bf2","name":"sensor","topic":"","qos":"2","datatype":"auto","broker":"57a374b82ac79302","nl":false,"rap":true,"rh":0,"inputs":0,"x":110,"y":280,"wires":[["e460a63c44694df0"]]},{"id":"e460a63c44694df0","type":"switch","z":"6728746bc51b1bf2","name":"if measure < 30","property":"payload","propertyType":"msg","rules":[{"t":"lt","v":"30","vt":"num"},{"t":"gte","v":"30","vt":"num"}],"checkall":"true","repair":false,"outputs":2,"x":260,"y":280,"wires":[["1bffdcd2ee06088c","939e281413b2f3f3"],[]]},{"id":"1bffdcd2ee06088c","type":"change","z":"6728746bc51b1bf2","name":"set new payload to enable relay","rules":[{"t":"set","p":"payload","pt":"msg","to":"on","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":530,"y":220,"wires":[["b9da0961b477b0ab"]]},{"id":"b9da0961b477b0ab","type":"mqtt out","z":"6728746bc51b1bf2","name":"relay","topic":"","qos":"","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"57a374b82ac79302","x":870,"y":220,"wires":[]},{"id":"4a60d32022405689","type":"change","z":"6728746bc51b1bf2","name":"set new payload to disable relay","rules":[{"t":"set","p":"payload","pt":"msg","to":"off","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":670,"y":260,"wires":[["b9da0961b477b0ab"]]},{"id":"939e281413b2f3f3","type":"delay","z":"6728746bc51b1bf2","name":"","pauseType":"delay","timeout":"3","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":460,"y":260,"wires":[["4a60d32022405689"]]},{"id":"57a374b82ac79302","type":"mqtt-broker","name":"","broker":"broker.hivemq.com","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""}]

i hope its helps you!

1 Like

Have you tried either of the above answers/suggestions?

sorry i don't really know about this code(this code is from my lecturer btw)

Serial.println();
  if (strcmp(topic, DeviceConfig.MqttSub) == 0)
  {
    Serial.print("Recvd relay command parse code: ");
    StaticJsonDocument<100> doc;
    DeserializationError error = deserializeJson(doc, (char *)payload);
    Serial.println(error.code());
    if (error == DeserializationError::Ok)
    {
      if (doc.containsKey("state") && doc["state"].is<int>())
      {
        DeviceConfig.RelayOn = (doc["state"].as<int>() == 1);
        Serial.print("Changing state: ");
        Serial.print(DeviceConfig.RelayOn );
        Serial.println();
      }
    }
  }

the MqttSub define "/command"

is this the code that used to receive the command from node red?

#include <Arduino.h>
#include "ESPAsyncWebServer.h"
// #include <ESPAsyncTCP.h>
#include <definitions.h>
#include <FS.h>
#include <storage.hpp>
#include <auth.hpp>
#include <Hash.h>
#include <Adafruit_Sensor.h>
#include <DHT.h>
#include <DHT_U.h>
#include <Simple_HCSR04.h>
#include <PubSubClient.h>
#include <I2CSoilMoistureSensor.h>

// Create AsyncWebServer object on port 80
AsyncWebServer server(80);
String channelState(int numCh);
String htmlProcess(const String &var);
void deviceInit();
void mqttCallback(char *topic, byte *payload, unsigned int length);
void mqttInit();
void mqttReconnect();
String composeSensorJson();
// Replaces placeholder with button section in your web page
String htmlProcess(const String &var)
{
  Serial.println(var);
  if (var == "DNM")
  {
    return String(DeviceConfig.DeviceName);
  }
  if (var == "APSSID")
  {
    return String(DeviceConfig.ApSSID);
  }
  else if (var == "APPWD")
  {
    return String(DeviceConfig.ApPwd);
  }
  else if (var == "TETPWD")
  {
    return String(DeviceConfig.TetherPwd);
  }
  else if (var == "PUSER")
  {
    return String(DeviceConfig.PortalUser);
  }
  else if (var == "PPWD")
  {
    return String(DeviceConfig.PortalPwd);
  } 
  else if (var == "MAXSTR")
  {
    return String(MAX_CONF_STRING);
  }
  else if (var == "MAXSSID")
  {
    return String(MAX_CONF_SSID_NAME);
  }
  else if (var == "SHAPWD")
  {
    return sha1(DeviceConfig.PortalPwd);
  }
  else if (var == "FWVER")
  {
    return FW_VER;
  }
  else if (var == "PRODNAME")
  {
    return PRODUCT_NAME;
  }  
  else if (var == "SELDHT")
  {
    return DeviceConfig.Device == DeviceSelect::DHT_11 ? "selected" : "";
  }
  else if (var == "SELSOIL")
  {
    return DeviceConfig.Device == SoilMoisture ? "selected" : "";
  }
  else if (var == "SELPIR")
  {
    return DeviceConfig.Device == PIR ? "selected" : "";
  }
  else if (var == "SELLDR")
  {
    return DeviceConfig.Device == LDR ? "selected" : "";
  }
  
  else if (var == "SELRELAY")
  {
    return DeviceConfig.Device == Relay ? "selected" : "";
  }
  else if (var == "MQTTSRVR")
  {
    return String(DeviceConfig.MqttServer);
  }
  else if (var == "MQTTPORT")
  {
    return String(DeviceConfig.MqttPort);
  }
  else if (var == "MQTTPORTMIN")
  {
    return "3";
  }
  else if (var == "MQTTPORTMAX")
  {
    return "6";
  }
  else if (var == "MQTTUSER")
  {
    return String(DeviceConfig.MqttUser);
  }
  else if (var == "MPWD")
  {
    return String(DeviceConfig.MqttPwd);
  }
 
  else if (var == "MQTTPUB")
  {
    return String(DeviceConfig.MqttPub);
  }
  else if (var == "MQTTSUB")
  {
    return String(DeviceConfig.MqttSub);
  }
  else if (var == "MQTTPUBSEC")
  {
    return String(DeviceConfig.MqttPubSec);
  }
  return String();
}

// Set IP addresses
IPAddress apLocalIp(192, 168, 1, 1);
IPAddress apLocalIpGateway(192, 168, 1, 1);
IPAddress apLocalIpSubnet(255, 255, 255, 0);

uint64_t lastMsWiFiSta, lastMsHb;


DHT_Unified *dht = nullptr;
Simple_HCSR04 *hcsr = nullptr;
WiFiClient espClient;
PubSubClient client(espClient);


void setup()
{

  //Serial port for debugging purposes
  Serial.begin(115200);

  deviceInit();

  // Initialize SPIFFS
  if (!SPIFFS.begin())
  {
    Serial.println("An Error has occurred while mounting SPIFFS");
    return;
  }

  loadConfig();
  WiFi.setHostname(DeviceConfig.DeviceName);
  WiFi.begin();

  pinMode(LED_HB, OUTPUT);

  // Connect to Wi-Fi
  WiFi.mode(WIFI_STA);
  WiFi.begin(DeviceConfig.ApSSID, DeviceConfig.ApPwd);

  // Route for web pages

  server.on(WEB_PAGE_ROOT, HTTP_GET, [](AsyncWebServerRequest *request) {
    handleRedirect(request);
  });
  server.on(WEB_PAGE_INDEX, HTTP_GET, [](AsyncWebServerRequest *request) {
    if (!handleLoginAuth(request))
      return;
    request->send(SPIFFS, WEB_FILE_INDEX, CONTENT_TYPE_HTML, false, htmlProcess);
  });
  server.on(WEB_FAVICON, HTTP_GET, [](AsyncWebServerRequest *request) {
    request->send(SPIFFS, WEB_FILE_FAVICON, CONTENT_TYPE_PNG, false);
  });
  server.on(WEB_PAGE_LOGIN, HTTP_ANY, [](AsyncWebServerRequest *request) {
    handleLoginPage(request);
  });
  server.on(WEB_PAGE_SETTINGS, HTTP_GET, [](AsyncWebServerRequest *request) {
    if (!handleLoginAuth(request))
      return;
    request->send(SPIFFS, WEB_FILE_SETTINGS, CONTENT_TYPE_HTML, false, htmlProcess);
  });
  server.on(WEB_PAGE_LOGOUT, HTTP_GET, [](AsyncWebServerRequest *request) {
    handleLogout(request);
  });
  server.on(WEB_PAGE_REBOOT, HTTP_GET, [](AsyncWebServerRequest *request) {
    if (!handleLoginAuth(request))
      return;
    request->send(SPIFFS, WEB_FILE_REBOOT, CONTENT_TYPE_HTML, false, htmlProcess);
  });
  server.on(WEB_PAGE_REBOOT_NOW, HTTP_GET, [](AsyncWebServerRequest *request) {
    if (!handleLoginAuth(request))
      return;
    ESP.restart();
  });

  // Send a GET request to <ESP_IP>/updt
  server.on(WEB_PAGE_UPDATE, HTTP_GET, [](AsyncWebServerRequest *request) {
    if (!handleLoginAuth(request))
      return;
    String resp = composeSensorJson();
    Serial.print("RESP: ");
    Serial.println(resp);
    request->send(200, CONTENT_TYPE_JSON, resp);
  });

  server.on(WEB_PAGE_SETTINGS_SAVE, HTTP_POST, [](AsyncWebServerRequest *request) {
    if (!handleLoginAuth(request))
      return;
    for (int i = 0; i < request->params(); i++)
    {
      Serial.print("Prm:");
      Serial.println(request->getParam(i)->value());
      if (request->getParam(i)->name().equals(PARAM_INPUT_DEVICE_NAME))
      {
        String deviceName = request->getParam(i)->value();
        if (deviceName.length() > 0 && deviceName.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.DeviceName, deviceName.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_TETHERING_PWD))
      {
        String tetPwd = request->getParam(i)->value();
        if (tetPwd.length() > 0 && tetPwd.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.TetherPwd, tetPwd.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_AP_SSID))
      {
        String apSsid = request->getParam(i)->value();
        if (apSsid.length() > 0 && apSsid.length() <= MAX_CONF_SSID_NAME)
        {
          strcpy(DeviceConfig.ApSSID, apSsid.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_AP_PWD))
      {
        String apPwd = request->getParam(i)->value();
        if (apPwd.length() > 0 && apPwd.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.ApPwd, apPwd.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_PORTAL_USERNAME))
      {
        String pUser = request->getParam(i)->value();
        if (pUser.length() > 0 && pUser.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.PortalUser, pUser.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_PORTAL_PWD))
      {
        String pPwd = request->getParam(i)->value();
        if (pPwd.length() > 0 && pPwd.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.PortalPwd, pPwd.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_SENSOR_SEL))
      {
        String val = request->getParam(i)->value();

        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          DeviceConfig.Device = static_cast<DeviceSelect>(val.toInt());
        }
      }
      //mqtt
      if (request->getParam(i)->name().equals(PARAM_INPUT_MQTT_SERVER))
      {
        String val = request->getParam(i)->value();
        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.MqttServer, val.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_MQTT_PORT))
      {
        String val = request->getParam(i)->value();
        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          DeviceConfig.MqttPort = val.toInt();
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_MQTT_USER))
      {
        String val = request->getParam(i)->value();
        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.MqttUser, val.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_MQTT_PWD))
      {
        String val = request->getParam(i)->value();
        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.MqttPwd, val.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_MQTT_PUB_TOPIC))
      {
        String val = request->getParam(i)->value();
        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.MqttPub, val.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_MQTT_SUB_TOPIC))
      {
        String val = request->getParam(i)->value();
        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          strcpy(DeviceConfig.MqttSub, val.c_str());
        }
      }
      if (request->getParam(i)->name().equals(PARAM_INPUT_MQTT_PUB_SEC))
      {
        String val = request->getParam(i)->value();
        if (val.length() > 0 && val.length() <= MAX_CONF_STRING)
        {
          DeviceConfig.MqttPubSec = val.toInt();
        }
      }
    }
    request->redirect(WEB_PAGE_ROOT);
    saveConfig();
    deviceInit();
    mqttInit();
    if (client.connected())
    {
      client.disconnect();
    }
  });

  server.serveStatic(WEB_PAGE_ASSETS, SPIFFS, WEB_PAGE_ASSETS);

  // Start server
  server.begin();

  lastMsWiFiSta = millis();

  mqttInit();
  mqttReconnect();
  Serial.println("Started...");
}

void deviceInit()
{
  Serial.println("Device init...");
  if (DeviceConfig.Device == DeviceSelect::DHT_11)
  {
    dht = new DHT_Unified(AIO_SENSOR_PIN_3, DHT11);
    Serial.print("Using DHT...");
    dht->begin();
    sensor_t sensor;
    dht->temperature().getSensor(&sensor);
    dht->humidity().getSensor(&sensor);
  }
  else if (DeviceConfig.Device == PIR)
  {
    Serial.print("Using digital sensor...");
    pinMode(AIO_SENSOR_PIN_3, INPUT);
  }
  else if (DeviceConfig.Device == Relay)
  {
    Serial.print("Using relay...");
    pinMode(AIO_SENSOR_PIN_3, OUTPUT);
  }
  else if (DeviceConfig.Device == LDR)
  {
    Serial.print("Using LDR");
    pinMode(AIO_SENSOR_PIN_3, INPUT);
  }
  else if (DeviceConfig.Device == SoilMoisture)
  {
    Serial.print("Using Soil Moisture sensor...");
    pinMode(AIO_SENSOR_PIN_3, OUTPUT); 
  }
}

void mqttInit()
{
  client.setServer(DeviceConfig.MqttServer, DeviceConfig.MqttPort);
  client.setCallback(mqttCallback);//fungsi yang akan diesekusi ketika mqtt mendapat data masuk
}

void mqttCallback(char *topic, byte *payload, unsigned int length)
{
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  for (int i = 0; i < length; i++)
  {
    Serial.print((char)payload[i]);
  }
  Serial.println();
  if (strcmp(topic, DeviceConfig.MqttSub) == 0)
  {
    Serial.print("Recvd relay command parse code: ");
    StaticJsonDocument<100> doc;
    DeserializationError error = deserializeJson(doc, (char *)payload);
    Serial.println(error.code());
    if (error == DeserializationError::Ok)
    {
      if (doc.containsKey("state") && doc["state"].is<int>())
      {
        DeviceConfig.RelayOn = (doc["state"].as<int>() == 1);
        Serial.print("Changing state: ");
        Serial.print(DeviceConfig.RelayOn );
        Serial.println();
      }
    }
  }
}

uint32_t lastReconnectMs = 0;
void mqttReconnect()
{
  if (!client.connected() && WiFi.getMode() == WIFI_STA && WiFi.status() == WL_CONNECTED &&
       millis() - lastReconnectMs > 5000)
  {
    Serial.print("Attempting MQTT connection...");
    String clientId = String(DeviceConfig.DeviceName) + "-";
    clientId += String(random(0xffff), HEX);
    if (client.connect(clientId.c_str(), DeviceConfig.MqttUser, DeviceConfig.MqttPwd))
    {
      Serial.println("connected");
      if (DeviceConfig.Device == Relay)
      {
        client.setCallback(mqttCallback);
        Serial.println("Subscribing " + String(DeviceConfig.MqttSub) + (client.subscribe(DeviceConfig.MqttSub) ? " OK" : " FAIL"));
      }
    }
    else
    {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
    }
    lastReconnectMs = millis();
  }
}

#define FIRE_THRESH 1000

String composeSensorJson()
{
  DynamicJsonDocument doc(2048);

  JsonObject devData = doc.createNestedObject("data");

  if (DeviceConfig.Device == DeviceSelect::DHT_11)
  {
#ifdef DUMMY_SENSOR
    devData["humidity"] = "40 %";
    devData["temperature"] = "33.4 celcius";

#else
    if (dht == nullptr)
      deviceInit();
    sensors_event_t event;
    dht->temperature().getEvent(&event);
    if (isnan(event.temperature))
    {
      Serial.println(F("Error reading temperature!"));
      devData["temperature"] = "error";
    }
    else
    {
      Serial.print(F("Temperature: "));
      Serial.print(event.temperature);
      Serial.println(F("°C"));
      devData["temperature"] = String(event.temperature);// + " celcius";
    }
    // Get humidity event and print its value.
    dht->humidity().getEvent(&event);
    if (isnan(event.relative_humidity))
    {
      Serial.println(F("Error reading humidity!"));
      devData["humidity"] = "error";
    }
    else
    {
      Serial.print(F("Humidity: "));
      Serial.print(event.relative_humidity);
      Serial.println(F("%"));
      devData["humidity"] = String(event.relative_humidity) ;//+ " %";
    }

#endif
  }
  else if (DeviceConfig.Device == DeviceSelect::PIR)
  {
    devData["detection"] = (digitalRead(AIO_SENSOR_PIN_3) == PIR_ACTIVE) ? "Motion Detected" : "No Motion Detected";
  }
  else if (DeviceConfig.Device == DeviceSelect::LDR)
  {
    devData["intensity"] = String(analogRead(AIO_SENSOR_PIN_3));
  }
  else if(DeviceConfig.Device == DeviceSelect::SoilMoisture)
  {
    int sensor_analog;
    int moisture_percentage;
    sensor_analog = analogRead(AIO_SENSOR_PIN_3);
    moisture_percentage = (100 - (sensor_analog/4095.00) * 100);
    devData["moisture"] = moisture_percentage;
    if(moisture_percentage > 30.00){
      devData["status"] = String("Wet");
    }
    else{
      devData["status"] = String("Dry");
    }
  }
  else if (DeviceConfig.Device == DeviceSelect::Relay)
  {
    devData["state"] = (DeviceConfig.RelayOn ? "ON" : "OFF");
  }
  doc["device_type"] = DEVICE_NAME[DeviceConfig.Device - 1];
  doc["local_ip"] = WiFi.localIP().toString();

  String ret;
  serializeJsonPretty(doc, ret);
  return ret;
}

bool toggleHb = true;
bool staConnected = false;
uint64_t lastSendMs = 0;
void loop()
{
  if (WiFi.getMode() == WIFI_STA && WiFi.status() != WL_CONNECTED && millis() - lastMsWiFiSta > CONNECT_STA_TIMEOUT)
  {
    Serial.print("Dropping AP mode ");
    WiFi.mode(WIFI_AP);
    WiFi.softAPConfig(apLocalIp, apLocalIpGateway, apLocalIpSubnet);
    WiFi.softAP(DeviceConfig.DeviceName, DeviceConfig.TetherPwd);
    WiFi.setHostname(DeviceConfig.DeviceName);
    staConnected = false;
    Serial.println("OK");
  }
  else if (WiFi.getMode() == WIFI_STA && WiFi.status() == WL_CONNECTED)
  {
    lastMsWiFiSta = millis();
    if (!staConnected)
    {
      staConnected = true;
      Serial.println(String("Connected to [") + WiFi.SSID() + "] IP: " + WiFi.localIP().toString());
      WiFi.setHostname(DeviceConfig.DeviceName);
    }
  }

  if (millis() - lastMsHb > HB_PERIOD_MS)
  {
    lastMsHb = millis();
    digitalWrite(LED_HB, !digitalRead(LED_HB));
  }

  mqttReconnect();
  if (client.connected() && millis() - lastSendMs >= (DeviceConfig.MqttPubSec * 1000))
  {
    Serial.print("Publishing:");
    String json = composeSensorJson();
    Serial.println(json);
    Serial.println(client.publish(DeviceConfig.MqttPub, json.c_str()) ? "OK" : "FAIL");
    lastSendMs = millis();
  }
  if (DeviceConfig.Device == Relay) {
    digitalWrite(AIO_SENSOR_PIN_3, DeviceConfig.RelayOn ? RELAY_ACTIVE : RELAY_INACTIVE);
  }
  client.loop();
}

I assumed, from your original quote, you had the sensing part working and just needed some help with the Node-RED end.

Sorry, but I don't have the spare time to debug your Arduino code. I suggest you discuss your issue with your lecturer as I'm sure she/he has an incremental plan for this assignment/project and how to deal with any problems that students might encounter.

Quickly looking at your code, it would seem you are performing a check on 'moisture_percentage', so I don't really see how Node-RED comes into this (as it would seem to just duplicate this 'test').

    if(moisture_percentage > 30.00){
      devData["status"] = String("Wet");
    }
    else{
      devData["status"] = String("Dry");
    }