Detecting disconnection of MQTT out node

Just on a side note , I had disconnects to MQTT as well but I found out that this was due to the fact that 2 clients used the same MQTT ClientID.

After checking all my mqtt clients and ensuring I had unique clientsID's .. everything is fine and I have no more disconnects

The disconnects themselves are not the issue here, the disconnections are due to network issues. The issue is how to store messages until the connection is re-established and then send them.

I have a solution, but I still need to tidy it up. I should be able to do that tomorrow.

Or it may be that it never worked properly under the fairly extreme conditions you are subjecting it to.

You have me on tenterhooks :woozy_face:

Sorry, real life got in the way.

Try this. The subflow itself is unchanged, I have had to add some extra logic around it. It is working solidly for me now..

[{"id":"5ed5dcc8ccf90bfa","type":"group","z":"bdd7be38.d3b55","name":"Publishing to MQTT","style":{"label":true},"nodes":["b4227ef182f4ebef","a500a548eac99f1f","e7642367d639d549","ea94f646fa8e74e1","c04777c7cc3990a0","81428d3bcae766ee","86207c4b47bf3bd8","c458a09df04445f7","753fc1cc8863871e","e54df0d3d9480ac9","d13ea5ab5d57a0a7","43a40f5eb76748ac","cf6c087e45a7fbbf","de21d8a733a9c353","2fb7f6196432da68"],"x":54,"y":4659,"w":1172,"h":322},{"id":"ae9c6995a156ff5c","type":"subflow","name":"Delivery subflow","info":"","category":"","in":[{"x":50,"y":81,"wires":[{"id":"0b0e1e2df36d2d3a"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"0b0e1e2df36d2d3a","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"89ab963214b94f59","port":0}]}},{"id":"e6a7f648cf7f127b","type":"inject","z":"ae9c6995a156ff5c","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":100,"y":181,"wires":[["0b0e1e2df36d2d3a"]]},{"id":"89ab963214b94f59","type":"status","z":"ae9c6995a156ff5c","name":"","scope":["0b0e1e2df36d2d3a"],"x":300,"y":160,"wires":[[]]},{"id":"0b0e1e2df36d2d3a","type":"function","z":"ae9c6995a156ff5c","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"b4227ef182f4ebef","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":810,"y":4840,"wires":[["a500a548eac99f1f"]]},{"id":"a500a548eac99f1f","type":"link out","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","links":["e7642367d639d549"],"x":1185,"y":4840,"wires":[]},{"id":"e7642367d639d549","type":"link in","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","links":["a500a548eac99f1f"],"x":135,"y":4800,"wires":[["e54df0d3d9480ac9"]]},{"id":"ea94f646fa8e74e1","type":"comment","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Send MQTT messages to this link","info":"","x":210,"y":4700,"wires":[]},{"id":"c04777c7cc3990a0","type":"mqtt out","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"936d2ec2ee69f50f","x":710,"y":4740,"wires":[]},{"id":"81428d3bcae766ee","type":"trigger","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"num","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":660,"y":4840,"wires":[["b4227ef182f4ebef"]]},{"id":"86207c4b47bf3bd8","type":"comment","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Adjust timeout time if necessary","info":"","x":690,"y":4800,"wires":[]},{"id":"c458a09df04445f7","type":"complete","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"MQTT Complete","scope":["c04777c7cc3990a0"],"uncaught":false,"x":180,"y":4940,"wires":[["753fc1cc8863871e","43a40f5eb76748ac"]]},{"id":"753fc1cc8863871e","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":400,"y":4840,"wires":[["81428d3bcae766ee"]]},{"id":"e54df0d3d9480ac9","type":"subflow:ae9c6995a156ff5c","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Guaranteed delivery test","env":[{"name":"maxQueue","value":"0","type":"num"}],"x":410,"y":4740,"wires":[["81428d3bcae766ee","c04777c7cc3990a0","cf6c087e45a7fbbf"]]},{"id":"d13ea5ab5d57a0a7","type":"join","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"key","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"1","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":770,"y":4940,"wires":[["de21d8a733a9c353"]]},{"id":"43a40f5eb76748ac","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"key: completed, save topic and payload","rules":[{"t":"set","p":"key","pt":"msg","to":"completed","tot":"str"},{"t":"move","p":"payload","pt":"msg","to":"payload.data","tot":"msg"},{"t":"set","p":"payload.topic","pt":"msg","to":"topic","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":520,"y":4940,"wires":[["d13ea5ab5d57a0a7"]]},{"id":"cf6c087e45a7fbbf","type":"change","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"key: sending, save topic and payload","rules":[{"t":"set","p":"key","pt":"msg","to":"sending","tot":"str"},{"t":"move","p":"payload","pt":"msg","to":"payload.data","tot":"msg"},{"t":"set","p":"payload.topic","pt":"msg","to":"topic","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":510,"y":4900,"wires":[["d13ea5ab5d57a0a7"]]},{"id":"de21d8a733a9c353","type":"function","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Check messages match","func":"/** Check that the message that is has been sent (completed) is the one that has just been passed to the MQTT node (sending)\n * payload.sending contains last msg sent to MQTT node\n * payload.completed contains msg indicated completed by MQTT node\n * in each case the payload contains topic of message and payload value in data\n */ \nconst previousKey = context.get(\"previousKey\") || \"\"\nconst thisKey = msg.key\n\n//node.warn(typeof msg.payload.sending.data)\nif (thisKey === \"completed\"  &&  previousKey === \"sending\") {\n    let equal = false           // whether messages are equivalent\n    // topics must be the same\n    if (msg.payload.sending.topic === msg.payload.completed.topic) {\n        if (typeof msg.payload.sending.data === \"string\") {\n            equal = compareDirect(msg.payload.sending.data, msg.payload.completed.data)\n        } else if (Buffer.isBuffer(msg.payload.sending.data)) {\n            equal = compareBuffers(msg.payload.sending.data, msg.payload.completed.data)\n        } else {\n            equal = compareJSON(msg.payload.sending.data, msg.payload.completed.data)\n        }\n    }\n    if (equal) {\n        msg.control = \"OK\"\n    } else {\n        msg.control = \"FAIL\"\n    }\n} else {\n    // out of sequence so ignore\n    msg = null\n}\ncontext.set(\"previousKey\", thisKey)\nreturn msg;\n\nfunction compareJSON(sending, completed) {\n    // compares JSON version of payload sent with that from the Complete node\n    return completed === JSON.stringify(sending)\n}\n\nfunction compareDirect(sending, completed) {\n    // directly compares payload sent with that from the Complete node\n    return completed === sending \n}\n\nfunction compareBuffers(sending, completed) {\n    // compares buffer payload sent with that from the Complete node\n    let answer = false\n    // check completed is also a buffer\n    if (Buffer.isBuffer(completed)) {\n        answer = Buffer.compare(sending, completed) === 0\n    }\n    return answer \n}","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":970,"y":4940,"wires":[["a500a548eac99f1f"]]},{"id":"2fb7f6196432da68","type":"link in","z":"bdd7be38.d3b55","g":"5ed5dcc8ccf90bfa","name":"Send to MQTT","links":[],"x":160,"y":4740,"wires":[["e54df0d3d9480ac9"]],"l":true},{"id":"936d2ec2ee69f50f","type":"mqtt-broker","name":"","broker":"localhost","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]

In the image you posted earlier you have three instances. Are you connecting to three different brokers or is the difference just the topic? If it is the topic then you just have to make sure that msg.topic includes the topic and you can send them all to the same guranteed delivery flow. In the MQTT node leave the topic blank.

1 Like

I'm testing it now and, at the moment, I can say that everything works as expected (with only one Guaranteed delivery node, as you suggested). I'm going on with cutoffs; let's see.

P.S: It seems we still have "disconnection" problem. After a cutoff of ten minutes, I restored connections and, through local Wi-Fi, connected to node-RED to see what was happening. Guaranteed node continued increasing its counter for another 7 minutes. When MQTT node realized it was disconnected till the exact moment that MQTT node “realized” it was disconnected and renegotiated its connection again.

P.S2: And it seems also we are affected by something similar to Schrodinger principle of uncertainty: if I do not monitor the node (if I check it after connection is available), it works; if I open a local browser tab to monitor it, it never works.

Did it then send all the queued messages?

What in particular never works? The operation of the MQTT node itself, or the sending of all queued data when it does eventually reconnect?

Yes it did.

Guaranteed node, if locally monitored, never increases its counter, never does a queue while connection is down.

Is it still sending the data to mqtt?

What status is shown on the node?

I cannot reproduce this behavior again. It seems to work flawlessly. At least, I’ve tried several times and the counter starts to increase after timeout. I’m still trying but I don't realize of any difference between now and an hour ago.

P.S. I think I can see the difference: It is not a good idea having several guaranteed nodes. Although I have taken your advice and used a single node, I still have my three nodes unconnected, but in the flow, just in case I need to go back. That’s the difference: I’ve remove them.

I suspect it was an issue with the browser cache, or similar. Not with the flow.

I don't think that would make any difference.

Hello again. This guaranteed delivery test works great, but I need some help again. I want to have persistence instead of storing the queue in memory.
I've modified settings.js by adding

        default: {
                module: ‘memory’
        },
        file: {
                module: ‘localfilesystem’
        }
}, ```

I guess I have to change also the node of the ‘State machine’ function but I'm doing something wrong.
To test that the persistence is correctly configured I have changed null to file

let store = env.get(‘contextStore’)
if (store === ‘default’) store = null

and I can see the file in ~./nodo-red/context. Of course, doing that never sends anything through MQTT. I've tried the following but it never works either

let store = env.get(‘contextStore’)
if (store === ‘default’) store = null
else store = ‘file’;

My solution (if it can helps others)

function handleFAIL(stat) {
    //node.warn("handleFAIL")
    // ignore if in wrong state
    if (stat.state === "waitingForOKFail") {
        //node.warn("state to waiting")
        // set it to watitingForMsg in order to trigger send 
        stat.state = "waitingForTrigger";  
        let store = "file";
        context.set("stat", stat, store);  // Almacena el estado en archivo
    } else {
        node.warn("Ignoring unexpected FAIL");
    }
}

I guess I have to do something more as saved data do not survive after a reboot. Any ideas?

I have not been following this thread, so dont know if your approach is valid, but to have context survive a reboot, you can

  • enable persistent context or
  • use a database and restore context from database at startup or
  • write to file and restore upon startup or
  • use one of the many contrib nodes that persist values or

If you double click the subflow instance then down the bottom is a field to tell the subflow which context store to use. All you should have had to do was to change that to 'file' and add the file storage into settings.js.
Put the code back as it was and try that.

Alternatively, I have now released the subflow as a node, @colinl/node-red-guaranteed-delivery and included the MQTT delivery example in the node's example, so you could use that instead.

1 Like

That was the first thing I tried but, as soon as I change ‘default’ to ‘file’, it stops sending via MQTT and starts storing in the file. I will give a try to your node. Thanks

Edit: your node works as a champ

Did you just replace the subflow with the new node, or pick up the complete example that comes with the node?

I've just replaced Edit subflow instance "Guaranteed delivery test" with your new node and deleted "Delivery subflow"

OK, that is good to know.

Perhaps you would like to rate it on the flows site (as linked earlier) so that others will know you find it useful.

Thanks.