NodeRED MQTT Multitopics send to Single Bucket of InfluxDB

Hello All,

Help needed in combining 2 mqtt topics (esp/temperature, esp/humidity) into single InfluxDB bucket.
Source is sending DHT22 Sensor data via ESP32 over WiFi then MQTT Broker (Mosquito). Then it is send to NodeRed Dashboard and InfluxDB.

I tried a method to use functions before sending to InfluxDB Node. As described in the flow.
It creates data in bucket, but I dont know how to find the measurements/data/
Following is the relevant flow code.

[
    {
        "id": "f6f2187d.f17ca8",
        "type": "tab",
        "label": "Flow 1",
        "disabled": false,
        "info": ""
    },
    {
        "id": "3cc11d24.ff01a2",
        "type": "comment",
        "z": "f6f2187d.f17ca8",
        "name": "WARNING: please check you have started this container with a volume that is mounted to /data\\n otherwise any flow changes are lost when you redeploy or upgrade the container\\n (e.g. upgrade to a more recent node-red docker image).\\n  If you are using named volumes you can ignore this warning.\\n Double click or see info side panel to learn how to start Node-RED in Docker to save your work",
        "info": "\nTo start docker with a bind mount volume (-v option), for example:\n\n```\ndocker run -it -p 1880:1880 -v /home/user/node_red_data:/data --name mynodered nodered/node-red\n```\n\nwhere `/home/user/node_red_data` is a directory on your host machine where you want to store your flows.\n\nIf you do not do this then you can experiment and redploy flows, but if you restart or upgrade the container the flows will be disconnected and lost. \n\nThey will still exist in a hidden data volume, which can be recovered using standard docker techniques, but that is much more complex than just starting with a named volume as described above.",
        "x": 350,
        "y": 80,
        "wires": []
    },
    {
        "id": "9e58624.7faaba",
        "type": "mqtt out",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "topic": "esp32/output",
        "qos": "",
        "retain": "",
        "respTopic": "",
        "contentType": "",
        "userProps": "",
        "correl": "",
        "expiry": "",
        "broker": "10e78a89.5b4fd5",
        "x": 550,
        "y": 640,
        "wires": []
    },
    {
        "id": "abf7079a.653be8",
        "type": "mqtt in",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "topic": "esp32/temperature",
        "qos": "2",
        "datatype": "auto",
        "broker": "10e78a89.5b4fd5",
        "nl": false,
        "rap": false,
        "inputs": 0,
        "x": 230,
        "y": 520,
        "wires": [
            [
                "298e118d1face171"
            ]
        ]
    },
    {
        "id": "cc79021b.9a751",
        "type": "debug",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "x": 530,
        "y": 520,
        "wires": []
    },
    {
        "id": "4aecba01.78ce64",
        "type": "mqtt in",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "topic": "esp32/humidity",
        "qos": "2",
        "datatype": "auto",
        "broker": "10e78a89.5b4fd5",
        "nl": false,
        "rap": false,
        "inputs": 0,
        "x": 240,
        "y": 280,
        "wires": [
            [
                "e0f5eaaa4e27102e"
            ]
        ]
    },
    {
        "id": "22efa7b7.544a28",
        "type": "debug",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "false",
        "x": 530,
        "y": 280,
        "wires": []
    },
    {
        "id": "83cf37cf.c76988",
        "type": "ui_switch",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "label": "Output",
        "group": "61285987.c20328",
        "order": 0,
        "width": 0,
        "height": 0,
        "passthru": true,
        "decouple": "false",
        "topic": "",
        "style": "",
        "onvalue": "on",
        "onvalueType": "str",
        "onicon": "",
        "oncolor": "",
        "offvalue": "off",
        "offvalueType": "str",
        "officon": "",
        "offcolor": "",
        "x": 230,
        "y": 620,
        "wires": [
            [
                "9e58624.7faaba"
            ]
        ]
    },
    {
        "id": "df37e6b7.64c1c8",
        "type": "ui_gauge",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "group": "61285987.c20328",
        "order": 0,
        "width": 0,
        "height": 0,
        "gtype": "gage",
        "title": "Humidity gauge",
        "label": "%",
        "format": "{{value}}",
        "min": 0,
        "max": "100",
        "colors": [
            "#00b3d9",
            "#0073e6",
            "#001bd7"
        ],
        "seg1": "33",
        "seg2": "66",
        "className": "",
        "x": 540,
        "y": 220,
        "wires": []
    },
    {
        "id": "67d02a417bccdab7",
        "type": "ui_gauge",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "group": "61285987.c20328",
        "order": 0,
        "width": 0,
        "height": 0,
        "gtype": "gage",
        "title": "Temperature Gauge",
        "label": "%",
        "format": "{{value}}",
        "min": 0,
        "max": "100",
        "colors": [
            "#00b3d9",
            "#0073e6",
            "#001bd7"
        ],
        "seg1": "33",
        "seg2": "66",
        "className": "",
        "x": 560,
        "y": 480,
        "wires": []
    },
    {
        "id": "21eae8f8.2971b8",
        "type": "ui_chart",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "group": "61285987.c20328",
        "order": 0,
        "width": 0,
        "height": 0,
        "label": "Temperature Chart",
        "chartType": "line",
        "legend": "false",
        "xformat": "HH:mm:ss",
        "interpolate": "linear",
        "nodata": "",
        "dot": false,
        "ymin": "",
        "ymax": "",
        "removeOlder": 1,
        "removeOlderPoints": "",
        "removeOlderUnit": "3600",
        "cutout": 0,
        "useOneColor": false,
        "useUTC": false,
        "colors": [
            "#1f77b4",
            "#aec7e8",
            "#ff7f0e",
            "#2ca02c",
            "#98df8a",
            "#d62728",
            "#ff9896",
            "#9467bd",
            "#c5b0d5"
        ],
        "outputs": 1,
        "useDifferentColor": false,
        "className": "",
        "x": 570,
        "y": 560,
        "wires": [
            []
        ]
    },
    {
        "id": "b6e215eaca9456d4",
        "type": "ui_chart",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "group": "61285987.c20328",
        "order": 0,
        "width": 0,
        "height": 0,
        "label": "Humidity Chart",
        "chartType": "line",
        "legend": "false",
        "xformat": "HH:mm:ss",
        "interpolate": "linear",
        "nodata": "",
        "dot": false,
        "ymin": "",
        "ymax": "",
        "removeOlder": 1,
        "removeOlderPoints": "",
        "removeOlderUnit": "3600",
        "cutout": 0,
        "useOneColor": false,
        "useUTC": false,
        "colors": [
            "#1f77b4",
            "#aec7e8",
            "#ff7f0e",
            "#2ca02c",
            "#98df8a",
            "#d62728",
            "#ff9896",
            "#9467bd",
            "#c5b0d5"
        ],
        "outputs": 1,
        "useDifferentColor": false,
        "className": "",
        "x": 540,
        "y": 340,
        "wires": [
            []
        ]
    },
    {
        "id": "d257d6e2c66806b9",
        "type": "influxdb out",
        "z": "f6f2187d.f17ca8",
        "influxdb": "bc4ab5cb2a050021",
        "name": "Influx Out",
        "measurement": "",
        "precision": "",
        "retentionPolicy": "",
        "database": "database",
        "precisionV18FluxV20": "s",
        "retentionPolicyV18Flux": "",
        "org": "NCRTC",
        "bucket": "ESP",
        "x": 1140,
        "y": 420,
        "wires": []
    },
    {
        "id": "e0f5eaaa4e27102e",
        "type": "change",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "payload",
                "pt": "msg",
                "to": "$number(payload)",
                "tot": "jsonata"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 260,
        "y": 360,
        "wires": [
            [
                "b6e215eaca9456d4",
                "df37e6b7.64c1c8",
                "22efa7b7.544a28",
                "38ba8f5c43079dd6"
            ]
        ]
    },
    {
        "id": "298e118d1face171",
        "type": "change",
        "z": "f6f2187d.f17ca8",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "payload",
                "pt": "msg",
                "to": "$number(payload)",
                "tot": "jsonata"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 260,
        "y": 460,
        "wires": [
            [
                "67d02a417bccdab7",
                "cc79021b.9a751",
                "21eae8f8.2971b8",
                "18e6ce72279a551e"
            ]
        ]
    },
    {
        "id": "38ba8f5c43079dd6",
        "type": "function",
        "z": "f6f2187d.f17ca8",
        "name": "Hfunction",
        "func": "msg.payload = Number(msg.payload)\nreturn msg;\n\n//msg.topic = msg.topic.split(\"/\")[3];\n//return msg;\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 540,
        "y": 400,
        "wires": [
            [
                "b896eab750476d93"
            ]
        ]
    },
    {
        "id": "18e6ce72279a551e",
        "type": "function",
        "z": "f6f2187d.f17ca8",
        "name": "Tfunction",
        "func": "msg.payload = Number(msg.payload)\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 540,
        "y": 440,
        "wires": [
            [
                "b896eab750476d93"
            ]
        ]
    },
    {
        "id": "528fa145949c52ae",
        "type": "join",
        "z": "f6f2187d.f17ca8",
        "name": "JOINer for Influx Bucket",
        "mode": "custom",
        "build": "object",
        "property": "payload",
        "propertyType": "msg",
        "key": "topic",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": false,
        "timeout": "",
        "count": "2",
        "reduceRight": false,
        "reduceExp": "",
        "reduceInit": "",
        "reduceInitType": "num",
        "reduceFixup": "",
        "x": 930,
        "y": 420,
        "wires": [
            [
                "d257d6e2c66806b9"
            ]
        ]
    },
    {
        "id": "b896eab750476d93",
        "type": "function",
        "z": "f6f2187d.f17ca8",
        "name": "Joinfunction",
        "func": "msg.topic = msg.topic.split(\"/\")[3];\nreturn msg;\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 730,
        "y": 420,
        "wires": [
            [
                "528fa145949c52ae"
            ]
        ]
    },
    {
        "id": "10e78a89.5b4fd5",
        "type": "mqtt-broker",
        "name": "",
        "broker": "192.101.202.119",
        "port": "1883",
        "clientid": "",
        "autoConnect": true,
        "usetls": false,
        "protocolVersion": "4",
        "keepalive": "60",
        "cleansession": true,
        "birthTopic": "",
        "birthQos": "0",
        "birthPayload": "",
        "birthMsg": {},
        "closeTopic": "",
        "closeQos": "0",
        "closePayload": "",
        "closeMsg": {},
        "willTopic": "",
        "willQos": "0",
        "willPayload": "",
        "willMsg": {},
        "sessionExpiry": ""
    },
    {
        "id": "61285987.c20328",
        "type": "ui_group",
        "name": "Main",
        "tab": "e7c46d5e.a1283",
        "order": 1,
        "disp": true,
        "width": "6",
        "collapse": false
    },
    {
        "id": "bc4ab5cb2a050021",
        "type": "influxdb",
        "hostname": "127.0.0.1",
        "port": "8086",
        "protocol": "http",
        "database": "test",
        "name": "InfluxOut",
        "usetls": true,
        "tls": "d50d0c9f.31e858",
        "influxdbVersion": "2.0",
        "url": "http://localhost:8086",
        "rejectUnauthorized": false
    },
    {
        "id": "e7c46d5e.a1283",
        "type": "ui_tab",
        "name": "Dashboard",
        "icon": "dashboard"
    },
    {
        "id": "d50d0c9f.31e858",
        "type": "tls-config",
        "name": "",
        "cert": "",
        "key": "",
        "ca": "",
        "certname": "",
        "keyname": "",
        "caname": "",
        "servername": "",
        "verifyservercert": false
    }
]

