Message Throttling/limits and Message Totals

Hello,

Hopefully, I can explain my issue. Please feel free to ask any questions.

I'm migrating millions of files from one volume to another. However, it's not a straightforward bulk copy of a file. The file's destination is based on data; the destination is dynamic based on the values.
Also, I'm reading the checksum on the source, copying the file, reading the checksum on the destination, and validating the file copied.

I have an inject node running on intervals, and I'm currently using a delay node to allow four messages a second. This has been working fine for the past few days.

However, recently, we have been copying some huge files, which have taken longer to transfer. Longer to read the source checksum, longer to copy, and longer to read the destination checksum.

Because the inject node keeps sending more files at an interval, eventually the CPU maxes out at 100% usage, and we lock up.

Is there a way to manage the backlog? In other words, if the count of messages that pass the delay node exceeds n, pause the inject node or just the queue and wait node in front to finish.

I've looked at the status node and can not figure out how to pause the inject node.

I really appreciate any help you can provide.

Harry

The simple flow described in this post shows how you can easily make the operations happen sequentially, not starting the next one until the previous has completed.

I don't think this is going to work for my issue

I have installed node-red-contrib-home-assistant-websocket through the pallete

There is no remove button?

How do I remove it, thanks.

Figure it out, I had to delete the server node, then the remove appeared.

Why will it not work?

Ok for clarification, I don't think I have skill set to get this to work.

From what I can see, it looking for triggers from automation, I could not figure out how to make it watch a exec node.

All you need is to know when it has finished so that you can release the next one.

OK, so I'll point out the rather large Rhino in the room.

Why are you using Node-RED for this task? There are far better tools for this kind of thing.

Trust me, with the complexity around this project, there is no better tool.

Each file needs to be evaluated and different routes taken.

I think I have found the solution, testing is positive

node-red-contrib-queue-gate

[{"id":"a46ee04e.b1a9b8","type":"inject","z":"e5c32195.7ce7b8","name":"open","topic":"control","payload":"OPEN","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":80,"wires":[["c54e919c.18d7f"]]},{"id":"c54e919c.18d7f","type":"q-gate","z":"e5c32195.7ce7b8","name":"q-gate demo","controlTopic":"control","defaultState":"open","openCmd":"open","closeCmd":"close","toggleCmd":"toggle","queueCmd":"queue","defaultCmd":"default","triggerCmd":"trigger","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","statusCmd":"status","maxQueueLength":"0","keepNewest":false,"qToggle":false,"persist":false,"x":410,"y":300,"wires":[["d2740316.7035c"]]},{"id":"c04db5ad.da76b8","type":"inject","z":"e5c32195.7ce7b8","name":"input","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":130,"y":300,"wires":[["c54e919c.18d7f"]]},{"id":"287fae2e.cb245a","type":"inject","z":"e5c32195.7ce7b8","name":"toggle","topic":"control","payload":"toggle","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":160,"wires":[["c54e919c.18d7f"]]},{"id":"8fd8943b.e60c28","type":"inject","z":"e5c32195.7ce7b8","name":"close","topic":"control","payload":"close","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":120,"wires":[["c54e919c.18d7f"]]},{"id":"44541499.4ac23c","type":"inject","z":"e5c32195.7ce7b8","name":"default","topic":"control","payload":"default","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":200,"wires":[["c54e919c.18d7f"]]},{"id":"92b01a9d.669a58","type":"inject","z":"e5c32195.7ce7b8","name":"queue","topic":"control","payload":"queue","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":360,"wires":[["c54e919c.18d7f"]]},{"id":"5f8708d.abc59f8","type":"inject","z":"e5c32195.7ce7b8","name":"flush","topic":"control","payload":"flush","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":440,"wires":[["c54e919c.18d7f"]]},{"id":"fb8c7d0f.40eec","type":"inject","z":"e5c32195.7ce7b8","name":"trigger","topic":"control","payload":"trigger","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":400,"wires":[["c54e919c.18d7f"]]},{"id":"b7688480.6338","type":"inject","z":"e5c32195.7ce7b8","name":"reset","topic":"control","payload":"reset","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":480,"wires":[["c54e919c.18d7f"]]},{"id":"44127021.988758","type":"inject","z":"e5c32195.7ce7b8","name":"peek","topic":"control","payload":"peek","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":520,"wires":[["c54e919c.18d7f"]]},{"id":"cc39c2df.44d0b8","type":"inject","z":"e5c32195.7ce7b8","name":"drop","topic":"control","payload":"drop","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":560,"wires":[["c54e919c.18d7f"]]},{"id":"a94f3db7.55ae18","type":"inject","z":"e5c32195.7ce7b8","name":"status","topic":"control","payload":"status","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":240,"wires":[["c54e919c.18d7f"]]},{"id":"d2740316.7035c","type":"debug","z":"e5c32195.7ce7b8","name":"output","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","x":570,"y":300,"wires":[]},{"id":"ae120d0f.ade318","type":"status","z":"e5c32195.7ce7b8","name":"q-gate status","scope":null,"x":410,"y":360,"wires":[["ea4d90f9.61a5b"]]},{"id":"ea4d90f9.61a5b","type":"debug","z":"e5c32195.7ce7b8","name":"status text","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"status.text","targetType":"msg","x":570,"y":360,"wires":[]}]

I'm sure it will work with a message queue gate.

Isn't it better not to repeat the inject?
Since the exec node returns the command's exit status as msg.payload.code, you could use that to trigger the next process loop.

You're correct, I'm not repeating the inject node.

I have replaced the delay node with the queue gate node.

The gate node is in queue mode, the inject runs at intervals, if the gate queue is less than 5,000 queued, it will add to the queue. Once the queue is at 5,000 it will drop the rest and they will be picked up next time the interval is called.

The file is copied, the checksum is taken, the source vs. destination checksum results (valid copy) are stored in a database, and then we send a trigger operation to the gate queue, which releases an item from the queue.

The queue will not release until the process is finished, the queue will always have at least 5,000 to process until the job is complete.

Small files will copy in milliseconds, hence the large queue, and the larger files will take longer. I can have four processes running at once.