Combine multiple Payloads

Hey Guys,

I have about 30 Shelly 3EM sending simultaneus MQTT Messages to my MQTT Broker. I want to combine their measurements into a single payload to hand into InfluxDB but so far I am failing :frowning:
The Devices more or less send their Information whenever they want. It seems like they send their Energy in Watt/Minutes every minute and the Power/Voltage/Current/PF every second.
Now I have read alot about the join node but I cannot get it to work when I have multiple shellys sending this data can I ?

Basically I want something like:

msg.payload = [{
    Watt/Min: 12,
    Current: 12,
    Voltage: 12,
    Power: 12,
    PF: 12,
},
{
    tag1:"sensor1",
}];
return msg;

Do you know if that is possible or what would be the best way to do this?

This is my current experiment but its either sending every message or combining multiple devices into one join :frowning:

[
    {
        "id": "53f0c48e369e1106",
        "type": "mqtt in",
        "z": "5f5f00d7.65a72",
        "name": "MQTT IN",
        "topic": "shellies/+/emeter/+/+",
        "qos": "2",
        "datatype": "auto",
        "broker": "18443a8c.6ef695",
        "nl": false,
        "rap": false,
        "inputs": 0,
        "x": 320,
        "y": 520,
        "wires": [
            [
                "32c61e22a5dbcd7b"
            ]
        ]
    },
    {
        "id": "32c61e22a5dbcd7b",
        "type": "function",
        "z": "5f5f00d7.65a72",
        "name": "function 2",
        "func": "var Payload = msg.topic\nvar Machine = []\nMachine = Payload.split(\"/\")\nmsg.machine = Machine\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 460,
        "y": 520,
        "wires": [
            [
                "2bdb43e7ff879b5e"
            ]
        ]
    },
    {
        "id": "2bdb43e7ff879b5e",
        "type": "change",
        "z": "5f5f00d7.65a72",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "tag",
                "pt": "msg",
                "to": "machine.4",
                "tot": "msg"
            },
            {
                "t": "set",
                "p": "Maschine",
                "pt": "msg",
                "to": "machine.1",
                "tot": "msg"
            },
            {
                "t": "set",
                "p": "Phase",
                "pt": "msg",
                "to": "machine.3",
                "tot": "msg"
            },
            {
                "t": "change",
                "p": "topic",
                "pt": "msg",
                "from": "shellies/",
                "fromt": "str",
                "to": "",
                "tot": "str"
            },
            {
                "t": "change",
                "p": "topic",
                "pt": "msg",
                "from": "/emeter",
                "fromt": "str",
                "to": "",
                "tot": "str"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 630,
        "y": 520,
        "wires": [
            [
                "cdbf5a034242ba92"
            ]
        ]
    },
    {
        "id": "cdbf5a034242ba92",
        "type": "switch",
        "z": "5f5f00d7.65a72",
        "name": "",
        "property": "topic",
        "propertyType": "msg",
        "rules": [
            {
                "t": "cont",
                "v": "/energy",
                "vt": "str"
            },
            {
                "t": "cont",
                "v": "/power",
                "vt": "str"
            },
            {
                "t": "cont",
                "v": "/voltage",
                "vt": "str"
            },
            {
                "t": "cont",
                "v": "/current",
                "vt": "str"
            },
            {
                "t": "cont",
                "v": "/pf",
                "vt": "str"
            }
        ],
        "checkall": "true",
        "repair": false,
        "outputs": 5,
        "x": 790,
        "y": 520,
        "wires": [
            [
                "1d384af782366bed"
            ],
            [
                "1d384af782366bed"
            ],
            [
                "1d384af782366bed"
            ],
            [
                "1d384af782366bed"
            ],
            [
                "1d384af782366bed"
            ]
        ]
    },
    {
        "id": "1d384af782366bed",
        "type": "join",
        "z": "5f5f00d7.65a72",
        "name": "",
        "mode": "custom",
        "build": "object",
        "property": "Maschine",
        "propertyType": "msg",
        "key": "Maschine",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": false,
        "timeout": "",
        "count": "5",
        "reduceRight": false,
        "reduceExp": "",
        "reduceInit": "",
        "reduceInitType": "num",
        "reduceFixup": "",
        "x": 930,
        "y": 520,
        "wires": [
            [
                "36f8452feb77c470"
            ]
        ]
    },
    {
        "id": "36f8452feb77c470",
        "type": "debug",
        "z": "5f5f00d7.65a72",
        "name": "debug 12",
        "active": false,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "payload",
        "targetType": "msg",
        "statusVal": "",
        "statusType": "auto",
        "x": 1080,
        "y": 520,
        "wires": []
    },
    {
        "id": "18443a8c.6ef695",
        "type": "mqtt-broker",
        "name": "Mosquitto",
        "broker": "10.1.1.200",
        "port": "1883",
        "clientid": "",
        "autoConnect": true,
        "usetls": false,
        "compatmode": false,
        "protocolVersion": 4,
        "keepalive": "60",
        "cleansession": true,
        "birthTopic": "",
        "birthQos": "0",
        "birthPayload": "",
        "closeTopic": "",
        "closeQos": "0",
        "closePayload": "",
        "willTopic": "",
        "willQos": "0",
        "willPayload": ""
    }
]

