Help Optimize MQTT to influx functions


I could use another set of eyes on my nodes. It works, but I feel like this can't be the most efficient way. There are many places in the function where things are hard-coded that I am typing over and over. I would like to be able to just name the mqtt input node, and have that name carry on all the way to the values at the end, so that I can drop the function in more generically, and connect how ever many nodes to it as I want.

I'm receiving several strings via MQTT that look like this:

"{"value": 1}"

each mqtt address provides a single string, and all the fields are named "value"

I want to group them up into an array that I can pass to the influxdb output plugin. The influx db plugin specifies:

If msg.payload is an object containing multiple properties, the fields will be written to the measurement.

So, I first run it through a json not to end up with:


Then I run it through a change node:


Here I'm setting the payload to just the value "1", and setting a msg.topic so that I can tell the measurements apart. In this example, the topic is "mains".

The output of the change module of course is just:


Now for my main function. I borrowed bits and pieces of this code from different places, so I have very little confidence in it, but it does work: = || {};

switch (msg.topic) {

case "mains": = msg.payload;
    msg = null;
case "bulk": = msg.payload;
    msg = null;
case "absorption": = msg.payload;
    msg = null;
case "float": = msg.payload;
    msg = null;
case "inverter": = msg.payload;
    msg = null;
case "overload": = msg.payload;
    msg = null;
case "lowbattery": = msg.payload;
    msg = null;
case "temperature": = msg.payload;
    msg = null;

    msg = null;


if( != null && != null && != null && != null && != null && != null && != null && != null ) {
	msg2 = {};
msg2.payload = {
	return msg2;

My issue with this code is I am repeating parts over and over, it's tedious and can't be the most effective way to do things (but I don't know any better!) I connect the output of each change node to the function. The output is:



The whole spread looks like this:

And this is how it shows up at the other end, in grafana:

image image

Now, I wish I could make it say on/off instead of 1 or 0, but that's another matter, as it seems like influxdb doesn't like strings. :slight_smile:



Do you have any control over the MQTT messages? If you do, maybe make the topic be "LED" and then switch from a single MQTT node on the message payload "contains" absorption, etc. or could then switch on the msg.payload or other fields in your function instead of making msg.topic match the source.



I do not, and they aren't going to change it just for me, so this is what I'm stuck with!

Even if I could just type in the name once "main" and have it be used in all the various places, that would be good enough!



You could instead replace the Change nodes with ones configured similar to this and feed them all directly into the Influxdb node




(side note) - Move does the delete for you - so the delete is unnecessary



Thanks, thinking about it that is exactly what it should do of course. If it did not delete it then it would be Copy, not Move.



or set as we called it (as you can set a new value or set it to be a calculated value or copy an old one)



Is the idea of the function node to gather the values from each of the MQTT nodes and write out one record with all the data at that moment in time?

If so, as I understand what you have provided, your flow won't work. Each time a msg comes in via a MQTT node, it will pass to the function node and be processed and sent on.

Let's walk thru it.

  1. a messge comes from Mains LED
  2. you change the msg so you mnow have msg.topic = 'mains' and msg.payload = 1
  3. the function node runs Since it is the frist time you set to {}
  4. the switch causes the case 'mains' to be executed so you set to 1
  5. next you move the data to msg2,
  6. set to null
  7. and send the msg out - note it only contains the value from the Mains LED

If you get a message from any of the other mqtt nodes, the same thing happens only that value will be sent.

If you want an aggrate of all the mqtt nodes, you will need to add a join to your flow.



Here is a way to solve the mqtt -> database problem with a generic solution...

[{"id":"c4a25e8f.ac117","type":"mqtt in","z":"b473c044.f9da7","name":"","topic":"sensor/status/#","qos":"0","broker":"e36a332b.c61f3","x":112,"y":941,"wires":[["471c3086.0d641"]]},{"id":"b331f58c.b7c228","type":"inject","z":"b473c044.f9da7","name":"cfg","topic":"","payload":"[{\"topic\":\"sensor/status/heating/kg_tr/sol/vol\",\"min\":0,\"max\":700,\"Name\":\"Volumen\"},{\"topic\":\"sensor/status/heating/kg_tr/sol/wh\",\"min\":6581248,\"max\":99981248,\"Name\":\"Wärme\"},{\"topic\":\"sensor/status/heating/kg_tr/sol/t1\",\"min\":-30,\"max\":250,\"Name\":\"Kollektor\"},{\"topic\":\"sensor/status/heating/kg_tr/sol/t3\",\"min\":-30,\"max\":250,\"Name\":\"Vorlauf\"},{\"topic\":\"sensor/status/power/power/eme/balance\",\"nodatabase\":false,\"Name\":\"Energiebilanz akt.\"},{\"topic\":\"sensor/status/freshwater/kg_tr/water/cnt\",\"min\":1,\"max\":999999,\"nodatabase\":false,\"Name\":\"Wasserzähler\"}]","payloadType":"json","repeat":"","crontab":"","once":true,"onceDelay":0.1,"x":85,"y":990,"wires":[["3124dc85.b3edc4"]]},{"id":"3124dc85.b3edc4","type":"split","z":"b473c044.f9da7","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":168,"y":1040,"wires":[["3f2d1981.4e09c6"]]},{"id":"3f2d1981.4e09c6","type":"change","z":"b473c044.f9da7","name":"prep","rules":[{"t":"set","p":"topic","pt":"msg","to":"payload.topic","tot":"msg"},{"t":"set","p":"priority","pt":"msg","to":"0","tot":"num"},{"t":"move","p":"payload","pt":"msg","to":"cfg","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":273,"y":1088,"wires":[["ddd2e928.b09658"]]},{"id":"471c3086.0d641","type":"json","z":"b473c044.f9da7","name":"","x":265,"y":996,"wires":[["ddd2e928.b09658"]]},{"id":"ddd2e928.b09658","type":"function","z":"b473c044.f9da7","name":"AJV12","func":"var countmsg = 2;          //how many items to join?\n\nvar nodublicates = false;   //only join if payload unequal existing payload\nvar limit;                 //only join if payloads are on both sides of the limit\nvar hysteresis;      //hysteresis \nvar clearaftersend = false;//clear queue after sending\nvar sendinstantly = false;\nvar reverse = false;         //reverse output array\nvar priomode = false;\nvar storagemode = false;  \nvar addconfigmode = true;\nvar topicmatch = false;\nvar samplemode = false;\nvar sampleinterval = 60000;  //ms\n//------------ different topic mode ------------------\nvar diftopics = false;      //true: join only different topics; false: join only same topics\nvar alphasort = false;\n\nvar inmsg = msg;\nvar msgs = context.get(\"aggmsgs\")||{};\nvar topicmsgs = msgs[inmsg.topic]||[];\n\nif (inmsg.topic === \"\")\n    return null;\nif (typeof inmsg.reset != \"undefined\") {\n    msgs = {};\n    topicmsgs = [];\n    lastval = undefined;\n    context.set(\"aggmsgs\",msgs);\n    context.set(\"lastval\",lastval);\n    return null;\n}\nif (typeof inmsg.resettopic != \"undefined\") {\n    topicmsgs = [];\n    lastval = undefined;\n}\nif (typeof hysteresis !== 'undefined' && typeof lastval !== 'undefined') {\n    if (Math.abs(inmsg.payload - lastval) <= hysteresis) {\n        return null;\n    }\n}\nif (typeof inmsg.payload != \"undefined\") {\n    if (typeof inmsg.payload.arraystatus != \"undefined\") {\n        inmsg.payload = msgs;\n        return inmsg;\n    }\n}\nif (typeof lastval !== 'undefined') {\n    if (nodublicates && lastval == inmsg.payload)\n        return null;\n    if (typeof limit !== 'undefined' && countmsg == 2) {\n        if (!((lastval <= limit && inmsg.payload >= limit) || (lastval >= limit && inmsg.payload <= limit)))   \n            return null;\n    }\n} \n\nif (samplemode) {\n    var tmr = msgs[inmsg.topic + \"tmr\"];\n    if (typeof tmr === \"undefined\")\n        tmr = 1;\n//node.warn(msgs);\n\n    if (topicmsgs.length === 0) {\n        topicmsgs.push(inmsg);\n        setTimeout(timer,sampleinterval,inmsg.topic,false);\n    }\n    else if (topicmsgs.length === 1) { \n        topicmsgs.push(inmsg);\n//        if (tmr === 0) \n//            setTimeout(timer,sampleinterval,inmsg.topic,false); //normal timer\n    }\n    else {\n        topicmsgs[1] = inmsg;\n//        if (tmr === 0) \n//            setTimeout(timer,sampleinterval,inmsg.topic,false); //normal timer   \n    }\n\n    msgs[inmsg.topic] = topicmsgs;\n    context.set(\"aggmsgs\",msgs);\n    context.set(\"lastval\",inmsg.payload);\n    if (tmr === 0) {\n        setTimeout(timer,5,inmsg.topic,true);   //timer expired -> send asap\n    }\n\n    return null;\n}\n\nif (topicmatch) {\n    if (typeof inmsg.payload.jointopic != \"undefined\") {\n        if (topicmsgs.length === 0) {\n            topicmsgs.push(inmsg);\n            msgs[inmsg.topic] = topicmsgs;\n            context.set(\"aggmsgs\",msgs);\n            return null;   \n        }\n        else {\n            topicmsgs[0] = inmsg;\n        }\n    }\n    else {  //only accept topics specified by jointopics\n        for (var tmsgs in msgs) {\n            if (msgs.hasOwnProperty(tmsgs)) {   \n                var bfound = false;\n                var btopicfound = false;\n                var ar = msgs[tmsgs];\n                for (var i = 0; i < ar.length; i++) {    //search and replace\n                    if (i === 0) {\n                        if (ar[i].payload.jointopic == inmsg.topic)\n                            btopicfound = true;\n                    } else {\n                        if (ar[i].topic == inmsg.topic) {\n                            ar[i] = inmsg;\n                            bfound = true;\n                        }\n                    }\n                }\n                if (!bfound && btopicfound)        //add new\n                    msgs[tmsgs].push(inmsg);\n            }\n        }\n    }\n}\n\nvar outmsg = { \"topic\": inmsg.topic , \"payload\": \"\"};\nvar lastval = context.get(\"lastval\");\nvar inprio = typeof inmsg.priority != \"undefined\" ? inmsg.priority: countmsg-1 ;     //default prio is 1\n//node.warn(topicmsgs.length);\nif (addconfigmode) {\n    if (topicmsgs.length === 0) {\n        if (typeof inmsg.priority == \"undefined\")\n            return inmsg;\n        else {\n            topicmsgs.push(inmsg);\n            msgs[inmsg.topic] = topicmsgs;\n            context.set(\"aggmsgs\",msgs);\n            context.set(\"lastval\",inmsg.payload);\n            return null;\n            }\n        }\n    else {\n        if (typeof inmsg.priority == \"undefined\") {\n            for (var i = 0; i < topicmsgs.length; i++) //append attributes from prio 0..1 where higher overwrites\n                addAtr(topicmsgs[i],inmsg);\n            return inmsg;\n        }    \n        else {\n            for (var i = 0; i <= inmsg.priority; i++) {\n                if (typeof topicmsgs[i] == \"undefined\")\n                    topicmsgs.push({});\n            }\n            topicmsgs[inmsg.priority] = inmsg;\n            msgs[inmsg.topic] = topicmsgs;\n            context.set(\"aggmsgs\",msgs);\n            context.set(\"lastval\",inmsg.payload);\n            return null;\n        }\n    }\n}\n\nif (diftopics) {\n//node.warn(topicmsgs.length);\n    if (topicmsgs.length > 0) {     //msg with same topic -> replace old msg\n        topicmsgs.shift();\n    }\n    topicmsgs.push(inmsg);\n    msgs[inmsg.topic] = topicmsgs;    \n    if (Object.keys(msgs).length > countmsg) {\n        for (var tmsgs in msgs) {\n            if (msgs.hasOwnProperty(tmsgs)) {   \n                delete msgs[tmsgs];\n                break;\n            }\n        }\n    }\n    context.set(\"aggmsgs\",msgs);\n    context.set(\"lastval\",inmsg.payload);\n//node.warn(Object.keys(msgs).length);\n    if (Object.keys(msgs).length == countmsg) {\n        var aout = [];\n        outmsg.topic = \"\";\n        for (var tmsgs in msgs) {\n            if (msgs.hasOwnProperty(tmsgs)) \n                aout.push(msgs[tmsgs][0]);\n        }\n        if (clearaftersend) {\n            context.set(\"aggmsgs\",undefined);\n            context.set(\"lastval\",undefined);\n        }\n        if (alphasort) \n            aout.sort(function(a,b) {return (a.topic > b.topic) ? 1 : ((a.topic > b.topic) ? -1 : 0);});\n        if (reverse)\n            aout.reverse();\n        \n        for (var i =0; i<aout.length;i++) {outmsg.topic += aout[i].topic + \"_\"; }\n        outmsg.topic = outmsg.topic.substr(0,String(outmsg.topic).length-1);\n        outmsg.payload = aout;\n\n        return outmsg;\n    } else\n        return null;\n} \nelse \n{\n    if (topicmsgs.length == countmsg && !storagemode && !topicmatch){\n        topicmsgs.shift();\n    }\n    if (storagemode) {\n        if (topicmsgs.length < inprio)\n            topicmsgs.push(inmsg);\n        else\n            topicmsgs[inprio] = inmsg;\n        msgs[inmsg.topic] = topicmsgs;\n    } \n    if (!topicmatch && !storagemode) {\n        topicmsgs.push(inmsg);\n        msgs[inmsg.topic] = topicmsgs;\n    }\n    context.set(\"aggmsgs\",msgs);\n    context.set(\"lastval\",inmsg.payload);\n    \n    if (storagemode) {                          //check send msg\n        if (inprio < topicmsgs.length - 1)      //only send on trigger msg => no priority\n            return null;\n        for (var i = 0; i < topicmsgs.length; i++) { //only send if storage is full\n            if (topicmsgs[i] === undefined)\n                return null;\n        }    \n    }\n//node.warn(topicmsgs.length);\n    if (topicmatch) {\n        for (var tmsgs in msgs) {\n            if (msgs.hasOwnProperty(tmsgs)) {   \n                var ar = msgs[tmsgs];\n                if (ar.length == countmsg) {\n                    for (var i = 0; i < ar.length; i++) {\n                        if (ar[i].topic == inmsg.topic) {\n                            outmsg.payload = ar;\n                            outmsg.topic = ar[0].topic;\n                            node.send(RED.util.cloneMessage(outmsg));\n                        }\n                    }\n                }\n            }\n        }\n    } else {\n        if (topicmsgs.length == countmsg || sendinstantly) {\n            outmsg.payload = Object.assign({}, topicmsgs);\n            if (topicmatch) {\n                outmsg.topic = topicmsgs[0].topicori;\n                outmsg.payload[0].topic = outmsg.topic;\n            }\n            \n            if (clearaftersend) {\n                msgs[inmsg.topic] = undefined;\n                context.set(\"aggmsgs\",msgs);\n                context.set(\"lastval\",undefined);\n            }\n            if (reverse) {\n                var revArr = => Object.assign({}, a));\n                revArr.reverse();\n                outmsg.payload = revArr;\n            }\n            return outmsg;\n        }\n        else\n            return null;\n    }\n}\n\nfunction timer(topic,clear) {\n    var msgs = context.get(\"aggmsgs\")||{};\n    var topicmsgs = msgs[topic]||[];\n    var sent = false;\n    \n    var out = {\"topic\": topic, \"payload\": {\"value\": 0}};\n    if (topicmsgs.length > 1) {\n        out.payload.value = topicmsgs[1].payload - topicmsgs[0].payload;\n = topicmsgs;\n//        node.send(RED.util.cloneMessage(out));\n        node.send(out); \n        topicmsgs.shift();\n        sent = true;\n    }\n    if (clear || sent)\n        msgs[topic+\"tmr\"] = 1;\n    else \n        msgs[topic+\"tmr\"] = 0;\n    msgs[topic] = topicmsgs;\n    context.set(\"aggmsgs\",msgs);\n\n    if (sent || clear)\n        setTimeout(timer,sampleinterval,topic,false); //normal timer   \n}\n\nfunction addAtr(src, tgt) {\n    for (var atr in src) {\n        if (src.hasOwnProperty(atr)) {\n            if (atr != \"_msgid\" && typeof tgt[atr] == \"undefined\" && typeof src[atr] != \"object\" && atr != \"priority\") {   \n                tgt[atr] = src[atr];\n            }\n            if (typeof src[atr] == \"object\") {\n                if (typeof tgt[atr] == \"undefined\")\n                    tgt[atr] = {};\n                addAtr(src[atr],tgt[atr]);\n            }\n        }\n    }\n}","outputs":1,"noerr":0,"x":404,"y":1037,"wires":[["2a6d38dc.5eb678"]]},{"id":"2a6d38dc.5eb678","type":"switch","z":"b473c044.f9da7","name":"limit?","property":"payload","propertyType":"msg","rules":[{"t":"lt","v":"cfg.min","vt":"msg"},{"t":"gt","v":"cfg.max","vt":"msg"},{"t":"else"}],"checkall":"false","repair":false,"outputs":3,"x":534,"y":1034,"wires":[[],[],["fe38e85e.2aa798"]],"outputLabels":["under limit","over limit","ok"]},{"id":"fe38e85e.2aa798","type":"switch","z":"b473c044.f9da7","name":"nodb?","property":"true","propertyType":"jsonata","rules":[{"t":"eq","v":"cfg.nodatabase","vt":"msg"},{"t":"else"}],"checkall":"false","repair":false,"outputs":2,"x":662,"y":1074,"wires":[[],["c9f7c3f8.29e82"]],"outputLabels":["under limit","ok"]},{"id":"c9f7c3f8.29e82","type":"switch","z":"b473c044.f9da7","name":"","property":"topic","propertyType":"msg","rules":[{"t":"regex","v":"\\/heating\\/","vt":"str","case":false},{"t":"regex","v":"\\/kwl\\/","vt":"str","case":false},{"t":"else"}],"checkall":"true","outputs":3,"x":789,"y":1111,"wires":[[],[],["ee757e41.23398"]]},{"id":"ee757e41.23398","type":"change","z":"b473c044.f9da7","name":"no heating","rules":[{"t":"set","p":"","pt":"msg","to":"undefined","tot":"jsonata"},{"t":"set","p":"","pt":"msg","to":"undefined","tot":"jsonata"},{"t":"set","p":"measurement","pt":"msg","to":"$split(msg.topic, \"/\")[2]","tot":"jsonata"},{"t":"set","p":"device","pt":"msg","to":"$split(msg.topic, \"/\")[4] & $split(msg.topic, \"/\")[5]","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":935,"y":1159,"wires":[["336db3a5.0af69c"]]},{"id":"336db3a5.0af69c","type":"function","z":"b473c044.f9da7","name":"prepare data","func":"newmsg = {}\nnewmsg.payload = {}\nnewmsg.measurement = msg.measurement;\n\nif (typeof msg.payload === \"object\") {\n    for (var key in msg.payload) {\n        if (isNumeric(msg.payload[key]) && msg.measurement != \"event\") {\n             newmsg.payload[key] = parseFloat(msg.payload[key]);\n        } else {\n            newmsg.payload[key] = msg.payload[key];\n        }\n    }\n} else {\n    if (isNumeric(msg.payload[key]) && msg.measurement != \"event\") {\n         newmsg.payload.value = parseFloat(msg.payload);\n    } else {\n        newmsg.payload.value = msg.payload;\n    }\n}\n\n//add tag \"Device\"\nnewmsg.payload = [ newmsg.payload, { \"Device\": msg.device} ];\n//add tag \"Beacon\"\nif (msg.beacon !== undefined)\n    newmsg.payload[1].Beacon = msg.beacon;\n//add tag \"Location\"    \nif (msg.location !== undefined)\n    newmsg.payload[1].Location = msg.location;\n//add tag \"function\"    \nif (msg.function !== undefined)\n    newmsg.payload[1].Function = msg.function;\n//add tag \"system\"    \nif (msg.system !== undefined)\n    newmsg.payload[1].system = msg.system;\n//add tag \"type\"    \nif (msg.type !== undefined)\n    newmsg.payload[1].type = msg.type;   \n//add tag \"time\"    \nif (msg.time !== undefined)\n    newmsg.payload[1].time = msg.time;   \n    \nreturn newmsg;\n\nfunction isNumeric(n) {\n  return !isNaN(parseFloat(n)) && isFinite(n);\n}\n","outputs":"1","noerr":0,"x":1106,"y":1189,"wires":[["9ab5114a.50fef"]]},{"id":"9ab5114a.50fef","type":"influxdb out","z":"b473c044.f9da7","influxdb":"4ffcfa0e.b43084","name":"","measurement":"","x":1302,"y":1231,"wires":[]},{"id":"e36a332b.c61f3","type":"mqtt-broker","z":"","broker":"localhost","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"15","cleansession":true,"willTopic":"","willQos":"0","willPayload":"","birthTopic":"","birthQos":"0","birthPayload":""},{"id":"4ffcfa0e.b43084","type":"influxdb","z":"","hostname":"","port":"8086","protocol":"http","database":"infra","name":"","usetls":false,"tls":""}]
1 Like


You can certainly use strings in InfluxDB values but then you lose all the benefits that timeseries processing brings. Like being able to max/min/average/etc aggregation for a specific time period (e.g. an hour/day/week/month/etc).