MQTT data loss at reconnection

Hi everybody,

I have a problem with mqqt node (publish).

The flow is simple: I inject a timestamp (converted in date time format) every 1 second into the MQTT publish node (QOS =2 and Retain on server =True) and into debug console.

Then is cut the network connection and wait until I received the last will message from the brocker .

At the reconnection I received all the data generated during the disconnection (wich is great!!) but then I am missing the data generated during 10-15 secondes and it come back to real time.

Side node I am receving continuous data in the debug console while the entire process.

it look like the lost data are generated while the client is resending old data. Does anybody have an idea? May be I misconfigured a node? or is it a bug?

Regards,

Martin

I am having trouble following your description. Some questions.
How many computers are involved in this? There are three things going on:

  1. The device doing the publishing
  2. The device running the mqtt server
  3. The client device you are using to subscribe from
    Some of those may be the same device of course.
    Finally which network connection are you breaking?

Also a question for one of the experts. What happens to messages sent to the MQTT Out node when the node cannot communicate with the server? Are they buffered or rejected or what?

Hi colin,

I agree it was not clear:
I have one computer with node red doing the publishing (every second), one server running the broker (Mosquitto on synology) and one client connected to the broker (my phone).

I cut the connection between node red and the broker. My phone remain connected.

OK, are you expecting to see all the values published while the network is disconnected? If so are you expecting the mqtt node to save them an publish them all later? Suppose you were publishing thousands of messages a second and it was disconnected for days. If they were saved in node red you would run out of memory.

Yes it is the behaviour that I was expecting and it look like its working(without taking in account memory limitations). The problem is that while node-red is busy publishing the data stored when the connection was lost, realtime data are not stored and not send.

I suspect the node just buffers up a few then gives up. What happens if you leave the network connection out for a couple of minutes?
In fact I think that arguably it should not remember messages at all once the connection is lost. I can see it could cause a process problems if suddenly messages from hours ago are received.

Until this has been tested & behavior re-confirmed, it is difficult to judge if there is something wrong or not with the MQTT node implementation. What sounds to me a bit worrying is of course if fresher data gets lost in favor of pushing out older data. Is this really so?

I assume that during your tests you did break the network connection between Node-RED and the MQTT broker? If so, the MQTT broker was never involved in buffering and could never do anything about it, it was Node-RED itself that buffered the time stamp data messages in that situation

NR MQTT-node --------xxxxx--------- MQTT Broker -------------------- Your phone with MQTT Client

If you instead would break the network connection to your phone, I don't think you will lose the fresh data and you would also receive the last sent message (for each topic) with the retain flag set

NR MQTT-node ---------------- MQTT Broker -------xxxx-------- Your phone with MQTT Client

Yes only node red was storing the data .I will reconduct so test and publish the results

@mdresf has confirmed that it is the link between node red and the mqtt server that is getting broken. As I understand it what he is seeing is consistent with the MQTT Out node buffering up a number of messages and then discarding later ones. When he reconnects it then sends out the buffered ones followed by new ones as they arrive.
I don't know what behaviour the node was designed to do when it is disconnected and further messages arrive, and not certain what the ideal behaviour would be either.

Yes, this is fundamental to understand I think, so maybe Nick or someone with deeper knowledge reads this as well and may give their point of views

A first preliminary thought from my side would be that the node (when off-line) could/should behave like the broker does, i.e. if the retention flag is set, store the latest message per topic and then, when on-line again, send those to the broker. Only those last messages per topic, nothing else.

@mdresf just an add-on to the discussion.
When we use MQTT and we want to make sure that all messages are received, we normally use this node
node-red-contrib-safe-queue. It guarantees that all messages are delivered to your server. It stores the messages inside your hard-drive and only deletes after an acknowledge from the receiving part.

I think there is something wrong with the link, it doesn't work when clicked, you have to copy/paste instead.
How do you determine that the message has been successfully delivered to the MQTT server?

Sorry, just fixed the link :slight_smile:
To determine that the message was delivered the server has to reply with an acknowledge with the identifier of the message.
Basically the flow is:

  1. All messages have to go to the "queue in" node
  2. The "queue out" will send, in sequence, all messages that are stored in the queue
  3. After sending the message and ACK should come from the receiving part and it should go straight to the "queue ack" node. The "queue ack" will be responsible for deleting the message from disk.

OK, I thought you must be putting an id in with the message. If all you wanted was to guarantee that it got to the MQTT server I suppose you could just subscribe to the topic and use that to ack it. It would still need the id in the message of course.

Hi, how i can use this node (node-red-contrib-safe-queue)? Do you have an example? What node is the "Confirm of receipt" and "treat messages"? Where I can find?

