Wait for payload to finish flow before passing another payload

Hello all!
I would kindly appreciate any help or advice.

I have a flow that takes a payload from an excel file and splits the payload.
The number of new payloads that are created are dynamic and is not a static number.

Then, each payload goes through a series of nodes a couple of times (with a switch node), gets modified and eventually is written to an SQL table via an MSSQL node.

I would want that each payload in the flow waits for the previous payload to finish going through the entire flow/loop before proceeding on its own. How can that be possible?

Is this to maintain database integrity or to prevent messages/variables interfering with each other somehow within Node-red - variables passed by reference?

If it's for db integrity, I wonder if you can do something with transactions (I guess MSSQL has transactions - not my db vendor of choice).

How can you identify when a split payload is completely processed?

You can use the delay node in constant rate mode as a queue. Set the time to be longer than then loop would ever take. Then when a loop completes use that signal to set msg.flush =1 to release the next item.

2 Likes

Thank you for your response.
I would greatly appreciate if you could provide a small example to that.
Also, I join jbudd's question - what would indicate that the item has completed the loop?

I tried adding a delay node as you said and for instance I set it to send 1 msg per 30 seconds. What it actually did is hold all messages and release them all together after 30 seconds.

Much thanks again.

Perhaps you did not set it to Rate mode.

Also have a look at node-red-contrib-semaphore which can be used to allow only one message at a time through a section of flow.

I actually have, but will check again. I will also have a look at your recommendation. Thanks!

If the messages are coming out of the delay node, set to Rate Limit mode, quicker than the rate specified in the node then I shall be very surprised.

Here is a simple example just using a delay node in rate limit mode...

[{"id":"48d660b3a4109400","type":"inject","z":"d5b4a507fb8086e8","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":160,"y":150,"wires":[["e0f9e206681f3504"]]},{"id":"e0f9e206681f3504","type":"delay","z":"d5b4a507fb8086e8","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"minute","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":395,"y":150,"wires":[["e470f1d794e1bef9"]]},{"id":"943543cf7a1958e4","type":"change","z":"d5b4a507fb8086e8","name":"","rules":[{"t":"set","p":"flush","pt":"msg","to":"1","tot":"num"},{"t":"delete","p":"payload","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":485,"y":270,"wires":[["e0f9e206681f3504"]]},{"id":"e470f1d794e1bef9","type":"function","z":"d5b4a507fb8086e8","name":"Do something that takes a while","func":"\n\nsetTimeout(function() { node.send(msg)}, 3000)\nreturn null;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":680,"y":180,"wires":[["943543cf7a1958e4","690a432132f0cc08"]]},{"id":"690a432132f0cc08","type":"debug","z":"d5b4a507fb8086e8","name":"debug 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":850,"y":105,"wires":[]}]
1 Like

Thank you for your example. Oddly enough, the delay node in rate limit just sort of holds all of my payloads and then lets them go after the timer sets.

Either way, I have made some digging and found out my issue has another root:

If I generate X number of payloads in my flow, is there a way to sort their order in the flow? That may solve my case.

Do you mean that you have a sequence of messages and you want to re-order them, so later ones overtake earlier ones in the flow? That is complex as first you would have to collect them all together, sort them, and then send them on. Can you change the order in which they appear instead?

[Edit] Looking back I see you posted

Can you sort them before passing them on as a sequence of messages?

I concur with Colin on this... one of my favorite contributed nodes! And actually, it is configurable to allow any number of "in-flight" messages, not just 1 at a time. You can set the pool of tokens to any number, and once they are given out, the rest of the requests wait in line until another msg returns its token to the pool.

Works well with very little overhead -- as long as every node that takes a token eventually puts it back (hint: use a catch node for any nodes inside your loop that could throw an exception).

If you output an array of lines from your csv node, then use a sort node to rearrange the lines => split them into separate messages => push them all into the semaphore take node => process one at a time => use semaphore leave to return the token, you should be guaranteed to process them in order one at a time.

One thing I advise is to have an inject node connected to the semaphore release node so that when it locks up you can easily set it going again. If you do a partial redeploy whilst a message is in transit then this lockup can happen, as the semaphore has been acquired but will not be released again as node red was re-deployed. The alternative is to do a full redeploy if this happens.

There are some nodes, which takes an incoming message and puts it in a queue. Then the single messages are processed sequentially. So message 2 is processed only after message 1 has been processed.

https://flows.nodered.org/flow/43501a1b424434de0ffb

I suspect they will all suffer from the same problem with partial deploys.

The node-red-contrib-serial-iterator is no longer supported (at least its github page does not exist).

I have no experience with the loop node so I cannot comment on that one.

What exactly would be the problems of partial deploys from your point of view? I would be very interested in this so I can try it out on my end.

I only found the three nodes in a 10 second search. Maybe there are more recent versions or implementations.

A similar implementation could also be created with the function node. Using the context, the queue can even be persisted if needed.

I only wanted to show an alternative to the implementation with waiting times with the nodes. I don't like the approach of assuming a fixed estimated runtime as a delay. Either you wait unnecessarily to continue the flow, or the wait time is not sufficient. Either way, waiting based on an estimated wait time is inefficient and unreliable.

As I said, I would use the semaphore node for this, but the loop node looks as if it should work. One advantage of the semaphore is that you can have multiple end points to the protected section by using multiple release nodes. That can make the wiring less complex when it comes to using Catch nodes for when a node in the protected section throws an error of some sort. The catch can be fed directly into a release node rather than having to be fed back round to the start of the loop. For your case it may be that all you need to do is to add a semaphore acquire node at the start of the flow to be protected, a release node at the end, and catch any errors that may occur on the way through and direct those to release nodes too.

The problem with partial deploys is that once a message enters the loop (or the semaphore protected section) then no more can be released from the queue till that message is complete. If a partial deploy occurs then the message currently in the loop may get discarded so it never gets to the end in order to release the next message. I have not found it a big deal, just something to bear in mind.

Using the delay node as I showed may get around this as there is a built in timeout in the delay node - so that as long as that isn't the node being partial deployed then it will always timeout at the max time you set and free up for the next item... no need for these extra nodes as far as I can see :wink: (unless you really need multiple semaphores active at once...)

Yes, if you know there is a maximum time that is required for a message to be actioned, and are happy for all messages to be handled at this slow rate, then a Delay node in Rate Limit mode will do the job. Is that what you meant @dceejay?

@Colin
Thanks for the clarifications on the concerns with partial deploys. A node for loops would of course must have a timeout function integrated to process the next message in case of an error.
I had read over the suggestion with the semaphore node. I have to take a close look at the node and try it out. What I do not find so nice about the node, that again a dependence with the node is added. The loop nodes get along with correct programming without additional dependencies.

@dceejay
The delay node would have to pass a function in the msg object, which can tell the delay node that it can process the next message ahead of time. Thus the delay in the node would be a kind of timeout. This would make it possible to control further message processing in the flow. Continuous waiting until the delay has expired could be avoided, which would make the flow more efficient.
In addition, the user would have the choice. Either he always waits for the delay and does not call the function from the msg object, or he actively controls the further message flow via the function in the msg object.

I don't understand what you mean by that. Can you explain in more detail?