A new type of flow

First got to share appreciation of node red for how easy it is to jump into and all the reasons why it's so popular. Now I would like to share some thoughts on a closely related use-case I got from a similar low-code system called Pentaho ETL (mostly dead and abandoned these days unfortunately). These work very similar in most aspects, but I'd like to share a key difference:

Node red is stream focused, where each flow runs forever, typically continuously listening for input. What about a new type of flow which has a set start (and eventually an end)? Ie. it will generate a number of input messages (for example 1), but then that message will go through all the nodes (possibly generating more messages along the way), but eventually it all completes and the flow exits/stops.

Previously I'd schedule this kind of program from cron, but of course it could be scheduled from another flow within node red itself.

Main benefit is you can always know what the last message is. Take for example a flow which injects a message once per day, then that message triggers delete job to delete old log files. Eventually I get one message per file. I can write to log when it starts and each log file is deleted. But can't log when it ends (at least not in any feasible way).

With this new type of flow with a defined start and end, it is also a lot easier to aggregate / group messages on certain fields to calculate sum/max/min/avg etc. Node red can do something similar, but much more difficult since the system is designed completely different, running forever.

Not sure if this made sense or not? Just thinking outside the box here, perhaps out of scope for node red, but it is the biggest feature I'm missing.

This is already achievable. Just use something like node-red-contrib-cron-plus (node) - Node-RED and do what you need, once per day (or whatever)

How then can I log when the flow is finished? Ie. what is the last message. Because my understanding of node red is that a flow never finishes. There is no "last message".

I can recreate easily the scheduler, when it starts. Also have that one message end up in a message per file (to be deleted) for example. But I can't log when the job to delete old log files is finished/stopped.

Same would be good for a lot of use-cases. Most importantly aggregation of messages (group by): Sum, min, max, avg etc.

That entirely depends on how/what you enter in your flow.

there is if you write your flows to know when the last message of that operation occurs.

Why not? Write your flow so that the last node/operation stores a flag or result in context.


Node-RED is a FBP tool with event based flows. It is how you work that matters.

You can easily store in context at the (end of the flow(s)) with any info you desire (i.e. the result of your operation)

e.g. cron (13:00 every day) -> node -> node -> store_result_in_context

Perhaps if you share a flow of what you are doing you can get specific advice.

1 Like

Yeah you can do the same, but you would have to do a lot of custom logic stored in context to make it work. Ie. more code, more complications, room for more bugs etc. How easy is it to aggregate in node red for example? What I'm proposing is a different flow where all this is automatically included and available. No need for context or additional logic.

I guess what I'm proposing is something like batch processing vs node red's continuous stream.

For example with the file deletion flow as an example:

Inject node (sends 1 message once per day) --> Log start --> Get files in folder (generates x amount of messages) --> Delete 1 file per message (x amount of messages) --> Log each file deleted (x messages) --> Log stop (for the last of x messages)

How would you go about logging when this program ends? The last of x messages. You could do it in context, that's an additional node, then writing a timestamp and increment message count to context. Then fetch that data in the end to do logic to count messages again to identify the last meesage. And then set a flag to that whatever, then have another node afterwards filter on that flag. It's a lot of additional logic. And fragile in case some messages disappear along the way (error thrown along the way due to unavoidable uncertainty of I/O).

For that example flow I would do:

  1. Inject
  2. Log Start
  3. Get files in folder, send one msg with payload containing array of files names
  4. Split node to convert that array into a message sequence - one msg per filename
  5. Delete file
  6. Log
  7. Join node to conver the msg sequence back to a single message
  8. Log End

By converting the single message to a message sequence with the Split node, we can use the Join node to wait until all messages in that sequence have been 'handled'. No need for context.

3 Likes

The node I found to get files and return one message per file, so don't need to split an array:

You mention join node, but doesn't that require a node beforehand to add parts object? And that requires knowing how many messages there are. Which I don't. So how to use join when parts object is not set? And no way of know how many messages there are? In batch-processing this is easy and automatic. You get it for free. I don't know how to do that in node red.

Regardless, you describe a hack around the fact that this sort of behaviour isn't directly supported. In my proposal identifying the last file is just as easy regardless of whether messages are in an array or not. Each node will always know itself what the last message is and when to stop. Doesn't need complicated joins and context stores.

I thought about doing something similar to what you proposed. But then I thought it was just too much work and not worth it. So my program runs without logging when it stops.

If there are msg.parts the the join can be automatic. (if you wish to deletes msg in the split do so with the switch node and recreate message sequence)
You can use a split node after fs node to split the file, or use the built in file read node.

That will come from the Split node.

But there are no split node here?!? There is nothing to split really. The node to get files in folder just spits out individual messages.

What built in file read node? I didn't find any that came with node read so used one from the library. The messages that come out of fs is already split - one message per file. Not sure if I should split it anyway, to add a split node after it?


Install as part of node-red standard nodes.

