Joining MQTT messages when using a wildcard

Hi,

I am relatively new to Node-Red, and have been plugging away for around 12 months with limited coding knowledge.

I have a system that sends out MQTT messages for each of the sensor readings in separate MQTT messages. These are in a custom form, but the basis of which uses the serial number and a suffix to differentiate between them. I am using a wildcard on the MQTT input to draw them all in using one node.

For example, where the "+" is used in place of the serialNum
building1/event/type/fixture/id/+/name
building1/event/type/fixture/id/+/motion
building1/event/type/fixture/id/+/temperature

Each message has a unique topic, however they both contain the same path in the msg object "SerialNum".

This building has over 900 of these sensors, so the number of MQTT messages is over 5000, and these are being sent very, very regularly, and I have no control over any of this, with the exception of the topic format. I can adjust the part, and <data - motion/name/etc), position in the topic.

Previously, we had used separate flows for each sensor, merging together the data and so on. However, I am looking for a way to scale this up. I have it working on other systems, but I have not tried to do it with two or more messages coming in.

I am looking for a way in which you can join the data using this. I don't care how it is presented afterwards, as I will use a template/function node to format into a reasonable manner for influx out. Ideally naming each data point using the name captured in the /name topic.

I tried using a join node, using msg.payload to create a key/value with msg.payload.fixture[0].serialNum as the key. But this did not function as I expected.

Almost certain I am missing something obvious here, can anyone advise?

Use the topic as the key. Then you will get an entry in msg.payload for each topic

However, I don't think you should join them at all. To recommend the best way, what is in name and in motion? I assume that the temperature is in that one. Are there other values?

Can you publish all the values for one sensor in one MQTT value - so an object containing {name: ..., motion: ..., temperature: ..}? That would make it even easier.

This is all the data from a single sensor. There is brightness, motion, name, power, temp and type.

There are 990 of these sensors, each with exactly the same output (there are also locations which combine all sub-sensors together, but if I can crack this one, they're an easier fit.

I have tried doing a change node to make msg.topic = msg.payload.SerialNum (using correct paths), but this still did not merge based on exact same serial numbers. It keeps joining say CFV00000000001J030150600059 with CFV00000000001J030150600060

Screenshot 2022-10-28 at 4.09.59 pm
Screenshot 2022-10-28 at 4.09.51 pm

FYI node information at the moment.

This is essentially what I am trying to construct, there might be some variation in time formats, using motion as a difference between current time and now for example, but I have that already on other flows.

{ "fixture":[
{
"currentTime":1666952825,
"serialNum":"CFV00000000001J030150600059",
"name":"4-2-D-006",
"sensorStats":{
"illuminance":238.0,
"power":0.55,
"motion":1666704959,
"temperature":21.54
}
}
}
]
}

So using topic all the data is there and available.

To pick the serial number out you could, in a Function node split it on / and then use element 5.

However, as I suggested, that isn't necessary. You haven't said whether, in the sensor, you could combine everything into one object to be published, but if you can't then I suggest not having an Influxdb Measurement for each serial number, but have one measurement for, for example, temperature. Then have an MQTT node subscribing to building1/event/type/fixture/id/+/temperature. Then split out the serial number and the temperature value and write it to the temperature measurement with the value as an Influx Field and the serial number as an Influx Tag. Then when you want to fetch the temperature values for a particular sensor you would use a query with a WHERE serial_no = CFV00000000001J030150600060, for example. Tags are indexed in the database so fetching like that is efficient, and you don't end up with a database with hundreds of measurements, just hundreds of different values of the tag.

You can do that for each of the sensor values that are like temperature - a number that changes regularly.

The reason I asked about name is that if it is effectively a constant then writing it to the database every few minutes (or whatever) may not be a very efficient way of handling it.

OK, I think I am understanding your suggestion. My issue was trying to navigate away from relying on serial number as a TAG and using the name. Hence needing to merge the messages together, or have some form of lookup of serial number to reference the messages containing name to pull a name.

Yes it is fixed, so this may not get stored with every message, but I would like each message to be tagged with the name at very least. That way if a sensor was ever replaced, but the device named the same, I would not have to worry about redoing any code on visuals.

Am I making any sense?

And no, we cannot change any message format, just the topic format.

OK, so the name is actually the one that matters, and the serial number identifies a particular device, which may get changed out if it fails or something. I suggest probably putting both name and serial in as tags in that case. Then you can query for data for a particular name, or for a particular serial number if that is what you need,

In that case, then yes, you need to join the name in. So I suggest feeding all the messages into a join, using the full mqtt topic as the key (as it has everything you need to identify the content). In the join node use 1 as the message count and tick And Every Subsequent Message. Then you will get a message out every time you get one in, containing the latest values of everything. The message out will have the topic of the input message that triggered it, so then you can split the messages down different wires dependent on the last element of the topic, so all temperature values go down one wire, motion down another, etc. Then you can build the influx query incorporating, in the temperature case, the temperature as a field, and name and serial as tags.

If you need to access the data in a Function node, then because the key includes '/' characters you will need to use msg.payload["the/long/topic/name"] as if you try to use dot syntax it will think the slashes are divide operators.

On the time, do you need to use the time from the data, or can you get away with using the current time? If so then don't put the time in explicitly and influx will use the current time.

And herein lies my issue I believe, I can add both name and serial as tags from the message "Name", but not from message "motion".

So by joining by single messages, and then splitting afterwards, am I not in the exact same position as pulling in each MQTT in? I need to somehow "vlookup" the serial number to get the name.

