Below is the flow that leads to this issue.
Basically, I have a global variable that tracks all "task_submit" data. There can be multiple unrelated task_submit processes running concurrently. In addition, each task_submit process is broken down into multiple subtasks (documents) that are processed. The example I am working with has 656 documents to process.
Since (as I understand the limits) I can only update the whole global context object and not smaller pieces, I am worried about a race condition when updates are added deleted because of multiple fast updates to the global variable.
Therefore, in this case I am rate limiting the flow into the function node to 100 msgs / sec (although it is lower for testing) and the node itself waits until 5 ms have passed since the last context update before proceeding. (Although slowed for testing)
Colin:
Currently I have queues set up hold all msgs unless manually triggered. During the process the queue is holding back about 650 msgs and the delay has none. I can't drop any msgs as they contain the data for the next step however the msgs do not multiply. This example is an edge case and I would expect the system to be able to handle it easily. The delay is to ensure I don't try to write to the same global context variable at the same time creating a race condition since the documents are processed asynchronously and different parts of the global context variable may be updated by different node on a semi ranndom order.
Previously I enlarged the heap memory with --max-old-space-size=15192 as the document processing can take a bit of memory however the process was no where near this limit and the system has more than enough memory to handle it (this has been tested)
Patial answer?
I was having a similar problem in the 57a68ec1.2dc138 - "Start PDF Processing" node which pulls down the starting data based on the state of the task. I was getting the same error when I tried to send a msg with the 656 objects in the payload. I ran across a reference saying that node.red automatically generates that error when a functions sends out a payload of greater than 600 object for performance reasons. I moved the msg.payload to msg.my_payload and then used a change node to put it back before the split. That worked.
I tried a similar thing here but the same error then occurred on the change node.
The full task submit looks like the following but with 656 objects under the payload key
{"84cac7c1-260d-45f0-8795-e7b0fa05a555":{"timestamp":1546618112463,"payload":{"0":{"source_folder":"l:\\projects\\023-069\\source","source_file":"draft cover letter rev 3.docx","temp_folder":"\\\\aquila-dc1\\datashare\\Program Management\\source-docs\\temp\\844570621","temp_file":"aqs0-draft cover letter rev 3.docx","extension":".docx","pages":"6","processing":{"load_file":"","decrypt_file":"","decode_file":"","bookmark":"","links":"","color":"","write_file":"","repair_file":"","curr_status":0,"count":0,"file_id":0}}},"my_aqs":{"msg_id":"8e85791b-22a6-41db-849e-acdb8430e389","inprocess":{"program":"clarizen","token":"task_submit"}},"aqs":{"project":"023-069","user":"jboutwell","temp_folder":"\\\\aquila-dc1\\datashare\\Program Management\\source-docs\\temp\\844570621","inprocess":{"C_DocumentRemediation":"/Milestone/6thfws0iebqu6r8gg8gkls67580","C_ReviewDocCom":"/Task/6thfws0iebqu6r8gg8gkls675113","C_user_id":"/User/5q3sogy285cbd7ksfz2563o2e3710","C_user_name":"Josh Boutwell","C_template":"ectd"},"task_submit_guid":"84cac7c1-260d-45f0-8795-e7b0fa05a555"},"count_complete":1,"pdf_started":1},"timestamp":1546618113475}
Below is the flow I am testing
[{"id":"3dcf3c3c.f656dc","type":"tab","label":"Flow 2","disabled":false,"info":""},{"id":"57a68ec1.2dc138","type":"function","z":"3dcf3c3c.f656dc","name":"Start PDF Processing","func":"//Create empty Variables\n\nvar nmsg = {}\nvar temp_payload = {}\nvar temp1 = {}\n\n//Create timestamp\n var my_time = Date.now()\n\n//Get Global variable task_submit\nvar validation_messages = global.get(\"task_submit\");\n\n//Check if it exists, if it doesn't then return nothing\n if( validation_messages == null) {\nreturn\n }\n \n \n//Check to see if the global variable has been accessed recently If time stamp is atleast 1 second then procede \n\n if(my_time-validation_messages.timestamp<1000){\n\n return [msg,null]\n } else {\n \n \n//Place hold on Global Variable Task_submit\nvalidation_messages.timestamp = my_time\nglobal.set(\"task_submit\", validation_messages);\n\n\n\n//Iterate through all objects in global task_submit \nfor (var key in validation_messages) {\n // skip loop if the property is from prototype\n if (!validation_messages.hasOwnProperty(key)) continue;\n//Check if task has been processed\nif (validation_messages[key].count_complete == 1 && validation_messages[key].pdf_started != 2 && validation_messages[key].pdf_started != 1){\n //create payload object\n temp_payload = {}\n temp1 = validation_messages[key].payload\n //check to see if the current document was read correctly\n for (var key1 in temp1) {\n if(\"processing\" in temp1)\n //add file to process list\n temp_payload[key1] = temp1[key1]\n }\n\n//create and send new msg\n nmsg = {\"my_payload\": temp1, \"my_aqs\": validation_messages[key].my_aqs, \"aqs\": validation_messages[key].aqs}\n validation_messages[key].timestamp = my_time\n validation_messages[key].pdf_started = 1\n\n\nvalidation_messages.timestamp = my_time\nglobal.set(\"task_submit\", validation_messages);\nconsole.log(\"sending task_submit\")\n return [msg,nmsg]\n\n\n\n\n\n\n}\n\n}\n}","outputs":2,"noerr":0,"x":500,"y":540,"wires":[["e3828f1f.4ea0d"],["77056da4.29689c"]]},{"id":"d9d1e15c.4fa7","type":"split","z":"3dcf3c3c.f656dc","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":870,"y":540,"wires":[["ed66d5ed.c4f6e8"]]},{"id":"e3828f1f.4ea0d","type":"delay","z":"3dcf3c3c.f656dc","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":500,"y":480,"wires":[["57a68ec1.2dc138"]]},{"id":"32940967.9b671e","type":"function","z":"3dcf3c3c.f656dc","name":"Update task_submit file count","func":"\n var my_time = Date.now()\n var task_submit = global.get(\"task_submit\");\n console.log(\"Got here.\")\n if( task_submit == null) {\n console.log(\"bad task_submit.\")\n //Report Error\n //task_submit = {}\n }\n \n \n\n if(my_time - task_submit.timestamp < 100){\n console.log(\"delayed\")\n return [msg,null]\n } else {\n task_submit.timestamp = my_time \n\ntask_submit[msg.aqs.task_submit_guid].payload[msg.payload.processing.file_id].processing.count = task_submit[msg.aqs.task_submit_guid].payload[msg.payload.processing.file_id].processing.count+1\ntask_submit[msg.aqs.task_submit_guid].payload[msg.payload.processing.file_id].processing.timestamp = my_time\n\ntask_submit[msg.aqs.task_submit_guid].timestamp = my_time\nglobal.set(\"task_submit\", task_submit);\nconsole.log(\"updateing # \" + msg.payload.processing.file_id)\nconsole.log(msg)\n\nreturn [null, msg]\n\n }\n\n \n","outputs":2,"noerr":0,"x":1830,"y":300,"wires":[["beabbd66.5654d8","71360260.faf33c"],["3270c7a.fb21d38"]]},{"id":"beabbd66.5654d8","type":"delay","z":"3dcf3c3c.f656dc","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":1870,"y":140,"wires":[["d2270b38.bf76d","c97358c6.60f6e"]]},{"id":"56c7b584.c1567c","type":"inject","z":"3dcf3c3c.f656dc","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":140,"y":540,"wires":[["44386426.3022cc"]]},{"id":"44386426.3022cc","type":"function","z":"3dcf3c3c.f656dc","name":"","func":"//Reset(delete) task_submit .pdf_started key for testing only\n\n\n\n//Create empty Variables\n\nvar nmsg = {}\nvar temp_payload = {}\nvar temp1 = {}\n\n//Create timestamp\n var my_time = Date.now()\n\n//Get Global variable task_submit\nvar validation_messages = global.get(\"task_submit\");\n\n//Check if it exists, if it doesn't then return nothing\n if( validation_messages == null) {\nreturn\n }\n \n \n//Check to see if the global variable has been accessed recently If time stamp is atleast 1 second then procede \n\n if(my_time-validation_messages.timestamp<1000){\n\n return [msg,null]\n } else {\n \n \n//Place hold on Global Variable Task_submit\nvalidation_messages.timestamp = my_time\nglobal.set(\"task_submit\", validation_messages);\n\n\n\n//Iterate through all objects in global task_submit \nfor (var key in validation_messages) {\n // skip loop if the property is from prototype\n if (!validation_messages.hasOwnProperty(key)) continue;\n//Check if task has been processed\nif (validation_messages[key].count_complete == 1 && validation_messages[key].pdf_started == 1){\n //create payload object\ndelete validation_messages[key].pdf_started\n}}}\nglobal.set(\"task_submit\", validation_messages);\nreturn msg;","outputs":1,"noerr":0,"x":290,"y":540,"wires":[["57a68ec1.2dc138"]]},{"id":"77056da4.29689c","type":"change","z":"3dcf3c3c.f656dc","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"my_payload","tot":"msg"},{"t":"delete","p":"my_payload","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":700,"y":540,"wires":[["d9d1e15c.4fa7"]]},{"id":"ed66d5ed.c4f6e8","type":"delay","z":"3dcf3c3c.f656dc","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"milliseconds","rate":"10","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":1080,"y":460,"wires":[["ca61542c.0c4b2"]]},{"id":"ca61542c.0c4b2","type":"simple-queue","z":"3dcf3c3c.f656dc","name":"queue1","firstMessageBypass":false,"bypassInterval":"0","x":1500,"y":460,"wires":[["32940967.9b671e","5f0331a6.2cae08"]]},{"id":"d75f52d9.0206","type":"inject","z":"3dcf3c3c.f656dc","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":160,"y":360,"wires":[["32a48a69.096b3e"]]},{"id":"32a48a69.096b3e","type":"change","z":"3dcf3c3c.f656dc","name":"","rules":[{"t":"set","p":"trigger","pt":"msg","to":"1","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1300,"y":360,"wires":[["ca61542c.0c4b2"]]},{"id":"d2270b38.bf76d","type":"simple-queue","z":"3dcf3c3c.f656dc","name":"queue1","firstMessageBypass":false,"bypassInterval":"0","x":1260,"y":300,"wires":[["32940967.9b671e"]]},{"id":"4e4c94e9.463c5c","type":"inject","z":"3dcf3c3c.f656dc","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":160,"y":240,"wires":[["9a68ee80.d6187"]]},{"id":"9a68ee80.d6187","type":"change","z":"3dcf3c3c.f656dc","name":"","rules":[{"t":"set","p":"trigger","pt":"msg","to":"1","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1060,"y":300,"wires":[["d2270b38.bf76d"]]},{"id":"c97358c6.60f6e","type":"debug","z":"3dcf3c3c.f656dc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":2310,"y":260,"wires":[]},{"id":"71360260.faf33c","type":"debug","z":"3dcf3c3c.f656dc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":2090,"y":260,"wires":[]},{"id":"3270c7a.fb21d38","type":"debug","z":"3dcf3c3c.f656dc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":2090,"y":300,"wires":[]},{"id":"5f0331a6.2cae08","type":"debug","z":"3dcf3c3c.f656dc","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":1690,"y":460,"wires":[]},{"id":"9a82bacc.25e408","type":"comment","z":"3dcf3c3c.f656dc","name":"Start Process","info":"","x":130,"y":480,"wires":[]},{"id":"46cad907.2141f","type":"comment","z":"3dcf3c3c.f656dc","name":"Send 1 msg through to problem node","info":"","x":230,"y":320,"wires":[]},{"id":"f5bdc7e2.b6c198","type":"comment","z":"3dcf3c3c.f656dc","name":"Allow a msg trhough the delay rate limit","info":"","x":240,"y":180,"wires":[]}]