How did you install node-red?

If fs sends a message per line with no msg.parts then you would need to use another method, possible a trigger node to send msg.complete after a timeout
The maintainer of the fs node is @TotallyInformation,if it is not adding a msg.parts for each output message then you should raise a issue against the node in the nodes github page.

I thought read file node was for reading contents of a file? Can it read contents of a folder instead?

I installed node red as a modified version of the official custom node red docker using dockerfile.

Yeah the only way I could think of is some sort of timer, which is fragile and hacky. And it also causes a delay so won't get the last message clean on time.

To clarify, this isn't primarily about how to log the last message. This is about how things could be different. Automatically and easily aggregate, get last message etc. for free. No split, no join, no timer, doesn't matter if array or individual messages. Does batch processing sound so awful, you'd rather add a bunch of nodes and/or context store logic for every time you needed this? I think it would be pretty neat :stuck_out_tongue:

It is I did not realise you wanted the list of files in a directory, you could use the exec not for that, So no the read file node will not do what you wish.

You can use the watch node and the exec node with ls command to get a list of files, which you can split. also, there are many possible methods.
The complete node may work with the fs node also

1 Like

Complete node was interesting and may work! Thanks for the tip. Hopefully no timing issues, so it will come last after all file deleted messages from the main line of nodes.

Now onto aggregation :joy:

I tried the Complete node, but it came in before the other messages so doesn't work in a robust way.

No, it tells you when the node has finished processing the current message. It has no way of knowing if another message may arrive a second later.

1 Like

I think the problem here is how you chose to define 'last message'.

The way nodes work, you cannot 100% correlate an input message to the messages it sends.

For example, if a node receives two messages in quick succession, and in response to each it sends multiple messages, but they are interleaved in send order - the runtime has no way of automatically knowing which message was triggered by which. Equally, the runtime needs some way of knowing what the 'last' message means.

This relies on some level of metadata being attached to the messages by the node (as ultimately, its only the node that knows all the information needed).

We have an established way of defining 'batches' of messages - using the msg.parts metadata. Some nodes set that automatically, some don't. If the information is available, then the Join node will Just Work to act as a gate until the batch has completed processing.


My general point here is that to achieve the type of thing you describe would require a different operating model in the runtime, and for nodes to be implemented differently. That is not a trivial matter. We already have the tools to do the type of batch processing you describe, albeit not quite the form you are looking for.

1 Like

Try the exec node to list files and then split.
e.g.

[{"id":"b8cff8b867b151d1","type":"inject","z":"d1395164b4eec73e","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":200,"y":7360,"wires":[["2a4207ca95b9c572"]]},{"id":"2a4207ca95b9c572","type":"exec","z":"d1395164b4eec73e","command":"ls  .node-red -p | grep -v /","addpay":"","append":"","useSpawn":"false","timer":"","winHide":false,"oldrc":false,"name":"","x":410,"y":7360,"wires":[["d395b83b9b38cb12"],["1df7155b532838b3"],["1df7155b532838b3"]]},{"id":"1df7155b532838b3","type":"debug","z":"d1395164b4eec73e","name":"debug 2571","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":730,"y":7380,"wires":[]},{"id":"d395b83b9b38cb12","type":"split","z":"d1395164b4eec73e","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":710,"y":7320,"wires":[["a2c9c9973203adda","40b1eadec9a3c81e"]]},{"id":"a2c9c9973203adda","type":"join","z":"d1395164b4eec73e","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":"false","timeout":"","count":"","reduceRight":false,"x":830,"y":7320,"wires":[["dcc0ceb5a1e44fcf"]]},{"id":"40b1eadec9a3c81e","type":"debug","z":"d1395164b4eec73e","name":"debug 2574","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":810,"y":7260,"wires":[]},{"id":"dcc0ceb5a1e44fcf","type":"debug","z":"d1395164b4eec73e","name":"debug 2573","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":910,"y":7360,"wires":[]}]

ls <path_to_directory> -p | grep -v /

-p adds / to directories
grep -v / returns only lines without /

-E .jpg will return all files containg .jpg

My general point here is that to achieve the type of thing you describe would require a different operating model in the runtime, and for nodes to be implemented differently. That is not a trivial matter. We already have the tools to do the type of batch processing you describe, albeit not quite the form you are looking for.

Yeah the new flow I'm suggesting is exactly as you say, a different operating model in the runtime. Certainly not trivial matter. But from my past experience, still extremely useful when you need batch processing instead of continuous stream. You lose some things and gain other things. For some tasks, I miss the simplicity and possibilities of batch processing.

The batch processing tools available are cumbersome and fragile, requires extra code before and after to add parts, split, join etc. And if you accidentally make the wrong choice, like using the fs node which doesn't add parts, then it's damn near impossible.

I get that it is what it is. This is just me thinking loud of other possibilities. I enjoy node red, but also enjoyed slightly different systems before.