I just get it running myself
No idea if its a good solution, or one can do this with much less workarounds...
but it's running
Maybe someone still can have a look at it and give me advice where to do things better...
Maybe the someone is @Colin ?
[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow","info":"","category":"","in":[{"x":50,"y":81,"wires":[{"id":"62560ae6ab8737dc"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"62560ae6ab8737dc","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":100,"y":181,"wires":[["62560ae6ab8737dc"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["62560ae6ab8737dc"],"x":300,"y":160,"wires":[[]]},{"id":"62560ae6ab8737dc","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n handleFAIL(stat)\n} else if (control === \"__trigger\") {\n handleTrigger(stat)\n} else {\n // no valid control value so must be incoming message\n handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n case \"initial\":\n case \"waitingForMsg\":\n sendMsg(stat) // send next message if any\n break;\n \n case \"waitingForTrigger\":\n case \"waitingForOKFail\":\n // do nothing\n break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n //node.warn(\"handleMessage\")\n // push a clone onto the queue\n stat.queue.push(RED.util.cloneMessage(msg))\n // limit number in queue\n const max = Number(env.get(\"maxQueue\"))\n if (!isNaN(max) && max > 0) {\n // max length hit, remove oldest\n if (stat.queue.length > max) stat.queue.shift()\n }\n // Simulate a trigger event to handle any state change needed\n handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n //node.warn(\"sendMsg\")\n let thisMsg = stat.queue[0]\n if (thisMsg) {\n // send a clone\n //node.warn(\"sending\")\n node.send(RED.util.cloneMessage(thisMsg))\n stat.state = \"waitingForOKFail\"\n } else {\n // nothing in queue\n stat.state = \"waitingForMsg\"\n }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n //node.warn(\"handleOK\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // OK received so drop the top message \n stat.queue.shift()\n // set the state to waiting for message, which will allow the next one to be sent\n stat.state = \"waitingForMsg\"\n } else {\n node.warn(\"Ignoring unnexpected OK\")\n }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n //node.warn(\"handleFAIL\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // FAIL received so go to waiting for trigger state\n stat.state = \"waitingForTrigger\"\n } else {\n node.warn(\"Ignoring unnexpected FAIL\")\n }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n //node.warn(\"handleTrigger\")\n if (stat.state === \"waitingForTrigger\") {\n //node.warn(\"state to waiting\")\n // set it to watitingForMsg in order to trigger send \n stat.state = \"waitingForMsg\"\n }\n // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"f45d8c28502da533","type":"tab","label":"Flow 2","disabled":false,"info":"","env":[]},{"id":"b58e837f.778bd","type":"change","z":"f45d8c28502da533","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1030,"y":380,"wires":[["fd181d1c.1f5a3"]]},{"id":"716a7534.48bac4","type":"change","z":"f45d8c28502da533","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1110,"y":500,"wires":[["fd181d1c.1f5a3"]]},{"id":"fd181d1c.1f5a3","type":"link out","z":"f45d8c28502da533","name":"","links":["a699948f.6e0598"],"x":1215,"y":380,"wires":[]},{"id":"a699948f.6e0598","type":"link in","z":"f45d8c28502da533","name":"","links":["fd181d1c.1f5a3"],"x":215,"y":300,"wires":[["37f08ec1.d26a62"]]},{"id":"66644e37.fc0ad8","type":"link in","z":"f45d8c28502da533","name":"Email delivery","links":["9f35964.b7ad9e8"],"x":215,"y":260,"wires":[["37f08ec1.d26a62"]]},{"id":"37f08ec1.d26a62","type":"subflow:149380c1.63e107","z":"f45d8c28502da533","name":"Guaranteed delivery","env":[{"name":"retrySecs","value":"30","type":"num"},{"name":"maxQueue","value":"3000","type":"num"},{"name":"contextStore","value":"default","type":"env"}],"x":420,"y":260,"wires":[["1bd92bef.7abae4","ac3d08c0.3adb68","65ab6794.354d98","28593b1.d771bc4"]]},{"id":"f5f7f352.4ab6d8","type":"comment","z":"f45d8c28502da533","name":"Send MQTT messages to this link","info":"","x":250,"y":220,"wires":[]},{"id":"1bd92bef.7abae4","type":"mqtt out","z":"f45d8c28502da533","name":"","topic":"test/guaranteed","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"","x":1040,"y":260,"wires":[]},{"id":"9f35964.b7ad9e8","type":"link out","z":"f45d8c28502da533","name":"","links":["66644e37.fc0ad8"],"x":855,"y":100,"wires":[]},{"id":"efbe502f.9c581","type":"mqtt in","z":"f45d8c28502da533","name":"","topic":"test/guaranteed_ack","qos":"1","datatype":"json","broker":"","nl":false,"rap":false,"inputs":0,"x":110,"y":520,"wires":[["f79821a194243a2a"]]},{"id":"cec8afe1.087ee8","type":"join","z":"f45d8c28502da533","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"guarantee","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"1","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":710,"y":500,"wires":[["210ccba3.9e50b4"]]},{"id":"65ab6794.354d98","type":"change","z":"f45d8c28502da533","name":"outgoing","rules":[{"t":"set","p":"guarantee","pt":"msg","to":"outgoing","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":540,"y":460,"wires":[["cec8afe1.087ee8"]]},{"id":"25ad5f77.0b3a58","type":"change","z":"f45d8c28502da533","name":"incoming","rules":[{"t":"set","p":"guarantee","pt":"msg","to":"incoming","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":540,"y":520,"wires":[["cec8afe1.087ee8"]]},{"id":"210ccba3.9e50b4","type":"function","z":"f45d8c28502da533","name":"Check for match","func":"let state = context.get(\"state\") || \"quiescent\"\n// quiescent means that it is waiting for a message to be published\n// waiting means waiting for an ack back\n//node.warn(state)\nif (msg.reset) {\n // this is a reset fed back from the timeout node\n //node.warn(\"reset, state to quiescent\")\n state = \"quiescent\"\n msg = null\n} else if (state == \"quiescent\") {\n // waiting for a message to be sent\n // ignore if this is not an outgoing message\n if (msg.guarantee === \"incoming\") {\n //node.warn(\"Ignoring unnexpected incoming\")\n } else {\n //node.warn(\"outgoing, switch state\")\n state = \"waiting\"\n }\n // don't send anything\n msg = null\n} else {\n // waiting for response\n // Ignore unless this is an incoming ack message \n if (msg.guarantee === \"incoming\") {\n //node.warn(\"incoming\")\n // is the incoming timestamp the one we are expecting?\n if (msg.payload.incoming.timestamp != msg.payload.outgoing.timestamp) {\n // no, so this is presumably repeat of an earlier message, ignore it\n //node.warn(\"no match\")\n msg = null\n } else {\n // exact match so pass it on\n //node.warn(\"Match\")\n // set msg.reset for resetting the timeout trigger\n msg.reset = true\n //node.warn(\"setting quiescent\")\n state = \"quiescent\"\n }\n } else {\n //node.warn(\"outgoing, ignored\")\n msg = null\n }\n}\ncontext.set(\"state\", state)\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":900,"y":500,"wires":[["716a7534.48bac4","ac3d08c0.3adb68"]]},{"id":"ac3d08c0.3adb68","type":"trigger","z":"f45d8c28502da533","name":"Timeout 20 s","op1":"","op2":"0","op1type":"nul","op2type":"json","duration":"20","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":850,"y":380,"wires":[["b58e837f.778bd","401c69b2.eaa2e"]]},{"id":"6bc3b00b.b52e08","type":"mqtt in","z":"f45d8c28502da533","name":"","topic":"test/guaranteed","qos":"1","datatype":"json","broker":"","nl":false,"rap":false,"inputs":0,"x":300,"y":820,"wires":[["fdcb0331395bdb18","897a9e4f.e0457"]]},{"id":"9138a4a4.2ee658","type":"mqtt out","z":"f45d8c28502da533","name":"","topic":"test/guaranteed_ack","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"","x":1180,"y":820,"wires":[]},{"id":"80f13173.6b1df","type":"comment","z":"f45d8c28502da533","name":"Put this at the receiving end","info":"","x":320,"y":760,"wires":[]},{"id":"401c69b2.eaa2e","type":"change","z":"f45d8c28502da533","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":1010,"y":420,"wires":[["210ccba3.9e50b4"]]},{"id":"897a9e4f.e0457","type":"debug","z":"f45d8c28502da533","name":"SUB","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":690,"y":960,"wires":[]},{"id":"28593b1.d771bc4","type":"debug","z":"f45d8c28502da533","name":"PUB","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1010,"y":220,"wires":[]},{"id":"41de56ca.557958","type":"comment","z":"f45d8c28502da533","name":"This is where the subscriber gets his message","info":"","x":810,"y":920,"wires":[]},{"id":"2c0ca871.4a9428","type":"comment","z":"f45d8c28502da533","name":"Echo received message timestamp back to the ACK topic","info":"","x":810,"y":780,"wires":[]},{"id":"34c6dd23.8da4d2","type":"comment","z":"f45d8c28502da533","name":"Adjust timeout time if necessary","info":"","x":930,"y":340,"wires":[]},{"id":"c77977b95104c547","type":"change","z":"f45d8c28502da533","name":"generate Object","rules":[{"t":"set","p":"payload","pt":"msg","to":"{}","tot":"json"},{"t":"set","p":"payload.temperature","pt":"msg","to":"25.4","tot":"num"},{"t":"set","p":"payload.humidity","pt":"msg","to":"43","tot":"num"},{"t":"set","p":"payload.timestamp","pt":"msg","to":"","tot":"date"}],"action":"","property":"","from":"","to":"","reg":false,"x":600,"y":100,"wires":[["9f35964.b7ad9e8"]]},{"id":"4694895e7067cdc7","type":"inject","z":"f45d8c28502da533","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":240,"y":100,"wires":[["c77977b95104c547"]]},{"id":"fdcb0331395bdb18","type":"change","z":"f45d8c28502da533","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.timestamp","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":770,"y":820,"wires":[["9138a4a4.2ee658"]]},{"id":"f79821a194243a2a","type":"change","z":"f45d8c28502da533","name":"payload-to-timestamp","rules":[{"t":"set","p":"temp","pt":"msg","to":"payload","tot":"msg"},{"t":"set","p":"payload","pt":"msg","to":"{}","tot":"json"},{"t":"set","p":"payload.timestamp","pt":"msg","to":"temp","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":340,"y":520,"wires":[["25ad5f77.0b3a58"]]}]