Node-red-contrib-msg-queue - Only storing not forwarding w/ active connection

Give this a go. It uses the Guaranteed Delivery subflow from here with the logic to make it work with MQTT. Configure the MQTT node as required and send the messages to be sent in via the top left link node. It uses a Complete node attached to the MQTT node to know when the message has been sent successfully but there is no way to know when it has failed, so it uses a 5 second timeout to indicate that. When it fails it retries every 60 seconds. That can be configured in the subflow settings. Be aware though that you may get repeated messages when MQTT reconnects as the MQTT node may buffer up some messages and send them when the connection recovers. If repeated messages are a problem for you then you will have to deal with that at the receiving end. Also you may get warning messages about unexpected OK messages after a recovery. You can safely ignore those (or edit the function in the subflow to stop the warning).

[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow master copy","info":"","category":"","in":[{"x":60,"y":80,"wires":[{"id":"6a3f78ab.f6b8e"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"6a3f78ab.f6b8e","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"6a3f78ab.f6b8e","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":180,"wires":[["6a3f78ab.f6b8e"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["6a3f78ab.f6b8e"],"x":300,"y":160,"wires":[[]]},{"id":"87d89aae.9b0db","type":"change","z":"85aecebd.19674","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":360,"wires":[["81c21dfd.874868"]]},{"id":"a12804d0.bbd0a","type":"change","z":"85aecebd.19674","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":460,"wires":[["81c21dfd.874868"]]},{"id":"81c21dfd.874868","type":"link out","z":"85aecebd.19674","name":"","links":["7ae5a5e5.be2f6c"],"x":775,"y":400,"wires":[]},{"id":"7ae5a5e5.be2f6c","type":"link in","z":"85aecebd.19674","name":"","links":["81c21dfd.874868"],"x":135,"y":320,"wires":[["7267f4ba.8104e4"]]},{"id":"fbd0d6c6.8eb88","type":"link in","z":"85aecebd.19674","name":"Email delivery","links":["2090b139.480446"],"x":75,"y":260,"wires":[["7267f4ba.8104e4"]]},{"id":"7267f4ba.8104e4","type":"subflow:149380c1.63e107","z":"85aecebd.19674","name":"Guaranteed delivery","env":[],"x":280,"y":280,"wires":[["3ba7e731.3b7e88","2df5a331.164ed4"]]},{"id":"1a0a6921.df8e9f","type":"comment","z":"85aecebd.19674","name":"Send MQTT messages to this link","info":"","x":150,"y":220,"wires":[]},{"id":"3ba7e731.3b7e88","type":"mqtt out","z":"85aecebd.19674","name":"","topic":"","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"e3d45312.a3103","x":750,"y":280,"wires":[]},{"id":"2df5a331.164ed4","type":"trigger","z":"85aecebd.19674","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"num","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":500,"y":360,"wires":[["87d89aae.9b0db"]]},{"id":"4255ad9.f2b01d4","type":"comment","z":"85aecebd.19674","name":"Adjust timeout time if necessary","info":"","x":550,"y":320,"wires":[]},{"id":"55d227e5.4520b","type":"complete","z":"85aecebd.19674","name":"MQTT Complete","scope":["3ba7e731.3b7e88"],"uncaught":false,"x":180,"y":460,"wires":[["a838d587.28a058","a12804d0.bbd0a"]]},{"id":"a838d587.28a058","type":"change","z":"85aecebd.19674","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":270,"y":380,"wires":[["2df5a331.164ed4"]]},{"id":"e3d45312.a3103","type":"mqtt-broker","name":"","broker":"owl2.local","port":"1883","clientid":"tigger","usetls":false,"compatmode":true,"keepalive":"60","cleansession":false,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]