Handling processes out of HTTP IN calls in parallel

I have a lengthy (in term of execution time) process triggered through an API endpoint (below /api).

Screenshot 2023-12-13 at 18.24.58

The process extracts data from an influxdb database and push the result through MQTT out node.

Here is a simplified version of it:

As part of the API call, the user can give the topic where to get the messages as well as two timestamps used to query influxdb.
The Check function splits the request in 5 minutes windows.

The number of resulting MQTT messages can be fairly large (10000 and more).
It looks like, despite the "process" being in a subflow, that the messages published on MQTT are fully sequential. All messages resulting from the 1st request to /api. Then, messages from 2nd request...
The MQTT broker where the messages are published is the same.

Is there a way to avoid user N°2 (making the 2nd API call) having to wait for all messages being sent to user N°1 (making the 1st API call) ?

Thanks and I hope it is clear.

2 possible Solutions

I may miss the point on both suggestions. Sorry.
What I am looking for is NOT to wait...
I guess that what I see is the "expected" behaviour, however, I don't understand why.
And, is there a way to avoid this sequential aspect ?


Sorry I read it as reverse - wanting to wait :face_with_peeking_eye:

I think the problem lies with the mqtt broker trying to handle all those messages and not necessarily with the http-in node (which can handle 1000's of requests). Is 10000 mqtt messages the proper way to communicate the data ?
Is there a specific reason for so many messages ? One single object/array/csv/xml etc makes more sense, mqtt can handle up to 256MB of data in a single message.

As bakman2 intimates, if you really want to do it this way, you probably need to tune your MQTT Broker. Most of us probably use Mosquitto as our broker and that has lots of settings you can use such as the max in-flight messages, max queued messages and bytes, etc.

mosquitto.conf man page | Eclipse Mosquitto

Also noting that Node.js is (largely) single threaded. While it is excellent at hiding the fact and reducing the impact, it is certainly possible to end up with queue of tasks to be completed.

I’m 100% sure it is not a MQTT broker limitation. I am using a cluster of MQTT brokers capable of supporting 10000s of messages per second.
I have another usage showing much higher message throughput than this one.
I can’t really merge the individual messages into groups (array or else) of messages as each message as a particular JSON structure.
I thought about the single threaded aspect. However, on this example, I am missing why single thread could lead to that… two api calls, two subflows should “hide” this. I think.

In order to start processing the second set of messages before the first set has completed, some of the messages would have to overtake other messages in the flow. I don't see where in your flow this might happen.

I don't see why that would make a difference. A instance of a subflow is just a set of nodes. If you send it multiple messages all the messages go through the same nodes.

What are you doing in a function node that takes 5 minutes?

That I know :slight_smile:

I can try to reexplain in a better way...

user1 -> accesses /api and as a result it extracts X messages from influxb -> X messages published on MQTT
user2 -> accesses /api and as a result it extracts Y messages from influxb -> Y messages published on MQTT

On the broker, I get all X messages and followed by Y messages. Even if request from user2 it sent immediately after request from user1, with user1 request still not completed when user2 request is made.

It doesn't. I extract messages published between timestamp t1 and timestamp t2. To avoid too large requests on influxdb (and too large answers), I split the t1 to t2 interval, in t1 to t1 + 5', then t1 + 5' to t1 + 10' and so on. And each smaller interval is then passed to influxdb to get the data from within that time window.

It is the morning for me, so may still not follow :crazy_face:

So you want all MQTT messages from X first as it got requested first?

I don’t use MQTT (much) - but from reading it does not guarantee message ordering?

I know... Mornings can be hard :slight_smile:
In fact, it is just the opposite.
I would think that based on the flows and if sending X messages takes on long time, I should have Y messages in the mix.
And it is not the case. Never.

Been reading this topic. I think the split and rate limit node are queuing the 2 message groups in one queue. The delay node can queue on topic, but will then drop messages. I think you will have to create a function or custom node that will queue the messages in separate queues and release from multiple queues alternately.

If your broker can handle that many messages, why do you need the rate limit at all ?
Otherwise then you need to split the calls into separate streams before passing to separate instances of the subflow then they will form their own queues and overlap the outputs into mqtt. Of course the trickiness then is how to split if both calls to the api "look" the same - maybe you need to implement a round-robin in your check function (and add another output) - or would it need to handle 3 requests etc etc,

Quite possible that might move the limiting factor from Node-RED to InfluxDB by bombarding it with requests (which happen over HTTP(s) of course.

Certainly something in the back of my mind about using async functions to decouple the requests but I need to focus on paying work and I can't quite see an answer.

Here is a simple flow that will perform a round robin so you can send the requests down parallel routes. Feed it into multiple instances of your subflow.

[{"id":"86fe714001428c3c","type":"inject","z":"bdd7be38.d3b55","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":240,"y":3300,"wires":[["71a3c63d1e6a1d89"]]},{"id":"71a3c63d1e6a1d89","type":"function","z":"bdd7be38.d3b55","name":"Determine route","func":"// Generate rolling route number in msg.route\nconst numRoutes = 4\nmsg.route = context.get(\"route\") || 0\nmsg.route = (msg.route + 1) % numRoutes\ncontext.set(\"route\", msg.route)\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":410,"y":3300,"wires":[["a38187b6c7efa35d"]]},{"id":"d8ed75dcf9a0b08d","type":"debug","z":"bdd7be38.d3b55","name":"debug 2468","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":810,"y":3260,"wires":[]},{"id":"a38187b6c7efa35d","type":"switch","z":"bdd7be38.d3b55","name":"Round Robin","property":"route","propertyType":"msg","rules":[{"t":"eq","v":"0","vt":"num"},{"t":"eq","v":"1","vt":"num"},{"t":"eq","v":"2","vt":"num"},{"t":"eq","v":"3","vt":"num"}],"checkall":"false","repair":false,"outputs":4,"x":590,"y":3300,"wires":[["d8ed75dcf9a0b08d"],["448afcc292d9011e"],["c78b05795ebcbe86"],["c4e8c7ae96346c2d"]]},{"id":"448afcc292d9011e","type":"debug","z":"bdd7be38.d3b55","name":"debug 2469","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":810,"y":3300,"wires":[]},{"id":"c78b05795ebcbe86","type":"debug","z":"bdd7be38.d3b55","name":"debug 2470","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":810,"y":3340,"wires":[]},{"id":"c4e8c7ae96346c2d","type":"debug","z":"bdd7be38.d3b55","name":"debug 2471","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":810,"y":3380,"wires":[]}]
1 Like

Thank you for all your valuable answers.

The nodered service is colocated with the broker (on a 10G LAN). So, sending data to the broker is super quick. However, consumers are remote. Some of them are using very limited Internet access. So, I prefer to buffer ahead of the broker, to avoid having to manage large queues on the broker or risking message loss.

I thought (apparently wrongly) that two calls to /api as there is no flow variable or global variable used, that this would result into two independent context that would run in parallel. Clearly a wrong assumption.

Exactly. It is also why I am splitting the requests in smaller parts to avoid getting back a huge payload.

Thanks for the tip and the proposed code. I'll try that.
The limitation I see here is that the number of output on the round-robin is fixed. It is manageable, though.

If that is the case then the internet link speed will naturally limit the message arrival rate to each client and the broker will buffer the messages (memory/disk space allowing) - but yes would be in order of arrival - so the round robin approach probably best if you want them to overlap.

The http In node does not spin up a process to handle it, it just sends a message along the wire. Just like an Inject node does, for example.

Just a thoughtful reminder. Just because you have Node-RED, not all problems are Node-RED shaped of course. :grinning:

If you have a specific bottleneck, that is the ideal time to think about the best approach. In this case, it might be sensible to think whether a parallel microservice might be part of the answer.

However, the reason why I am a big fan of NodeRed is that I am not a developer :slight_smile:
And thanks to Node Red and the contrib nodes, I can do (almost) everything without knowing any programming language.

1 Like