Thanks for the help in advance.

Do you mean that you want to send them to the same Measurement? There is no problem sending them to the same bucket, you do not need to join them into one message. If you want to send them to one measurement then you do need to join them. However, if they arrive separately it is generally considered best to send them to different measurements.

How is it getting from the wifi to the MQTT? Do they arrive in one message via wifi? If so then it would be better to keep them together when sending to MQTT.

Hi Colin,
Thanks for the prompt reply.
I am using 2 diffrent MQTT nodes for 2 different MQTT topics.

  1. esp/humidity
  2. esp/temperature

Source is one esp32. Topics are 2.
I am not sure about the name of measurement. And the code written in Hfunction, TFunction, JoinerFunction.

You did not answer those questions.

You must know what measurement you want to write them to. If not then how are you ever going to fetch them from influx?

WiFi to MQTT there are 2 separate topics/messages.

I am sending towards InfluxDB.
Not reading from InfluxDB.

Where to define the measuremnt.

  1. At InfluxDB instance?
  2. At NodeRED Instance (InfluxDB Node Connector).

This image shows me defining a measurement for first time in NodeRED.

  1. Do you mean they come in to the system reading the wifi as separate messages?
  2. You still have not told us how they get from wifi to MQTT

Either in the influx node or in the message, as described in the influx node help text.
Do you understand what the Measurement is?

