Sorry, real life got in the way.
Try this. The subflow itself is unchanged, I have had to add some extra logic around it. It is working solidly for me now..
[{"id":"5ed5dcc8ccf90bfa","type":"group","z":"bdd7be38.d3b55","name":"Publishing to MQTT","style":{"label":true},"nodes":["b4227ef182f4ebef","a500a548eac99f1f","e7642367d639d549","ea94f646fa8e74e1","c04777c7cc3990a0","81428d3bcae766ee","86207c4b47bf3bd8","c458a09df04445f7","753fc1cc8863871e","e54df0d3d9480ac9","d13ea5ab5d57a0a7","43a40f5eb76748ac","cf6c087e45a7fbbf","de21d8a733a9c353","2fb7f6196432da68"],"x":54,"y":4659,"w":1172,"h":322},{"id":"ae9c6995a156ff5c","type":"subflow","name":"Delivery subflow","info":"","category":"","in":[{"x":50,"y":81,"wires":[{"id":"0b0e1e2df36d2d3a"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"0b0e1e2df36d2d3a","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"89ab963214b94f59","port":0}]}},{"id":"e6a7f648cf7f127b","type":"inject","z":"ae9c6995a156ff5c","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":100,"y":181,"wires":[["0b0e1e2df36d2d3a"]]},{"id":"89ab963214b94f59","type":"status","z":"ae9c6995a156ff5c","name":"","scope":["0b0e1e2df36d2d3a"],"x":300,"y":160,"wires":[[]]},{"id":"0b0e1e2df36d2d3a","type":"function","z":"ae9c6995a156ff5c","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n handleFAIL(stat)\n} else if (control === \"__trigger\") {\n handleTrigger(stat)\n} else {\n // no valid control value so must be incoming message\n handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n case \"initial\":\n case \"waitingForMsg\":\n sendMsg(stat) // send next message if any\n break;\n \n case \"waitingForTrigger\":\n case \"waitingForOKFail\":\n // do nothing\n break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n //node.warn(\"handleMessage\")\n // push a clone onto the queue\n stat.queue.push(RED.util.cloneMessage(msg))\n // limit number in queue\n const max = Number(env.get(\"maxQueue\"))\n if (!isNaN(max) && max > 0) {\n // max length hit, remove oldest\n if (stat.queue.length > max) stat.queue.shift()\n }\n // Simulate a trigger event to handle any state change needed\n handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n //node.warn(\"sendMsg\")\n let thisMsg = stat.queue[0]\n if (thisMsg) {\n // send a clone\n //node.warn(\"sending\")\n node.send(RED.util.cloneMessage(thisMsg))\n stat.state = \"waitingForOKFail\"\n } else {\n // nothing in queue\n stat.state = \"waitingForMsg\"\n }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n //node.warn(\"handleOK\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // OK received so drop the top message \n stat.queue.shift()\n // set the state to waiting for message, which will allow the next one to be sent\n stat.state = \"waitingForMsg\"\n } else {\n node.warn(\"Ignoring unnexpected OK\")\n }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n //node.warn(\"handleFAIL\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // FAIL received so go to waiting for trigger state\n stat.state = \"waitingForTrigger\"\n } else {\n node.warn(\"Ignoring unnexpected FAIL\")\n }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n //node.warn(\"handleTrigger\")\n if (stat.state === \"waitingForTrigger\") {\n //node.warn(\"state to waiting\")\n // set it to watitingForMsg in order to trigger send \n stat.state = \"waitingForMsg\"\n }\n // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"b4227ef182f4ebef","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":810,"y":4840,"wires":[["a500a548eac99f1f"]]},{"id":"a500a548eac99f1f","type":"link out","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","links":["e7642367d639d549"],"x":1185,"y":4840,"wires":[]},{"id":"e7642367d639d549","type":"link in","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","links":["a500a548eac99f1f"],"x":135,"y":4800,"wires":[["e54df0d3d9480ac9"]]},{"id":"ea94f646fa8e74e1","type":"comment","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Send MQTT messages to this link","info":"","x":210,"y":4700,"wires":[]},{"id":"c04777c7cc3990a0","type":"mqtt out","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"936d2ec2ee69f50f","x":710,"y":4740,"wires":[]},{"id":"81428d3bcae766ee","type":"trigger","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"num","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":660,"y":4840,"wires":[["b4227ef182f4ebef"]]},{"id":"86207c4b47bf3bd8","type":"comment","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Adjust timeout time if necessary","info":"","x":690,"y":4800,"wires":[]},{"id":"c458a09df04445f7","type":"complete","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"MQTT Complete","scope":["c04777c7cc3990a0"],"uncaught":false,"x":180,"y":4940,"wires":[["753fc1cc8863871e","43a40f5eb76748ac"]]},{"id":"753fc1cc8863871e","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":400,"y":4840,"wires":[["81428d3bcae766ee"]]},{"id":"e54df0d3d9480ac9","type":"subflow:ae9c6995a156ff5c","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Guaranteed delivery test","env":[{"name":"maxQueue","value":"0","type":"num"}],"x":410,"y":4740,"wires":[["81428d3bcae766ee","c04777c7cc3990a0","cf6c087e45a7fbbf"]]},{"id":"d13ea5ab5d57a0a7","type":"join","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"key","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"1","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":770,"y":4940,"wires":[["de21d8a733a9c353"]]},{"id":"43a40f5eb76748ac","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"key: completed, save topic and payload","rules":[{"t":"set","p":"key","pt":"msg","to":"completed","tot":"str"},{"t":"move","p":"payload","pt":"msg","to":"payload.data","tot":"msg"},{"t":"set","p":"payload.topic","pt":"msg","to":"topic","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":520,"y":4940,"wires":[["d13ea5ab5d57a0a7"]]},{"id":"cf6c087e45a7fbbf","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"key: sending, save topic and payload","rules":[{"t":"set","p":"key","pt":"msg","to":"sending","tot":"str"},{"t":"move","p":"payload","pt":"msg","to":"payload.data","tot":"msg"},{"t":"set","p":"payload.topic","pt":"msg","to":"topic","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":510,"y":4900,"wires":[["d13ea5ab5d57a0a7"]]},{"id":"de21d8a733a9c353","type":"function","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Check messages match","func":"/** Check that the message that is has been sent (completed) is the one that has just been passed to the MQTT node (sending)\n * payload.sending contains last msg sent to MQTT node\n * payload.completed contains msg indicated completed by MQTT node\n * in each case the payload contains topic of message and payload value in data\n */ \nconst previousKey = context.get(\"previousKey\") || \"\"\nconst thisKey = msg.key\n\n//node.warn(typeof msg.payload.sending.data)\nif (thisKey === \"completed\" && previousKey === \"sending\") {\n let equal = false // whether messages are equivalent\n // topics must be the same\n if (msg.payload.sending.topic === msg.payload.completed.topic) {\n if (typeof msg.payload.sending.data === \"string\") {\n equal = compareDirect(msg.payload.sending.data, msg.payload.completed.data)\n } else if (Buffer.isBuffer(msg.payload.sending.data)) {\n equal = compareBuffers(msg.payload.sending.data, msg.payload.completed.data)\n } else {\n equal = compareJSON(msg.payload.sending.data, msg.payload.completed.data)\n }\n }\n if (equal) {\n msg.control = \"OK\"\n } else {\n msg.control = \"FAIL\"\n }\n} else {\n // out of sequence so ignore\n msg = null\n}\ncontext.set(\"previousKey\", thisKey)\nreturn msg;\n\nfunction compareJSON(sending, completed) {\n // compares JSON version of payload sent with that from the Complete node\n return completed === JSON.stringify(sending)\n}\n\nfunction compareDirect(sending, completed) {\n // directly compares payload sent with that from the Complete node\n return completed === sending \n}\n\nfunction compareBuffers(sending, completed) {\n // compares buffer payload sent with that from the Complete node\n let answer = false\n // check completed is also a buffer\n if (Buffer.isBuffer(completed)) {\n answer = Buffer.compare(sending, completed) === 0\n }\n return answer \n}","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":970,"y":4940,"wires":[["a500a548eac99f1f"]]},{"id":"2fb7f6196432da68","type":"link in","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Send to MQTT","links":[],"x":160,"y":4740,"wires":[["e54df0d3d9480ac9"]],"l":true},{"id":"936d2ec2ee69f50f","type":"mqtt-broker","name":"","broker":"localhost","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]
In the image you posted earlier you have three instances. Are you connecting to three different brokers or is the difference just the topic? If it is the topic then you just have to make sure that msg.topic includes the topic and you can send them all to the same guranteed delivery flow. In the MQTT node leave the topic blank.