Extract the same mqtt topic value from 20 devices

Hi All
I have 20 units and i am trying to find a "better" way to sum all the payload values of the topics instead of having 20 mqtt in nodes.
Here is an example of one topic:
topic: "xyz/1/kW"
payload: 25
qos: 2
retain: false
_topic: "xyz/1/kW"
_msgid: "98cc6a1a.85e158"

Z69

Hi, have a look at the "Wildcards" section here:

Hi ghayne
I should have indicated that i understand wild card.
I can see all the kW topics, i am trying to get to the next point, not that good with code - still learning, on how do i add all the payloads together?

image
switch - looks at msg.topic and filter = contains "kW"
function - just convert payload from text to number

If i do them one-by-one, i can just assign a var and sum them, but not sure how to do that in this case

The 1 in the middle - is it the number (id) of the unit from where the measurement is coming?

Hi hotNipi

Yes, that is the unit "ID"

The messages come form mqtt one by one. So if you need to sum up certain measurements of all units, you'll need to have those units somehow represented in node-red. (Most probably stored in global context) And then by the ID of incoming message, find that unit and update the measured value. (and store the unit state back to context)
Then the incoming message is also an event, indicating that measured value is changed (maybe even not changed but at least the unit has send something so it is alive).
Taking that event as the inject for following (separate) flow where you get all the units from (global) context and do the math (add up all values to find out total).

Is this what you are trying to achieve?

1 Like

You can use wild card to filter the MQTT message using xyz/+/KW
then to sum them using context store and id each message via its topic
example below.

