MQTT out node disconnects when topic field is empty

Hi, I have been looking into your MQTT reconnection issue.

Question

Can you tell me what broker type and version are you using? Mosquitto V1.6? V2?

Observations

I can see that your subflow generates an invalid topic...

image

As per MQTT spec the server disconnects the client and a reasonCode of 129 (PUBLISH with malformed UTF-8 String for topic was sent) is generated.


Here is a slimmed down test (about 30 secs, please watch)...
chrome_mBZQOnthHO

↑ As you can see, all the nodes (apart from EMQX) disconnect & dont re-connect.



@knolleary this ↓ looks like it may be a bug in 10-mqtt.js. I have looked back through commits back to before V5 support was added - i don't see where I might have missed this! Very odd.

Reading through the issues in MQTT.JS repo, I suspect we should be calling client.end() before the reconnect event. As it stands, we do, but not when the broker initiates the disconnect (disconnect event).

So I added the below code to node.client.on("disconnect", ... in packages/node_modules/@node-red/nodes/core/network/10-mqtt.js ...

  if (node.connected) {
    node.client.end(true, {});
    node.connected = false;
  }

... Resulting in this much better situation (about 30 secs, please watch)...

chrome_krBBSVgfDx

↑ You barely see the disconnection / reconnection - but it does happen....

NOTE: Moqsquito V1.6.x never re-connects - though a manual disconnect/connect does work

chrome_oHaa5QondJ



Request for help from you @bartoszek ...

  1. Please add the below code to node.client.on("disconnect", function(packet) { in packages/node_modules/@node-red/nodes/core/network/10-mqtt.js to include this...
    node.client.on("disconnect", function(packet) {
      if (node.connected) {    //<< add this
        node.client.end(true, {});    //<< add this
        node.connected = false;    //<< add this
      }    //<< add this
    }
    
  2. Restart node-red - does the issue continue or is it fixed?


Handling this issue / fixing your flow...

You should look to curb bad topics entering the MQTT node. As for why you are getting bad topics is not immediately obvious but I would simply use a switch node that checks the topic is not empty and doesnt contain \n, \t etc. Use an otherwise clause to log the (full) bad message for later inspection.



My test flow - if you are interested...

[{"id":"89fef359b4e8deb0","type":"inject","z":"eb81e71f9edd709c","name":"go","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":1370,"y":820,"wires":[["27d52903b6159003"]]},{"id":"a20fe7d4ca71f475","type":"mqtt out","z":"eb81e71f9edd709c","name":"PI mosq 2.x","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"7bab12a9.b04bcc","x":1590,"y":940,"wires":[]},{"id":"ae546d5b2aea26d9","type":"mqtt out","z":"eb81e71f9edd709c","name":"local mosq 1.6x","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"f03246cf.89b378","x":1600,"y":1000,"wires":[]},{"id":"d1bb24cddfc839b7","type":"mqtt out","z":"eb81e71f9edd709c","name":"broker.emqx.io","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"63e287c41d3182a5","x":1600,"y":1120,"wires":[]},{"id":"f5ead90074142b07","type":"mqtt out","z":"eb81e71f9edd709c","name":"test.mosquitto.org","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"a9a4ad4ffe3fe406","x":1610,"y":1060,"wires":[]},{"id":"47ed98a2be139c32","type":"debug","z":"eb81e71f9edd709c","name":"","active":true,"tosidebar":false,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"payload","statusType":"auto","x":1580,"y":820,"wires":[]},{"id":"a7ca9a8a930eed01","type":"debug","z":"eb81e71f9edd709c","name":"","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":1590,"y":880,"wires":[]},{"id":"27d52903b6159003","type":"function","z":"eb81e71f9edd709c","name":"replay bad msg","func":"return { \n    \"topic\": \"$join([\\\"homes\\\", $flowContext(\\\"_room\\\"), \\\"floor\\\", \\\"temperature\\\", \\\"get\\\"], \\\"/\\\")\\t\", \n    \"payload\": {}, \n    \"qos\": 1, \n    \"retain\": false, \n    \"_msgid\": \"3bc8639b5aa82cb0\" \n}\n","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1380,"y":880,"wires":[["47ed98a2be139c32","a7ca9a8a930eed01","a20fe7d4ca71f475","ae546d5b2aea26d9","d1bb24cddfc839b7","f5ead90074142b07","59bddb34689a8e32"]]},{"id":"59bddb34689a8e32","type":"mqtt out","z":"eb81e71f9edd709c","name":"broker.hivemq.com","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"780d4f68b1c8e137","x":1610,"y":1180,"wires":[]},{"id":"c8f1eb314fb19abb","type":"inject","z":"eb81e71f9edd709c","name":"connect","props":[{"p":"action","v":"connect","vt":"str"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1330,"y":960,"wires":[["ae546d5b2aea26d9"]]},{"id":"65843dd1926d8de7","type":"inject","z":"eb81e71f9edd709c","name":"disconnect","props":[{"p":"action","v":"disconnect","vt":"str"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1330,"y":1000,"wires":[["ae546d5b2aea26d9"]]},{"id":"7bab12a9.b04bcc","type":"mqtt-broker","name":"PI","broker":"192.168.1.61","port":"1883","clientid":"","autoConnect":true,"usetls":false,"compatmode":false,"protocolVersion":"5","keepalive":"59","cleansession":true,"birthTopic":"test/lwt/birth","birthQos":"0","birthPayload":"payload of test/lwt/birth","birthMsg":{"contentType":"application/json","userProps":"{\"prop\":\"birth-prop\"}","respTopic":"test/lwt/birthResponse","correl":"test/lwt/birth:Correl","expiry":"60"},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"f03246cf.89b378","type":"mqtt-broker","name":"","broker":"localhost","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"58","cleansession":true,"birthTopic":"demo/lwt/birth","birthQos":"0","birthPayload":"birth","birthMsg":{"contentType":"application/json","userProps":"{\"lh\":\"value\"}","respTopic":"demo/lwt/birthReply","correl":"correl for demo/lwt/birth","expiry":"123"},"closeTopic":"demo/lwt/close","closeQos":"0","closePayload":"close","closeMsg":{},"willTopic":"demo/lwt/will","willQos":"0","willPayload":"will","willMsg":{},"sessionExpiry":"","credentials":{}},{"id":"63e287c41d3182a5","type":"mqtt-broker","name":"test broker","broker":"broker.emqx.io","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"a9a4ad4ffe3fe406","type":"mqtt-broker","name":"","broker":"test.mosquitto.org","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"59","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"780d4f68b1c8e137","type":"mqtt-broker","name":"broker.hivemq.com","broker":"broker.hivemq.com","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"5","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""}]
1 Like