Hi, I have been looking into your MQTT reconnection issue.
Question
Can you tell me what broker type and version are you using? Mosquitto V1.6? V2?
Observations
I can see that your subflow generates an invalid topic...
As per MQTT spec the server disconnects the client and a reasonCode of 129 (PUBLISH with malformed UTF-8 String for topic was sent) is generated.
Here is a slimmed down test (about 30 secs, please watch)...
↑ As you can see, all the nodes (apart from EMQX) disconnect & dont re-connect.
@knolleary this ↓ looks like it may be a bug in 10-mqtt.js. I have looked back through commits back to before V5 support was added - i don't see where I might have missed this! Very odd.
Reading through the issues in MQTT.JS repo, I suspect we should be calling client.end()
before the reconnect event. As it stands, we do, but not when the broker initiates the disconnect (disconnect
event).
So I added the below code to node.client.on("disconnect", ...
in packages/node_modules/@node-red/nodes/core/network/10-mqtt.js
...
if (node.connected) {
node.client.end(true, {});
node.connected = false;
}
... Resulting in this much better situation (about 30 secs, please watch)...
↑ You barely see the disconnection / reconnection - but it does happen....
NOTE: Moqsquito V1.6.x never re-connects - though a manual disconnect/connect does work
Request for help from you @bartoszek ...
- Please add the below code to
node.client.on("disconnect", function(packet) {
in packages/node_modules/@node-red/nodes/core/network/10-mqtt.js
to include this...node.client.on("disconnect", function(packet) {
if (node.connected) { //<< add this
node.client.end(true, {}); //<< add this
node.connected = false; //<< add this
} //<< add this
}
- Restart node-red - does the issue continue or is it fixed?
Handling this issue / fixing your flow...
You should look to curb bad topics entering the MQTT node. As for why you are getting bad topics is not immediately obvious but I would simply use a switch
node that checks the topic is not empty and doesnt contain \n
, \t
etc. Use an otherwise clause to log the (full) bad message for later inspection.
My test flow - if you are interested...
[{"id":"89fef359b4e8deb0","type":"inject","z":"eb81e71f9edd709c","name":"go","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":1370,"y":820,"wires":[["27d52903b6159003"]]},{"id":"a20fe7d4ca71f475","type":"mqtt out","z":"eb81e71f9edd709c","name":"PI mosq 2.x","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"7bab12a9.b04bcc","x":1590,"y":940,"wires":[]},{"id":"ae546d5b2aea26d9","type":"mqtt out","z":"eb81e71f9edd709c","name":"local mosq 1.6x","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"f03246cf.89b378","x":1600,"y":1000,"wires":[]},{"id":"d1bb24cddfc839b7","type":"mqtt out","z":"eb81e71f9edd709c","name":"broker.emqx.io","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"63e287c41d3182a5","x":1600,"y":1120,"wires":[]},{"id":"f5ead90074142b07","type":"mqtt out","z":"eb81e71f9edd709c","name":"test.mosquitto.org","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"a9a4ad4ffe3fe406","x":1610,"y":1060,"wires":[]},{"id":"47ed98a2be139c32","type":"debug","z":"eb81e71f9edd709c","name":"","active":true,"tosidebar":false,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"payload","statusType":"auto","x":1580,"y":820,"wires":[]},{"id":"a7ca9a8a930eed01","type":"debug","z":"eb81e71f9edd709c","name":"","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":1590,"y":880,"wires":[]},{"id":"27d52903b6159003","type":"function","z":"eb81e71f9edd709c","name":"replay bad msg","func":"return { \n \"topic\": \"$join([\\\"homes\\\", $flowContext(\\\"_room\\\"), \\\"floor\\\", \\\"temperature\\\", \\\"get\\\"], \\\"/\\\")\\t\", \n \"payload\": {}, \n \"qos\": 1, \n \"retain\": false, \n \"_msgid\": \"3bc8639b5aa82cb0\" \n}\n","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1380,"y":880,"wires":[["47ed98a2be139c32","a7ca9a8a930eed01","a20fe7d4ca71f475","ae546d5b2aea26d9","d1bb24cddfc839b7","f5ead90074142b07","59bddb34689a8e32"]]},{"id":"59bddb34689a8e32","type":"mqtt out","z":"eb81e71f9edd709c","name":"broker.hivemq.com","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"780d4f68b1c8e137","x":1610,"y":1180,"wires":[]},{"id":"c8f1eb314fb19abb","type":"inject","z":"eb81e71f9edd709c","name":"connect","props":[{"p":"action","v":"connect","vt":"str"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1330,"y":960,"wires":[["ae546d5b2aea26d9"]]},{"id":"65843dd1926d8de7","type":"inject","z":"eb81e71f9edd709c","name":"disconnect","props":[{"p":"action","v":"disconnect","vt":"str"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1330,"y":1000,"wires":[["ae546d5b2aea26d9"]]},{"id":"7bab12a9.b04bcc","type":"mqtt-broker","name":"PI","broker":"192.168.1.61","port":"1883","clientid":"","autoConnect":true,"usetls":false,"compatmode":false,"protocolVersion":"5","keepalive":"59","cleansession":true,"birthTopic":"test/lwt/birth","birthQos":"0","birthPayload":"payload of test/lwt/birth","birthMsg":{"contentType":"application/json","userProps":"{\"prop\":\"birth-prop\"}","respTopic":"test/lwt/birthResponse","correl":"test/lwt/birth:Correl","expiry":"60"},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"f03246cf.89b378","type":"mqtt-broker","name":"","broker":"localhost","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"58","cleansession":true,"birthTopic":"demo/lwt/birth","birthQos":"0","birthPayload":"birth","birthMsg":{"contentType":"application/json","userProps":"{\"lh\":\"value\"}","respTopic":"demo/lwt/birthReply","correl":"correl for demo/lwt/birth","expiry":"123"},"closeTopic":"demo/lwt/close","closeQos":"0","closePayload":"close","closeMsg":{},"willTopic":"demo/lwt/will","willQos":"0","willPayload":"will","willMsg":{},"sessionExpiry":"","credentials":{}},{"id":"63e287c41d3182a5","type":"mqtt-broker","name":"test broker","broker":"broker.emqx.io","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"a9a4ad4ffe3fe406","type":"mqtt-broker","name":"","broker":"test.mosquitto.org","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"59","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"780d4f68b1c8e137","type":"mqtt-broker","name":"broker.hivemq.com","broker":"broker.hivemq.com","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""}]