[{"id":"9c489037.d1e608","type":"inject","z":"8d22ae29.7df6d","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Mypi/1/KW","payload":"25","payloadType":"num","x":150,"y":120,"wires":[["b62594de.3589a"]]},{"id":"b62594de.3589a","type":"mqtt out","z":"8d22ae29.7df6d","name":"","topic":"","qos":"","retain":"","broker":"35ccc936.fc2256","x":440,"y":160,"wires":[]},{"id":"808d0bf0.9b8d88","type":"inject","z":"8d22ae29.7df6d","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Mypi/2/KW","payload":"23","payloadType":"str","x":150,"y":160,"wires":[["b62594de.3589a"]]},{"id":"5a77f3e6.771824","type":"inject","z":"8d22ae29.7df6d","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Mypi/3/KW","payload":"22","payloadType":"num","x":150,"y":200,"wires":[["b62594de.3589a"]]},{"id":"8524dd60.04a128","type":"mqtt in","z":"8d22ae29.7df6d","name":"","topic":"Mypi/+/KW","qos":"2","datatype":"auto","broker":"35ccc936.fc2256","x":390,"y":40,"wires":[["55d19da0.61d88c","1d5fdaf3.808655"]]},{"id":"55d19da0.61d88c","type":"function","z":"8d22ae29.7df6d","name":"","func":"let sumObject = context.get(\"sumObject\") || {};\nsumObject[msg.topic] = parseInt(msg.payload);\ncontext.set(\"sumObject\", sumObject);\nmsg.payload = Object.values(sumObject).reduce( (accumulator, currentValue) => accumulator + currentValue );\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":580,"y":40,"wires":[["1d5fdaf3.808655"]]},{"id":"1d5fdaf3.808655","type":"debug","z":"8d22ae29.7df6d","name":"","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":650,"y":120,"wires":[]},{"id":"35ccc936.fc2256","type":"mqtt-broker","z":"","name":"free","broker":"broker.hivemq.com","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

Edit/ for total dynamic, i have set random ID's and input in inject nodes, so each press will be a a random id and value. All standard nodes

[{"id":"9c489037.d1e608","type":"inject","z":"8d22ae29.7df6d","name":"random KW","props":[{"p":"payload"},{"p":"topic","v":"\"Mypi/\" & $ceil($random()*20) &  \"/KW\"","vt":"jsonata"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"$floor($random() *15) +15","payloadType":"jsonata","x":150,"y":120,"wires":[["b62594de.3589a"]]},{"id":"b62594de.3589a","type":"mqtt out","z":"8d22ae29.7df6d","name":"","topic":"","qos":"","retain":"","broker":"35ccc936.fc2256","x":440,"y":160,"wires":[]},{"id":"808d0bf0.9b8d88","type":"inject","z":"8d22ae29.7df6d","name":"random ohm","props":[{"p":"payload"},{"p":"topic","v":"\"Mypi/\" & $ceil($random()*20) &  \"/OHM\"","vt":"jsonata"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"$random()  ","payloadType":"jsonata","x":150,"y":160,"wires":[["b62594de.3589a"]]},{"id":"5a77f3e6.771824","type":"inject","z":"8d22ae29.7df6d","name":"random amp","props":[{"p":"payload"},{"p":"topic","v":"\"Mypi/\" & $ceil($random()*20) &  \"/AMP\"","vt":"jsonata"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"$ceil($random() *2) ","payloadType":"jsonata","x":150,"y":200,"wires":[["b62594de.3589a"]]},{"id":"8524dd60.04a128","type":"mqtt in","z":"8d22ae29.7df6d","name":"","topic":"Mypi/#","qos":"2","datatype":"auto","broker":"35ccc936.fc2256","x":290,"y":40,"wires":[["55d19da0.61d88c"]]},{"id":"55d19da0.61d88c","type":"function","z":"8d22ae29.7df6d","name":"","func":"let topics = msg.topic.split(\"/\");\nif (![\"KW\",\"AMP\",\"OHM\"].includes(topics[2])){\n    return;\n}\nlet sumObject = context.get(\"sumObject\") || {};\nif(!sumObject[topics[2]]){\n    sumObject[topics[2]] ={};\n}\nsumObject[topics[2]][topics[1]] = parseFloat(msg.payload);\ncontext.set(\"sumObject\", sumObject);\nmsg.payload ={};\nmsg.payload[topics[2]+\"_Total\"] = Object.values(sumObject[topics[2]]).reduce( (accumulator, currentValue) => accumulator + currentValue );\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":440,"y":40,"wires":[["1d5fdaf3.808655"]]},{"id":"1d5fdaf3.808655","type":"debug","z":"8d22ae29.7df6d","name":"","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"true","targetType":"full","statusVal":"payload","statusType":"auto","x":570,"y":40,"wires":[]},{"id":"35ccc936.fc2256","type":"mqtt-broker","z":"","name":"free","broker":"broker.hivemq.com","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

Edit/ added filter in function node , only items in array line 2 are logged and summed

Or may be a bit more dynamic (object oriented) way
(flow contains node-red-node-random)

[{"id":"f6a645f4.6c51f8","type":"change","z":"1ea32064.47b71","name":"parse topic","rules":[{"t":"set","p":"targetUnit","pt":"msg","to":"$split(msg.topic, \"/\")[1]","tot":"jsonata"},{"t":"set","p":"measurement","pt":"msg","to":"$split(msg.topic, \"/\")[2]","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":270,"y":780,"wires":[["476acacd.dc40c4"]]},{"id":"6f482439.78c80c","type":"mqtt out","z":"1ea32064.47b71","name":"","topic":"","qos":"","retain":"","broker":"ec674c1c.f707b","x":730,"y":540,"wires":[]},{"id":"fef3ae43.cb5a3","type":"inject","z":"1ea32064.47b71","name":"","topic":"","payload":"","payloadType":"date","repeat":"2","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":540,"wires":[["3af39d2a.512272"]]},{"id":"3af39d2a.512272","type":"random","z":"1ea32064.47b71","name":"","low":"0","high":"10","inte":"true","property":"payload","x":320,"y":540,"wires":[["3bc1afb.1e5ed5"]]},{"id":"3bc1afb.1e5ed5","type":"function","z":"1ea32064.47b71","name":"fake kW","func":"msg.topic=\"xyz/\"+msg.payload+\"/kW\"\nreturn msg;","outputs":1,"noerr":0,"x":460,"y":540,"wires":[["8ef85a9e.28ca98"]]},{"id":"8ef85a9e.28ca98","type":"random","z":"1ea32064.47b71","name":"","low":"1","high":"10","inte":"true","property":"payload","x":600,"y":540,"wires":[["6f482439.78c80c"]]},{"id":"c06f5b2d.b22898","type":"mqtt in","z":"1ea32064.47b71","name":"","topic":"xyz/#","qos":"2","datatype":"auto","broker":"ec674c1c.f707b","x":120,"y":780,"wires":[["f6a645f4.6c51f8"]]},{"id":"91faf7d3.b92608","type":"debug","z":"1ea32064.47b71","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":610,"y":920,"wires":[]},{"id":"476acacd.dc40c4","type":"function","z":"1ea32064.47b71","name":"store","func":"var units = global.get('units')||[] \nvar unit = units[msg.targetUnit] || {}\nvar measurement = unit[msg.measurement] || {[`${msg.measurement}`]:0}\nvar value = parseFloat(msg.payload)\nif(measurement != value){\n    measurement = value\n    unit[msg.measurement] = measurement\n    units[msg.targetUnit] = unit\n    units[msg.targetUnit].timestamp = new Date().getTime()\n    global.set('units',units)\n    msg.payload = msg.targetUnit\n    msg.topic = \"unitUpdate\"\n    return msg;\n}\n","outputs":1,"noerr":0,"x":430,"y":780,"wires":[["f1601565.a713f8"]]},{"id":"9a4ec3e.2a01c4","type":"mqtt out","z":"1ea32064.47b71","name":"","topic":"","qos":"","retain":"","broker":"ec674c1c.f707b","x":730,"y":600,"wires":[]},{"id":"b036345a.ac84c8","type":"inject","z":"1ea32064.47b71","name":"","topic":"","payload":"","payloadType":"date","repeat":"3","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":600,"wires":[["9712ec5d.69bec"]]},{"id":"9712ec5d.69bec","type":"random","z":"1ea32064.47b71","name":"","low":"0","high":"10","inte":"true","property":"payload","x":320,"y":600,"wires":[["8b5d0f59.682cc"]]},{"id":"8b5d0f59.682cc","type":"function","z":"1ea32064.47b71","name":"fake ohm","func":"msg.topic=\"xyz/\"+msg.payload+\"/ohm\"\nreturn msg;","outputs":1,"noerr":0,"x":460,"y":600,"wires":[["686da0e8.dc2e3"]]},{"id":"686da0e8.dc2e3","type":"random","z":"1ea32064.47b71","name":"","low":"1000","high":"10000","inte":"true","property":"payload","x":600,"y":600,"wires":[["9a4ec3e.2a01c4"]]},{"id":"f1601565.a713f8","type":"link out","z":"1ea32064.47b71","name":"unit-update","links":["7e91c635.3f6168"],"x":515,"y":780,"wires":[]},{"id":"7e91c635.3f6168","type":"link in","z":"1ea32064.47b71","name":"calculate","links":["f1601565.a713f8"],"x":200,"y":920,"wires":[["ab3c7808.3f4c38"]]},{"id":"ab3c7808.3f4c38","type":"switch","z":"1ea32064.47b71","name":"","property":"measurement","propertyType":"msg","rules":[{"t":"eq","v":"kW","vt":"str"},{"t":"eq","v":"ohm","vt":"str"},{"t":"eq","v":"temp","vt":"str"}],"checkall":"true","repair":false,"outputs":3,"x":290,"y":920,"wires":[["807d846c.274a58"],["34ebfe03.b97bd2"],["ee3d6b24.fa4948"]]},{"id":"2834760c.a1600a","type":"mqtt out","z":"1ea32064.47b71","name":"","topic":"","qos":"","retain":"","broker":"ec674c1c.f707b","x":710,"y":660,"wires":[]},{"id":"edf0a5ba.fa5218","type":"inject","z":"1ea32064.47b71","name":"","topic":"","payload":"","payloadType":"date","repeat":"4","crontab":"","once":false,"onceDelay":0.1,"x":150,"y":660,"wires":[["ba1354ff.f5b7c8"]]},{"id":"ba1354ff.f5b7c8","type":"random","z":"1ea32064.47b71","name":"","low":"0","high":"10","inte":"true","property":"payload","x":300,"y":660,"wires":[["6a2f127c.2f62ac"]]},{"id":"6a2f127c.2f62ac","type":"function","z":"1ea32064.47b71","name":"fake temp","func":"msg.topic=\"xyz/\"+msg.payload+\"/temp\"\nreturn msg;","outputs":1,"noerr":0,"x":440,"y":660,"wires":[["19858194.59d89e"]]},{"id":"19858194.59d89e","type":"random","z":"1ea32064.47b71","name":"","low":"15","high":"35","inte":"false","property":"payload","x":580,"y":660,"wires":[["2834760c.a1600a"]]},{"id":"807d846c.274a58","type":"function","z":"1ea32064.47b71","name":"sum kW","func":"var units = global.get('units')\nvar sum = units.reduce((a, b) => a + (b.kW ? b.kW : 0), 0)\nmsg.payload = sum\nmsg.topic = 'sum of kW of all units'\nreturn msg;","outputs":1,"noerr":0,"x":440,"y":880,"wires":[["91faf7d3.b92608"]]},{"id":"34ebfe03.b97bd2","type":"function","z":"1ea32064.47b71","name":"sum ohm","func":"var units = global.get('units')\nvar sum = units.reduce((a, b) => a + (b.ohm ? b.ohm : 0), 0)\nmsg.payload = sum\nmsg.topic = 'sum of ohm of all units'\nreturn msg;","outputs":1,"noerr":0,"x":440,"y":920,"wires":[["91faf7d3.b92608"]]},{"id":"ee3d6b24.fa4948","type":"function","z":"1ea32064.47b71","name":"sum temp","func":"var units = global.get('units')\nvar sum = units.reduce((a, b) => a + (b.temp ? b.temp : 0), 0)\nmsg.payload = sum\nmsg.topic = 'sum of temp of all units'\nreturn msg;","outputs":1,"noerr":0,"x":440,"y":960,"wires":[["91faf7d3.b92608"]]},{"id":"a73ef9e4.f4b828","type":"comment","z":"1ea32064.47b71","name":"fake measurements from 10 units","info":"","x":230,"y":500,"wires":[]},{"id":"5cd54264.19bc6c","type":"comment","z":"1ea32064.47b71","name":"store incoming measurements","info":"it creates the object to hold the data if it does not exist.\nit creates properties for object by the type of measurement\nit adds/adjusts timestamp of latest update\n\nit sends out the message as event indicating that some property of the unit has been changed\n","x":500,"y":740,"wires":[]},{"id":"b67da41e.884fc8","type":"comment","z":"1ea32064.47b71","name":"parse topic","info":"it parses out the unit id from where the message originates\n\nit parses out the measurement type\n","x":240,"y":740,"wires":[]},{"id":"29d73c2.544d2c4","type":"comment","z":"1ea32064.47b71","name":"switch by measurement type","info":"","x":220,"y":860,"wires":[]},{"id":"b1ecf37d.576b5","type":"comment","z":"1ea32064.47b71","name":"sum by type","info":"","x":450,"y":840,"wires":[]},{"id":"ec674c1c.f707b","type":"mqtt-broker","z":"","name":"nipi","broker":"localhost","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]
1 Like

Hi hotNipi

Thank you for your code, i tested it with three units for now, and it is working 100%.
I get the just of your code, I now just have to go thru your code and do some "Google is your Friend" to try to understand and learn what you have done.

Again, thank you.

I actually made couple of changes after I shared initially.
As I did it quickly I didn't think too deep. It works but can be done better.
For example to hold units (renamed to devices) in array is not the best idea.
Even if you currently have device index in topic, this is not always the case.
It can be freely the name of device.
xyz/device_name/measurement
And then the stored object will be much more human readable that way. But it works for indexes also.

Also you can use it for another group of devices without need to do any changes.


So may be a bit better version of it:

[{"id":"f6a645f4.6c51f8","type":"change","z":"1ea32064.47b71","name":"parse topic","rules":[{"t":"set","p":"devices","pt":"msg","to":"$split(msg.topic, \"/\")[0]","tot":"jsonata"},{"t":"set","p":"device","pt":"msg","to":"$split(msg.topic, \"/\")[1]","tot":"jsonata"},{"t":"set","p":"measurement","pt":"msg","to":"$split(msg.topic, \"/\")[2]","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":270,"y":780,"wires":[["476acacd.dc40c4","c4af2a2a.d1efe8"]]},{"id":"c06f5b2d.b22898","type":"mqtt in","z":"1ea32064.47b71","name":"","topic":"xyz/#","qos":"2","datatype":"auto","broker":"ec674c1c.f707b","x":120,"y":780,"wires":[["f6a645f4.6c51f8"]]},{"id":"91faf7d3.b92608","type":"debug","z":"1ea32064.47b71","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":450,"y":880,"wires":[]},{"id":"476acacd.dc40c4","type":"function","z":"1ea32064.47b71","name":"store","func":"var devices = global.get(msg.devices)|| {}\nvar device = devices[msg.device] || {}\nvar measurement = device[msg.measurement] || {[`${msg.measurement}`]:0}\nvar value = parseFloat(msg.payload)\nif(measurement != value){\n    device[msg.measurement] = value\n    devices[msg.device] = device\n    devices[msg.device].timestamp = new Date().getTime()\n    global.set(msg.devices,devices)\n    msg.payload = msg.device\n    msg.topic = \"deviceUpdate\"\n    return msg;\n}\n","outputs":1,"noerr":0,"x":430,"y":780,"wires":[["f1601565.a713f8"]]},{"id":"f1601565.a713f8","type":"link out","z":"1ea32064.47b71","name":"unit-update","links":["7e91c635.3f6168"],"x":515,"y":780,"wires":[]},{"id":"7e91c635.3f6168","type":"link in","z":"1ea32064.47b71","name":"calculate","links":["f1601565.a713f8"],"x":135,"y":880,"wires":[["807d846c.274a58"]]},{"id":"807d846c.274a58","type":"function","z":"1ea32064.47b71","name":"sum of measurement","func":"msg.payload = Object.values(global.get(msg.devices)).reduce((a, b) => a + (b[msg.measurement] ? b[msg.measurement] : 0), 0)\nmsg.topic = 'sum of '+msg.measurement+' of all devices'\nreturn msg;","outputs":1,"noerr":0,"x":280,"y":880,"wires":[["91faf7d3.b92608"]]},{"id":"5cd54264.19bc6c","type":"comment","z":"1ea32064.47b71","name":"store incoming measurements","info":"it creates the object to hold the data if it does not exist.\nit creates properties for object by the type of measurement\nit adds/adjusts timestamp of latest update\n\nit sends out the message as event indicating that some property of the unit has been changed\n","x":500,"y":740,"wires":[]},{"id":"b67da41e.884fc8","type":"comment","z":"1ea32064.47b71","name":"parse topic","info":"it parses out the unit id from where the message originates\n\nit parses out the measurement type\n","x":240,"y":740,"wires":[]},{"id":"b1ecf37d.576b5","type":"comment","z":"1ea32064.47b71","name":"sum by type","info":"","x":270,"y":840,"wires":[]},{"id":"c4af2a2a.d1efe8","type":"debug","z":"1ea32064.47b71","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":430,"y":820,"wires":[]},{"id":"c913f9a7.f00808","type":"inject","z":"1ea32064.47b71","name":"","topic":"","payload":"kW","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":150,"y":980,"wires":[["febe248d.b007d8"]]},{"id":"febe248d.b007d8","type":"function","z":"1ea32064.47b71","name":"get sum of specific measurements ","func":"var devices = global.get('xyz')\nmsg.payload = Object.values(devices).reduce((a, b) => a + (b[msg.payload] ? b[msg.payload] : 0), 0)\n\nreturn msg;","outputs":1,"noerr":0,"x":360,"y":980,"wires":[["bc644140.d51eb"]]},{"id":"bc644140.d51eb","type":"debug","z":"1ea32064.47b71","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":570,"y":980,"wires":[]},{"id":"dba26b95.93d948","type":"inject","z":"1ea32064.47b71","name":"","topic":"4","payload":"kW","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":130,"y":1020,"wires":[["fefb14dc.dff178"]]},{"id":"fefb14dc.dff178","type":"function","z":"1ea32064.47b71","name":"get specific measurements of one device","func":"var device = global.get('xyz')[msg.topic]\nmsg.payload = device[msg.payload] ? device[msg.payload] : 0\nreturn msg;","outputs":1,"noerr":0,"x":360,"y":1020,"wires":[["188af27c.8eb44e"]]},{"id":"188af27c.8eb44e","type":"debug","z":"1ea32064.47b71","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","x":590,"y":1020,"wires":[]},{"id":"92276c7a.0c4fa","type":"comment","z":"1ea32064.47b71","name":"Usage of stored data as needed","info":"","x":350,"y":940,"wires":[]},{"id":"ec674c1c.f707b","type":"mqtt-broker","z":"","name":"nipi","broker":"localhost","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.