Import MQTT to Influxdb

Hi,

i got a mqtt-server with several numbers of topics. A big part of the topics i try to get to influxdb.

The topics are organized liked
foo/bar/001
foo/bar/002
foo/john/001
foo/john/002
and so on.....

Everything from foo/bar/ should be in to one bucket, everything from foo/john/ into a other buket.

From mqtt i get the data via foo/bar/#, that means i get Temp, Voltage and Humidity. The payload looks for example like this

{"ArrivalTime": "2023-03-24T18:57:16+02:00","TP": "Temp-Feuchte-CO2_002","Tag": "TMP","Val": 15.800}

How i can get for each topic an entry in my bucket? At the moment i get all value in the same VAL field :frowning:

...use a named topic as bucket.
Split the msg from mqtt into msgs with the desired topic, like "bar" and "john".
Then use the msg.payload as value and the msg.topic as topic when feeding these in the influxDB node.

Alternatively, look into telegraf, as another tool/option outside NodeRed.

If I understand correctly, you are talking about grouping both 001 and 002 ( or temp,volt,hum) readings from bar.

This example may help. It stores incoming topic messages from bar, john, etc
and will only output once it has received three messages (temp,volt,hum).

[{"id":"ea801c4379fcf1a2","type":"inject","z":"65617ffeb779f51c","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"foo/bar/Temp","payload":"","payloadType":"date","x":270,"y":4580,"wires":[["8aae3691017739b8"]]},{"id":"8aae3691017739b8","type":"change","z":"65617ffeb779f51c","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"{    \"ArrivalTime\": $now(),    \"TP\": \"Temp-Feuchte-CO2_002\",    \"Tag\": \"TMP\",    \"Val\": $round($random()*6+10, 1)}","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":520,"y":4680,"wires":[["a7c385c3a8417dad"]]},{"id":"4e4233dc603787ea","type":"inject","z":"65617ffeb779f51c","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"foo/bar/Voltage","payload":"","payloadType":"date","x":280,"y":4620,"wires":[["8aae3691017739b8"]]},{"id":"12bae7f80deda7b3","type":"inject","z":"65617ffeb779f51c","name":"john Temp","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"foo/john/Temp","payload":"","payloadType":"date","x":220,"y":4700,"wires":[["8aae3691017739b8"]]},{"id":"6936fc017d07a139","type":"inject","z":"65617ffeb779f51c","name":"john Volt","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"foo/john/Voltage","payload":"","payloadType":"date","x":220,"y":4740,"wires":[["8aae3691017739b8"]]},{"id":"5308d01d0797909d","type":"inject","z":"65617ffeb779f51c","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"foo/john/another","payload":"","payloadType":"date","x":600,"y":4600,"wires":[["8aae3691017739b8"]]},{"id":"5821e56a18523f02","type":"inject","z":"65617ffeb779f51c","name":"john Hum","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"foo/john/Humidity","payload":"","payloadType":"date","x":220,"y":4780,"wires":[["8aae3691017739b8"]]},{"id":"55f2ef1ee96632b6","type":"inject","z":"65617ffeb779f51c","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"foo/bar/Humidity","payload":"","payloadType":"date","x":280,"y":4660,"wires":[["8aae3691017739b8"]]},{"id":"a7c385c3a8417dad","type":"mqtt out","z":"65617ffeb779f51c","name":"","topic":"","qos":"","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"e8ba3ef5.22f4a8","x":670,"y":4680,"wires":[]},{"id":"4235b6534949180b","type":"mqtt in","z":"65617ffeb779f51c","name":"","topic":"foo/+/+","qos":"2","datatype":"auto-detect","broker":"e8ba3ef5.22f4a8","nl":false,"rap":true,"rh":0,"inputs":0,"x":150,"y":4860,"wires":[["83d53f3b33bda1c7","eb9778148fdbdeb0"]]},{"id":"eb9778148fdbdeb0","type":"change","z":"65617ffeb779f51c","name":"","rules":[{"t":"set","p":"topic_array","pt":"msg","to":"$split($$.topic, \"/\")","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":330,"y":4860,"wires":[["5feb7b9a84d38c0b"]]},{"id":"5feb7b9a84d38c0b","type":"switch","z":"65617ffeb779f51c","name":"","property":"$$.topic_array[2] in [\"Temp\", \"Voltage\", \"Humidity\"]","propertyType":"jsonata","rules":[{"t":"true"}],"checkall":"true","repair":false,"outputs":1,"x":490,"y":4860,"wires":[["76bdf0a51223d976"]]},{"id":"9462d9cc801e1bba","type":"debug","z":"65617ffeb779f51c","name":"debug 250","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":690,"y":4860,"wires":[]},{"id":"9fc07bc776bb3f6d","type":"change","z":"65617ffeb779f51c","name":"","rules":[{"t":"delete","p":"foo[msg.topic_array[1]]","pt":"flow"},{"t":"move","p":"payload","pt":"msg","to":"payload[0]","tot":"msg"},{"t":"set","p":"payload[1].tag1","pt":"msg","to":"topic_array[1]","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":540,"y":4900,"wires":[["9462d9cc801e1bba"]]},{"id":"0bba405707acd54d","type":"join","z":"65617ffeb779f51c","name":"","mode":"custom","build":"merged","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":390,"y":4900,"wires":[["9fc07bc776bb3f6d"]]},{"id":"76bdf0a51223d976","type":"change","z":"65617ffeb779f51c","name":"","rules":[{"t":"set","p":"foo[msg.topic_array[1]][msg.topic_array[2]]","pt":"flow","to":"payload.Val","tot":"msg"},{"t":"set","p":"payload","pt":"msg","to":"foo[msg.topic_array[1]]","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":240,"y":4900,"wires":[["0bba405707acd54d"]]},{"id":"e8ba3ef5.22f4a8","type":"mqtt-broker","name":"testb","broker":"192.168.1.25","port":"1883","clientid":"node-red-test","autoConnect":true,"usetls":false,"compatmode":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthRetain":"false","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]

Have a play.

The output would be for something like below, for six random incoming message you get an output for bar and john grouped.


[edit] added fuller example with some filtering and output for influx.

What out of that do you want in the database, and as what fields and tags?

[Edit] You say you want to use different buckets. Buckets are equivalent to a database in terms of a conventional database. I wonder whether actually you mean Measurements.

First of all, thanks for the hints, i will try the next days when i back in office and will keep you up to date.

@hominidae
Telegraf i got on the scope, but thought that nodered will be easier :slight_smile: maybe i will try it in a later point of the project.

@Colin
The topics look for example like
foo/bar/xxx/001/temp
foo/bar/xxx/001/co2
foo/bar/xxx/002/temp

foo/john/xxx/001/temp
foo/john/xxx/002/temp

foo/doe/yyy/001/temp
and so on.....

Each number are a single sensor with different value. From each sensor i want to have the tag and the value in the bucket and if possilbe tp also to know the name of the sensor.... If i understand correct, tp and tag are tags and val is a field. So now i only need to match it in the right way :upside_down_face:

Depend of the sensor group, everything with john and bar should be in a separete bucket. And evrything with yyy also in an separte bucket.

Why do you want them in separate buckets (that is effectively in different databases)? My initial suggestion would be to put all the temperature values (assuming that is what temp means) in one Influxdb Measurement (Measurements are similar to tables in a conventional database) and the co2 value in a different Measurement. Then, in addition to tp and tag being Tags, have the topic elements bar/john/doe and xxx/yyy as two more tags. I don't know what those actually mean in the real world so that might not be exactly what you want.

Just to add to what @Colin said, it sounds like you are confusing the term Bucket with Measurement. This video from 7 years ago helped me understand the concepts of measurements, tags and fields. For example (from the video)...

Hi,

so here is the real world - maybe this help a little

The topic is LoRa/Klima/CO2/xxx (xxx is number of sensor)

The idea is, to get everything from a single sensor (CO2, TMP, Humidity and V) in to influx and keep the informatin fo the sensor-id. Each number (002, 003, ...) are a separate sensor. In NodeRed i only want to have this with one flow for each kind of topic. So one flow for LoRa/Klima/CO2/xxx, a seperate flow for LoRa/FW/Counter/xxx

Bucket and Measurment i think i figuerd out - but i have no idea how to handle the json object which mqtt provide.

LoRa/Klima/CO2/002/CO2
{"ArrivalTime": "2023-03-31T13:56:59+02:00","TP": "Temp-Feuchte-CO2_002","Tag": "CD","Val": 355}

If i try the flow i get this messages - "Cannot merge non-object types"

any ideas why?

At a guess as you haven't given a lot to go on.
I would think you are passing in json strings an not objects. Show us what you are passing into the flow. and an example flow that you used.

Thanks for the tip with the json string and objects. After i convert the string to objects it looks good in the test which you provide.

I test it now with my orginal mqtt broker and will keep you up to date.

This i got via mqtt:

LoRa/Klima/CO2/002/CO2
{"ArrivalTime": "2023-03-31T13:56:59+02:00","TP": "Temp-Feuchte-CO2_002","Tag": "CD","Val": 355}

Hi all,

finaly i got it solved out. Thanks to the example from E1cid - this point me to the right direction. I only needed to changed the arrays and the topics

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.