Memory issue processing large csv file

Hi,
From what I have googled looks like garbage collection issue.

I have multiple csv processing flows, one process 780k rows.

It get through around 375959 and then crashes node red

node-red_1  | 19 Oct 11:56:48 - [warn] [function:Insert Year, Make, Model & Engine] 375955
node-red_1  | 19 Oct 11:56:48 - [warn] [function:Insert Year, Make, Model & Engine] 375956
node-red_1  | 19 Oct 11:56:48 - [warn] [function:Insert Year, Make, Model & Engine] 375957
node-red_1  | 19 Oct 11:56:48 - [warn] [function:Insert Year, Make, Model & Engine] 375958
node-red_1  | 19 Oct 11:56:48 - [warn] [function:Insert Year, Make, Model & Engine] 375959

 <--- Last few GCs --->
node-red_1  | 
node-red_1  | [18:0x56001d48df80]   197976 ms: Scavenge 1972.5 (2065.4) -> 1960.3 (2065.4) MB, 4.7 / 0.0 ms  (average mu = 0.232, current mu = 0.040) allocation failure 
node-red_1  | [18:0x56001d48df80]   198070 ms: Scavenge 1973.2 (2065.4) -> 1964.7 (2067.4) MB, 12.9 / 0.0 ms  (average mu = 0.232, current mu = 0.040) allocation failure 
node-red_1  | [18:0x56001d48df80]   198981 ms: Mark-sweep 1977.3 (2067.4) -> 1963.0 (2072.9) MB, 857.0 / 0.0 ms  (average mu = 0.219, current mu = 0.203) allocation failure scavenge might not succeed
node-red_1  | 
node-red_1  | 
node-red_1  | <--- JS stacktrace --->
node-red_1  | 
node-red_1  | FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
node-red_1 exited with code 0

I have a 1 second delay before processing each row in mysql.

