Split- process separately - recombine

Need a bit of help
In this example - I'm splitting a msg and doing stuff in a parallel and then I want to recombine.


[{"id":"f14cd0c.f486c3","type":"inject","z":"29e0a8a9.d58188","name":"","topic":"","payload":"3","payloadType":"num","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":190,"y":220,"wires":[["53d3335f.af04cc"]]},{"id":"7417b524.f9dabc","type":"change","z":"29e0a8a9.d58188","name":"double","rules":[{"t":"set","p":"payload","pt":"msg","to":"6","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":530,"y":120,"wires":[["2935e45e.77973c"]]},{"id":"53d3335f.af04cc","type":"function","z":"29e0a8a9.d58188","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"x":355,"y":220,"wires":[["7417b524.f9dabc","cb36ea0b.3fa758","c0ddc6d0.284578"]],"icon":"node-red/arrow-in.svg","l":false},{"id":"cb36ea0b.3fa758","type":"change","z":"29e0a8a9.d58188","name":"square","rules":[{"t":"set","p":"payload","pt":"msg","to":"9","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":530,"y":320,"wires":[["ef6bcef.6a4753"]]},{"id":"9cdb0e8.a21d3f","type":"debug","z":"29e0a8a9.d58188","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":1190,"y":220,"wires":[]},{"id":"70806030.761ab","type":"join","z":"29e0a8a9.d58188","name":"","mode":"custom","build":"merged","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1010,"y":220,"wires":[["9cdb0e8.a21d3f"]]},{"id":"2935e45e.77973c","type":"template","z":"29e0a8a9.d58188","name":"","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\"double\": \"{{payload}}\"}","output":"json","x":700,"y":120,"wires":[["afead1c6.2ac5c"]]},{"id":"ef6bcef.6a4753","type":"template","z":"29e0a8a9.d58188","name":"","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\"square\": \"{{payload}}\"}","output":"json","x":700,"y":320,"wires":[["70806030.761ab"]]},{"id":"afead1c6.2ac5c","type":"delay","z":"29e0a8a9.d58188","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":860,"y":120,"wires":[["70806030.761ab"]]},{"id":"c0ddc6d0.284578","type":"template","z":"29e0a8a9.d58188","name":"","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\"payload\": \"{{payload}}\"}","output":"json","x":700,"y":220,"wires":[["70806030.761ab"]]},{"id":"33b61f01.1928a","type":"comment","z":"29e0a8a9.d58188","name":"Delay just to prove it can handle timing diff","info":"","x":940,"y":60,"wires":[]}]

I actually want the output msg. to be

payload : "3",
double : "6",
square : "9" 

rather than

payload : payload : "3"
                double : "6",
                square : "9"

but my brain seems to have stopped working after its initial efforts :).

Also, is there a better way of doing this in the first place?

It would have to be a hell of a lot more complex a set of calculations to make it worth the complexity of trying to do parallel processing.

Just do it in a single change node.

The real world problem is a hell of a lot more complex :slight_smile:

(Its adding an image file to a tweet - the processing destroys msg.payload so hence need for sidechain)

Why not move msg.payload to msg.orig_payload and keep it in the msg object?

1 Like

If you really need to do parallel processing, either start with a split node or add the msg.parts object to the msg on each parallel leg of the flow, so you can use a join node (set to 'Automatic' mode) at the end of the flow.

But in general, if each leg is a synchronous task, they will have to be executed serially by the nodejs event loop anyway... I would think that a single change node with three or more steps (as Julian mentioned) would be more efficient.

If you can include some actual input data, and a sample of what you need at the end, it would make a nice little "contest" for the community to come up with different solutions. ;*)

In my head, I was was looking for a generic solution in case I needed to do something similar again

(I've used flow variables and delay timers in the past to make sure all processing done by certain point in flow and then re-combined things that way)

Brain has re-engaged :slight_smile:
Just need a change node to move things around

Relying on msgs arriving at the right times is almost never the best solution. And with the new asynch flow processing in v1.0 it may not even work the way you expect.

That's why I wanted to try a different approach :slight_smile:

I used flow context and timing when I was a newbie :slight_smile:

You might try this though I don't know whether it will actually give you better performance:

This discussion highlights a bit of a missing piece of core functionality: how to "fan out" 1 msg into N identical msg objects with the msg.parts set correctly, so a join in "automatic" mode will do the right thing...

In the past I've used a function node, cloned the incoming msg objects, added the msg.parts.count number to match the output ports, looped through the array of new msgs setting the loop index in msg.parts.index, and returned the new array. But that's quite a bit of boilerplate to be repeated, and relies on me keeping the output port count synched with the loop counter total in code. Is there no way for a function node to get it's own configured output port count?

So I guess what I'd like to see is something like a new mode in the split node -- to duplicate the input msg N times (to match the number of N output wires) while adding the msg.parts to each "leg" of the output. Of course, that is not practical with only a single output port, as the output wires are not "ordered" by index... and adding variable output ports to that core node is probably also a non-starter.

So... what am I asking for? A new core node to "fan out" a single msg to N msgs? Or perhaps a new function node method like cloneMsgWithParts(count)? A new checkbox on the change node to add msg.parts to each output port? Perhaps there are other ways that I've just overlooked...

1 Like

Its nice looking way of effectively having callable functions but have to do the msg.payload_orig method so each function get msg.payload as its input

I agree that this is a missing piece of core functionality. I had a similar idea of a branch node a while ago.

I often need to split a single execution of a flow into several subtasks with possibly long-running operations (database queries or RPCs), and combine the results when all have finished (or raised an error, timed out, ...).

The number of subtasks/branches is known, so the idea was to create a node with a configurable number of outputs (like the switch node). Each output (branch) was to have a name (used as property in the combined result), and a configurable timeout for the corresponding branch.

This seems similar to promises so I wonder if it would be possible to build a node that let you use a promise/async style interface? That would actually be really cool.

@shrickus While your here oh JSONata Master :slight_smile:

Is there an expression that will reduce

  foo :"xxx",
  bar :"yyy",
etc etc

down to

etc etc

independant on what keys are within the payload?

I don't think so since the change node requires msg.______ as the output. So you cannot set the msg itself.

On the other hand, if you had:

var msg1 = { payload: { payload: { foo: "xxx", bar: "yyy" } } }
return msg1

Then you certainly could flatten that to msg.payload using the JSONata expression payload.payload

Otherwise you would have to use a function node.

1 Like

Yes a simple function node is the easiest

msg = msg.payload;
return msg;

BUT - we will automatically add a msg._msgid, and you will lose any other top level properties (like incoming topic for exaple_

1 Like

My use case wasn't as complicated as I thought - making a copy of payload and re-using it later on in the flow works fine :slight_smile:

And the parallel processing method needed the extra complexity of converting to base64 and back again as image buffer won't go thru a template untouched

But it was an interesting exercise :slight_smile:

Yes, a similar approach. The intent was to make forking and rejoining of parallel branches much easier, something like Promise.all() in your example. I use that pattern in my code a lot, too.

I already started coding a prototype of my "branch" node a while ago, but haven't had the time to continue. The tricky part is the error and timeout handling. And I am not the kind of person who wants to publish something half-baked. :smiley_cat:


Modified my method to use function node to move payload rather than template so as to be data type agnostic (e.g will deal with image buffers directly)

Also, you can simply use node name to specify what key name you want in combined output