Node-red MQTT QOS does not work?

I am publishing an MQTT message every minute from Node-red on a Raspberry Pi to Mosquitto on another Pi, both on my LAN.

If I disrupt the connection (shutdown the broker Pi), the missed messages are not delivered when it restarts regardless of the QOS setting in Node-red.

According to the MQTT v5 documentation:

QOS 1 "At least once", where messages are assured to arrive but duplicates can occur.
QOS 2 "Exactly once", where messages are assured to arrive exactly once.

In order to comply with the MQTT specification, shouldn't the MQTT-Out node queue QOS 1 or 2 messages until the broker is available?

ps I have seen on steves-internet-guide.com

With node-red the client queues messages until it detects the connection failure 
after which it discards new incoming messages.

This does not seem to be true. When I shutdown the broker Pi, the node status remains at "Connected" for at least a couple of minutes. It only shows "Disconnected" for a few seconds after I reboot the Pi, but it does not deliver the supposedly queued messages

If you shutdown the broker then the publisher can't send any data - and by default there is no buffering/retry built into the sending client.

I have tried supporting local queuing by having a bit play time in the 3.1.0 beta branch few months back but the current version of MQTT.JS did not play nicely. I forget the details but something to do with the local store not being initialised until after connection (made the feature unusable for the time being)

It might see the light of day with MQTT.JS V5 just around the corner (but I havent tried)

OK I believe you - the usually reliable website I linked is wrong about queueing.

But there is surely something going wrong here:
Node-red 3.1.0 beta 3, MQTT-out node set to QOS 2 and MQTT 5
I pull the power on the Pi running mosquitto and Node-red still shows "connected" for almost 10 minutes, but it sends a message every minute.

QOS option is set to 2, so shouldn't the MQTT node do all the handshaking stuff and therefore know that the broker is offline?

If it neither queues QOS 2 messages when connected nor performs the QOS handshaking, what's the point of offering a QOS option?

@Steve-Mcl Does that mean QOS is known to be broken [with mqtt v5?] in 3.1?

MQTT is a transport - it sits on top of TCP/IP - so it sends the packets in good faith... as you have "broken the wire" connection to the broker it has to wait for the TCP level timeout to fail before it can assume the link is broken - even though in the meantime it continues to "send" other packets that arrive. Only once that timeout occurs and it gets no handshake (at the MQTT layer) can it detect the link really has gone and set its status appropriately.

Messages are queued at the broker so it on the subscriber side you would see the effect of QoS.

You are saying that QOS of the MQTT-Out node is merely setting a flag for the broker, yes I would like this message delivery guaranteed?

But MQTT-In has QOS too, What if the node publishing is set to QOS 2 and the node subscribing is set to QOS 0, or vice versa?

Actually queueing at the broker is confusing too, I think it only queues messages for connected subscribers. Not a NR issue of course.

In the above example the keep-alive time in the MQTT config node is 60 (sec?) yet it stays "connected" for several minutes. Sometimes anyway.
Don't you think this is a bug?

In v5, I have a vague memory of needing to set a positive value in session timeout (I forget the exact name of the property) on the broker config. This is the default behaviour according to MQTT.JS iirc

Something to do with reducing the chance of DoS due to flooding the broker with retain messages that are retained forever.

Sorry Steve, I only just noticed your reply.
I'll have to do some more reading on mqtt 5, was not aware of session expiry vs keep alive.

I was looking at another thread https://discourse.nodered.org/t/edge-to-cloud-computing/79815 about making data transfer resilient over network drop-outs.
I thought "Surely MQTT already does this?" but it seems not, except with the single most recent retained message (per topic). Hence this thread.

This flow might be useful for this. It queues MQTT messages until they are successfully sent to the broker, using the Complete state of the MQTT node to indicate that the message has been sent. I cannot confirm whether this can be absolutely guaranteed not to lose messages, but it does seem to do the job as far as I can see. At least it did when I tried it some time ago.