[{"id":"82e615bdb2a3f6be","type":"file in","z":"ed8280c9e0778d93","name":"Read In CSV","filename":"","format":"lines","chunk":false,"sendError":false,"encoding":"utf8","allProps":false,"x":770,"y":2880,"wires":[["721f97a3bbc664e9"]]},{"id":"38793faeb2d51c16","type":"watch","z":"ed8280c9e0778d93","name":"Watch Hotfolder","files":"/data/holley/vehicle/csvimport/","recursive":false,"x":160,"y":2880,"wires":[["6740f03cf0e1048e"]]},{"id":"6740f03cf0e1048e","type":"switch","z":"ed8280c9e0778d93","name":"Test if File","property":"type","propertyType":"msg","rules":[{"t":"eq","v":"file","vt":"str"},{"t":"empty"}],"checkall":"true","repair":false,"outputs":2,"x":320,"y":2880,"wires":[["75efbbe7ef06d85a"],[]]},{"id":"5f31b87a42f464bb","type":"delay","z":"ed8280c9e0778d93","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":620,"y":2880,"wires":[["82e615bdb2a3f6be"]]},{"id":"682bc75ce730c080","type":"function","z":"ed8280c9e0778d93","name":"Insert Year, Make, Model & Engine","func":"const mysql = global.get(\"mysql\");\nconst data = msg.payload;\n\nlet haveUpdate = false;\n\n//set msg.topic to empty string, used later to see if we need to\n//call MySql\nmsg.topic ='';\n\nlet insert = '';\n\nlet rowcount = flow.get('rowcount');\nrowcount++;\nflow.set('rowcount',rowcount);\n\nnode.warn(rowcount);\n\n//insert part number\nlet preinsert = \"START TRANSACTION;\";\npreinsert += \"INSERT INTO vehicle_partnumbers (partnumber) VALUES (\" + mysql.escape(data.partnumber) + \") ON DUPLICATE KEY UPDATE partnumber_id=LAST_INSERT_ID(partnumber_id);\";\npreinsert += \"SET @last_id_in_vehicle_partnumber = LAST_INSERT_ID();\";\n\n\nif ( data.hasOwnProperty('year' ) && data.year != null ){ \n    haveUpdate = true;\n    //insert year\n    insert += \"INSERT INTO vehicle_years (year) VALUES (\" + mysql.escape(data.year) + \") ON DUPLICATE KEY UPDATE year_id=LAST_INSERT_ID(year_id);\";\n    insert += \"SET @last_id_in_vehicle_year = LAST_INSERT_ID();\";\n    //insert relationship\n    insert += \"INSERT IGNORE INTO partnumber_vehicle_years_link (partnumber_id,year_id) VALUES(@last_id_in_vehicle_partnumber,@last_id_in_vehicle_year);\";\n}\n\nif ( data.hasOwnProperty('make' ) && data.make != null ){ \n    haveUpdate = true;\n    //insert make\n    insert += \"INSERT INTO vehicle_make (make) VALUES (\" + mysql.escape(data.make) + \") ON DUPLICATE KEY UPDATE make_id=LAST_INSERT_ID(make_id);\";\n    insert += \"SET @last_id_in_vehicle_make = LAST_INSERT_ID();\";\n    //insert relationship\n    insert += \"INSERT IGNORE INTO partnumber_vehicle_make_link (partnumber_id,make_id) VALUES(@last_id_in_vehicle_partnumber,@last_id_in_vehicle_make);\";\n}\n\nif ( data.hasOwnProperty('model' ) && data.model != null ){ \n    haveUpdate = true;\n    //insert Model\n    insert += \"INSERT INTO vehicle_model (model) VALUES (\" + mysql.escape(data.model) + \") ON DUPLICATE KEY UPDATE model_id=LAST_INSERT_ID(model_id);\";\n    insert += \"SET @last_id_in_vehicle_model = LAST_INSERT_ID();\";\n    //insert relationship\n    insert += \"INSERT IGNORE INTO partnumber_vehicle_model_link (partnumber_id,model_id) VALUES(@last_id_in_vehicle_partnumber,@last_id_in_vehicle_model);\";\n}\n\nif ( data.hasOwnProperty('engine' ) && data.engine != null ){ \n    haveUpdate = true;\n    //insert Engine\n    insert += \"INSERT INTO vehicle_engine (engine) VALUES (\" + mysql.escape(data.engine) + \") ON DUPLICATE KEY UPDATE engine_id=LAST_INSERT_ID(engine_id);\";\n    insert += \"SET @last_id_in_vehicle_engine = LAST_INSERT_ID();\";\n    //insert relationship\n    insert += \"INSERT IGNORE INTO partnumber_vehicle_engine_link (partnumber_id,engine_id) VALUES(@last_id_in_vehicle_partnumber,@last_id_in_vehicle_engine);\";\n}\nif ( haveUpdate ) {\n   //we must have an update so update\n   msg.topic = preinsert + insert + \"COMMIT;\";\n}\nmsg.payload = '';\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":260,"y":3040,"wires":[["d63b437a2684107b"]]},{"id":"23e618ea2a1f10d1","type":"mysql","z":"ed8280c9e0778d93","mydb":"19c571b552634c36","name":"Bulk Insert","x":950,"y":3100,"wires":[[]]},{"id":"a2a69003875aa87c","type":"comment","z":"ed8280c9e0778d93","name":"AUTO_INCREMENT READ ME","info":"We're performing bulk table inserts each call for efficiency. This will cause the AUTO_INCREMENT of each table to be out of squance. This is normal and done for perfomance.\n\n\nThis is not unusual and there are a couple of causes. Sometimes it is due to optimisations the query runner makes to reduce contention issues with the counter resource, improving efficiency when there are concurrent updates to the affected table. Sometimes it is due to transactions that got explicitly rolled back (or implicitly rolled back due to encountering an error).\n\nThe only guarantees from an auto_increment column (or IDENTITY in MSSQL, and the other names the concept goes by) is that each value will be unique and never smaller than a previous one: so you can rely on the values for ordering but you can not rely on them not to have gaps.\n\nIf you need the column's values to have no gaps at all you will need to manage the values yourself, either in another layer of business logic or in the DB via a trigger (be careful of potential performance issues with triggers though), of course if you do roll your own you will have to contend with all the concurrency/rollback/cleanup-after-delete/other issues that the DB engines work around by allowing gaps).","x":1190,"y":3100,"wires":[]},{"id":"721f97a3bbc664e9","type":"function","z":"ed8280c9e0778d93","name":"prepend columns","func":"if (msg.parts.index === 0) {\n    //set columns\n    flow.set('columns', msg.payload);\n    //set complete to empty string\n    flow.set('fileReadComplete', '');\n    flow.set('fileMoved', false);\n    \n    //get hotfolder dir parent\n    let dir = msg.topic.split(\"/\").slice(0,-1).join(\"/\") + \"/\";\n\n    //set parent directory for later\n    flow.set('hotfolderLocation', dir);\n    \n    //set delete flag\n    flow.set('deletedCSVFile', false);\n    \n    //set delete flag\n    flow.set('rowcount', 0);\n    \n    return;\n}\n\nmsg.payload = flow.get('columns') + '\\r\\n' + msg.payload;\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":950,"y":2880,"wires":[["c1a8a897a18e2b4a"]]},{"id":"c1a8a897a18e2b4a","type":"function","z":"ed8280c9e0778d93","name":"set msg.complete to last item","func":"let deletedCSVFile = flow.get('deletedCSVFile');\nif (msg.parts.index + 1 === msg.parts.count &&\n        !deletedCSVFile ) {\n    flow.set('fileReadComplete', 'complete');\n}\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1190,"y":2880,"wires":[["80b0f58ee1c76bc7"]]},{"id":"80b0f58ee1c76bc7","type":"change","z":"ed8280c9e0778d93","name":"parts -> temp","rules":[{"t":"set","p":"temp","pt":"msg","to":"parts","tot":"msg"},{"t":"delete","p":"parts","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":1420,"y":2880,"wires":[["171226d664772d00"]]},{"id":"59979b0410fa8822","type":"comment","z":"ed8280c9e0778d93","name":"save and restore msg.parts because CSV node uses it","info":"","x":1580,"y":2840,"wires":[]},{"id":"6970b1e9169edc04","type":"change","z":"ed8280c9e0778d93","name":"temp -> parts","rules":[{"t":"set","p":"parts","pt":"msg","to":"temp","tot":"msg"},{"t":"delete","p":"temp","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":1780,"y":2880,"wires":[["92147694a038c15a"]]},{"id":"2657a608e781690c","type":"switch","z":"ed8280c9e0778d93","name":"check if done with file","property":"ReadComplete","propertyType":"msg","rules":[{"t":"eq","v":"complete","vt":"str"}],"checkall":"true","repair":false,"outputs":1,"x":1240,"y":3040,"wires":[["9f32f8aa07e28a39"]]},{"id":"04852593dd91bcf0","type":"switch","z":"ed8280c9e0778d93","name":"Check if we have sql to insert","property":"topic","propertyType":"msg","rules":[{"t":"empty"},{"t":"nempty"}],"checkall":"true","repair":false,"outputs":2,"x":710,"y":3040,"wires":[["098db3e4e9247d2c"],["098db3e4e9247d2c","23e618ea2a1f10d1"]]},{"id":"171226d664772d00","type":"csv","z":"ed8280c9e0778d93","name":"Vehicle CSV","sep":",","hdrin":true,"hdrout":"","multi":"one","ret":"\\r","temp":"","skip":"0","strings":false,"include_empty_strings":true,"include_null_values":true,"x":1590,"y":2880,"wires":[["6970b1e9169edc04"]]},{"id":"d63b437a2684107b","type":"delay","z":"ed8280c9e0778d93","name":"","pauseType":"delay","timeout":"1","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"allowrate":false,"x":480,"y":3040,"wires":[["04852593dd91bcf0"]]},{"id":"098db3e4e9247d2c","type":"function","z":"ed8280c9e0778d93","name":"Check if File Read Complete","func":"//read in complete\nmsg.ReadComplete = flow.get('fileReadComplete');\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1000,"y":3040,"wires":[["2657a608e781690c"]]},{"id":"19ce6e3e2a4a41d8","type":"exec","z":"ed8280c9e0778d93","command":"","addpay":"cmd","append":"","useSpawn":"false","timer":"5","winHide":false,"oldrc":false,"name":"Move file to Complete folder","x":1740,"y":3040,"wires":[[],[],["7f8732530d6c05fe"]]},{"id":"9f32f8aa07e28a39","type":"function","z":"ed8280c9e0778d93","name":"Assemble Move Command","func":"/**\n * clear topic otherwise it will send\n * sql from privous loop\n * */\nmsg.topic = '';\n\n/**\n * Please note we have to negated test to check if file exists\n * otherwsie it will always return 1 (error) if file not found\n * meaning it was moved on privious loop\n * This way it will always return 0 success if if the file is\n * not there\n * \n * */\n//test if we have file and move to complete\nmsg.cmd = \"[ ! -f \\\"\" + msg.filename + \"\\\" ] || mv \\\"\" + msg.filename + \"\\\"  \\\"\" + flow.get('hotfolderLocation') + \"completed/\\\"\";\n\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1480,"y":3040,"wires":[["19ce6e3e2a4a41d8"]]},{"id":"6b4b4c7ef1b2fb63","type":"function","z":"ed8280c9e0778d93","name":"Set Deleted Flag","func":"flow.set('deletedCSVFile', true);\nflow.set('fileReadComplete', '');\nmsg.ReadComplete = '';\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":2090,"y":3040,"wires":[[]]},{"id":"7f8732530d6c05fe","type":"switch","z":"ed8280c9e0778d93","name":"","property":"payload.code","propertyType":"msg","rules":[{"t":"eq","v":"0","vt":"str"}],"checkall":"true","repair":false,"outputs":1,"x":1930,"y":3040,"wires":[["6b4b4c7ef1b2fb63"]]},{"id":"75efbbe7ef06d85a","type":"switch","z":"ed8280c9e0778d93","name":"Check if csv","property":"file","propertyType":"msg","rules":[{"t":"regex","v":"\\.csv$","vt":"str","case":true},{"t":"regex","v":"^\\.","vt":"str","case":false},{"t":"else"}],"checkall":"true","repair":false,"outputs":3,"x":470,"y":2880,"wires":[["5f31b87a42f464bb"],[],["d72a6dcef3dde5b2"]]},{"id":"d72a6dcef3dde5b2","type":"delay","z":"ed8280c9e0778d93","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":640,"y":2940,"wires":[["ec6bc3c1ecc08ca5"]]},{"id":"ec6bc3c1ecc08ca5","type":"exec","z":"ed8280c9e0778d93","command":"rm -f","addpay":"filename","append":"","useSpawn":"false","timer":"3","winHide":false,"oldrc":false,"name":"Not a csv so Delete File","x":830,"y":2940,"wires":[[],[],[]]},{"id":"57f59d16501a2401","type":"comment","z":"ed8280c9e0778d93","name":"Not a csv just delete and ignore","info":"","x":750,"y":2980,"wires":[]},{"id":"92147694a038c15a","type":"switch","z":"ed8280c9e0778d93","name":"Check we have partnumber","property":"payload.partnumber","propertyType":"msg","rules":[{"t":"nempty"}],"checkall":"true","repair":false,"outputs":1,"x":2000,"y":2880,"wires":[["682bc75ce730c080"]]},{"id":"19c571b552634c36","type":"MySQLdatabase","name":"","host":"db","port":"3306","db":"vehicle_applications","tz":"","charset":"UTF8"}]

thanks in advance
Harry

Tbh I dont think this is the right approach for such a large dataset. In mysql you can load csv files directly and it is extremely fast (the LOCAL keyword is essential for getting that performance), it basically a blind import.

1 Like

b578f4df2af017c8cc41a547bc8176fb

Well I ended up splitting the files into 200,000 lines (40 files).

awk -v l=200000 '(NR==1){header=$0;next}
                (NR%l==2) {
                   close(file); 
                   file=sprintf("%s.%0.5d.csv",FILENAME,++c)
                   sub(/csv[.]/,"",file)
                   print header > file
                }
                {print > file}' file.csv

Took 4min 20 seconds to process each csv and create each multi-table transactional sql, then another 4 minutes on top of that to finish the processing in MySQL

Simply had a flow that injected a new file every 12 minutes, let it run overnight.

Now we have the 8 million rows in the database, each month I will run a diff on the new file against the previous file. My Testing looks around 210,000 row difference between last month and this, so I will only need to run the diff csv each month.

diff old.csv new.csv | sed -n '/^[><]/s/[><] \(.*\)$/\1/p' > diff.csv

Left here incase it helps anyone else.
Harry

1 Like

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.