Problem with join node and msg.parts

I am trying to discover all Tasmota MQTT devices on my system.
I am able to get all the topics with this flow, but I can't get them joined into a single object. I don't know how to correctly set the msg.parts for the join node:

[{"id":"bf137310810593ae","type":"mqtt in","z":"fd95058a.48e3e8","name":"","topic":"tele/+/LWT","qos":"2","datatype":"auto","broker":"e2295db.3547aa","nl":false,"rap":true,"rh":0,"inputs":0,"x":240,"y":1180,"wires":[["25712040044e3040"]]},{"id":"c24b7d393068ad09","type":"debug","z":"fd95058a.48e3e8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":630,"y":1180,"wires":[]},{"id":"25712040044e3040","type":"function","z":"fd95058a.48e3e8","name":"","func":"a = msg.topic\nb= a.split(\"/\");\nmsg.payload = b[1];\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":420,"y":1180,"wires":[["10e206036a4b4df8"]]},{"id":"10e206036a4b4df8","type":"join","z":"fd95058a.48e3e8","name":"","mode":"reduce","build":"string","property":"payload","propertyType":"msg","key":"topic","joiner":",","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"$append($A,[{\t  \"name\": payload,\t  \"index\": $I\t}])","reduceInit":"[]","reduceInitType":"json","reduceFixup":"","x":510,"y":1240,"wires":[["c24b7d393068ad09"]]},{"id":"e2295db.3547aa","type":"mqtt-broker","name":"SaltyMQTT","broker":"localhost","port":"1883","clientid":"","usetls":false,"compatmode":false,"protocolVersion":4,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

Try moving the device name to the topic.

Did it. Same. Messages are not joined.

Ahhh, I didn't look at your join. What do you want the end result object to look like?
do you know howmany MQTT msgs there will be?

I don't know how many they are, that's the scope of the flow (discovery).
The resulting object is what configured in the join node:

$append($A,[{
  "name": payload,
  "index": $I
}])

an array of objects. But an array of strings (payload) could be acceptable too.

Since you are using the LWT from different devices, all the messages aren't going to arrive at the same time and you will never know when to do the join.

It seems to me what you want to do is create a global to hold the status of each device. That way you could check it an any time.

Do the devices send out a birth msg too?

I want to create a global to hold all the MQTT topic of devices (and then get peculiar informations).
This is the complete message received from a device:

{"topic":"tele/PAB-plug/LWT","payload":"Online","qos":0,"retain":true,"_topic":"tele/PAB-plug/LWT","_msgid":"00c8acda9b5708ff"}

Maybe the join node is not the better choice.

Give me a few minutes to try and convert a flow I have that builds an object to check doors.

Here is the 'Door' flow using injects if you want to see what I'm changing.

[{"id":"cc38f2c70173beeb","type":"tab","label":"Flow 1","disabled":false,"info":"","env":[]},{"id":"b946b6d6f1384bbd","type":"inject","z":"cc38f2c70173beeb","name":"Front door unlocked","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Front door","payload":"unlocked","payloadType":"str","x":170,"y":240,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"5bdafdb6d195bc3f","type":"inject","z":"cc38f2c70173beeb","name":"Front door locked","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Front door","payload":"locked","payloadType":"str","x":170,"y":280,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"093316f2d930fb91","type":"inject","z":"cc38f2c70173beeb","name":"gym back door left open","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"gym back door left","payload":"open","payloadType":"str","x":190,"y":340,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"0794bc9e30ea9991","type":"inject","z":"cc38f2c70173beeb","name":"gym back door left closed","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"gym back door left","payload":"closed","payloadType":"str","x":190,"y":380,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"a8ae0244601876fb","type":"inject","z":"cc38f2c70173beeb","name":"gym back door right open","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"gym back door right","payload":"open","payloadType":"str","x":190,"y":440,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"e7886559a21e6046","type":"inject","z":"cc38f2c70173beeb","name":"gym back door right closed","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"gym back door right","payload":"closed","payloadType":"str","x":200,"y":480,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"801517e98c9ba6fa","type":"inject","z":"cc38f2c70173beeb","name":" jarvis door unlocked","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":" jarvis door","payload":"unlocked","payloadType":"str","x":170,"y":540,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"153ce5f9350b6398","type":"inject","z":"cc38f2c70173beeb","name":" jarvis door locked","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":" jarvis door","payload":"locked","payloadType":"str","x":170,"y":580,"wires":[["ebe70eb3bfe15eeb"]]},{"id":"feb31e1bca38aa94","type":"inject","z":"cc38f2c70173beeb","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"doors","payloadType":"global","x":410,"y":160,"wires":[["b554871717464e74"]]},{"id":"ebe70eb3bfe15eeb","type":"link call","z":"cc38f2c70173beeb","name":"Set status in global","links":["d40b527dcbde48df"],"linkType":"static","timeout":"30","x":550,"y":380,"wires":[["a8f6c839a13a0c28"]]},{"id":"d40b527dcbde48df","type":"link in","z":"cc38f2c70173beeb","name":"Set Status","links":[],"x":520,"y":520,"wires":[["252d653f926ccff9"]],"l":true},{"id":"ca2a8928d1fe6ca8","type":"link out","z":"cc38f2c70173beeb","name":"return","mode":"return","links":[],"x":870,"y":520,"wires":[],"l":true},{"id":"252d653f926ccff9","type":"function","z":"cc38f2c70173beeb","name":"","func":"// get the global 'doors' object or itinilize it if it doesn't exist\nlet doors = global.get('doors') || {}\n\n// get the door name from msg.topic\nlet door_name = msg.topic\n\n// get this doors object of initilize it\n//let this_door = doors.door_name ||{'name': msg.topic, 'status': msg.payload}\nlet this_door = doors.door_name ||{'status': msg.payload}\n\n// check the door status and set it in teh objct\nif ( (msg.payload == 'open') || (msg.payload == 'unlocked') ) {\n    this_door.state = 0\n    this_door.status = msg.payload\n} else if ( (msg.payload == 'closed') || (msg.payload == 'locked') ) {\n    this_door.state = 1    \n    this_door.status = msg.payload\n} else {\n    // an unknown status has arrived - send a warning\n    node.warn('unknown status for '+ msg.topic)\n    return null\n}\n\n// update the 'doors' variable\ndoors[door_name] = this_door\n// update the 'doors' global\nglobal.set('doors', doors)\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":700,"y":520,"wires":[["ca2a8928d1fe6ca8"]]},{"id":"a8f6c839a13a0c28","type":"debug","z":"cc38f2c70173beeb","name":"","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":780,"y":380,"wires":[]},{"id":"3c5671db2c18c284","type":"debug","z":"cc38f2c70173beeb","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":790,"y":160,"wires":[]},{"id":"ddd7971942421d43","type":"comment","z":"cc38f2c70173beeb","name":"This flow runs at a set interval and checks to sse that all doors are locked/closed ","info":"","x":610,"y":100,"wires":[]},{"id":"ab37f81aefc3e35f","type":"comment","z":"cc38f2c70173beeb","name":"'Set Status' This flow sets the global with the status of each door ","info":"","x":690,"y":460,"wires":[]},{"id":"51b224860c6ee555","type":"comment","z":"cc38f2c70173beeb","name":"This flow takes the input status of the doors and calls the 'Set Status' flow","info":"","x":730,"y":300,"wires":[]},{"id":"b554871717464e74","type":"function","z":"cc38f2c70173beeb","name":"","func":"function delay(time) {\n  return new Promise(resolve => setTimeout(resolve, time));\n}\n\n\nconst doors =  msg.payload\nmsg.payload = \"system secure\"\n\nfor (let key in doors) {\n//    node.warn(\"processing\" key);\n//    node.warn(\"door: \" + key + \" state=\" + doors[key][\"state\"]);\n    if (doors[key][\"state\"] == 0) {\n        msg.payload = \"security breach: \"+ key;\n        node.send(msg);\n    }\n}\n\nreturn;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":620,"y":160,"wires":[["3c5671db2c18c284"]]}]

Try this flow. I think it will do just what you want.

[{"id":"bf137310810593ae","type":"mqtt in","z":"c717bb01fa7ff12b","name":"","topic":"tele/+/LWT","qos":"2","datatype":"auto","broker":"5f0a4eca.22adb","nl":false,"rap":true,"rh":0,"inputs":0,"x":160,"y":400,"wires":[["25712040044e3040"]]},{"id":"25712040044e3040","type":"function","z":"c717bb01fa7ff12b","name":"","func":"a = msg.topic\nb= a.split(\"/\");\nmsg.device = b[1];\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":320,"y":400,"wires":[["4630a8a9d06331da"]]},{"id":"4630a8a9d06331da","type":"link call","z":"c717bb01fa7ff12b","name":"Set status in global","links":["1f65bee5cc773d63"],"linkType":"static","timeout":"30","x":530,"y":400,"wires":[["d49cee5b602af457"]]},{"id":"d49cee5b602af457","type":"debug","z":"c717bb01fa7ff12b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":750,"y":400,"wires":[]},{"id":"1f65bee5cc773d63","type":"link in","z":"c717bb01fa7ff12b","name":"Set Status","links":[],"x":280,"y":540,"wires":[["7710d3568d33c096"]],"l":true},{"id":"aee1d5b3d1975eb0","type":"link out","z":"c717bb01fa7ff12b","name":"return","mode":"return","links":[],"x":630,"y":540,"wires":[],"l":true},{"id":"7710d3568d33c096","type":"function","z":"c717bb01fa7ff12b","name":"","func":"// get the global 'devices' object or itinilize it if it doesn't exist\nlet devices = global.get('devices') || {}\n\n// get the device name from msg.topic\nlet device_name = msg.topic\n\n// get this devices object of initilize it\n//let this_device = devices.device_name ||{'name': msg.topic, 'status': msg.payload}\nlet this_device = devices.device_name ||{'status': msg.device}\n\n// check the device status and set it in teh objct\nif ( (msg.payload == 'Online') || (msg.payload == 'Offline') ) {\n    this_device.state = 0\n    this_device.status = msg.payload\n} else if ( (msg.payload == 'closed') || (msg.payload == 'locked') ) {\n    this_device.state = 1    \n    this_device.status = msg.payload\n} else {\n    // an unknown status has arrived - send a warning\n    node.warn('unknown status for '+ msg.topic)\n    return null\n}\n\n// update the 'devices' variable\ndevices[device_name] = this_device\n// update the 'devices' global\nglobal.set('devices', devices)\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":460,"y":540,"wires":[["aee1d5b3d1975eb0"]]},{"id":"044eeea04192da73","type":"comment","z":"c717bb01fa7ff12b","name":"'Set Status' This flow sets the global with the status of each device ","info":"","x":460,"y":480,"wires":[]},{"id":"5f0a4eca.22adb","type":"mqtt-broker","name":"mqttpizw.local","broker":"mqttpizw.local","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthRetain":"false","birthPayload":"","birthMsg":{},"closeTopic":"","closeRetain":"false","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willRetain":"false","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]
1 Like

Or you can set the global storage of mqtt subs with a change node and JSONata
e.g.

[{"id":"78ddf6b3.498b","type":"inject","z":"e67b04df66bd0437","name":"online pab plug","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"tele/PAB-plug/LWT","payload":"Online","payloadType":"str","x":150,"y":680,"wires":[["55fabf0.e18fb4"]]},{"id":"55fabf0.e18fb4","type":"change","z":"e67b04df66bd0437","name":"","rules":[{"t":"set","p":"mqtt_subs","pt":"global","to":"($index := $split($$.topic,\"/\")[1];\t$append([$globalContext(\"mqtt_subs\")][index != $index],[{\"index\": $index, \"value\": $$.payload}])\t)","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":450,"y":740,"wires":[["c24b7d393068ad09"]]},{"id":"56fe5a95.282094","type":"inject","z":"e67b04df66bd0437","name":"offline pab plug","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"tele/PAB-plug/LWT","payload":"Offline","payloadType":"str","x":150,"y":720,"wires":[["55fabf0.e18fb4"]]},{"id":"242333bf.9001cc","type":"inject","z":"e67b04df66bd0437","name":"online another plug","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"tele/TEST-plug/LWT","payload":"Online","payloadType":"str","x":170,"y":800,"wires":[["55fabf0.e18fb4"]]},{"id":"814db66c.7525d","type":"inject","z":"e67b04df66bd0437","name":"offline another plug","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"tele/TEST-plug/LWT","payload":"Offline","payloadType":"str","x":170,"y":840,"wires":[["55fabf0.e18fb4"]]},{"id":"c24b7d393068ad09","type":"debug","z":"e67b04df66bd0437","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":650,"y":740,"wires":[]},{"id":"50f0afea.2c24a8","type":"change","z":"e67b04df66bd0437","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"$globalContext(\"mqtt_subs\")[value = \"Online\"].index","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":380,"y":940,"wires":[["c24b7d393068ad09"]]},{"id":"e330a198.2d14e","type":"inject","z":"e67b04df66bd0437","name":"return online","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":170,"y":940,"wires":[["50f0afea.2c24a8"]]}]
1 Like

Yes!
You made my day! Thank you very much for your time

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.