Split should send complete message once all has been split

Hi there,

The following flow is a filter and collect flow - I have 10 things, I want 5 buckets of 2 - how do I do this?

Using the split node, I can divide by 10 things into ten times 1 thing, then using switch, I can identify my buckets and using join, I can collect the things into my buckets. Each join is then one bucket.

But each join is awaiting all parts from the split node, so they never send a message with their collected things. No I can't count and tell the join to send a message after 2 things because I don't know how large the bucket will become.

So my solution has been to use another join that does collect all things and once it sends a message (i.e. it has received all things), a delay of 1 second is used and a final complete message is sent to all other joins. And with that I have my buckets sending out their collected things.

My request was to be: could the split node send a complete message once all items have been split but that would raise the question of message ordering and delays. I.e. an extra option on the split node: "send complete after X seconds" would be what I'm doing here and that's a hack already since the delay is assuming that all messages have been sent to the join nodes - that might not be the case.

Can a join node assume that it has received all messages if a parts labelled { index: 8, length: 9 } is received? No it can't, there being no guaranteed ordering of messages.

So the "true" solution would be to filter the things using a large array inside a function node but that's not "visual" flow code for me :frowning:

(This example flow is take layers from an image and splitting those layers into 4 json files to be used elsewhere - so one file containing multiple layers - that also overlap - becomes 4 files.)

