MQTT broker messages getting lost

Hello,

I've created a test case with NR to check how our broker can handle big quantities of data.
In flow bellow there are 10 flows each containing, 200 clients, every client sends 20topics.

I cannot copy paste all flows. so here is the sample part:

[{"id":"44a3b07ac9e44468","type":"subflow","name":"GenerateMessage","info":"","category":"","in":[{"x":100,"y":120,"wires":[{"id":"8ad30a3498d875b1"}]}],"out":[{"x":440,"y":60,"wires":[{"id":"8ad30a3498d875b1","port":0}]}],"env":[{"name":"IP","type":"str","value":"192.168.0.82"}],"meta":{},"color":"#DDAA99"},{"id":"69f649fe30b787d6","type":"mqtt out","z":"44a3b07ac9e44468","name":"","topic":"","qos":"2","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"44a2b6db4f9d9282","x":670,"y":120,"wires":[]},{"id":"8ad30a3498d875b1","type":"function","z":"44a3b07ac9e44468","name":"","func":"num= msg.payload;\n\nfor (let i = 0; i < 20; i++) {\n  msg.payload=\"Hello_Sub\"+i.toString();\n  msg.topic = \"tgw/data/lika/\"+i.toString();\n  node.send(msg);\n//   return msg\n} \n\n","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":280,"y":120,"wires":[["69f649fe30b787d6"]]},{"id":"44a2b6db4f9d9282","type":"mqtt-broker","z":"44a3b07ac9e44468","name":"Emqx","broker":"${IP}","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"88370236fb6586b4","type":"inject","z":"dabcedc3b1e417af","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"0.5","crontab":"","once":false,"onceDelay":0.1,"topic":"count","payload":"50","payloadType":"num","x":100,"y":100,"wires":[["7d830749219bf7b7"]]},{"id":"7d830749219bf7b7","type":"function","z":"dabcedc3b1e417af","name":"","func":"switch (msg.topic){\n    case \"count\":\n        var count = flow.get('count') || 0;\n        count ++\n        flow.set('count',count)\n        if (count<=msg.payload){\n            msg.payload = \"Count:\"+count.toString()\n            node.status({text:count});\n            node.send(msg)\n        }\n        break;\n    \n    case \"Reset\":\n         count = 0;\n         flow.set('count',count)   \n        //  msg.payload = count;\n         node.status({text:count});\n        //  return msg\n         break;\n}\n","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":280,"y":120,"wires":[["1319f4fc81e4b6d9","3838ed7becc39972","6396444e59f3b26f","ba6e0aac211a76f5","a253e19d2534eae3","4ed33fcfb99920e1","20df8be1f4339ea9","7d8f9f464e274361","a431f443ae419580","b06ba81b4017e9e6","cc7c84f04046d29a"]]},{"id":"bb66ccfcfcf93772","type":"inject","z":"dabcedc3b1e417af","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Reset","payload":"0","payloadType":"num","x":110,"y":140,"wires":[["7d830749219bf7b7"]]},{"id":"cc7c84f04046d29a","type":"link out","z":"dabcedc3b1e417af","name":"call","mode":"link","links":["8f0d4999f7e1e16c","27c534cd9ea95588","8765b5b5d4402e68","fdc0429e22c5a5f5","3db2f23b12a1d4c5","08edd9d884a58c67","9695754fa21c5f52","db6006b73c7e6015","07cf904ebf360c63"],"x":535,"y":80,"wires":[]},{"id":"1319f4fc81e4b6d9","type":"function","z":"dabcedc3b1e417af","name":"","func":"msg.payload=msg.payload;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":280,"y":180,"wires":[["0e796608280f77d2","34e209a8279cdd64","9379e6f2e794f844","1a0e8356b5d564e3","fe643e0b5ac15013","1b4f308e71800b30","663ee8d6bef13f34","091e87a3dfd5b6a2","86657e2dee198411","bd4e1a9fb9628f01","ef21a73d5aa0db19","f823872b97e701f6","608125ac7bb84629","fefd0f44445aad22","8ab0dc9b4adc6df4","411d6b98c6a77679","574103df7acf3245","1aceda39df938bc9","cfc5515e5d44e671","f19717774b97d80a"]]},{"id":"0e796608280f77d2","type":"subflow:44a3b07ac9e44468","z":"dabcedc3b1e417af","name":"","x":470,"y":160,"wires":[[]]}]

Strange thing is that everything works fine with QoS0 and QoS1, but when QoS2 is selected, it generates some number of message batches (Clients * Topics) and then stops sending the messages.

It starts generating at rate: 1-2 batches per second, counter goes up and messages keep coming to the broker. After 25-40 (in that range mostly, very rarely it manages to send everything) counter finishes remaining count in less than second and no messages arrive.

Bigger number of topics in every client, the faster it "breaks"

Any ideas why this situation only happens with QoS2?

Hi, if you could share the full demo flow (attach as file) I will take a look.

As for reasons - there could be many factors including software bugs (in node-red, mqtt.js, broker), the hardware (small scale RPi/RISC vs AMD/Intel/CISC etc), network (wired vs wireless including noise and traffic), broker location (local vs cloud)

Lastly, as I am certain you know, QoS2 will tax the system(s) hardest as there is more to do and more CPU+Memory is required to monitor acks, and remember the send once only state etc.

Thanks for taking a look into it,

Nodered v2.2.2
Broker Emqx Download EMQX | EMQ
Everything setup under the same Ubuntu Virtual Machine.

I understand that QoS2 will tax the system hardest thats why I wanted to test it. We don't plan to use it, but just wanted to share results of experiment.

If it helps anybody:
2000000 messages total:

  • 200 clients/200topics/QoS0/ 50 batches - Time to send 58s, Time to arrive at broker 118s

  • 200 clients/200topics/QoS1/ 50 batches - Time to send 75s, Time to arrive at broker 359s

  • 2000 clients/20 topics/ QoS0/ 50 batches - Time to send 93s, Time to arrive at broker 122s

  • 2000 clients/20 topics/ QoS1/ 50 batches - Time to send 97s, Time to arrive at broker 375s

MultipleClients.json (307.4 KB)

I saw that it takes 3 times longer for broker to crunch all messages in QoS1 so wanted to see how will it react to QoS2.

I don't understand why there is a gap between sending the messages and arriving at the broker. There is only a network in between.

Thanks for sharing.

Would you mind detailing the spec of the virtual machine and the host please?
In particular, how much memory and CPU were allocated to the guest. Also, did you happen to notice or log CPU utilisation while running the tests? Thanks

@Colin I believe broker cannot handle that many requests coming to it.

As it is shown in here when processor cannot process queue starts to fill up.

I believe I have this behavior, because I'm sending ~33000msg/second.

@Steve-Mcl VM has 16Gb of RAM and 1 processor allocated.
Host:
image

1 Like

Have you used a network monitor tool to check whether the delay is in the MQTT Out node or in the broker itself?

Actually you should be able to see which process is hogging the processor by using top or a similar tool.

No I haven't used any external tools. Just to clarify when I say processor I mean, broker processor.

Anyway, the problem is not how to increase throughput. but why specifically QoS2 messages get lost.

Is the same if you try mosquito? If not, maybe a problem/limitation with the broker

If this is for professional usage coping with heavy loads, many messages, maybe look at ActiveMQ as broker. We used that for high demands in critical application

QoS transactions are a lot heavier in terms of processor utilisation and comms. Presumably the messages are not able to be processed in time so eventually a buffer overflows somewhere and they get lost. Part of the reason I suggested checking the processor utilisation is to check whether it is the broker or the client end that is losing the messages. Trying an alternative broker is certainly worth doing.

If you don't want to loose messages, maybe you should try another message broker. There is a solution where one can transfer the MQTT messages into KAFKA messages. There is an OpenSource solution for the whole setup from UMH. You can find advice regarding the installation in their documentation (linked). And there also is a learning website with some useful hints as well as a discord channel where their team answers questions and collects feedback. Maybe this could help to keep track of every single message. (In detail it is the UMH microservice "mqttkafkabridge" that will do the job.)

@KarolisL having imported your large flow and had a play, I cen see the following...

  1. You are triggering MQTT 20 messages to 2000 MQTT client connections
    • in total, 40000 messages
  2. You have a 0.5 pulse repeating this 50 times
    • in total, 2,000,000 messages

In my testing, on an i7 windows 11 box (no VM, but lots of dev time stuff running in background (the day job)) by adding some logging, I could see ...

Lets round that down to 2 secs for sake of easy maths - i.e. Nodejs/Node-RED is pushing 20000 messages per sec (10000 per 500ms) though your flows. Lets use that to see what is happening under the hood...

  • Every 0.5s you queue up 40000 messages in the nodejs stack
  • at 0.0 sec mark, 40000 are queued
  • at 0.5 sec mark, 10000 have been delivered and another 40000 are queued (total queued now 70000)
  • at 1.0 sec mark, 20000 have been delivered and another 40000 are queued (total queued now 100000)
  • at 1.5 sec mark, 30000 have been delivered and another 40000 are queued (total queued now 130000)
  • at 2.0 sec mark, 40000 have been delivered and another 40000 are queued (total queued now 160000)

Based on that, by the time the 50th iteration is triggered, you have about 1440000 pieces of data queued in memory.

What I am trying to say is, this is not a real test of the broker, more a test of NODEJS, MQTT.js and Node-RED throughput. Yes, it may well be the MQTT.js client library or even node-red that is the bottle neck but that is still not the broker limiting things.

If you really want to stress test the EMQX broker, then use something like emqtt_bench - or believe the benchmarks

Based on that, by the time the 50th iteration is triggered, you have about 1440000 pieces of data queued in memory.

Thanks it makes sense. But I've ran the with mqtt_bench tool before nodered and had similar results .
Here is a short snipped out of the log:

58s pub total=1731199 rate=31447/sec
58s pub_overrun total=1712419 rate=31441/sec
59s pub total=1760276 rate=29077/sec
59s pub_overrun total=1741496 rate=29077/sec
1m0s pub total=1791528 rate=31252/sec

It shows output at discreet 1s intervals and got similar results: After benchmark tool was stopped, dashboard displayed 405621 messages received out of 1802746. It took, ~47s for all remaining messages to arrive.

The same behavior with 2 different tools that's why I stayed with node-red, being able to have more controlled environment.

I did a test with this tool and QoS2, this was my command:
kl@kl-VirtualBox:/opt/emqx$ emqtt_bench pub -t test/lika/data/%i -h localhost -s 250 -q 2 -c 500 -n 100 -I 1 but received this error from a beginning.

emqtt(kl-VirtualBox_bench_pub_7_2815588628): State: connected, Unexpected Event: (cast, {mqtt_packet,
{mqtt_packet_header,
5,
false,
0,
false},

{mqtt_packet_puback,
11,
16,
#{}},
 undefined})

since I'm not experienced enough to understand what was going in here I did the same test with QoS2 with nodered. It was able to generate messages for some time.

I think we got too deep into this. I wanted to test how broker will react in real life situation when worst case scenario might appear - ~10000 clients will send 3-4 topics when some kind of global alarm is triggered.

1 Like

So is this question actually anything to do with node-red?

Yes, why would MQTT QoS2 messaging break down in nodered after sending some data.

QoS0 - just shoots and forgets. - makes sense
QoS1 - has PUBACK message, so it takes longer longer to process all messages
QoS2 - has even more Acknowledgments so I expected even longer wait times, but instead it started dropping messages. I will make a video on Monday to better explain.

This is my only question related to nodered

No need for videos.

So are you saying you do not see the same problem with the other tool and QoS2? If you don't see it with that tool then it is a node-red issue and not the broker. If you do see it with the other tool then it is not node red.

I have a theory.

Since the nodejs event queue is getting thoroughly flooded, it's possible the keep alive between client and broker is not being serviced and this a disconnect is occurring.

If you do your tests with mosquito you will be able to monitor the $ topics and see the connections drop.

As a test, try your qosw test again but set your timer to poll every 10s. Also, add some counters (global variables) to monitor the count of messages sent into the subflow and use complete and catch nodes to watch the MQTT node & count the number of completes and errors. Then tweak your inject frequently till it fails - see how many messages are in the (node-red) system

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.