[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow master copy","info":"","category":"","in":[{"x":60,"y":80,"wires":[{"id":"6a3f78ab.f6b8e"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"6a3f78ab.f6b8e","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"6a3f78ab.f6b8e","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":280,"y":80,"wires":[[]]},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":180,"wires":[["6a3f78ab.f6b8e"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["6a3f78ab.f6b8e"],"x":300,"y":160,"wires":[[]]},{"id":"87d89aae.9b0db","type":"change","z":"85aecebd.19674","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":360,"wires":[["81c21dfd.874868"]]},{"id":"a12804d0.bbd0a","type":"change","z":"85aecebd.19674","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":460,"wires":[["81c21dfd.874868"]]},{"id":"81c21dfd.874868","type":"link out","z":"85aecebd.19674","name":"","links":["7ae5a5e5.be2f6c"],"x":765,"y":460,"wires":[]},{"id":"7ae5a5e5.be2f6c","type":"link in","z":"85aecebd.19674","name":"","links":["81c21dfd.874868"],"x":135,"y":320,"wires":[["7267f4ba.8104e4"]]},{"id":"fbd0d6c6.8eb88","type":"link in","z":"85aecebd.19674","name":"Email delivery","links":["2090b139.480446"],"x":75,"y":260,"wires":[["7267f4ba.8104e4"]]},{"id":"7267f4ba.8104e4","type":"subflow:149380c1.63e107","z":"85aecebd.19674","name":"Guaranteed delivery","env":[],"x":280,"y":280,"wires":[["3ba7e731.3b7e88","2df5a331.164ed4"]]},{"id":"1a0a6921.df8e9f","type":"comment","z":"85aecebd.19674","name":"Send MQTT messages to this link","info":"","x":150,"y":220,"wires":[]},{"id":"3ba7e731.3b7e88","type":"mqtt out","z":"85aecebd.19674","name":"","topic":"","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"e3d45312.a3103","x":750,"y":280,"wires":[]},{"id":"2090b139.480446","type":"link out","z":"85aecebd.19674","name":"","links":["fbd0d6c6.8eb88"],"x":475,"y":60,"wires":[]},{"id":"2df5a331.164ed4","type":"trigger","z":"85aecebd.19674","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"num","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":500,"y":360,"wires":[["87d89aae.9b0db"]]},{"id":"6326bb2.cf1eb44","type":"debug","z":"85aecebd.19674","name":"PUB","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":510,"y":120,"wires":[]},{"id":"4255ad9.f2b01d4","type":"comment","z":"85aecebd.19674","name":"Adjust timeout time if necessary","info":"","x":550,"y":320,"wires":[]},{"id":"26aef8d0.4b58d8","type":"inject","z":"85aecebd.19674","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"timestamp","v":"7","vt":"num"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"test/test","payload":"","payloadType":"date","x":290,"y":60,"wires":[["2090b139.480446","6326bb2.cf1eb44"]]},{"id":"984febcd.60b788","type":"comment","z":"85aecebd.19674","name":"Click to send test message to MQTT","info":"","x":280,"y":20,"wires":[]},{"id":"55d227e5.4520b","type":"complete","z":"85aecebd.19674","name":"MQTT Complete","scope":["3ba7e731.3b7e88"],"uncaught":false,"x":180,"y":460,"wires":[["a838d587.28a058","a12804d0.bbd0a"]]},{"id":"a838d587.28a058","type":"change","z":"85aecebd.19674","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":270,"y":380,"wires":[["2df5a331.164ed4"]]},{"id":"e3d45312.a3103","type":"mqtt-broker","name":"","broker":"owl2.local","port":"1883","clientid":"tigger","usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"tydwr/tigger/LWT","birthQos":"0","birthRetain":"true","birthPayload":"Online","birthMsg":{},"closeTopic":"tydwr/tigger/LWT","closeQos":"0","closePayload":"Offline","closeMsg":{},"willTopic":"tydwr/tigger/LWT","willQos":"0","willPayload":"Offline","willMsg":{},"sessionExpiry":""}]

I do also have a more complex example which does end to end checking that the message gets through, by adding an id to each message and sending a message back from the subscriber as each is received, if that would be of interest. A sort of DIY QoS 1.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.