Issue joining parallel flows into one

I want to divide one flow into two parallel flows, and then when both flows are finished i want to join them back into one, while also merging one object(msg.data in my case) in wich I store relevant info to be used after the flows are joined into one.

This is what Ive tried so far (Im trying achieve this with a join node, but I welcome any other solutions):

[
    {
        "id": "a55f9253.62fd6",
        "type": "inject",
        "z": "759de691.424df8",
        "name": "msg.data = { }",
        "props": [
            {
                "p": "data",
                "v": "{}",
                "vt": "json"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "x": 2350,
        "y": 140,
        "wires": [
            [
                "43f34c27.69f9b4",
                "59fe93e0.950a4c"
            ]
        ]
    },
    {
        "id": "43f34c27.69f9b4",
        "type": "function",
        "z": "759de691.424df8",
        "name": "function1",
        "func": "msg.data.a = \"a\";\nmsg.data.b = \"b\";\nmsg.data.c = \"c\";\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 2540,
        "y": 120,
        "wires": [
            [
                "8df7d6bf.2dc188"
            ]
        ]
    },
    {
        "id": "59fe93e0.950a4c",
        "type": "function",
        "z": "759de691.424df8",
        "name": "function2",
        "func": "msg.data.d = \"d\";\nmsg.data.e = \"e\";\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 2540,
        "y": 160,
        "wires": [
            [
                "8df7d6bf.2dc188"
            ]
        ]
    },
    {
        "id": "8df7d6bf.2dc188",
        "type": "join",
        "z": "759de691.424df8",
        "name": "",
        "mode": "custom",
        "build": "merged",
        "property": "data",
        "propertyType": "msg",
        "key": "topic",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": true,
        "timeout": "",
        "count": "2",
        "reduceRight": false,
        "reduceExp": "",
        "reduceInit": "",
        "reduceInitType": "num",
        "reduceFixup": "",
        "x": 2710,
        "y": 140,
        "wires": [
            [
                "71e14cc6.03b514"
            ]
        ]
    },
    {
        "id": "71e14cc6.03b514",
        "type": "debug",
        "z": "759de691.424df8",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 2850,
        "y": 140,
        "wires": []
    }
]

First I create an inject node and I set msg.data to an empty json inside of it.
image

Then I divide the flow like this:

In function1 I have:

  msg.data.a = "a";
  msg.data.b = "b";
  msg.data.c = "c";
  return msg;

In function2 I have:

  msg.data.d = "d";
  msg.data.e = "e";
  return msg;

After this Im atempting to join both flows into one, and to merge the msg.data to look like this:

msg.data = {
  "a": "a",
  "b": "b",
  "c": "c",
  "d": "d",
  "e": "e"
};

My join node looks like this:
image
I set the node to set the msg 'After a number of message parts' = 2, since I have 2 separate parallel flows.

And this is the result I get in the debugs:
image

What I would want is to have only one response in the debugs instead of two(meaning the parallel flows have reunited back into one), with msg.data having a,b,c,d and e, like I have in the second debug.

I suspect this 2 in the join node indicates the number of properties inside my msg.data object(5 in this example) and not the times that msg.data is returned(wich would be 2, one per parallel flow). Important note: I dont know how many fields will msg.data have in my real scenario.

Any suggestions on how to accomplish this with or without a join node would be appreciated.
Thanks.

You can join as an key value object and then merge the two properties, Take a look at second join example. Or you can join as an array and merge the containing objects, as in first join example.

[{"id":"9916d5e5.46f21","type":"debug","z":"30af2d3e.d94ea2","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":794,"y":2780,"wires":[]},{"id":"9115d2c2.010da8","type":"change","z":"30af2d3e.d94ea2","name":"","rules":[{"t":"set","p":"data","pt":"msg","to":"$merge([$$.data.*])","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":620,"y":2980,"wires":[["9916d5e5.46f21"]]},{"id":"6bf0ce60.43c928","type":"change","z":"30af2d3e.d94ea2","name":"","rules":[{"t":"set","p":"data","pt":"msg","to":"$merge([$$.data])","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":2900,"wires":[["9916d5e5.46f21"]]},{"id":"5c64b5a.2b4facc","type":"join","z":"30af2d3e.d94ea2","name":"","mode":"custom","build":"object","property":"data","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":460,"y":3040,"wires":[["9115d2c2.010da8"]]},{"id":"f3db2f39.cd7ff8","type":"join","z":"30af2d3e.d94ea2","name":"","mode":"custom","build":"array","property":"data","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":500,"y":2920,"wires":[["6bf0ce60.43c928"]]},{"id":"7feb5ff8.ea8ab","type":"function","z":"30af2d3e.d94ea2","name":"","func":" msg.data ={};\n msg.data.a = \"a\";\n msg.data.b = \"b\";\n msg.data.c = \"c\";\n msg.topic = \"one\";\n return msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":320,"y":2880,"wires":[["f3db2f39.cd7ff8","5c64b5a.2b4facc"]]},{"id":"2cfb3c32.130dac","type":"function","z":"30af2d3e.d94ea2","name":"","func":"msg.data = {};\nmsg.data.d = \"d\";\nmsg.data.e = \"e\";\nmsg.topic =\"two\";\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":310,"y":2940,"wires":[["f3db2f39.cd7ff8","5c64b5a.2b4facc"]]},{"id":"d5b9daf8.b9f4b8","type":"inject","z":"30af2d3e.d94ea2","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":140,"y":2920,"wires":[["7feb5ff8.ea8ab","2cfb3c32.130dac"]]}]

Your example still produces two debugs, meaning two different flows.
However you gave me an idea and by doing something similar to what you did, this seems to meet my needs.

[
    {
        "id": "a55f9253.62fd6",
        "type": "inject",
        "z": "759de691.424df8",
        "name": "msg.data = { }",
        "props": [
            {
                "p": "data",
                "v": "{}",
                "vt": "json"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "x": 2350,
        "y": 140,
        "wires": [
            [
                "43f34c27.69f9b4",
                "59fe93e0.950a4c"
            ]
        ]
    },
    {
        "id": "43f34c27.69f9b4",
        "type": "function",
        "z": "759de691.424df8",
        "name": "function1",
        "func": "msg.topic = \"one\";\nmsg.data.a = \"a\";\nmsg.data.b = \"b\";\nmsg.data.c = \"c\";\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 2540,
        "y": 120,
        "wires": [
            [
                "8df7d6bf.2dc188"
            ]
        ]
    },
    {
        "id": "59fe93e0.950a4c",
        "type": "function",
        "z": "759de691.424df8",
        "name": "function2",
        "func": "msg.topic = \"two\";\nmsg.data.d = \"d\";\nmsg.data.e = \"e\";\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 2540,
        "y": 160,
        "wires": [
            [
                "8df7d6bf.2dc188"
            ]
        ]
    },
    {
        "id": "8df7d6bf.2dc188",
        "type": "join",
        "z": "759de691.424df8",
        "name": "",
        "mode": "custom",
        "build": "object",
        "property": "data",
        "propertyType": "msg",
        "key": "topic",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": true,
        "timeout": "",
        "count": "2",
        "reduceRight": false,
        "reduceExp": "",
        "reduceInit": "",
        "reduceInitType": "num",
        "reduceFixup": "",
        "x": 2710,
        "y": 140,
        "wires": [
            [
                "71e14cc6.03b514",
                "a9984e24.567b7"
            ]
        ]
    },
    {
        "id": "71e14cc6.03b514",
        "type": "debug",
        "z": "759de691.424df8",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 2850,
        "y": 140,
        "wires": []
    },
    {
        "id": "a9984e24.567b7",
        "type": "change",
        "z": "759de691.424df8",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "data",
                "pt": "msg",
                "to": "$merge([$$.data.*])",
                "tot": "jsonata"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 2870,
        "y": 180,
        "wires": [
            [
                "99e93f56.5ff3a"
            ]
        ]
    },
    {
        "id": "99e93f56.5ff3a",
        "type": "debug",
        "z": "759de691.424df8",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 3030,
        "y": 180,
        "wires": []
    }
]

However I would like to use the 'merging' option inside the join instead of having to do the merging manually after, and to undestand how it works deeply since the docs dont seem to fully grasp it.

Nevertheless you guided me into the right direction to solve my particular use case, so thank you, I will close this.

My example gives two debugs as it is two example launched from same inject.

You have just chosen the 2 join example.

If you give the two function valid msg.parts you could merge internally with reduced sequence.
Or, you can time out the merge and have no count.
Or, you can send a msg.complete with no count.

Yeah you are right, sorry I didnt undestand your example at first.
The timeout option is a no-go for me, since we need the API to respond as soon as possible.
As for the msg.complete, I dont know how I would do that, since I dont know wich one of the parallel branches will finish first.

So your only option if you want it internal is to send msg.parts with both functions then you can join using reduced sequence.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.