Joining messages based on matching topic / value

Hi Team,

New user, both to node red and the forums, so please bear with me.

I have multiple devices with multiple sensors in the field and I need to join two messages. Sounds simple enough but my sensors report at different intervals.

Here's my setup. All devices connect to a mqtt broker using the following syntax DeviceName/Sensor. I have multiple mqtt nodes retrieving messages for each sensor using the "+" wildcard (+/sensor name) which helps me store sensor data into a data base.

My goal is to display on a map the location of each device and the values of the sensor data. The problem is that my devices transmit gps location every 15 seconds, but sensor data is transmitted at a higher frequency. If I use the Join node I'll end up with a message that contains location data for one device but sensor data for another.

I tried to use the change node to set up a flow context based on the device name and then a split node to "decide" if the join should happen or not, but it's not working. The resulting message is a merge of different devices (location for one and sensor data for another), a message with two locations (no idea why this happens) or a message with sensor data for two different devices.

The funny thing is that if I inject the data manually it works, but once I start using live data I get the errors described above.

The messages structure is fairly simple [Device name, sensor data] and [Device name, latitude, longitude]. Ideally I should be able to join the messages if the device names match but I haven't been able to find the way to do this.

I'm more than happy to provide extra info you might need, just let me know what and how and I'll reply.

Thanks

Maybe something like this?

Demo

chrome_MzWd9bvNmC

flow (use CTRL+I to import)

