I think there could be two problems: your input rate might be to high and processing the array eventually takes to long. I've tested to process a flow with 100000 items each 10 ms. My machine then takes 300 ms to process one array. Dividing the array into chunks of 100 isn't blocking parallel branches. But the input can only be read each 300 ms not to get an overflow:
[
{
"id": "5bf81ce259910494",
"type": "inject",
"z": "839d658c3816e9e7",
"name": "Timestamp start each 10 ms",
"props": [
{
"p": "topic",
"vt": "str"
},
{
"p": "timeStart",
"v": "",
"vt": "date"
},
{
"p": "timeEnd",
"v": "",
"vt": "date"
}
],
"repeat": "0.01",
"crontab": "",
"once": true,
"onceDelay": 0.1,
"topic": "measure duration",
"x": 190,
"y": 1740,
"wires": [
[
"6e9696f91d3fc4c1"
]
]
},
{
"id": "6e9696f91d3fc4c1",
"type": "function",
"z": "839d658c3816e9e7",
"name": "timestamp 1.1 (array of 100000 items)",
"func": "msg.topic = \"value 1.1\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration11 = msg.timeEnd - msg.timeStart;\nmsg.payload = new Array(100000).fill(\"starting value\");\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 510,
"y": 1740,
"wires": [
[
"85333a3f7fecd05f"
]
]
},
{
"id": "17d4f4617b63fc68",
"type": "debug",
"z": "839d658c3816e9e7",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 1810,
"y": 1920,
"wires": []
},
{
"id": "4bb669bcfe173505",
"type": "function",
"z": "839d658c3816e9e7",
"name": "process the items + timestamp 1.2 (split to chunks of 100)",
"func": "const arrPart = []\nlet startIdx;\nlet endIdx;\n\nif(msg.trigger === undefined){\n context.set('array', msg.payload);\n msg.parts = {};\n msg.parts.type = \"array\";\n msg.parts.count = Math.ceil(msg.payload.length/100);\n msg.parts.len = 100;\n msg.parts.index = 0;\n msg.parts.id = msg._msgid;\n msg.arraylength = msg.payload.length;\n startIdx = msg.parts.index;\n}else{\n startIdx = msg.parts.index * 100\n}\n\n//divide into chunks of 100\nendIdx = startIdx + 100;\nif(msg.arraylength < endIdx){\n endIdx = msg.arraylength;\n msg.parts.len = endIdx - startIdx;\n}\n\nfor(let i=startIdx; i<endIdx; i++){\n //************************************************************\n //WORK TO BE DONE FOR EVERY ITEM!!!!!!\n if(msg.parts.index==0){\n msg.duration12Start = new Date().valueOf() - msg.timeStart;\n }else{\n msg.duration12End = new Date().valueOf() - msg.timeStart; \n }\n arrPart.push(context.get(\"array\")[i] + \" was changed\");\n //************************************************************\n}\n\nmsg.payload = arrPart;\nmsg._msgid = RED.util.generateId();\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1290,
"y": 1780,
"wires": [
[
"375d5a507181218e"
]
]
},
{
"id": "375d5a507181218e",
"type": "function",
"z": "839d658c3816e9e7",
"name": "process control (while do)",
"func": "if(msg.parts.index < msg.parts.count-1){\n let triggerMsg = RED.util.cloneMessage(msg);\n triggerMsg.trigger = true;\n triggerMsg.parts.index++;\n return[msg, triggerMsg];\n}\nreturn [msg, null];\n\n",
"outputs": 2,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1230,
"y": 1700,
"wires": [
[
"abdc75fee10c8291"
],
[
"4bb669bcfe173505"
]
]
},
{
"id": "abdc75fee10c8291",
"type": "join",
"z": "839d658c3816e9e7",
"name": "",
"mode": "auto",
"build": "object",
"property": "payload",
"propertyType": "msg",
"key": "topic",
"joiner": "\\n",
"joinerType": "str",
"accumulate": "false",
"timeout": "",
"count": "",
"reduceRight": false,
"x": 1490,
"y": 1700,
"wires": [
[
"17d4f4617b63fc68",
"79db583c55da54b7"
]
]
},
{
"id": "e5666ac6e3592b45",
"type": "function",
"z": "839d658c3816e9e7",
"name": "timestamp 2.1",
"func": "msg.topic = \"measure 2.1\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration21 = msg.timeEnd - msg.timeStart;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 900,
"y": 1920,
"wires": [
[
"6d7aeefef80e5b92"
]
]
},
{
"id": "6d7aeefef80e5b92",
"type": "function",
"z": "839d658c3816e9e7",
"name": "timestamp 2.2",
"func": "msg.topic = \"measure 2.2\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration22 = msg.timeEnd - msg.timeStart;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1140,
"y": 1920,
"wires": [
[
"6031269dfd3065d3"
]
]
},
{
"id": "6031269dfd3065d3",
"type": "function",
"z": "839d658c3816e9e7",
"name": "timestamp 2.3",
"func": "msg.topic = \"measure 2.3\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration23 = msg.timeEnd - msg.timeStart;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1380,
"y": 1920,
"wires": [
[
"17d4f4617b63fc68"
]
]
},
{
"id": "7482684a98383440",
"type": "comment",
"z": "839d658c3816e9e7",
"name": "Process the first 100 array items. Then split them of and direct them to the Join Node. Input reverse to the Function Node to process the next 100 items. Split the items of...etc. ==> No blocking of parallel branches (or the node.js event loop) + faster handling of huge arrays",
"info": "blockiert parallele Nodes stark",
"x": 930,
"y": 1560,
"wires": []
},
{
"id": "91fb4e0bc0b15941",
"type": "simple-queue",
"z": "839d658c3816e9e7",
"name": "",
"firstMessageBypass": true,
"bypassInterval": "0",
"x": 950,
"y": 1780,
"wires": [
[
"4bb669bcfe173505"
]
]
},
{
"id": "79db583c55da54b7",
"type": "function",
"z": "839d658c3816e9e7",
"name": "trigger next array",
"func": "\nreturn {trigger:true};",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 970,
"y": 1600,
"wires": [
[
"91fb4e0bc0b15941"
]
]
},
{
"id": "85333a3f7fecd05f",
"type": "change",
"z": "839d658c3816e9e7",
"name": "set ttl",
"rules": [
{
"t": "set",
"p": "ttl",
"pt": "msg",
"to": "10",
"tot": "num"
}
],
"action": "",
"property": "",
"from": "",
"to": "",
"reg": false,
"x": 730,
"y": 1780,
"wires": [
[
"91fb4e0bc0b15941"
]
]
},
{
"id": "2be3e438fb8f1ec0",
"type": "inject",
"z": "839d658c3816e9e7",
"name": "Timestamp start each 1000 ms",
"props": [
{
"p": "topic",
"vt": "str"
},
{
"p": "timeStart",
"v": "",
"vt": "date"
},
{
"p": "timeEnd",
"v": "",
"vt": "date"
}
],
"repeat": "1",
"crontab": "",
"once": true,
"onceDelay": 0.1,
"topic": "Laufzeit messen",
"x": 210,
"y": 1920,
"wires": [
[
"e5666ac6e3592b45"
]
]
},
{
"id": "a130d79ac0568a80",
"type": "comment",
"z": "839d658c3816e9e7",
"name": "Throttle the input rate!",
"info": "",
"x": 840,
"y": 1740,
"wires": []
}
]