There is not much point writing it to influx if you are never going read the data back anywhere. You cannot read it back without knowing the measurement name where you have written it.

It comes from WiFi to MQTT Broker using 2 topics.
esp/humidity
esp/temperature

Please check my next comment for the debug messages and further details


Running Flow


Source nodes defined.
2 nodes for 2 mqtt topics

  1. esp/humidity
  2. esp/temperature


Function to process the data before joiner function.


  1. Join Function
  2. Joiner Node.

There is some problem in Joiner Function code.
But I can not understand which.

Do you mean that the DHT sends it direct to the MQTT broker via wifi? If so then I now understand.

If it comes in on two topics then you should treat them as separate items and send them separately to different measurements in influx. Don't try and join them together. There is no point.


  1. Debug Message showing the error.
  2. Out Node for InfluxDB connection.

I presume our posts crossed. As I said, don't try and join them.

The reason I was confused about the wifi/mqtt issue is that the topics start with esp. I assumed there was an ESP device in the system somewhere.

I got your point.
So 2 different measurements can be in 1 bucket?
I will try this.
Thanks for suggestion.
Please help I am a newbie in NodeRed and Influx DB.

How to send 2 measurements sepearte in my current flow.
A rough sketch might also help.

There is 1 sensor-> 2 variables.
Also if I use more sensors then how to address this situation.
Separate measurements/buckets for each sensor and variable or we can combine something here.

image

If you set the MQTT nodes to output parsed JSON then they should output numbers so you don't need to convert to numbers as you are doing at the moment.

Why do the topics start with esp32?

Send all temperatures to the same measurement, and all humidity to their one measurement and use influxdb tags to define which device they come from. If you don't know what influxdb tags are then you need to do some research on influx.

This is how I do some of my MQTT stuff from sensors.

Could you not use the tag esp/# and see what that returns.

It would then be possible to split the string for esp/temperature and esp/humidity and build the payload string to put them in the same measurement in InfluxDB at the same time?

MQTT Explorer is good for seeing complete messages at the Broker before deciding how best to approach the problem.

The measurement is actually in msg.measurement, and it won't add them at the same time, but yes, that would be my suggestion after @pgncr gets it working using the simple method.

Do you have one esp device spending the temperature and humidity on separate topics?

What are you using to do the measurements?

@pgncr I am still confused about the wifi and mqtt. You said in the first post that the DHT sends the data to an esp32, then to MQTT. When the ESP receives the data are both temp and humidity in the same message? If so then don't split them up, send them both to one topic together so you can send them to influx together.

[Edit] And include in the object you send to mqtt a device id so you can identify which device it is from and put that in a tag in the database.

How does the DHT communicate with the ESP?