Hello,
my use case is that I'm using database node which streams query result - msg per row. DB node adds to each msg msg.parts.id
and when stream finishes, sends msg.complete
set to true (with msg.payload
set to undefined
value).
For now:
- "auto" mode will group msgs by
msg.parts.id
and release them as bulk array when receivedmsg.complete
set totrue
- "manual" mode doesn't support grouping messages, so when node works on two or more parallel queries, msgs from different queries will go through one "queue" and will be processes as one group, additionally bulk data will be sliced according to when specific query will finish reading stream and sends
complete
msg.
For the feature:
- "manual" mode should:
- pass msgs to one, default group if
msg.parts.id
is not defined - should group msgs if
msg.parts.id
is defined
E.g. if "Join" node will be configured to send the msg after 50 parts and db node will process two queries - every returns 72 records, in result we will get 2 msgs with 50 items in and 2 msgs with 22 items.
- pass msgs to one, default group if
Below two gif examples which presents current and requested "Join" node version. Function fakes db stream, it iterate 144 times, separates msgs into two groups (even and odd as parallel queries) and sends them every 50 parts or when complete property set (when the stream finishes).
Example with current node:
Example with modified node:
Code:
[{"id":"c5a3f19.a6cd01","type":"inject","z":"60787892.d68368","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":760,"y":260,"wires":[["c0b7d7cd.3e3998"]]},{"id":"c0b7d7cd.3e3998","type":"function","z":"60787892.d68368","name":"DB Read Stream","func":"let counter = 144;\nmsg.parts = {};\nlet id_1 = RED.util.generateId();\nlet id_2 = RED.util.generateId();\n\nfor (let i = 0; i < counter; i++) {\n msg.payload = i;\n if (i%2 === 0) {\n msg.parts.id = id_1;\n if (i === 142) {\n msg.complete = true;\n node.send(msg);\n } else {\n node.send(msg);\n }\n } else {\n msg.parts.id = id_2;\n if (i === 143) {\n msg.complete = true;\n node.send(msg); \n } else {\n node.send(msg);\n }\n }\n\n} ","outputs":1,"noerr":0,"initialize":"","finalize":"","x":950,"y":260,"wires":[["7eff2e65.c621d"]]},{"id":"7eff2e65.c621d","type":"join","z":"60787892.d68368","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"50","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1130,"y":260,"wires":[["198cd316.aa3bcd"]]},{"id":"198cd316.aa3bcd","type":"debug","z":"60787892.d68368","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1310,"y":260,"wires":[]}]
If it's ok I can send PR, it doesn't break previous behaviour (if msg.parts.id wasn't included).