[{"id":"fdcf5d2989a1bc3f","type":"split","z":"eb2518b78d92c110","name":"split on layers","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload.payload.layers","x":104,"y":368,"wires":[["61b8db53fc7da012","287989e94c61e33f"]]},{"id":"61b8db53fc7da012","type":"switch","z":"eb2518b78d92c110","name":"filter layers","property":"payload.payload.layers.name","propertyType":"msg","rules":[{"t":"regex","v":"^filter1","vt":"str","case":true},{"t":"regex","v":"^filter2","vt":"str","case":true},{"t":"regex","v":"^filter3","vt":"str","case":true},{"t":"regex","v":"^layer1","vt":"str","case":true},{"t":"regex","v":"^layer2","vt":"str","case":true},{"t":"regex","v":"^layer3","vt":"str","case":true},{"t":"regex","v":"^layer4","vt":"str","case":true}],"checkall":"false","repair":false,"outputs":7,"x":405,"y":368,"wires":[["02507f416cb8c956"],["6ed9721bec8407d0"],["bb36069257f3d9c2"],["04cd59b10eddb46a"],["c646e02acedb6cd8"],["c646e02acedb6cd8"],["c646e02acedb6cd8"]]},{"id":"02507f416cb8c956","type":"join","z":"eb2518b78d92c110","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1171,"y":129.25,"wires":[[]]},{"id":"6ed9721bec8407d0","type":"join","z":"eb2518b78d92c110","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1171,"y":253.25,"wires":[[]]},{"id":"bb36069257f3d9c2","type":"join","z":"eb2518b78d92c110","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1171,"y":372,"wires":[[]]},{"id":"04cd59b10eddb46a","type":"join","z":"eb2518b78d92c110","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1171,"y":541,"wires":[[]]},{"id":"c646e02acedb6cd8","type":"change","z":"eb2518b78d92c110","name":"mark as overlay layer","rules":[{"t":"set","p":"payload.component","pt":"msg","to":"overlay","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":720,"y":540,"wires":[["5ce548081b392fe0"]]},{"id":"5ce548081b392fe0","type":"junction","z":"eb2518b78d92c110","x":916.998779296875,"y":540.6844395399094,"wires":[["02507f416cb8c956","6ed9721bec8407d0","bb36069257f3d9c2","04cd59b10eddb46a"]]},{"id":"287800d7f6075fc5","type":"group","z":"eb2518b78d92c110","name":"signal to the colletors that everything has been seen","style":{"label":true},"nodes":["287989e94c61e33f","a046cdac74882720","7f7d2c3a7f8eedce"],"x":548,"y":702,"w":344,"h":129},{"id":"287989e94c61e33f","type":"join","z":"eb2518b78d92c110","g":"287800d7f6075fc5","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":true,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":624,"y":774,"wires":[["a046cdac74882720"]]},{"id":"a046cdac74882720","type":"change","z":"eb2518b78d92c110","g":"287800d7f6075fc5","name":"","rules":[{"t":"delete","p":"payload","pt":"msg"},{"t":"set","p":"complete","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":771,"y":743,"wires":[["7f7d2c3a7f8eedce"]]},{"id":"7f7d2c3a7f8eedce","type":"delay","z":"eb2518b78d92c110","g":"287800d7f6075fc5","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":806,"y":790,"wires":[["5ce548081b392fe0"]]}]

Hi.

Sorry, but can't you set it to manual and say how many parts are needed?
After a number of message parts

(Ok, oops. You already have it set to manual.)

But you are missing the 2 needed for the number of parts....

Maybe.

Could you include an inject node to give an example of the message?

I set to manual because I manually send the complete message and the join node should ignore the parts stuff attached to the message. Also I want an array and not object.

In fact this would probably also work in automatic mode, but it's not strictly working in automatic because the join isn't receiving all the messages generated by the corresponding split node.

But no, I don't have a fixed number of items I am expecting. Well I do since I know how many layers I have now but I don't know this number in the future so I don't want to depend on that number now since it will change (most likely) in the future. But I don't want to change the code in the future just the number! :wink:

Ah.

Ok, so before they payloads are received by the join node:
remove the msg.parts to not confused things. (As you mentioned the messages may be coming from other split nodes.)

Have a time out time set?

Maybe the batch node can help ?

Both solution are time-based .. which is suboptimal. Although the batch can also be used as counter of X messages - so also count based.

So either

  • there is a guaranteed order of messages (which can be generated by using a join that recombines all expected) or
  • a upper limit on expected time (timeout on join or batch or as in my solution using a final message) or
  • there is a count i.e. upper limit on messages received (join on X messages and send message or batch collect X messages)

I'm now just theorising about the problem of having independent events, which have to be synchronised. Which is the case here even if we're talking about messages.

Any other approaches?

Sorry. Above me.
Just thought I would mention what I did - no offence. Just I know how many times I've missed the elephant myself. So just in case you missed those things I mentioned.

THis is not time based

[{"id":"063fc03c6eee2c86","type":"inject","z":"613df62afc8a16bf","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,2,3,4,5,1,2,3,4,5]","payloadType":"json","x":130,"y":120,"wires":[["1dcc8582b64dca2f"]]},{"id":"1dcc8582b64dca2f","type":"change","z":"613df62afc8a16bf","name":"count element","rules":[{"t":"set","p":"count","pt":"msg","to":"$count($$.payload)-1","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":240,"y":180,"wires":[["fdcf5d2989a1bc3f"]]},{"id":"fdcf5d2989a1bc3f","type":"split","z":"613df62afc8a16bf","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload","x":330,"y":120,"wires":[["d0f086474c289f47","61b8db53fc7da012"]]},{"id":"d0f086474c289f47","type":"switch","z":"613df62afc8a16bf","name":"","property":"count","propertyType":"msg","rules":[{"t":"eq","v":"parts.index","vt":"msg"}],"checkall":"true","repair":false,"outputs":1,"x":470,"y":160,"wires":[["064dee6f8850fed2"]]},{"id":"61b8db53fc7da012","type":"switch","z":"613df62afc8a16bf","name":"filter layers","property":"payload","propertyType":"msg","rules":[{"t":"eq","v":"1","vt":"num"},{"t":"eq","v":"2","vt":"num"},{"t":"eq","v":"3","vt":"num"},{"t":"eq","v":"4","vt":"num"},{"t":"eq","v":"5","vt":"num"}],"checkall":"false","repair":true,"outputs":5,"x":730,"y":120,"wires":[["70b40e7d7d1fe200"],[],[],[],["af6bc54346c5499e"]]},{"id":"064dee6f8850fed2","type":"change","z":"613df62afc8a16bf","name":"send complete","rules":[{"t":"set","p":"complete","pt":"msg","to":"true","tot":"bool"},{"t":"delete","p":"payload","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":680,"y":200,"wires":[["70b40e7d7d1fe200","af6bc54346c5499e"]]},{"id":"af6bc54346c5499e","type":"join","z":"613df62afc8a16bf","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"10","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":890,"y":160,"wires":[["7cca17aa84f768d6"]]},{"id":"70b40e7d7d1fe200","type":"join","z":"613df62afc8a16bf","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"10","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":890,"y":80,"wires":[["b8bb240e447319bf"]]},{"id":"7cca17aa84f768d6","type":"debug","z":"613df62afc8a16bf","name":"debug 2590","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1070,"y":140,"wires":[]},{"id":"b8bb240e447319bf","type":"debug","z":"613df62afc8a16bf","name":"debug 2589","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1070,"y":100,"wires":[]}]

Or as you have already stated, prior to split filter the data using JS or JSONata into 4 separate objects and the split that.

No no, absolute great that we throw around ideas, I don't want to offend and wasn't offended :slight_smile:

Everyone has their own ideas and those ideas I want to hear!

It isn't and therefore can have a bug because the assumption is that when you send the complete message parts.count == parts.index (aside: I think this might need to be index - 1 because index starts at zero), that the downstream joins have received all their messages. But that depends on how fast the switch node is.

All of this is academic if the computation is minimal, issues arise when the switch (or anything else) before the join nodes becomes complex and starts taking time. Then the final complete message will result in an empty message from the joins (potentially).

Imagine this flow instead:

[{"id":"1c2de0e948fed2c8","type":"split","z":"e9f01bcd4521f157","name":"split on layers","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload.payload.layers","x":431,"y":445.75,"wires":[["2c31832eb7a582d2","04c371d074249879"]]},{"id":"2c31832eb7a582d2","type":"switch","z":"e9f01bcd4521f157","name":"filter layers","property":"payload.payload.layers.name","propertyType":"msg","rules":[{"t":"regex","v":"^filter1","vt":"str","case":true},{"t":"regex","v":"^filter2","vt":"str","case":true},{"t":"regex","v":"^filter3","vt":"str","case":true},{"t":"regex","v":"^layer1","vt":"str","case":true},{"t":"regex","v":"^layer2","vt":"str","case":true},{"t":"regex","v":"^layer3","vt":"str","case":true},{"t":"regex","v":"^layer4","vt":"str","case":true}],"checkall":"false","repair":false,"outputs":7,"x":732,"y":445.75,"wires":[["f4a496e56a563d23"],["cc0e4d314674a792"],["167a77a3ca794ed3"],["fa28aba0352f2569"],["4488c72ede5b4c53"],["4488c72ede5b4c53"],["4488c72ede5b4c53"]]},{"id":"ac9ffec84fc087d8","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1498,"y":207,"wires":[[]]},{"id":"103ce07fc85af62c","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1498,"y":331,"wires":[[]]},{"id":"507cc9db082aa589","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1498,"y":449.75,"wires":[[]]},{"id":"295f3d491aab2ead","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1498,"y":618.75,"wires":[[]]},{"id":"4488c72ede5b4c53","type":"change","z":"e9f01bcd4521f157","name":"mark as overlay layer","rules":[{"t":"set","p":"payload.component","pt":"msg","to":"overlay","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1047,"y":617.75,"wires":[["4954cdbbeff30d23"]]},{"id":"f4a496e56a563d23","type":"delay","z":"e9f01bcd4521f157","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":1136.5,"y":306,"wires":[["ac9ffec84fc087d8"]]},{"id":"cc0e4d314674a792","type":"delay","z":"e9f01bcd4521f157","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":1142,"y":363,"wires":[["103ce07fc85af62c"]]},{"id":"167a77a3ca794ed3","type":"delay","z":"e9f01bcd4521f157","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":1138,"y":437,"wires":[["507cc9db082aa589"]]},{"id":"fa28aba0352f2569","type":"delay","z":"e9f01bcd4521f157","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":1140,"y":500,"wires":[["295f3d491aab2ead"]]},{"id":"4954cdbbeff30d23","type":"junction","z":"e9f01bcd4521f157","x":1243.998779296875,"y":618.4344395399094,"wires":[["ac9ffec84fc087d8","103ce07fc85af62c","507cc9db082aa589","295f3d491aab2ead"]]},{"id":"deb5003ecea5cb67","type":"group","z":"e9f01bcd4521f157","name":"signal to the colletors that everything has been seen","style":{"label":true},"nodes":["04c371d074249879","13a8f5f731f55e5c","6e31e1d897dd2bbf"],"x":875,"y":779.75,"w":344,"h":129},{"id":"04c371d074249879","type":"join","z":"e9f01bcd4521f157","g":"deb5003ecea5cb67","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":true,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":951,"y":851.75,"wires":[["13a8f5f731f55e5c"]]},{"id":"13a8f5f731f55e5c","type":"change","z":"e9f01bcd4521f157","g":"deb5003ecea5cb67","name":"","rules":[{"t":"delete","p":"payload","pt":"msg"},{"t":"set","p":"complete","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":1098,"y":820.75,"wires":[["6e31e1d897dd2bbf"]]},{"id":"6e31e1d897dd2bbf","type":"delay","z":"e9f01bcd4521f157","g":"deb5003ecea5cb67","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":1133,"y":867.75,"wires":[["4954cdbbeff30d23"]]}]

where the delay is 5s between the filter and the joins and the delay at the bottom is 1s ...

Sorry, but something I'm seeing which to me is Problematic.

You say the number of parts can change, and you want the flow to work with varying numbers of parts.

But without editing the flow it can't work as is. As far as I can see.

So there is a deeper problem - yes?

My example sends complete on the last split item, if you require it to be after use a trigger node to send 100ms after the last spit item passes.

I may of miss read your intent here though.

when the count message is created it has been -1.

Please supply data for flows if you wish others to look at them, it is really helpful.

The flow works fine as long as the time taken for all messages to travel to their respect join nodes is less than the delay set at the bottom, i.e. the delay that delays the sending of the complete message to the join nodes.

[{"id":"1c2de0e948fed2c8","type":"split","z":"e9f01bcd4521f157","name":"split on layers","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload.payload.layers","x":265.8575439453125,"y":445.3401794433594,"wires":[["2c31832eb7a582d2"]]},{"id":"2c31832eb7a582d2","type":"switch","z":"e9f01bcd4521f157","name":"filter layers","property":"payload.payload.layers.name","propertyType":"msg","rules":[{"t":"regex","v":"^filter1","vt":"str","case":true},{"t":"regex","v":"^filter2","vt":"str","case":true},{"t":"regex","v":"^filter3","vt":"str","case":true},{"t":"regex","v":"^layer1","vt":"str","case":true},{"t":"regex","v":"^layer2","vt":"str","case":true},{"t":"regex","v":"^layer3","vt":"str","case":true},{"t":"regex","v":"^layer4","vt":"str","case":true}],"checkall":"false","repair":false,"outputs":7,"x":589.8575439453125,"y":445.3401794433594,"wires":[["f4a496e56a563d23"],["cc0e4d314674a792"],["167a77a3ca794ed3"],["fa28aba0352f2569"],["4488c72ede5b4c53"],["4488c72ede5b4c53"],["4488c72ede5b4c53"]]},{"id":"ac9ffec84fc087d8","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1896.8575439453125,"y":356.59014892578125,"wires":[[]]},{"id":"103ce07fc85af62c","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1896.8575439453125,"y":435.5901794433594,"wires":[[]]},{"id":"507cc9db082aa589","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1896.8575439453125,"y":521.3402099609375,"wires":[[]]},{"id":"295f3d491aab2ead","type":"join","z":"e9f01bcd4521f157","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1896.8575439453125,"y":606.3401489257812,"wires":[[]]},{"id":"4488c72ede5b4c53","type":"change","z":"e9f01bcd4521f157","name":"mark as overlay layer","rules":[{"t":"set","p":"payload.component","pt":"msg","to":"overlay","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":833.8575439453125,"y":718.3401489257812,"wires":[["4954cdbbeff30d23","04c371d074249879"]]},{"id":"f4a496e56a563d23","type":"delay","z":"e9f01bcd4521f157","name":"delay 5s","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":837.3575439453125,"y":299.5902099609375,"wires":[["ac9ffec84fc087d8","04c371d074249879"]]},{"id":"cc0e4d314674a792","type":"delay","z":"e9f01bcd4521f157","name":"delay 5s","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":842.8575439453125,"y":356.5902099609375,"wires":[["103ce07fc85af62c","04c371d074249879"]]},{"id":"167a77a3ca794ed3","type":"delay","z":"e9f01bcd4521f157","name":"delay 5s","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":838.8575439453125,"y":430.5902099609375,"wires":[["507cc9db082aa589","04c371d074249879"]]},{"id":"fa28aba0352f2569","type":"delay","z":"e9f01bcd4521f157","name":"delay 5s","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":840.8575439453125,"y":493.5902099609375,"wires":[["295f3d491aab2ead","04c371d074249879"]]},{"id":"4954cdbbeff30d23","type":"junction","z":"e9f01bcd4521f157","x":1696.8041865825653,"y":722.6656010150909,"wires":[["ac9ffec84fc087d8","103ce07fc85af62c","507cc9db082aa589","295f3d491aab2ead"]]},{"id":"deb5003ecea5cb67","type":"group","z":"e9f01bcd4521f157","name":"signal to the colletors that everything has been seen","style":{"label":true},"nodes":["04c371d074249879","13a8f5f731f55e5c","6e31e1d897dd2bbf"],"x":1049.8575439453125,"y":799.3401489257812,"w":613,"h":158},{"id":"04c371d074249879","type":"join","z":"e9f01bcd4521f157","g":"deb5003ecea5cb67","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":true,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1125.8575439453125,"y":840.3401489257812,"wires":[["13a8f5f731f55e5c"]]},{"id":"13a8f5f731f55e5c","type":"change","z":"e9f01bcd4521f157","g":"deb5003ecea5cb67","name":"send final complete message","rules":[{"t":"delete","p":"payload","pt":"msg"},{"t":"set","p":"complete","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":1354.8575439453125,"y":870.3401489257812,"wires":[["6e31e1d897dd2bbf"]]},{"id":"6e31e1d897dd2bbf","type":"delay","z":"e9f01bcd4521f157","g":"deb5003ecea5cb67","name":"delay 1s","pauseType":"delay","timeout":"1","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":1576.8575439453125,"y":916.3401489257812,"wires":[["4954cdbbeff30d23"]]}]

This flow takes care of the issue if all messages are passed through only once. I.e., no message is dropped and no message is duplicated by the filter (switch node) matching multiple times on a single message. In that flow I can be semi-certain that the final complete message will arrive after all the other messages because I'm also taking into account the delay caused by computation before the message arrives at the destination bucket (join node)

Ah sorry, I assumed you were using the split node to generate that count - mea culpa.

I described the problem above, the data isn't important. You're assumption using an array is quite right but the time factor issue is the same whether you use generated data, real data or your own data - splitting an array an assuring that all data is received is the underlying question.

To me data is important if i want to run the flow. People helping should not have to create data you all ready have. It prevents false assumptions.
An example flow should be as simple as possible, containing data, and possible delays nodes to simulate any latency the flow might have to handle. This way there is all info required to give a informed response.

possible delays nodes to simulate any latency the flow might have to handle.

Yer that's the last flow I posted - that simulates the situation I was referring to. But in the end, there is no solution to the problem because that's the nature of flows - each branch is independent of any other branch, each message is independent of all other messages. Each branch will eventually complete but there isn't a guarantee that they all will complete in X units of time.

It all looks so simple but in the end, there is complexity that is hidden from the viewer.

That wasn't my intention, the required information was related to the nature of the problem not to the data used because, in my view, the data isn't important for the problem. It's the synchronisation of flow branches that is the problem.

I don't believe that you need the 1 second delay before sending your final complete message, and that it is deterministic.

Consider the last message that gets to the master Join node. It will get placed in that Join's queue at the same time as it will get placed in whichever other join it is destined for, and all previous messages will already have been handled by their join nodes, or will be sitting in one of the input queues.

Therefore, when the master Join gets round to servicing the last message you can be certain that all messages have either been handled by the other joins, or are sitting in their input queues. If the master join then sends the complete message to the other joins, that message will get added to the back of their input queues, it will not overtake messages already in the queue, so each join node will complete the joining of all the messages sent to it before it sees the Complete message

I would prepare the data for the split in JS or JSONata as it is just cleaner. And if labeled correctly the flow is still readable fairly easily.

But it is possible with delays,
e.g.

[{"id":"063fc03c6eee2c86","type":"inject","z":"613df62afc8a16bf","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,2,3,4,5,1,2,3,4,5]","payloadType":"json","x":130,"y":60,"wires":[["1dcc8582b64dca2f"]]},{"id":"1dcc8582b64dca2f","type":"change","z":"613df62afc8a16bf","name":"count element","rules":[{"t":"set","p":"count","pt":"msg","to":"$count($$.payload)","tot":"jsonata"},{"t":"set","p":"count","pt":"flow","to":"0","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":140,"y":100,"wires":[["fdcf5d2989a1bc3f"]]},{"id":"fdcf5d2989a1bc3f","type":"split","z":"613df62afc8a16bf","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload","x":290,"y":100,"wires":[["61b8db53fc7da012"]]},{"id":"61b8db53fc7da012","type":"switch","z":"613df62afc8a16bf","name":"filter layers","property":"payload","propertyType":"msg","rules":[{"t":"eq","v":"1","vt":"num"},{"t":"eq","v":"2","vt":"num"},{"t":"eq","v":"3","vt":"num"},{"t":"eq","v":"4","vt":"num"},{"t":"eq","v":"5","vt":"num"}],"checkall":"false","repair":true,"outputs":5,"x":430,"y":100,"wires":[["51a6b0ac07fb29cc"],["42c6996da9048dec"],["42c6996da9048dec"],["42c6996da9048dec"],["177b85b1f43b42e8"]]},{"id":"51a6b0ac07fb29cc","type":"delay","z":"613df62afc8a16bf","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":580,"y":20,"wires":[["3864a34a4d437ace"]]},{"id":"177b85b1f43b42e8","type":"change","z":"613df62afc8a16bf","name":"","rules":[{"t":"set","p":"flowcount","pt":"msg","to":"count","tot":"flow","dc":true},{"t":"set","p":"count","pt":"flow","to":"$$.flowcount + 1","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":660,"y":140,"wires":[["af6bc54346c5499e","d0f086474c289f47"]]},{"id":"42c6996da9048dec","type":"change","z":"613df62afc8a16bf","name":"","rules":[{"t":"set","p":"flowcount","pt":"msg","to":"count","tot":"flow","dc":true},{"t":"set","p":"count","pt":"flow","to":"$$.flowcount + 1","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":660,"y":100,"wires":[[]]},{"id":"3864a34a4d437ace","type":"change","z":"613df62afc8a16bf","name":"","rules":[{"t":"set","p":"flowcount","pt":"msg","to":"count","tot":"flow","dc":true},{"t":"set","p":"count","pt":"flow","to":"$$.flowcount + 1","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":660,"y":60,"wires":[["70b40e7d7d1fe200","d0f086474c289f47"]]},{"id":"af6bc54346c5499e","type":"join","z":"613df62afc8a16bf","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":890,"y":140,"wires":[["7cca17aa84f768d6"]]},{"id":"d0f086474c289f47","type":"switch","z":"613df62afc8a16bf","name":"","property":"count","propertyType":"msg","rules":[{"t":"eq","v":"count","vt":"flow"}],"checkall":"true","repair":false,"outputs":1,"x":610,"y":220,"wires":[["cb4a10f17528e07f"]]},{"id":"70b40e7d7d1fe200","type":"join","z":"613df62afc8a16bf","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":890,"y":60,"wires":[["b8bb240e447319bf"]]},{"id":"cb4a10f17528e07f","type":"function","z":"613df62afc8a16bf","name":"function 165","func":"return {complete: true}","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":750,"y":220,"wires":[["70b40e7d7d1fe200","af6bc54346c5499e"]]},{"id":"7cca17aa84f768d6","type":"debug","z":"613df62afc8a16bf","name":"debug 2590","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1030,"y":120,"wires":[]},{"id":"b8bb240e447319bf","type":"debug","z":"613df62afc8a16bf","name":"debug 2589","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1030,"y":80,"wires":[]}]

THis flow would have to be rate limited for concurrency or the flow would need id's for each input, then deleted by id. Which would just make the flow even more complexed.

Apologies for going back to basics - what exactly are the things you are spitting ? into what ? and what are you creating ? Is it always 5 buckets, always just 2 items to join ? - Joined into what ? an array ? an object ?

The parts as well as having an index (and maybe a count) also have an id - If joining back into an array - it will use the id to regroup messages and the index to place them in the original order - so depending on what you trying to create you may be able to realign them as you want.

[{"id":"2612bee523a0b19a","type":"inject","z":"8f7d7811a9253b8f","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[\"A\",\"B\",\"C\",\"D\",\"E\",\"F\",\"G\",\"I\",\"J\",\"K\"]","payloadType":"json","x":112.5,"y":87.5,"wires":[["a034812667b8e7e9"]]},{"id":"a034812667b8e7e9","type":"split","z":"8f7d7811a9253b8f","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload","x":250,"y":87.5,"wires":[["445e0585c82a8416"]]},{"id":"3de47d8842f5e68a","type":"debug","z":"8f7d7811a9253b8f","name":"debug 1","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":547.5,"y":162.5,"wires":[]},{"id":"e2908753ac2a5802","type":"join","z":"8f7d7811a9253b8f","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":"false","timeout":"","count":"","reduceRight":false,"x":412.5,"y":162.5,"wires":[["3de47d8842f5e68a"]]},{"id":"45dcbb4cc7e568e7","type":"function","z":"8f7d7811a9253b8f","name":"separate even/odd","func":"if ( msg.parts.index % 2 === 0 ) {\n    return [msg,null]\n}\nelse {\n    return [null,msg]\n}","outputs":2,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":170,"y":175,"wires":[["e2908753ac2a5802"],["d921949c1417d8e9"]],"outputLabels":["even","odd"]},{"id":"d921949c1417d8e9","type":"delay","z":"8f7d7811a9253b8f","name":"","pauseType":"delay","timeout":"3","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":310,"y":225,"wires":[["e2908753ac2a5802"]]},{"id":"445e0585c82a8416","type":"batch","z":"8f7d7811a9253b8f","name":"","mode":"count","count":"2","overlap":0,"interval":"5","allowEmptySequence":false,"honourParts":false,"topics":[],"x":402.5,"y":87.5,"wires":[["45dcbb4cc7e568e7"]]}]

Who's on First - it's a structural problem of when do messages arrive and who can synchronisation be assured.

split-filter-collect: a split node splits, a switch node filters and a join node collects a subset of the original messages. Question: how does the join node know it has received all the relevant messages for it?

You can have X tens/hundreds/thousands of joins, one filter and one split.

How can you guarantee that the messages have all arrived at the join after going through the filter (and perhaps some other computational nodes).

Practical demonstration:

[0,1,2,3,4,5,6,7,8,9] --> split --> filter number > 4 --> 5,6,7,8,9 --> some random computation that takes a random amount of time --> join

The split sends 10 messages, the filter sends 5 messages and the join - eventually - receives five messages (or perhaps less because the "random computation" drops messages).

when does join send it's message? How does it know it won't receive anything after 9?

No function node involved, a purist approach to solve a non-trivial problem in concurrent computing.

There is no solution to that. If the computation time is random then there is no way of knowing whether a message has been dropped or is still being processed. The join node (ie logic around it) needs to be made aware that particular messages have been dropped, or at least a count of them.

One solution would be to mark messages to be dropped rather than dropping them, then drop them after the join.