Dynamically changeable Node from "outside", handshake secured sending over MQTT

I need a solution for:

  • Receiving Data from ModbusRTU, ModbusTCP, maybe KNX. The parameters (Modbus ID, FC, Register / KNX Adress) should be configurable from external like MQTT Message or a config file.
  • The Data then gets a timestamp and is sent via MQTT. I need an extended Version of @Colin 's "guaranteed delivery flow" https://flows.nodered.org/flow/05e6d61f14ef6af763ec4cfd1049ab61
    A handshake over MQTT: something unique like the timestamp or aMsgID comes back with an "ok" state. (Not from the MQTT sending node or the MQTT Broker, but from a receiver that has checked the data over its MQTT subscription, then sending back an MQTT message.

What allready works:

  • Get the data over the Nodes Modbus-Flex-Getter or knxEasy-in (but with fixed parameters in the nodes)
  • sending the Data with the timestamp in a correct object to MQTT

Some questions appear for me:

  • What if the connection is lost for a long time - how to limit saving Data localy (I dont want a blocked system "run out of memory" or something like that)
  • How to set a limit "maximum messages per time". When from KNX (event driven) are commicg to much telegrams, I want to limit the sending to MQTT.
  • Have somebody allready done something similar?
  • Should this be done in a flow or does it need a new Node for doing all that. Can somebody do the programming for me?

Thanks for some hints

Something like this?

[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow master copy","info":"","category":"","in":[{"x":60,"y":80,"wires":[{"id":"6a3f78ab.f6b8e"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"6a3f78ab.f6b8e","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"6a3f78ab.f6b8e","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":280,"y":80,"wires":[[]]},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":180,"wires":[["6a3f78ab.f6b8e"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["6a3f78ab.f6b8e"],"x":300,"y":160,"wires":[[]]},{"id":"b58e837f.778bd","type":"change","z":"f613199d.00c9","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":810,"y":240,"wires":[["fd181d1c.1f5a3"]]},{"id":"716a7534.48bac4","type":"change","z":"f613199d.00c9","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":890,"y":360,"wires":[["fd181d1c.1f5a3"]]},{"id":"fd181d1c.1f5a3","type":"link out","z":"f613199d.00c9","name":"","links":["a699948f.6e0598"],"x":995,"y":240,"wires":[]},{"id":"a699948f.6e0598","type":"link in","z":"f613199d.00c9","name":"","links":["fd181d1c.1f5a3"],"x":175,"y":200,"wires":[["37f08ec1.d26a62"]]},{"id":"66644e37.fc0ad8","type":"link in","z":"f613199d.00c9","name":"Email delivery","links":["9f35964.b7ad9e8"],"x":115,"y":160,"wires":[["37f08ec1.d26a62"]]},{"id":"37f08ec1.d26a62","type":"subflow:149380c1.63e107","z":"f613199d.00c9","name":"Guaranteed delivery","env":[{"name":"retrySecs","value":"30","type":"num"},{"name":"maxQueue","value":"10","type":"num"}],"x":320,"y":160,"wires":[["788b62de.d224dc"]]},{"id":"f5f7f352.4ab6d8","type":"comment","z":"f613199d.00c9","name":"Send MQTT messages to this link","info":"","x":150,"y":120,"wires":[]},{"id":"788b62de.d224dc","type":"function","z":"f613199d.00c9","name":"Add timestamp","func":"// Add the current timestamp\n// This assumes that message turnaround through MQTT takes at least 1 msec\nmsg.payload = {timestamp: new Date().getTime(), oldPayload: msg.payload}\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":540,"y":160,"wires":[["1bd92bef.7abae4","65ab6794.354d98","ac3d08c0.3adb68"]]},{"id":"1bd92bef.7abae4","type":"mqtt out","z":"f613199d.00c9","name":"","topic":"test/guaranteed","qos":"1","retain":"","broker":"e3d45312.a3103","x":940,"y":160,"wires":[]},{"id":"91f12b17.df645","type":"inject","z":"f613199d.00c9","name":"Test message","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":230,"y":40,"wires":[["7eb796f0.df58b8"]]},{"id":"7eb796f0.df58b8","type":"function","z":"f613199d.00c9","name":"Build test message","func":"msg.payload = {data: \"some data\", moreData: new Date()}\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":430,"y":40,"wires":[["9f35964.b7ad9e8","28593b1.d771bc4"]]},{"id":"9f35964.b7ad9e8","type":"link out","z":"f613199d.00c9","name":"","links":["66644e37.fc0ad8"],"x":595,"y":40,"wires":[]},{"id":"efbe502f.9c581","type":"mqtt in","z":"f613199d.00c9","name":"","topic":"test/guaranteed_ack","qos":"1","datatype":"json","broker":"e3d45312.a3103","inputs":0,"x":130,"y":380,"wires":[["25ad5f77.0b3a58"]]},{"id":"cec8afe1.087ee8","type":"join","z":"f613199d.00c9","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"guarantee","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"1","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":490,"y":360,"wires":[["210ccba3.9e50b4"]]},{"id":"65ab6794.354d98","type":"change","z":"f613199d.00c9","name":"outgoing","rules":[{"t":"set","p":"guarantee","pt":"msg","to":"outgoing","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":320,"y":320,"wires":[["cec8afe1.087ee8"]]},{"id":"25ad5f77.0b3a58","type":"change","z":"f613199d.00c9","name":"incoming","rules":[{"t":"set","p":"guarantee","pt":"msg","to":"incoming","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":320,"y":380,"wires":[["cec8afe1.087ee8"]]},{"id":"210ccba3.9e50b4","type":"function","z":"f613199d.00c9","name":"Check for match","func":"let state = context.get(\"state\") || \"quiescent\"\n// quiescent means that it is waiting for a message to be published\n// waiting means waiting for an ack back\n//node.warn(state)\nif (msg.reset) {\n    // this is a reset fed back from the timeout node\n    //node.warn(\"reset, state to quiescent\")\n    state = \"quiescent\"\n    msg = null\n} else if (state == \"quiescent\") {\n    // waiting for a message to be sent\n    // ignore if this is not an outgoing message\n    if (msg.guarantee === \"incoming\") {\n        //node.warn(\"Ignoring unnexpected incoming\")\n    } else {\n        //node.warn(\"outgoing, switch state\")\n        state = \"waiting\"\n    }\n    // don't send anything\n    msg = null\n} else {\n    // waiting for response\n    // Ignore unless this is an incoming ack message \n    if (msg.guarantee === \"incoming\") {\n        //node.warn(\"incoming\")\n        // is the incoming timestamp the one we are expecting?\n        if (msg.payload.incoming.timestamp != msg.payload.outgoing.timestamp) {\n            // no, so this is presumably repeat of an earlier message, ignore it\n            //node.warn(\"no match\")\n            msg = null\n        } else  {\n            // exact match so pass it on\n            //node.warn(\"Match\")\n            // set msg.reset for resetting the timeout trigger\n            msg.reset = true\n            //node.warn(\"setting quiescent\")\n            state = \"quiescent\"\n        }\n    } else {\n        //node.warn(\"outgoing, ignored\")\n        msg = null\n    }\n}\ncontext.set(\"state\", state)\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":680,"y":360,"wires":[["716a7534.48bac4","ac3d08c0.3adb68"]]},{"id":"ac3d08c0.3adb68","type":"trigger","z":"f613199d.00c9","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"json","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":640,"y":240,"wires":[["b58e837f.778bd","401c69b2.eaa2e"]]},{"id":"6bc3b00b.b52e08","type":"mqtt in","z":"f613199d.00c9","name":"","topic":"test/guaranteed","qos":"1","datatype":"json","broker":"e3d45312.a3103","inputs":0,"x":140,"y":560,"wires":[["9138a4a4.2ee658","d253b91a.0161e"]]},{"id":"9138a4a4.2ee658","type":"mqtt out","z":"f613199d.00c9","name":"","topic":"test/guaranteed_ack","qos":"1","retain":"","broker":"e3d45312.a3103","x":500,"y":560,"wires":[]},{"id":"80f13173.6b1df","type":"comment","z":"f613199d.00c9","name":"Put this at the receiving end","info":"","x":160,"y":480,"wires":[]},{"id":"401c69b2.eaa2e","type":"change","z":"f613199d.00c9","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":790,"y":280,"wires":[["210ccba3.9e50b4"]]},{"id":"d253b91a.0161e","type":"change","z":"f613199d.00c9","name":"Restore payload","rules":[{"t":"move","p":"payload.oldPayload","pt":"msg","to":"payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":320,"y":660,"wires":[["897a9e4f.e0457"]]},{"id":"897a9e4f.e0457","type":"debug","z":"f613199d.00c9","name":"SUB","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":500,"y":700,"wires":[]},{"id":"28593b1.d771bc4","type":"debug","z":"f613199d.00c9","name":"PUB","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":630,"y":100,"wires":[]},{"id":"41de56ca.557958","type":"comment","z":"f613199d.00c9","name":"This is where the subscriber gets his message","info":"","x":610,"y":660,"wires":[]},{"id":"2c0ca871.4a9428","type":"comment","z":"f613199d.00c9","name":"Echo received messages back to the ACK topic","info":"","x":580,"y":520,"wires":[]},{"id":"34c6dd23.8da4d2","type":"comment","z":"f613199d.00c9","name":"Adjust timeout time if necessary","info":"","x":710,"y":200,"wires":[]},{"id":"e3d45312.a3103","type":"mqtt-broker","name":"","broker":"owl2.local","port":"1883","clientid":"tigger","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"autoUnsubscribe":true,"birthTopic":"tydwr/tigger/LWT","birthQos":"0","birthRetain":"true","birthPayload":"Online","birthMsg":{},"closeTopic":"tydwr/tigger/LWT","closeQos":"0","closePayload":"Offline","closeMsg":{},"willTopic":"tydwr/tigger/LWT","willQos":"0","willPayload":"Offline","willMsg":{},"sessionExpiry":""}]

Hi Colin

This looks, promising but I don't get it with the timeout option. If I disconnect "Echo received messages back to the ACK topic", your script tells "3 waiting for trigger"
image
This is this the maximum time the script is waiting for an ack from the other side, right?
What happens to the stored messages that never get an ack? Are those 3 still somewhere or are they delted and this is just a counter?
Does the order of the messages in the _ack topic matter? I think so, right?

Thanks for clarification

Yes, if the timeout triggers then a FAIL is signalled back to the subflow so it knows to retry that message later.

Did you read the explanation of the subflow here? It explains how messages that fail to get sent are retained in the subflow (backed to persistent storage if required) until they are successfully sent.
Note that messages are sent in order, so a later message will not be sent until all previous messages have been successfully transferred.

Not sure what you mean by that.

You allready answered it a question before, thanks :wink:

Your subflow works really well. I am now struggling a little with trying to reduce the amount of sended data:
the timestamp is already needed in my payload, so no need to wrap anotherone around the message
Is it possible to use this one for the handshake? And sending back in the ack topic only the timestamp without the rest of the data?

eg:
sender: {"temperature1":26.8, "humidity1":43,"timestamp":1709019768984}
answer in the ack topic: 1709019768984

Can somebody give me a helping hand?

How often are you sending it? Unless you are sending it hundreds of times a second or you are paying for your bandwidth then really don't worry about it.

If you do want to spend the time changing and debugging it then:

  1. Remove the Add Timestamp function in the flow I posted
  2. In the Check for Match function change the path to the timestamp to match your structure
  3. At the other end of the network remove the Restore Payload function.

But as I said, really, don't bother.

I just get it running myself :muscle:
No idea if its a good solution, or one can do this with much less workarounds...
but it's running

Maybe someone still can have a look at it and give me advice where to do things better...
Maybe the someone is @Colin ? :see_no_evil:

[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow","info":"","category":"","in":[{"x":50,"y":81,"wires":[{"id":"62560ae6ab8737dc"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"62560ae6ab8737dc","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":100,"y":181,"wires":[["62560ae6ab8737dc"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["62560ae6ab8737dc"],"x":300,"y":160,"wires":[[]]},{"id":"62560ae6ab8737dc","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"f45d8c28502da533","type":"tab","label":"Flow 2","disabled":false,"info":"","env":[]},{"id":"b58e837f.778bd","type":"change","z":"f45d8c28502da533","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1030,"y":380,"wires":[["fd181d1c.1f5a3"]]},{"id":"716a7534.48bac4","type":"change","z":"f45d8c28502da533","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1110,"y":500,"wires":[["fd181d1c.1f5a3"]]},{"id":"fd181d1c.1f5a3","type":"link out","z":"f45d8c28502da533","name":"","links":["a699948f.6e0598"],"x":1215,"y":380,"wires":[]},{"id":"a699948f.6e0598","type":"link in","z":"f45d8c28502da533","name":"","links":["fd181d1c.1f5a3"],"x":215,"y":300,"wires":[["37f08ec1.d26a62"]]},{"id":"66644e37.fc0ad8","type":"link in","z":"f45d8c28502da533","name":"Email delivery","links":["9f35964.b7ad9e8"],"x":215,"y":260,"wires":[["37f08ec1.d26a62"]]},{"id":"37f08ec1.d26a62","type":"subflow:149380c1.63e107","z":"f45d8c28502da533","name":"Guaranteed delivery","env":[{"name":"retrySecs","value":"30","type":"num"},{"name":"maxQueue","value":"3000","type":"num"},{"name":"contextStore","value":"default","type":"env"}],"x":420,"y":260,"wires":[["1bd92bef.7abae4","ac3d08c0.3adb68","65ab6794.354d98","28593b1.d771bc4"]]},{"id":"f5f7f352.4ab6d8","type":"comment","z":"f45d8c28502da533","name":"Send MQTT messages to this link","info":"","x":250,"y":220,"wires":[]},{"id":"1bd92bef.7abae4","type":"mqtt out","z":"f45d8c28502da533","name":"","topic":"test/guaranteed","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"","x":1040,"y":260,"wires":[]},{"id":"9f35964.b7ad9e8","type":"link out","z":"f45d8c28502da533","name":"","links":["66644e37.fc0ad8"],"x":855,"y":100,"wires":[]},{"id":"efbe502f.9c581","type":"mqtt in","z":"f45d8c28502da533","name":"","topic":"test/guaranteed_ack","qos":"1","datatype":"json","broker":"","nl":false,"rap":false,"inputs":0,"x":110,"y":520,"wires":[["f79821a194243a2a"]]},{"id":"cec8afe1.087ee8","type":"join","z":"f45d8c28502da533","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"guarantee","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"1","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":710,"y":500,"wires":[["210ccba3.9e50b4"]]},{"id":"65ab6794.354d98","type":"change","z":"f45d8c28502da533","name":"outgoing","rules":[{"t":"set","p":"guarantee","pt":"msg","to":"outgoing","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":540,"y":460,"wires":[["cec8afe1.087ee8"]]},{"id":"25ad5f77.0b3a58","type":"change","z":"f45d8c28502da533","name":"incoming","rules":[{"t":"set","p":"guarantee","pt":"msg","to":"incoming","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":540,"y":520,"wires":[["cec8afe1.087ee8"]]},{"id":"210ccba3.9e50b4","type":"function","z":"f45d8c28502da533","name":"Check for match","func":"let state = context.get(\"state\") || \"quiescent\"\n// quiescent means that it is waiting for a message to be published\n// waiting means waiting for an ack back\n//node.warn(state)\nif (msg.reset) {\n    // this is a reset fed back from the timeout node\n    //node.warn(\"reset, state to quiescent\")\n    state = \"quiescent\"\n    msg = null\n} else if (state == \"quiescent\") {\n    // waiting for a message to be sent\n    // ignore if this is not an outgoing message\n    if (msg.guarantee === \"incoming\") {\n        //node.warn(\"Ignoring unnexpected incoming\")\n    } else {\n        //node.warn(\"outgoing, switch state\")\n        state = \"waiting\"\n    }\n    // don't send anything\n    msg = null\n} else {\n    // waiting for response\n    // Ignore unless this is an incoming ack message \n    if (msg.guarantee === \"incoming\") {\n        //node.warn(\"incoming\")\n        // is the incoming timestamp the one we are expecting?\n        if (msg.payload.incoming.timestamp != msg.payload.outgoing.timestamp) {\n            // no, so this is presumably repeat of an earlier message, ignore it\n            //node.warn(\"no match\")\n            msg = null\n        } else  {\n            // exact match so pass it on\n            //node.warn(\"Match\")\n            // set msg.reset for resetting the timeout trigger\n            msg.reset = true\n            //node.warn(\"setting quiescent\")\n            state = \"quiescent\"\n        }\n    } else {\n        //node.warn(\"outgoing, ignored\")\n        msg = null\n    }\n}\ncontext.set(\"state\", state)\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":900,"y":500,"wires":[["716a7534.48bac4","ac3d08c0.3adb68"]]},{"id":"ac3d08c0.3adb68","type":"trigger","z":"f45d8c28502da533","name":"Timeout 20 s","op1":"","op2":"0","op1type":"nul","op2type":"json","duration":"20","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":850,"y":380,"wires":[["b58e837f.778bd","401c69b2.eaa2e"]]},{"id":"6bc3b00b.b52e08","type":"mqtt in","z":"f45d8c28502da533","name":"","topic":"test/guaranteed","qos":"1","datatype":"json","broker":"","nl":false,"rap":false,"inputs":0,"x":300,"y":820,"wires":[["fdcb0331395bdb18","897a9e4f.e0457"]]},{"id":"9138a4a4.2ee658","type":"mqtt out","z":"f45d8c28502da533","name":"","topic":"test/guaranteed_ack","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"","x":1180,"y":820,"wires":[]},{"id":"80f13173.6b1df","type":"comment","z":"f45d8c28502da533","name":"Put this at the receiving end","info":"","x":320,"y":760,"wires":[]},{"id":"401c69b2.eaa2e","type":"change","z":"f45d8c28502da533","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":1010,"y":420,"wires":[["210ccba3.9e50b4"]]},{"id":"897a9e4f.e0457","type":"debug","z":"f45d8c28502da533","name":"SUB","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":690,"y":960,"wires":[]},{"id":"28593b1.d771bc4","type":"debug","z":"f45d8c28502da533","name":"PUB","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1010,"y":220,"wires":[]},{"id":"41de56ca.557958","type":"comment","z":"f45d8c28502da533","name":"This is where the subscriber gets his message","info":"","x":810,"y":920,"wires":[]},{"id":"2c0ca871.4a9428","type":"comment","z":"f45d8c28502da533","name":"Echo received message timestamp back to the ACK topic","info":"","x":810,"y":780,"wires":[]},{"id":"34c6dd23.8da4d2","type":"comment","z":"f45d8c28502da533","name":"Adjust timeout time if necessary","info":"","x":930,"y":340,"wires":[]},{"id":"c77977b95104c547","type":"change","z":"f45d8c28502da533","name":"generate Object","rules":[{"t":"set","p":"payload","pt":"msg","to":"{}","tot":"json"},{"t":"set","p":"payload.temperature","pt":"msg","to":"25.4","tot":"num"},{"t":"set","p":"payload.humidity","pt":"msg","to":"43","tot":"num"},{"t":"set","p":"payload.timestamp","pt":"msg","to":"","tot":"date"}],"action":"","property":"","from":"","to":"","reg":false,"x":600,"y":100,"wires":[["9f35964.b7ad9e8"]]},{"id":"4694895e7067cdc7","type":"inject","z":"f45d8c28502da533","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":240,"y":100,"wires":[["c77977b95104c547"]]},{"id":"fdcb0331395bdb18","type":"change","z":"f45d8c28502da533","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.timestamp","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":770,"y":820,"wires":[["9138a4a4.2ee658"]]},{"id":"f79821a194243a2a","type":"change","z":"f45d8c28502da533","name":"payload-to-timestamp","rules":[{"t":"set","p":"temp","pt":"msg","to":"payload","tot":"msg"},{"t":"set","p":"payload","pt":"msg","to":"{}","tot":"json"},{"t":"set","p":"payload.timestamp","pt":"msg","to":"temp","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":340,"y":520,"wires":[["25ad5f77.0b3a58"]]}]

Sending is not the problem (maybe 100 or 200 MQTT messages in 5 minutes)
Receiving at the other side from several hundred or even several thousend senders at the same time maybe run soon in load problems.

It's just in my genes to save everything I can: Why should I send more Bytes, if I can do the same with less?

That should work, as far as I can see, provided the timestamps in the messages are guaranteed unique.
The disadvantage over the original code is that if you change the structure of the data, or want, in addition, to publish a different format of data, then it will not work. The original implementation would work with any data structure you wanted to publish.

It is caused premature optimisation. You have spent time optimising some code and retesting it before you know if it is ever going to be a problem. I can absolutely guarantee that if you ever get to the point that resources become a problem that the problem will not be where you expected it to be. That is one of the rules of software development. The bottlenecks will not be where you expect. You would have been better spending your time doing further development of the project.

If MQTT network bandwidth should ever become a problem then a solution would be to block a number of messages together and send them as an array of records, then unpack them at the other end. Then you would have to revert to the original version of the guaranteed delivery flow as the structure of the data would be different.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.