How to process any messaging queues synchronously in node-red

Hi,
I'm using Node-RED version: v1.3.7 and Node.js version: v14.17.5 .
Is their any packages or node-red core nodes exists in node-red which process AQMP messages synchronously.

Because i am facing issues, while processing large amount of RabbitMQ messages, so i wanted to process all the messages synchronously.

Thanks in advanced for any help!

Can you explain what you mean by that please?

What i mean is:
For e.g. I have 200 messages to process in one of the custom node, so the node is processing all the messages at the same time, so i wanted a node-red nodes or packages which send my each messages to custom node synchronously( one after the other).

Is their any predefined node in node-red or any package similar to package: node-red-contrib-amqp.
While useing the node-red-contrib-amqp package its requires the node.js version: between version 6 to version 8 and my current node.js version is v14.17.5.

Have you tried the split node?

what does the data coming out of node-red-contrib-amqp look like?

  • Is it an array of 200 things?
  • is it an object with 200 keys?
  • Is it sending out 200 separate messages?

Do you mean 200 messages, or 200 sets of data all in one message? Node red nodes all process just one message at a time, they are not processed at the same time.

Hi @Colin

I mean 200 messages, and all process of messages should be in synchronously.

Issues: currently i am facing issues is, while process 200 messages all at one time, some messages does not get process they got skiped.
Result i want: if with the help of some packages/library or nodes of nodered flows, i want them to process only one after the other.

If you mean that you have a section of flow and you want to wait till one message has completely passed through that section before the next one is allowed through then have a look at this example of how to do it. Set the time in the Queue node to a greater time than your flow should ever take to run through. If something locks up then it will release the next message after that time in order to let things keep going.

[{"id":"b6630ded2db7d680","type":"inject","z":"bdd7be38.d3b55","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":120,"y":1700,"wires":[["ed63ee4225312b40"]]},{"id":"ed63ee4225312b40","type":"delay","z":"bdd7be38.d3b55","name":"Queue","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"minute","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":290,"y":1700,"wires":[["d4d479e614e82a49","7eb760e019b512dc"]]},{"id":"a82c03c3d34f683c","type":"delay","z":"bdd7be38.d3b55","name":"Some more stuff to do","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":780,"y":1700,"wires":[["7c6253e5d34769ac","b23cea1074943d4d"]]},{"id":"2128a855234c1016","type":"link in","z":"bdd7be38.d3b55","name":"link in 1","links":["7c6253e5d34769ac"],"x":75,"y":1780,"wires":[["3a9faf0a95b4a9bb"]]},{"id":"7c6253e5d34769ac","type":"link out","z":"bdd7be38.d3b55","name":"link out 1","mode":"link","links":["2128a855234c1016"],"x":645,"y":1780,"wires":[]},{"id":"b23cea1074943d4d","type":"debug","z":"bdd7be38.d3b55","name":"OUT","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":650,"y":1620,"wires":[]},{"id":"d4d479e614e82a49","type":"debug","z":"bdd7be38.d3b55","name":"IN","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":450,"y":1620,"wires":[]},{"id":"3a9faf0a95b4a9bb","type":"function","z":"bdd7be38.d3b55","name":"Flush","func":"return {flush: 1}","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":170,"y":1780,"wires":[["ed63ee4225312b40"]]},{"id":"7eb760e019b512dc","type":"function","z":"bdd7be38.d3b55","name":"Some functions to be performed","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":530,"y":1700,"wires":[["a82c03c3d34f683c"]]}]

Not sure I am on the right track, but....

Another interesting approach (and one I use)
is using node-red-contrib-semaphore

its not perfect, but does allow to signal when another message can pass through the network.
setting the token bucket to 1, will effectively only allow 1 message at a time. That is when you signal the last message has been processed

The example I posted does effectively the same as the semaphore nodes with two advantages, firstly it uses just core nodes, and secondly it has a built in recovery option if something goes wrong. It is easy to get the semaphore nodes locked up. For example a redeploy while there is a message in progress can cause it. I used to use the semaphore nodes but I have moved all mine across to use the technique I posted.

1 Like

Was the flush 1 in the rate limit delay node introduced after Node-red 1.3.7? as the OP is on 1.3.7. I seem to recall it was V2

Nice!
I may steel this for myself!

But I agree, the semaphore node can be temperamental at times!

I don't know, in so he/she will soon find out. My test flow can be imported and tested to see if it works. I think the example has a 5 second delay in the 'do something' section so it should let one message through every 5 seconds.

@shubham-credence can you not upgrade? That is a very old version.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.