Parse and transform MQTT message cluster to influx measurements

Hi,

I want to store incoming MQTT messages into Influx messages. The incoming messages look like this:

solaranzeige/wr1/ac-power 220
solaranzeige/wr1/ac-frequency 50.01
solaranzeige/wr2/ac-power 225
solaranzeige/wr1/ac-frequency 50.06

These Mqtt messages come in every 10 seconds. I can already receive the Mqtt messages and join them to one msg.payload resulting in this:

{"solaranzeige/wr1/ac-power“:220,“solaranzeige/wr1/ac-frequency“:50.01,“solaranzeige/wr2/ac-power“:225,“solaranzeige/wr2/ac-frequency“:50.06}

What I want is to store this to an influxdb database solaranzeige. And either two measurements wr1, wr2 with ac-power,ac-frequency being field keys. Or into one measurement (name to be discussed) with ac-power, ac-frequency being fields and „wr1,wr2“ being a tag.

And maybe I would like to filter the incoming messages in order to only pass certain topics (or key/value fields).

Any hint would be greatly appreciated.

Something like this?
Splits your msg in two (wr1 and wr2) and writes them into influxdb.

[{"id":"347f100875322db2","type":"influxdb out","z":"dfd70fafb4d381bc","influxdb":"e10beff6461faf32","name":"write solaranzeige to sensordb","measurement":"solaranzeige","precision":"","retentionPolicy":"","database":"database","precisionV18FluxV20":"ms","retentionPolicyV18Flux":"","org":"organisation","bucket":"bucket","x":870,"y":600,"wires":[]},{"id":"0e41344b8b2f75a0","type":"debug","z":"dfd70fafb4d381bc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":510,"y":520,"wires":[]},{"id":"ecc928dc611867c3","type":"debug","z":"dfd70fafb4d381bc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":810,"y":560,"wires":[]},{"id":"0d009ea810f4a5b8","type":"inject","z":"dfd70fafb4d381bc","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{\t    \"solaranzeige/wr1/ac-power\":220,\t    \"solaranzeige/wr1/ac-frequency\":50.01,\t    \"solaranzeige/wr2/ac-power\":225,\t    \"solaranzeige/wr2/ac-frequency\":50.06\t}","payloadType":"jsonata","x":290,"y":600,"wires":[["0e41344b8b2f75a0","619e05b66f0bf779","46874677c76b3f17"]]},{"id":"619e05b66f0bf779","type":"change","z":"dfd70fafb4d381bc","name":"prepare data for influx","rules":[{"t":"set","p":"payload","pt":"msg","to":"{\t   \"sensor\": 'wr1',\t   \"ac-power\": msg.payload.'solaranzeige/wr1/ac-power',\t   \"ac-frequency\": msg.payload.'solaranzeige/wr1/ac-frequency'\t} ","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":540,"y":560,"wires":[["e2d106bd391b0d05"]]},{"id":"46874677c76b3f17","type":"change","z":"dfd70fafb4d381bc","name":"prepare data for influx","rules":[{"t":"set","p":"payload","pt":"msg","to":"{\t   \"sensor\": 'wr2',\t   \"ac-power\": msg.payload.'solaranzeige/wr2/ac-power',\t   \"ac-frequency\": msg.payload.'solaranzeige/wr2/ac-frequency'\t} ","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":540,"y":600,"wires":[["e2d106bd391b0d05"]]},{"id":"e2d106bd391b0d05","type":"junction","z":"dfd70fafb4d381bc","x":700,"y":580,"wires":[["ecc928dc611867c3","347f100875322db2"]]},{"id":"e10beff6461faf32","type":"influxdb","hostname":"influxdb","port":"8086","protocol":"http","database":"sensordb","name":"","usetls":false,"tls":"","influxdbVersion":"1.x","url":"http://influxdb:8086","rejectUnauthorized":false}]

@cameo69

Very cool. Thanks for the fast reply.

I did forget to mention that this is a very stripped down example. In fact it is more like 40 measurements per device. I hoped for some brilliant magic function that would automatically strip

x/y/z value

to a

tag: x
field: z
value

object ready to inject in Influx regardless of how many x/y/z triples I feed it.

This might be more than I can do with my limited JSONata skills, but I suggest you post the real messages here. I am sure that makes it easier for ppl to help.

There are several ways to filter the topics,

  1. You can use dynamic subscriptions and only subscribe to the topics required.
  2. you can use a switch node with the property field set to JSONata J: $$.topic in ["topic/1","topic/2","topic/etc"], and the rule is true.

As to a dynamic way to set the fields and tags,. please supply an output format/s for the input you have shown, as guessing is not the best way to do this.

Hi @cameo69,

the real messages are the same structure. Excerpt:

solaranzeige/pv-garage/zentralertimestamp 1678873357
solaranzeige/pv-garage/wattstundengesamtheute 1489.23
solaranzeige/pv-garage/firmware 1
solaranzeige/pv-garage/ac_ausgangsfrequenz 49.99747467041
solaranzeige/pv-garage/solarspannung_string_2 308.56726074219
solaranzeige/pv-garage/solarstrom_string_2 1.9309170246124
solaranzeige/pv-garage/ac_wirkleistung 271.63934326172
solaranzeige/pv-garage/ac_ausgangsstrom 1.1686055958271
solaranzeige/pv-garage/ac_ausgangsspannung 233.96688842773
solaranzeige/pv-garage/solarstrom 1.2068809270859
solaranzeige/pv-garage/solarspannung 368.55401611328
solaranzeige/pv-garage/solarspannung_string_1 368.55401611328
solaranzeige/pv-garage/solarstrom_string_1 1.2068809270859
solaranzeige/pv-garage/geraetestatus 0
solaranzeige/pv-garage/errorcodes 0
solaranzeige/pv-garage/solarleistung_string_1 444.80081264802
solaranzeige/pv-garage/solarleistung_string_2 595.81777700511
solaranzeige/pv-garage/gen24 1
solaranzeige/pv-garage/temperatur 0
solaranzeige/pv-garage/wattstundengesamtjahr 0
solaranzeige/pv-garage/wattstundengesamt 0
solaranzeige/pv-garage/modulpvleistung 6270

Appr. 60 every 10 seconds. Per device (device in this case being pv-garage). The individual measurements (like modulpvleistung) might vary from device to device.

@E1cid thanks!

I would like to transform:

solaranzeige/pv-garage/zentralertimestamp 1678873357
solaranzeige/pv-garage/wattstundengesamtheute 1489.23
solaranzeige/pv-garage/firmware 1
solaranzeige/pv-garage/ac_ausgangsfrequenz 49.99747467041
solaranzeige/pv-garage/solarspannung_string_2 308.56726074219
solaranzeige/pv-garage/solarstrom_string_2 1.9309170246124
solaranzeige/pv-garage/ac_wirkleistung 271.63934326172
solaranzeige/pv-garage/ac_ausgangsstrom 1.1686055958271
solaranzeige/pv-garage/ac_ausgangsspannung 233.96688842773
solaranzeige/pv-garage/solarstrom 1.2068809270859
solaranzeige/pv-garage/solarspannung 368.55401611328
solaranzeige/pv-garage/solarspannung_string_1 368.55401611328
solaranzeige/pv-garage/solarstrom_string_1 1.2068809270859
solaranzeige/pv-garage/geraetestatus 0
solaranzeige/pv-garage/errorcodes 0
solaranzeige/pv-garage/solarleistung_string_1 444.80081264802
solaranzeige/pv-garage/solarleistung_string_2 595.81777700511
solaranzeige/pv-garage/gen24 1
solaranzeige/pv-garage/temperatur 0
solaranzeige/pv-garage/wattstundengesamtjahr 0
solaranzeige/pv-garage/wattstundengesamt 0
solaranzeige/pv-garage/modulpvleistung 6270

Into an Influx Measurement solaranzeige:

Timestamp, Device (Tag), wattstundengesamtheute (Field), firmware (Field), ac_ausgangsfrequenz (field) etc.
1678873357, pv-garage, 1489.23, 1, 49.99747467041

Maybe alternatively into one measurement per device. So measurement pv-garage with

Timestamp, wattstundengesamtheute (Field), firmware (Field), ac_ausgangsfrequenz (field) etc.
1678873357, 1489.23, 1, 49.99747467041

I am not sure about this. Is this one message per topic2 containing all topic3 or one message per triple?

Would strongly suggest to post a real message verbatim, then it should become clear which field is populated, how.

Hi @cameo69,

before I do the join it looks like this:

image

After the join:

{"solaranzeige/pv-garage/zentralertimestamp":1678881301,"solaranzeige/pv-garage/wattstundengesamtheute":4096.35,"solaranzeige/pv-garage/firmware":1,"solaranzeige/pv-garage/ac_ausgangsfrequenz":50.001197814941,"solaranzeige/pv-garage/solarspannung_string_2":311.28298950195}

Regards
JP

If values are coming in separately, then unless you have a good reason, send them to separate Measurements in Influx. There is generally no significant benefit in complicating the system by joining them together into one measurement. Use the KISS principle.

From your example I deduct that it is all solaranzeige.

This should do the trick more flexible:

[{"id":"14a022506648ecd8","type":"debug","z":"dfd70fafb4d381bc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":450,"y":680,"wires":[]},{"id":"b8e85eaee907f68a","type":"debug","z":"dfd70fafb4d381bc","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":690,"y":680,"wires":[]},{"id":"9239fcc8dd1f03ab","type":"inject","z":"dfd70fafb4d381bc","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{\t   \"solaranzeige/wr1/ac-power\":220,\t   \"solaranzeige/wr1/ac-frequency\":50.01,\t   \"solaranzeige/wr2/ac-power\":225,\t   \"solaranzeige/wr2/ac-frequency\":50.06,\t   \"solaranzeige/pv-garage/zentralertimestamp\": 1678881301,\t   \"solaranzeige/pv-garage/wattstundengesamtheute\": 4096.35,\t   \"solaranzeige/pv-garage/firmware\": 1,\t   \"solaranzeige/pv-garage/ac_ausgangsfrequenz\": 50.001197814941,\t   \"solaranzeige/pv-garage/solarspannung_string_2\": 311.28298950195,\t   \"solaranzeige/dumdidum/solarspannung_string_2\": 311.28298950195\t}","payloadType":"jsonata","x":290,"y":640,"wires":[["14a022506648ecd8","5c1c8ce1664b6345"]]},{"id":"5c1c8ce1664b6345","type":"change","z":"dfd70fafb4d381bc","name":"prepare data for influx","rules":[{"t":"set","p":"payload","pt":"msg","to":"(\t    $divided := $spread(msg.payload).(\t        {\t            \"sensor\": $split($keys($)[0],\"/\")[1],\t            $split($keys($)[0],\"/\")[2] : $lookup($, $keys($)[0])\t        }\t    );\t\t    $distinct($divided.sensor).([\t        $filter($divided, function($v, $i, $a) {$v.sensor = $}  ) ~> $merge()  \t    ])\t)","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":480,"y":640,"wires":[["b8e85eaee907f68a","b6c8086220147b43"]]},{"id":"b6c8086220147b43","type":"split","z":"dfd70fafb4d381bc","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":670,"y":640,"wires":[["8093dce92045ed90"]]},{"id":"8093dce92045ed90","type":"debug","z":"dfd70fafb4d381bc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":910,"y":680,"wires":[]},{"id":"8a55489d629c5941","type":"influxdb out","z":"dfd70fafb4d381bc","influxdb":"e10beff6461faf32","name":"write solaranzeige to sensordb","measurement":"solaranzeige","precision":"","retentionPolicy":"","database":"database","precisionV18FluxV20":"ms","retentionPolicyV18Flux":"","org":"organisation","bucket":"bucket","x":970,"y":640,"wires":[]},{"id":"e10beff6461faf32","type":"influxdb","hostname":"influxdb","port":"8086","protocol":"http","database":"sensordb","name":"","usetls":false,"tls":"","influxdbVersion":"1.x","url":"http://influxdb:8086","rejectUnauthorized":false}]

Result looks like this and could be easily sent to influxdb.

JSONata for reference: https://try.jsonata.org/K_ak_MUhp

Hi @Colin ,

agreed this is a solution and I am coming from the SQL not the non-SQL world in which you would most likely never do this. :slight_smile:

All the measurements are taken from one device (sensor) at one point in time. It is just the process of translating these to separate MQTT messages (instead of putting all of them in one JSON object and send that one message via MQTT) that creates the problem.

To me it feels natural to have all measurements taken from one device at one point in time (every 10 seconds) to put into one measurement (table) in influx as well. Agreed this is not absolutely necessary.

Note that if you want to sent fields and tags to the influxdb out node then you have to send an array of two objects, the first containing the fields and the second containing the tags. Something like

[
  {
    "ac-power": 225,
    "ac-frequency": 50.01
  },
  {
    sensor: "wr1"
  }
]

Also I recommend not using names with special characters such as a minus sign. Otherwise it just causes problems. They could perhaps be changed to underscores.

I was just looking into that.

@cameo69 your proposal is awesome and extremely close to the final solution. The icing on the cake would be to extract the sensor field and move things from

{"sensor":"pv-garage","zentralertimestamp":1678895197,"wattstundengesamtheute":7839.21,"firmware":1,"ac_ausgangsfrequenz":49.959522247314,"solarspannung_string_2":299.48822021484,"solarstrom_string_2":0.38880056142807,"ac_wirkleistung":283.07800292969}

to

[
   {
      "zentralertimestamp":1678895197,
      "wattstundengesamtheute":7839.21,
      "firmware":1,
      "ac_ausgangsfrequenz":49.959522247314,
      "solarspannung_string_2":299.48822021484,
      "solarstrom_string_2":0.38880056142807,
      "ac_wirkleistung":283.07800292969
   },
  {
    sensor: "pv-garage"
  }
]

I will try to find a solution but maybe this also is something which you guys simply laugh and point me to an oneline solution. :slight_smile:

You have been tremendous help so far and I cannot thank you enough!

OK, in that case I agree, it is worth putting them back together. Can you change the sending device code so that it doesn't split them up, or is it something you don't have control over?

Still require the exact format you wish to send to influx, I have taken what i think it should be.
You may have to adjust
example

[{"id":"9239fcc8dd1f03ab","type":"inject","z":"65617ffeb779f51c","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{\t   \"solaranzeige/wr1/ac-power\":220,\t   \"solaranzeige/wr1/ac-frequency\":50.01,\t   \"solaranzeige/wr2/ac-power\":225,\t   \"solaranzeige/wr2/ac-frequency\":50.06,\t   \"solaranzeige/pv-garage/zentralertimestamp\": 1678881301,\t   \"solaranzeige/pv-garage/wattstundengesamtheute\": 4096.35,\t   \"solaranzeige/pv-garage/firmware\": 1,\t   \"solaranzeige/pv-garage/ac_ausgangsfrequenz\": 50.001197814941,\t   \"solaranzeige/pv-garage/solarspannung_string_2\": 311.28298950195,\t   \"solaranzeige/dumdidum/solarspannung_string_2\": 311.28298950195\t}","payloadType":"json","x":130,"y":3520,"wires":[["14a022506648ecd8","5c1c8ce1664b6345"]]},{"id":"14a022506648ecd8","type":"debug","z":"65617ffeb779f51c","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":290,"y":3560,"wires":[]},{"id":"5c1c8ce1664b6345","type":"change","z":"65617ffeb779f51c","name":"prepare data for influx","rules":[{"t":"set","p":"payload","pt":"msg","to":"$keys($$.payload).(\t   $topic := $split($, \"/\");\t   {\t       $topic[2]: $lookup($$.payload,$),\t       \"sensor\": $topic[1]\t   }\t){sensor:$merge($)} ~> |$.*|{},[\"sensor\",\"firmware\"]|","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":320,"y":3520,"wires":[["b6c8086220147b43"]]},{"id":"b6c8086220147b43","type":"split","z":"65617ffeb779f51c","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"topic","x":490,"y":3460,"wires":[["d2b826ea4a6a4a4b"]]},{"id":"d2b826ea4a6a4a4b","type":"change","z":"65617ffeb779f51c","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"[\t  $$.payload,\t  {\t    \"sensor\": $$.topic\t  }\t]","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":620,"y":3520,"wires":[["8093dce92045ed90"]]},{"id":"8093dce92045ed90","type":"debug","z":"65617ffeb779f51c","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":750,"y":3560,"wires":[]}]

example output per device

[
  {
    "ac-power":220,
    "ac-frequency":50.01
  },
  {
    "sensor":"wr1"
  }
]

The expression that prepares the data to split

$keys($$.payload).(
   $topic := $split($, "/");
   {
       $topic[2]: $lookup($$.payload,$),
       "sensor": $topic[1]
   }
){sensor:$merge($)} ~> |$.*|{},["sensor","firmware"]|

[edit] Added "firmware"to the delete section, any readings you do not want sent add to the delete array. If you want firmware remove it from the delete array.

:slight_smile:

[{"id":"3d353a08b767cc7b","type":"debug","z":"dfd70fafb4d381bc","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":450,"y":900,"wires":[]},{"id":"4ae3f61cab8f50f3","type":"debug","z":"dfd70fafb4d381bc","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":690,"y":900,"wires":[]},{"id":"69de696ea3fd0d39","type":"inject","z":"dfd70fafb4d381bc","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{\t   \"solaranzeige/wr1/ac-power\":220,\t   \"solaranzeige/wr1/ac-frequency\":50.01,\t   \"solaranzeige/wr2/ac-power\":225,\t   \"solaranzeige/wr2/ac-frequency\":50.06,\t   \"solaranzeige/pv-garage/zentralertimestamp\": 1678881301,\t   \"solaranzeige/pv-garage/wattstundengesamtheute\": 4096.35,\t   \"solaranzeige/pv-garage/firmware\": 1,\t   \"solaranzeige/pv-garage/ac_ausgangsfrequenz\": 50.001197814941,\t   \"solaranzeige/pv-garage/solarspannung_string_2\": 311.28298950195,\t   \"solaranzeige/dumdidum/solarspannung_string_2\": 311.28298950195\t}","payloadType":"jsonata","x":290,"y":860,"wires":[["3d353a08b767cc7b","fd95d439d95a5be3"]]},{"id":"fd95d439d95a5be3","type":"change","z":"dfd70fafb4d381bc","name":"prepare data for influx","rules":[{"t":"set","p":"payload","pt":"msg","to":"(\t   $divided := $spread(msg.payload).(\t       {\t           \"sensor\": $split($keys($)[0],\"/\")[1],\t           $split($keys($)[0],\"/\")[2] : $lookup($, $keys($)[0])\t        }\t    );\t   $vals := $distinct($divided.sensor).(\t       [\t           $filter(\t               $divided,\t               function($v, $i, $a) {$v.sensor = $}  \t           ) ~> $merge()   \t    ]\t   );\t   $vals.(\t       [\t           [\t               $ ~> |$|{},\t               [\"sensor\"]|,\t               {\"sensor\": $.sensor}\t            ]\t        ]\t    )\t\t)","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":480,"y":860,"wires":[["4ae3f61cab8f50f3","732cb9631b2b35b2"]]},{"id":"732cb9631b2b35b2","type":"split","z":"dfd70fafb4d381bc","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":670,"y":860,"wires":[["8ec7fe81cd1ea23f"]]},{"id":"8ec7fe81cd1ea23f","type":"debug","z":"dfd70fafb4d381bc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":910,"y":900,"wires":[]},{"id":"2bb4918a48999c5e","type":"influxdb out","z":"dfd70fafb4d381bc","influxdb":"e10beff6461faf32","name":"write solaranzeige to sensordb","measurement":"solaranzeige","precision":"","retentionPolicy":"","database":"database","precisionV18FluxV20":"ms","retentionPolicyV18Flux":"","org":"organisation","bucket":"bucket","x":970,"y":860,"wires":[]},{"id":"e10beff6461faf32","type":"influxdb","hostname":"influxdb","port":"8086","protocol":"http","database":"sensordb","name":"","usetls":false,"tls":"","influxdbVersion":"1.x","url":"http://influxdb:8086","rejectUnauthorized":false}]

Sending software is not mine but I will investigate. You all have been a tremendous help. Thank you guys so much.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.