"Confirm of receipt" and "treat messages" are generic names for what you can do with the messages. You can do something like this:
image

[{"id":"79b7eccc.7e0aa4","type":"tab","label":"Client","disabled":false,"info":""},{"id":"f52a4ff2.f5026","type":"inject","z":"79b7eccc.7e0aa4","name":"Message","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":140,"y":140,"wires":[["25f186d9.0cb17a"]]},{"id":"25f186d9.0cb17a","type":"queue in","z":"79b7eccc.7e0aa4","name":"queue in","config":"234672ff.22406e","sendError":true,"x":560,"y":140,"wires":[[]]},{"id":"514e8648.37a058","type":"queue out","z":"79b7eccc.7e0aa4","name":"","config":"234672ff.22406e","x":180,"y":220,"wires":[["dd0d1b35.f17ee8"]]},{"id":"bb53f59.2f93008","type":"mqtt out","z":"79b7eccc.7e0aa4","name":"Server","topic":"data","qos":"1","retain":"false","broker":"4509155d.f4762c","x":610,"y":220,"wires":[]},{"id":"dd0d1b35.f17ee8","type":"change","z":"79b7eccc.7e0aa4","name":"pack message","rules":[{"t":"set","p":"message","pt":"msg","to":"{}","tot":"json"},{"t":"set","p":"message.uuid","pt":"msg","to":"uuid","tot":"msg"},{"t":"move","p":"payload","pt":"msg","to":"message.payload","tot":"msg"},{"t":"set","p":"payload","pt":"msg","to":"message","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":420,"y":220,"wires":[["bb53f59.2f93008"]]},{"id":"aae3ea9e.d31268","type":"mqtt in","z":"79b7eccc.7e0aa4","name":"","topic":"data/ack","qos":"2","datatype":"auto","broker":"4509155d.f4762c","x":120,"y":280,"wires":[["6b40a0c8.a837e"]]},{"id":"6b40a0c8.a837e","type":"queue ack","z":"79b7eccc.7e0aa4","name":"","config":"234672ff.22406e","x":540,"y":280,"wires":[]},{"id":"234672ff.22406e","type":"queue config","z":"","name":"","storage":"fs","path":"safe-queue-folder","timeoutAck":"1000","startJob":true,"typeTimeout":"retry-times","typeError":"move-error","retryTimeout":"3","retryError":"","maxInMemory":""},{"id":"4509155d.f4762c","type":"mqtt-broker","z":"","name":"","broker":"lager","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

At the server side you should do something like this to send the ack back

[{"id":"11e4c166.e87c2f","type":"tab","label":"Server","disabled":false,"info":""},{"id":"612bc0e5.357b7","type":"mqtt in","z":"11e4c166.e87c2f","name":"","topic":"data","qos":"2","datatype":"json","broker":"4509155d.f4762c","x":150,"y":200,"wires":[["ef5aa389.1bc04","9d9bcc6c.00c7a"]]},{"id":"ef5aa389.1bc04","type":"change","z":"11e4c166.e87c2f","name":"Prepare ack","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.ack","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":240,"wires":[["668140dd.194ae"]]},{"id":"668140dd.194ae","type":"mqtt out","z":"11e4c166.e87c2f","name":"","topic":"data/ack","qos":"","retain":"","broker":"4509155d.f4762c","x":550,"y":240,"wires":[]},{"id":"794ac09f.7d142","type":"comment","z":"11e4c166.e87c2f","name":"send ack back to the clients","info":"","x":780,"y":240,"wires":[]},{"id":"bfd11dd7.83529","type":"comment","z":"11e4c166.e87c2f","name":"Treat the data in the server","info":"","x":770,"y":160,"wires":[]},{"id":"9d9bcc6c.00c7a","type":"change","z":"11e4c166.e87c2f","name":"Manipulate data","rules":[],"action":"","property":"","from":"","to":"","reg":false,"x":360,"y":160,"wires":[[]]},{"id":"4509155d.f4762c","type":"mqtt-broker","z":"","name":"","broker":"lager","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

Thanks for explanation!

Hello Tiago, I tried to deploy your scheme but unfortunately it doesn't work.. I was trying to find what is wrong but did not succeed.
In your example function Prepare ack returns "undefined" and "queue ack" doesn't get the acknowledgement at all.. :frowning:
Could you please help us to solve this issue as it is not clear what type of data is expected by queue ack module..
Thanks in advance,
Evgenii Borodin

Welcome to the forum @euborodin

This thread is two years old and there may well be better ways of doing what you want now. Please start a new thread explaining exactly what problem you are trying to solve and you will no doubt get some help.

I am closing this thread.