[{"id":"f8818d57c93d2b15","type":"inject","z":"d85dd4e7.e244b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"d1/SensorData","payload":"{\"value\":1 }","payloadType":"json","x":1870,"y":160,"wires":[["c34aea8aba15137e"]]},{"id":"8bbbddee7a8d6537","type":"inject","z":"d85dd4e7.e244b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"d1/gps","payload":"{\"lat\":1, \"lon\": 55 }","payloadType":"json","x":1870,"y":200,"wires":[["c34aea8aba15137e"]]},{"id":"34933e142378e9d4","type":"inject","z":"d85dd4e7.e244b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"d2/SensorData","payload":"{\"value\":2 }","payloadType":"json","x":1870,"y":240,"wires":[["c34aea8aba15137e"]]},{"id":"5c41d4ab8109a454","type":"inject","z":"d85dd4e7.e244b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"d2/gps","payload":"{\"lat\":22, \"lon\": 22 }","payloadType":"json","x":1870,"y":280,"wires":[["c34aea8aba15137e"]]},{"id":"de4faf47dfbdfd6e","type":"function","z":"d85dd4e7.e244b","name":"store in global.data","func":"const clone = RED.util.cloneMessage(msg.payload)\nconst parts = msg.topic.split('/')\nconst deviceName = parts[0]\nconst devicePath = \"data.\" + parts[0]\nconst path = \"data.\" + parts.join('.')\n//node.warn({parts,deviceName,devicePath});\nglobal.set(path, clone)\nmsg.topic = deviceName\nconst data = global.get(devicePath)\nif (data && data.gps && data.SensorData) {\n    msg.payload = data\n    return msg\n}","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":2290,"y":220,"wires":[["046b9cccacad7df7"]]},{"id":"046b9cccacad7df7","type":"debug","z":"d85dd4e7.e244b","name":"debug 15","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":2300,"y":280,"wires":[]},{"id":"c34aea8aba15137e","type":"junction","z":"d85dd4e7.e244b","x":2160,"y":220,"wires":[["de4faf47dfbdfd6e"]]}]

If that is no good, then please share a demo flow with injects etc to simulate your data.

Just a comment on your flow:

const clone = RED.util.cloneMessage(msg.payload)
const parts = msg.topic.split('/')
const deviceName = parts[0]
const devicePath = "data." + parts[0]
const path = "data." + parts.join('.')
//node.warn({parts,deviceName,devicePath});
global.set(path, clone)
msg.topic = deviceName
const data = global.get(devicePath)
if (data && data.gps && data.SensorData) {
    msg.payload = data
    return msg
}

that's what you're doing in the function node. As I understand you are simply collecting data points (gps & sensor data) until there is a pair for a device - i.e. sensorData was sent and gps data was sent for the device. Once a pair is available, a msg is sent with the datapoints combined in a single payload.

What happens if sensorData is sent once but multiple gps datapoints are sent (or vice versa)? Won't stale data be passed along since once the sensorData has been set in the global hash, it's never removed. So each new gps datapoint would trigger a further sending out of data (gps + sensorData).

Shouldn't datapoints be deleted in the global context once they have been sent out?

nothing

yes

potentially.

There wasnt really a detailed specification - I gave the OP something to build on.

1 Like

If you join using key/value mode using topic as the key, and say send after just 1 message part, and select And Every subsequent message, then you will get a message after every input message from MQTT, with the topic still set to tell you which message has just been received. Every message passed on will have the latest values from all sensors. So then you can match the new message up with its corresponding location or sensor value and pass on an appropriate message. You just need to make sure that on startup it ignores, for example, a sensor value, when it has not yet got the location for that device.

Sure and a nice thing you did.

However it would be nice to point these things out else the OP might well come back and ask why there is duplication in their data.

Also I wanted to understand what you were doing because this type of question pops up often here in the forum and perhaps there could be a best-practice solution for this time of question.

I suppose you only actually need to save the data set which arrives less frequently, ie GPS, then feed it into each sensor message as it arrives.

Might be a good idea to also store a timestamp so the flow can detect and handle data with stale location properties.

Hi Team,

Ideally the latest sensor data would be paired with the latest gps, to avoid repeats or stale data. However we're trying to build something to monitor devices and give a heads up to people in the office that something might happen and to keep an eye open for the behavior of device X, so stale / repeat data is not a big issue.

I did try @Steve-Mcl solution with live feed data and did not get the same output as the example. The function's output is the first message that arrives, it's not joining the messages. Maybe it's because we receive a string as the payload for the messages instead of a Json object?.

Then why not respond with the data requested?

Did my joining method work?

Hi Team,

I did try @Colin's method and I get an object with a bunch of GPS data and maybe one sensor data. I could be the way the data is being sent from the broker.

The solution from @Steve-Mcl does provide the result I need, I just haven't been able to get it working with live data from my feeds.

The data should contain and object or array (or any output really, I can get the data I need after) with the device name , sensor data , gps data.

Here's a sample of the data I receive from my feeds with @Steve-Mcl function:

[{"id":"1d8a976bf946155b","type":"tab","label":"Flow 3","disabled":false,"info":"","env":},{"id":"cd70febc3c30d96d","type":"junction","z":"1d8a976bf946155b","x":500,"y":160,"wires":[["f4b90ad461d6978a"]]},{"id":"ad222abfb8bce8de","type":"inject","z":"1d8a976bf946155b","name":"Device SensorData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device1/sensordata","payload":"80","payloadType":"str","x":330,"y":100,"wires":[["cd70febc3c30d96d"]]},{"id":"0d46f2f24ced83a3","type":"inject","z":"1d8a976bf946155b","name":"Device GPSData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device1/GPS","payload":"lat,N,long,W","payloadType":"str","x":320,"y":180,"wires":[["cd70febc3c30d96d"]]},{"id":"3bb5cbe6628018ad","type":"inject","z":"1d8a976bf946155b","name":"Device SensorData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device2/sensordata","payload":"85","payloadType":"str","x":330,"y":140,"wires":[["cd70febc3c30d96d"]]},{"id":"be8f52b4937ecedc","type":"inject","z":"1d8a976bf946155b","name":"Device GPSData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device2/GPS","payload":"lat,N,long,W","payloadType":"str","x":320,"y":220,"wires":[["cd70febc3c30d96d"]]},{"id":"f4b90ad461d6978a","type":"function","z":"1d8a976bf946155b","name":"store in global.data","func":"const clone = RED.util.cloneMessage(msg.payload)\nconst parts = msg.topic.split('/')\nconst deviceName = parts[0]\nconst devicePath = "data." + parts[0]\nconst path = "data." + parts.join('.')\n//node.warn({parts,deviceName,devicePath});\nglobal.set(path, clone)\nmsg.topic = deviceName\nconst data = global.get(devicePath)\nif (data && data.gps && data.SensorData) {\n msg.payload = data\n return msg\n}","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":,"x":650,"y":160,"wires":[["f3710a32edb9f9e8"]]},{"id":"f3710a32edb9f9e8","type":"debug","z":"1d8a976bf946155b","name":"debug 19","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":900,"y":160,"wires":}]

Thanks for the help.

Here is a low code version of Steve's function that deals with your incoming data strings.

[{"id":"92d51f946870c4f4","type":"inject","z":"d1395164b4eec73e","name":"Device SensorData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device1/sensordata","payload":"80","payloadType":"str","x":170,"y":6640,"wires":[["4ea745b6215aa849"]]},{"id":"4ea745b6215aa849","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"topic","pt":"msg","to":"$split($$.topic, \"/\")","tot":"jsonata","dc":true},{"t":"move","p":"topic[0]","pt":"msg","to":"device","tot":"msg"},{"t":"move","p":"topic[0]","pt":"msg","to":"topic","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":400,"y":6760,"wires":[["18816f75ffe05762"]]},{"id":"2ff2bd9110b624e2","type":"inject","z":"d1395164b4eec73e","name":"Device SensorData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device2/sensordata","payload":"85","payloadType":"str","x":170,"y":6680,"wires":[["4ea745b6215aa849"]]},{"id":"274cc3e95be6654b","type":"inject","z":"d1395164b4eec73e","name":"Device GPSData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device1/GPS","payload":"lat,N,long,W","payloadType":"str","x":180,"y":6720,"wires":[["4ea745b6215aa849"]]},{"id":"5fecee8a40b7953b","type":"inject","z":"d1395164b4eec73e","name":"Device GPSData","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"device2/GPS","payload":"lat,N,long,W","payloadType":"str","x":180,"y":6760,"wires":[["4ea745b6215aa849"]]},{"id":"18816f75ffe05762","type":"switch","z":"d1395164b4eec73e","name":"","property":"topic","propertyType":"msg","rules":[{"t":"eq","v":"sensordata","vt":"str"},{"t":"eq","v":"GPS","vt":"str"}],"checkall":"true","repair":false,"outputs":2,"x":550,"y":6760,"wires":[["8b7ffdf72914dc50"],["a903c0fed6559cb1"]]},{"id":"8b7ffdf72914dc50","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"data[msg.device][msg.topic]","pt":"flow","to":"payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":770,"y":6740,"wires":[["3641068119d446b6"]]},{"id":"a903c0fed6559cb1","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"data[msg.device][msg.topic]","pt":"flow","to":"(\t    $values := $split($$.payload, \",\").$trim($);\t    {\t        \"lat\": $values[0] & \" \" & $values[1],\t        \"lon\": $values[2] & \" \" & $values[3]\t    }\t)","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":770,"y":6780,"wires":[["3641068119d446b6"]]},{"id":"3641068119d446b6","type":"switch","z":"d1395164b4eec73e","name":"","property":"$count($flowContext(\"data.\"  & $$.device).*)","propertyType":"jsonata","rules":[{"t":"eq","v":"2","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":990,"y":6760,"wires":[["a9ede7b319b7b78e"]]},{"id":"a9ede7b319b7b78e","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"move","p":"data[msg.device]","pt":"flow","to":"payload","tot":"msg"},{"t":"move","p":"device","pt":"msg","to":"payload.device","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":1140,"y":6760,"wires":[["046b9cccacad7df7"]]},{"id":"046b9cccacad7df7","type":"debug","z":"d1395164b4eec73e","name":"debug 15","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":880,"y":6840,"wires":[]}]

Outputs when ever two (GPS, sensordata) values have been received for a device.

Output

{
  "sensordata":"80",
  "GPS":{
    "lat":"lat N",
    "lon":"long W"
  },
  "device":"device1"
}
1 Like

Do the messages containing sensor data all have unique topics?
What key does the sensor that you do see have?

Hi Team,

Sorry for the late reply. The solution provided by @E1cid is exactly what I was looking for.

I spent these last couple of days testing and everything seems to be working.

Thanks again for all your help.

1 Like