[node: "Join"] Add support for parts grouping in manual mode

Hello,
my use case is that I'm using database node which streams query result - msg per row. DB node adds to each msg msg.parts.id and when stream finishes, sends msg.complete set to true (with msg.payload set to undefined value).

For now:

  • "auto" mode will group msgs by msg.parts.id and release them as bulk array when received msg.complete set to true
  • "manual" mode doesn't support grouping messages, so when node works on two or more parallel queries, msgs from different queries will go through one "queue" and will be processes as one group, additionally bulk data will be sliced according to when specific query will finish reading stream and sends complete msg.

For the feature:

  • "manual" mode should:
    • pass msgs to one, default group if msg.parts.id is not defined
    • should group msgs if msg.parts.id is defined
      E.g. if "Join" node will be configured to send the msg after 50 parts and db node will process two queries - every returns 72 records, in result we will get 2 msgs with 50 items in and 2 msgs with 22 items.

Below two gif examples which presents current and requested "Join" node version. Function fakes db stream, it iterate 144 times, separates msgs into two groups (even and odd as parallel queries) and sends them every 50 parts or when complete property set (when the stream finishes).

Example with current node:


Example with modified node:


Code:

[{"id":"c5a3f19.a6cd01","type":"inject","z":"60787892.d68368","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":760,"y":260,"wires":[["c0b7d7cd.3e3998"]]},{"id":"c0b7d7cd.3e3998","type":"function","z":"60787892.d68368","name":"DB Read Stream","func":"let counter = 144;\nmsg.parts = {};\nlet id_1 = RED.util.generateId();\nlet id_2 = RED.util.generateId();\n\nfor (let i = 0; i < counter; i++) {\n    msg.payload = i;\n    if (i%2 === 0) {\n        msg.parts.id = id_1;\n        if (i === 142) {\n            msg.complete = true;\n            node.send(msg);\n        } else {\n            node.send(msg);\n        }\n    } else {\n        msg.parts.id = id_2;\n        if (i === 143) {\n            msg.complete = true;\n            node.send(msg);           \n        } else {\n            node.send(msg);\n        }\n    }\n\n} ","outputs":1,"noerr":0,"initialize":"","finalize":"","x":950,"y":260,"wires":[["7eff2e65.c621d"]]},{"id":"7eff2e65.c621d","type":"join","z":"60787892.d68368","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"50","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1130,"y":260,"wires":[["198cd316.aa3bcd"]]},{"id":"198cd316.aa3bcd","type":"debug","z":"60787892.d68368","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1310,"y":260,"wires":[]}]

If it's ok I can send PR, it doesn't break previous behaviour (if msg.parts.id wasn't included).

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.