Why? Just send each one to InfluxDB with an appropriate tag that identifies the device (actually, I prefer to tag by location but obviously it depends on your use-case).

Hi @luuki and welcome to the forum.

The code that you posted is in a format that makes it impossible to import to my Node-red.
To post code:

  • Use the </> button above the post editor (or it may be hidden behind the gearwheel button)

  • Paste your code over the top of the highlighted words "type or paste code here"

About joining messages: msg.topic is critical, unfortunately you have not given us any information about msg.topic.
Basically though, since all MQTT messages already have a topic you need to ensure that each of your 30 Shellies uses a different topic.

I updated to code so now it should be visible sorry!

With the Topic I have used to enter the Topic the Old Way since ever.
The only "issue" I Have is that this amounts to tons of topics to select a measurement from for a graph (30x Devices with 6 Topics each = 180 different Topics to search through).

Slimming this down a bit might help.

Alternatively I might just send them Individually and pass the Machine and Measured Category as tags but keep the long measurement topic.

Im just trying to look for best practice, there must be a solution somewhere between splitting it up and combining it like I want to and the 180 Topics.

The problem will be the timing. The join node can join 30 messages together, but you won't be able to guarantee it's from 30 different devices. (Edit: actually you could use msg.parts to make sure you get a message from each, but I'd still do the following)

You might be better off storing all of the values in a flow context variable, and then have a separate flow that runs every, say 5 minutes, and sends the current snapshot of what's in the variable. So for every incoming MQTT message you load the flow context variable, update with this value, and then save it back again.

I have played around with the msg.parts (basically gave every category(power, voltage, etc.) an ID but that didnt really change things as it still wouldnt combine by Device and then by each ID (whatever was the last payload for the ID).

Storing it in flow might be best way, I havent worked with that yet.

Instead of creating every device by hand:
Is there a way of saying that when there is a message in topic DeviceX/Current that the payload will automatically get stored in flow.DeviceX/Current ?

Reading this thread I think I would go the route of store all data to influx properly tagged.

If you want the collect all and store at once, you would loose the details that are sent with higher frequency.

To do the combining efficiently you could use the m-queue node, set it to a queue depth of 1 and set it to keep only the newest message. In that mode it collects only the last msg per topic. Then on predefined intervals or whenever a certain topic arrived (or whatever trigger you prefer) you would send a trigger command to the m-queue and it would release all msg at once. Behind it you put a join node to join all messages. Voila

If the topic is, say "my/topic/devicex/current" then you could use a function node and split the topic by "/" and pull out the third array item (index 2). Then use that to set the context storage. Something along the lines of (untested):

const topics = msg.topic.split('/');
const device = topics[2];
var context = flow.get('mydata');
context[device] = msg.payload;
flow.set('mydata', context);

If you're not keen on a function node, you could alternatively do it with a change node set to JSONata and use $merge to merge the payload into the stored context:

(
    $topics := $split(topic, '/');
    $device := $topics[2];
    $newData := { $device: payload };
    $merge([$flowContext('mydata'), $newData])
)

If you do go the context route you might want to also use file-based context so it doesn't get cleared on a NR restart.

Following up on my statement, here is a quick solution with using the m-queue.
It is fairly simple to collect things per topic with this node and release all at once.

[{"id":"d1a59530d0718bc5","type":"m-queue","z":"2a13b6d85cb5aa4b","name":"","queueSelect":"topic","controlFlag":"control","defaultQueue":"default","allQueues":"all","triggerCmd":"trigger","statusCmd":"status","pauseCmd":"pause","resumeCmd":"resume","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","maximumCmd":"maximum","newestCmd":"newest","protectCmd":"protect","deleteCmd":"delete","paused":false,"protect":false,"keepNewestDefault":true,"maxSizeDefault":"1","protectDefault":false,"persist":false,"newValue":"value","storeName":"memoryOnly","statusOutput":true,"outputs":2,"x":900,"y":180,"wires":[["28bc5f4bda4e2ad1"],["07baeb2b23577f7b"]]},{"id":"e2e828720f11f81b","type":"debug","z":"2a13b6d85cb5aa4b","name":"","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"","statusType":"counter","x":910,"y":120,"wires":[]},{"id":"48ac3d9585df3215","type":"inject","z":"2a13b6d85cb5aa4b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"status","payloadType":"str","x":720,"y":340,"wires":[["d1a59530d0718bc5"]]},{"id":"875a138f91887836","type":"inject","z":"2a13b6d85cb5aa4b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"reset","payloadType":"str","x":730,"y":380,"wires":[["d1a59530d0718bc5"]]},{"id":"d94755acd19a363f","type":"function","z":"2a13b6d85cb5aa4b","name":"random gen","func":"//function from https://stackoverflow.com/questions/1349404/generate-random-string-characters-in-javascript\nfunction makeid(length) {\n    let result = '';\n    //const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';\n    const characters = 'AQX';\n    const charactersLength = characters.length;\n    let counter = 0;\n    while (counter < length) {\n        result += characters.charAt(Math.floor(Math.random() * charactersLength));\n        counter += 1;\n    }\n    return result;\n}\n\n\nfor (let i = 0; i<50; i++) {\n    //let r = Math.floor(Math.random() * 20);\n    //let r = (Math.random() + 1).toString(10).substring(2,3);\n    let r = makeid(2);\n    node.send({\"topic\": r, \"payload\": i})\n}\n//return msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":690,"y":180,"wires":[["e2e828720f11f81b","d1a59530d0718bc5"]]},{"id":"779231c4ba564fcb","type":"inject","z":"2a13b6d85cb5aa4b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":520,"y":180,"wires":[["d94755acd19a363f"]]},{"id":"28bc5f4bda4e2ad1","type":"join","z":"2a13b6d85cb5aa4b","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"1","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1090,"y":180,"wires":[["d2fd8767c97a144a"]]},{"id":"7e8aed187f6bf5f7","type":"inject","z":"2a13b6d85cb5aa4b","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"control","v":"true","vt":"bool"}],"repeat":"5","crontab":"","once":false,"onceDelay":0.1,"topic":"all","payload":"trigger","payloadType":"str","x":660,"y":240,"wires":[["d1a59530d0718bc5"]]},{"id":"d2fd8767c97a144a","type":"debug","z":"2a13b6d85cb5aa4b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1270,"y":180,"wires":[]},{"id":"07baeb2b23577f7b","type":"debug","z":"2a13b6d85cb5aa4b","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1110,"y":340,"wires":[]},{"id":"ab5c9ba990b07cd6","type":"comment","z":"2a13b6d85cb5aa4b","name":"https://discourse.nodered.org/t/combine-multiple-payloads/78489/6","info":"https://discourse.nodered.org/t/combine-multiple-payloads/78489/6","x":920,"y":80,"wires":[]}]

output_filter_02

@luuki I will another +1 to the suggestion of saving the data as it is received to Influx, just limiting the rate on any data that comes in quicker than you want to save it. There is no significant benefit in going to the effort of merging them together, or at least if you think there is then please tell us what that is.

Don't include a / in the field name, you will just make additional work for yourself. Also Watts per minute is a meaningless concept. Do you mean Watt Minutes?

If you want to merge incoming payloads under each device this may help.

[{"id":"b9e01ca971d04f1d","type":"inject","z":"b9860b4b9de8c8da","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"my/topic/device1/current","payload":"10","payloadType":"num","x":180,"y":2560,"wires":[["9c664c4ff6bec283"]]},{"id":"9c664c4ff6bec283","type":"change","z":"b9860b4b9de8c8da","name":"","rules":[{"t":"set","p":"topic_parts","pt":"msg","to":"$split($$.topic, \"/\")","tot":"jsonata"},{"t":"set","p":"incoming[msg.topic_parts[2]][msg.topic_parts[3]]","pt":"flow","to":"payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":420,"y":2660,"wires":[["708c84b303250521"]]},{"id":"0d5842c013e19f66","type":"inject","z":"b9860b4b9de8c8da","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"my/topic/device1/voltage","payload":"240","payloadType":"num","x":180,"y":2600,"wires":[["9c664c4ff6bec283"]]},{"id":"9157e17a3aae0df8","type":"inject","z":"b9860b4b9de8c8da","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"my/topic/device1/power","payload":"2400","payloadType":"num","x":180,"y":2640,"wires":[["9c664c4ff6bec283"]]},{"id":"c524aa3356db055f","type":"inject","z":"b9860b4b9de8c8da","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"my/topic/device2/current","payload":"5","payloadType":"num","x":170,"y":2720,"wires":[["9c664c4ff6bec283"]]},{"id":"188c8ba8d6335447","type":"inject","z":"b9860b4b9de8c8da","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"my/topic/device2/voltage","payload":"230","payloadType":"num","x":180,"y":2760,"wires":[["9c664c4ff6bec283"]]},{"id":"a47a52b5e54ef194","type":"inject","z":"b9860b4b9de8c8da","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"my/topic/device2/power","payload":"1200","payloadType":"num","x":180,"y":2800,"wires":[["9c664c4ff6bec283"]]},{"id":"708c84b303250521","type":"switch","z":"b9860b4b9de8c8da","name":"","property":"$count($flowContext(\"incoming.\" & msg.topic_parts[2]).*)","propertyType":"jsonata","rules":[{"t":"eq","v":"3","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":570,"y":2660,"wires":[["ee0a882d69024f93"]]},{"id":"ee0a882d69024f93","type":"change","z":"b9860b4b9de8c8da","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"[]","tot":"json"},{"t":"move","p":"incoming[msg.topic_parts[2]]","pt":"flow","to":"payload[0]","tot":"msg"},{"t":"set","p":"payload[1].tag1","pt":"msg","to":"topic_parts[2]","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":720,"y":2660,"wires":[["50301e2627d49a2f"]]},{"id":"50301e2627d49a2f","type":"debug","z":"b9860b4b9de8c8da","name":"debug 295","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":630,"y":2720,"wires":[]}]

Or you could just write each incoming payload to db direct, then combine them with a db query.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.