Once you have joined them all using the MQTT topic then you will have the name for that serial number available. So, for example, the name for serial number 1234 will be in something like msg.payload["building1/event/type/fixture/1234/name"]. That is true whether the latest message was a temperature message or a motion message or anything else.

I am really sorry, but I am not sure I understand.

It might be the sheer volume of data being sent, but when I run this configuration

Using a join node, in manual with combine each msg.payload to create a key/value object, using the value of msg.topic as the key. sent after 1 and subsequent.

I get the attached,

Which has added all of the names under the same topic, using different serial numbers.

~~admin~~https://capture.dropbox.com/MLLw1lNKFTjZ0TUR~~edit~~

Admin edit: Please paste screenshots into your reply instead of advertising external services

It would not allow me to input a video - sorry admin?

Posting videos is almost always a bad idea and pointless.

If you look carefully you will see that message.payload is an object containing a lot of other objects. The first one is called "now/smartengine/event/type/fixture/id/CVF000...045/name" which is one of your topics, as are all the other objects in msg.payload. That is what the Join node does. Since that is a name topic, you can see that the name for that id is "Pendants". So if you want the name for that id you can look in msg.payload["now/smartengine/event/type/fixture/id/CVF000...045/name"][0].name
or if you have the id in a javascript variable called id then you can access its name by

let name = msg.payload[`now/smartengine/event/type/fixture/id/${id}/name`][0].name

I have used the template literal string format to inject the variable id into the expression.

There’s a great page in the docs (Working with messages : Node-RED) that will explain how to use the debug panel to find the right path to any data item.

Pay particular attention to the part about the buttons that appear under your mouse pointer when you over hover a debug message property in the sidebar.

BX00Cy7yHi

I have read this topic a number of times, it is unfortunate that it is not easy to combine wildcard topics/messages

Gave it some thought with a different approach: use context to store what has been received per topic, if all 3 exist, build new msg, send it and remove it from the context.

Input should be send per topic (no joining).

Example:

// if topics context does not exist, write it first before assigning to variable
if (context.get("topics") === undefined) { context.set("topics", {}) }

const topics = context.get("topics") 
const base_topic = 'building1/event/type/fixture/id'

const topic_input = msg.topic

// add topic with payload to context, object reference is sometimes handy.
topics[topic_input] =  msg.payload

// get serial number
const topic_serial = topic_input.split("/")[5]

// required topics
const name_topic = `${base_topic}/${topic_serial}/name`
const motion_topic = `${base_topic}/${topic_serial}/motion`
const temperature_topic = `${base_topic}/${topic_serial}/temperature`

// if all 3 elements exist, build new payload, send it and remove them from context
if(topics[name_topic] !== undefined && topics[motion_topic] !== undefined && topics[temperature_topic] !== undefined){
    
    // new payload
    const pl = {
        serial: `${base_topic}/${topic_serial}`,
        name: topics[name_topic],
        temperature: topics[temperature_topic],
        motion: topics[motion_topic],
    }

    node.send({ payload: pl })

    // delete topics from context wait for new cycle
    delete topics[name_topic]
    delete topics[motion_topic]
    delete topics[temperature_topic]
}

return null

example flow:

[{"id":"40d7453917e0f5cc","type":"debug","z":"54a4ccbc8858cca3","name":"debug 134","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":550,"y":220,"wires":[]},{"id":"b5da338512fec5c5","type":"function","z":"54a4ccbc8858cca3","name":"function 33","func":"\nconst names = [\"abc\",\"def\",\"vuw\"]\n\nnames.forEach(name => {\n   node.send({ topic: `building1/event/type/fixture/id/${name}/motion`, payload: true })\n    node.send({ topic: `building1/event/type/fixture/id/${name}/name`, payload: `name ${name}` })\n    node.send({ topic: `building1/event/type/fixture/id/${name}/temperature`, payload: 21.2 })\n})\n\nreturn null ","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":255,"y":220,"wires":[["d2cd963e0357e18c"]],"l":false},{"id":"9a607ccad70c207a","type":"inject","z":"54a4ccbc8858cca3","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":160,"y":220,"wires":[["b5da338512fec5c5"]]},{"id":"d2cd963e0357e18c","type":"function","z":"54a4ccbc8858cca3","name":"function 34","func":"// if context does not exist, write it first before assigning to variable\nif (context.get(\"topics\") === undefined) { context.set(\"topics\", {}) }\n\nconst topics = context.get(\"topics\") \nconst base_topic = 'building1/event/type/fixture/id'\n\nconst topic_input = msg.topic\n\n// add topic with payload to context, object reference is sometimes handy.\ntopics[topic_input] =  msg.payload\n\n// get serial number\nconst topic_serial = topic_input.split(\"/\")[5]\n\n// required topics\nconst name_topic = `${base_topic}/${topic_serial}/name`\nconst motion_topic = `${base_topic}/${topic_serial}/motion`\nconst temperature_topic = `${base_topic}/${topic_serial}/temperature`\n\n// if all 3 elements exist, build new payload, send it and remove them from context\nif(topics[name_topic] !== undefined && topics[motion_topic] !== undefined && topics[temperature_topic] !== undefined){\n    \n    // new payload\n    const pl = {\n        serial: `${base_topic}/${topic_serial}`,\n        name: topics[name_topic],\n        temperature: topics[temperature_topic],\n        motion: topics[motion_topic],\n    }\n\n    node.send({ payload: pl })\n\n    // delete topics from context wait for new cycle\n    delete topics[name_topic]\n    delete topics[motion_topic]\n    delete topics[temperature_topic]\n}\n\nreturn null","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":370,"y":220,"wires":[["40d7453917e0f5cc"]]}]

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.