Detecting when parts are complete in asynchronous flow

Hi,
This could get a long read, so thanks in advanced. I will post quick overview and then a detailed part.

Overview:
We have a flow that runs once a week, it downloads a csv (not shown), then cleans up the csv (not shown) and drops the cleaned csv in a watched folder (this part included)

There are 40k plus rows in the csv that get processed and injected into a database. This all works as expected and happy with performance and process.

The Issue:
I need to send an email on completion as just a sanity check that it is processing as expected. One of those, if I don't receive an email on Friday morning then go look.

I'm getting a lot of email, it would send more but the email server errors with too many connections. I assuming it's trying to send 40K plus emails for each insert.

Screen Shot of the Flow, ignore warnings, I took out some things before posting the flow.

Details:
Couple of things I have tried.
Set a flow context variable to send email, once sent set back to false, idea to stop any more emails going out.
Set a flow context variable for parts complete and only process once complete.

Assumption is that under an asynchronous flow with MySQl, the MySQL injection node is waiting to finish and the next row is being processed. The csv part of the flow finishes and marks the flow complete, but the MySQL node is still catching up. Once the MySQL node comes back, the csv part is complete and it's spamming email.

First part of the flow is simple, watch hot-folder, check it's a csv file and read in if it is.

Prepend columns node:
This is where all the prep is done, I want to convert each row to an object so grabbed the first row and store the columns, then each subsequent row I can prepend that to the data and I end up with an object with column name = value. I also set some flow data as well if first time through.

Next node I'm checking if this is the last part and marking the flow complete in my flow context, I think this is a big asynchronous issue.

Then I store the parts as the csv node will overwrite them and I don't need that one. After parsing the CSV I put the parts back.

Then we check with the main database value which everything keys off.
We prep the SQL and then Inject. This loops until all parts are done.

After the MySQL Node:
This is the bit that is broken.
Expected Behavior: Check if the process is complete. If it is, send one email and delete the incoming csv
I check if we have sent an email already, if not set that to false indicating we have sent email, send email. Theory being that next loop wouldn't send one because it now marked false.

It loops and tries to end an email for each one, but office errors (thankfully) with too many connections.

How do I detect the end of processing and process one email and one file delete.

Thanks

I removed some email branding to make smaller.

[{"id":"22620fd59064e156","type":"watch","z":"ed8280c9e0778d93","name":"Watch Hotfolder","files":"/mnt/common/hotfolder/information/csvimport","recursive":"","x":2080,"y":1380,"wires":[["3736eb3bcf48b7e0"]]},{"id":"3736eb3bcf48b7e0","type":"switch","z":"ed8280c9e0778d93","name":"Test if File","property":"type","propertyType":"msg","rules":[{"t":"eq","v":"file","vt":"str"},{"t":"empty"}],"checkall":"true","repair":false,"outputs":2,"x":2260,"y":1380,"wires":[["694295c8bef6dba3"],[]]},{"id":"c710fc1816aaabcb","type":"delay","z":"ed8280c9e0778d93","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":2560,"y":1380,"wires":[["febc0ff57a66a0f7"]]},{"id":"53880e39a42d9823","type":"file in","z":"ed8280c9e0778d93","name":"Read In CSV","filename":"","format":"lines","chunk":false,"sendError":false,"encoding":"utf8","allProps":false,"x":2890,"y":1380,"wires":[["fba3098c428df58c"]]},{"id":"fba3098c428df58c","type":"function","z":"ed8280c9e0778d93","name":"prepend columns","func":"if (msg.parts.index === 0) {\n    //set columns\n    flow.set('columns', msg.payload);\n    //set complete to empty string\n    flow.set('fileReadComplete', '');\n    flow.set('fileMoved', false);\n    \n    //get hotfolder dir parent\n    let dir = msg.topic.split(\"/\").slice(0,-1).join(\"/\") + \"/\";\n\n    //set parent directory for later\n    flow.set('hotfolderLocation', dir);\n    \n    //set delete flag\n    flow.set('deletedCSVFile', false);\n    \n    //set rowcount\n    flow.set('rowcount', 0);\n    \n    return;\n}\n\nmsg.payload = flow.get('columns') + '\\r\\n' + msg.payload;\n\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":3070,"y":1380,"wires":[["1be35e860427e3a1"]]},{"id":"1be35e860427e3a1","type":"function","z":"ed8280c9e0778d93","name":"set msg.complete to last item","func":"let deletedCSVFile = flow.get('deletedCSVFile');\nif (msg.parts.index + 1 === msg.parts.count &&\n        !deletedCSVFile ) {\n    flow.set('fileReadComplete', 'complete');\n}\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":3310,"y":1380,"wires":[["b4f972a72b61d1a7"]]},{"id":"b4f972a72b61d1a7","type":"change","z":"ed8280c9e0778d93","name":"parts -> temp","rules":[{"t":"set","p":"temp","pt":"msg","to":"parts","tot":"msg"},{"t":"delete","p":"parts","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":3540,"y":1380,"wires":[["293ef064cf78f824"]]},{"id":"293ef064cf78f824","type":"csv","z":"ed8280c9e0778d93","name":"Information CSV","sep":",","hdrin":true,"hdrout":"","multi":"one","ret":"\\r","temp":"","skip":"0","strings":false,"include_empty_strings":true,"include_null_values":true,"x":3720,"y":1380,"wires":[["97d68baad8154676"]]},{"id":"97d68baad8154676","type":"change","z":"ed8280c9e0778d93","name":"temp -> parts","rules":[{"t":"set","p":"parts","pt":"msg","to":"temp","tot":"msg"},{"t":"delete","p":"temp","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":3900,"y":1380,"wires":[["2622c34123b83f37"]]},{"id":"4ea27662855017e9","type":"function","z":"ed8280c9e0778d93","name":"Insert into Partnumber & Brand Table","func":"const mysql = global.get(\"mysql\");\n//load payload into data object\nconst data = msg.payload;\n\n\nmsg.topic ='';\n\nlet rowcount = flow.get('rowcount');\nrowcount++;\nflow.set('rowcount',rowcount);\nnode.status({fill:\"green\",shape:\"dot\",text:\"Processing: \" + rowcount});\n\n//start transaction;\nlet insert = \"START TRANSACTION;\";\n\n//insert part number\ninsert += \"INSERT INTO vehicle_partnumbers (partnumber,name,ShortDescription,lastmodified) VALUES (\" + mysql.escape(data.partnumber) + \",\" + mysql.escape(data.name) + \",\";\ninsert += mysql.escape(data.shortdescription) + \",\" + mysql.escape(data.lastmodified) + \") \";\ninsert += \"ON DUPLICATE KEY UPDATE name=\" + mysql.escape(data.name) + \", ShortDescription=\" + mysql.escape(data.shortdescription) + \", lastmodified=\" + mysql.escape(data.lastmodified) + \",\";\ninsert += \"partnumber_id=LAST_INSERT_ID(partnumber_id);\";\n\ninsert += \"SET @last_id_in_vehicle_partnumbers = LAST_INSERT_ID();\";\n\n//check if we have brand with a value.\nif ( data.hasOwnProperty('brand' ) && data.brand != null ){ \n    \n    //insert Brand\n    insert += \"INSERT INTO vehicle_brand (brand,lastmodified) VALUES (\" + mysql.escape(data.brand) + \",\" + mysql.escape(data.lastmodified) + \") \";\n    insert += \"ON DUPLICATE KEY UPDATE lastmodified=\" + mysql.escape(data.lastmodified) + \", brand_id=LAST_INSERT_ID(brand_id);\";\n    insert += \"SET @last_id_in_vehicle_brand = LAST_INSERT_ID();\";\n    insert += \"INSERT IGNORE INTO partnumber_vehicle_brand_link (partnumber_id,brand_id) VALUES(@last_id_in_vehicle_partnumbers,@last_id_in_vehicle_brand);\";\n}\n//commit transaction\ninsert += \"COMMIT;\";\nmsg.topic = insert;\nmsg.payload = '';\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":2170,"y":1540,"wires":[["e663bf383b7d0364"]]},{"id":"e663bf383b7d0364","type":"mysql","z":"ed8280c9e0778d93","mydb":"","name":"Bulk Insert","x":2410,"y":1540,"wires":[["4240f3293bdb7406"]]},{"id":"c36552a8bb3efdde","type":"comment","z":"ed8280c9e0778d93","name":"AUTO_INCREMENT READ ME","info":"We're performing bulk table inserts each call for efficiency. This will cause the AUTO_INCREMENT of each table to be out of squance. This is normal and done for perfomance.\n\n\nThis is not unusual and there are a couple of causes. Sometimes it is due to optimisations the query runner makes to reduce contention issues with the counter resource, improving efficiency when there are concurrent updates to the affected table. Sometimes it is due to transactions that got explicitly rolled back (or implicitly rolled back due to encountering an error).\n\nThe only guarantees from an auto_increment column (or IDENTITY in MSSQL, and the other names the concept goes by) is that each value will be unique and never smaller than a previous one: so you can rely on the values for ordering but you can not rely on them not to have gaps.\n\nIf you need the column's values to have no gaps at all you will need to manage the values yourself, either in another layer of business logic or in the DB via a trigger (be careful of potential performance issues with triggers though), of course if you do roll your own you will have to contend with all the concurrency/rollback/cleanup-after-delete/other issues that the DB engines work around by allowing gaps).","x":2670,"y":1580,"wires":[]},{"id":"4240f3293bdb7406","type":"function","z":"ed8280c9e0778d93","name":"Check if File Read Complete","func":"//read in complete\nmsg.ReadComplete = flow.get('fileReadComplete');\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":2640,"y":1540,"wires":[["938fe3de09e24fd2"]]},{"id":"938fe3de09e24fd2","type":"switch","z":"ed8280c9e0778d93","name":"check if done with file","property":"ReadComplete","propertyType":"msg","rules":[{"t":"eq","v":"complete","vt":"str"}],"checkall":"true","repair":false,"outputs":1,"x":2880,"y":1540,"wires":[["4015da6db3061169","d4110fe015f8f648"]]},{"id":"4015da6db3061169","type":"function","z":"ed8280c9e0778d93","name":"Assemble Move Command","func":"/**\n * clear topic otherwise it will send\n * sql from privous loop\n * */\nmsg.topic = '';\n\n/**\n * Please note we have to negated test to check if file exists\n * otherwsie it will always return 1 (error) if file not found\n * meaning it was moved on privious loop\n * This way it will always return 0 success if if the file is\n * not there\n * \n * */\n//test if we have file and move to complete\nmsg.cmd = \"[ ! -f \\\"\" + msg.filename + \"\\\" ] || mv \\\"\" + msg.filename + \"\\\"  \\\"\" + flow.get('hotfolderLocation') + \"completed/\\\"\";\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":3160,"y":1540,"wires":[["cb9df3bcf86a27fe"]]},{"id":"cb9df3bcf86a27fe","type":"exec","z":"ed8280c9e0778d93","command":"","addpay":"cmd","append":"","useSpawn":"false","timer":"5","winHide":false,"oldrc":false,"name":"Move file to Complete folder","x":3420,"y":1540,"wires":[[],[],["b5ac751e33aa99f9"]]},{"id":"a2b046bc4938a0d1","type":"comment","z":"ed8280c9e0778d93","name":"save and restore msg.parts because CSV node uses it","info":"","x":3580,"y":1420,"wires":[]},{"id":"a923b9543c781bc4","type":"function","z":"ed8280c9e0778d93","name":"Set Deleted Flag","func":"flow.set('deletedCSVFile', true);\nflow.set('fileReadComplete', '');\nmsg.ReadComplete = '';\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":3790,"y":1540,"wires":[[]]},{"id":"b5ac751e33aa99f9","type":"switch","z":"ed8280c9e0778d93","name":"","property":"payload.code","propertyType":"msg","rules":[{"t":"eq","v":"0","vt":"str"}],"checkall":"true","repair":false,"outputs":1,"x":3610,"y":1540,"wires":[["a923b9543c781bc4"]]},{"id":"694295c8bef6dba3","type":"switch","z":"ed8280c9e0778d93","name":"Check if csv","property":"file","propertyType":"msg","rules":[{"t":"regex","v":"\\.csv$","vt":"str","case":true},{"t":"regex","v":"^\\.","vt":"str","case":false},{"t":"else"}],"checkall":"true","repair":false,"outputs":3,"x":2410,"y":1380,"wires":[["c710fc1816aaabcb"],[],["34c5b01c12988354"]]},{"id":"34c5b01c12988354","type":"delay","z":"ed8280c9e0778d93","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":2600,"y":1440,"wires":[["268021d623219ef3"]]},{"id":"268021d623219ef3","type":"exec","z":"ed8280c9e0778d93","command":"rm -f","addpay":"filename","append":"","useSpawn":"false","timer":"3","winHide":false,"oldrc":false,"name":"Not a csv so Delete File","x":2790,"y":1440,"wires":[[],[],[]]},{"id":"788a813c95b74bf1","type":"comment","z":"ed8280c9e0778d93","name":"Not a csv just delete and ignore","info":"","x":2730,"y":1480,"wires":[]},{"id":"2622c34123b83f37","type":"switch","z":"ed8280c9e0778d93","name":"Check we have partnumber","property":"payload.partnumber","propertyType":"msg","rules":[{"t":"nempty"}],"checkall":"true","repair":false,"outputs":1,"x":4120,"y":1380,"wires":[["4ea27662855017e9"]]},{"id":"630d0258b1910e74","type":"debug","z":"ed8280c9e0778d93","name":"","active":false,"tosidebar":true,"console":true,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":2570,"y":1620,"wires":[]},{"id":"c81fc14a4f311333","type":"catch","z":"ed8280c9e0778d93","name":"","scope":["e663bf383b7d0364"],"uncaught":false,"x":2410,"y":1620,"wires":[["630d0258b1910e74","4240f3293bdb7406"]]},{"id":"626f12c41f9ad239","type":"e-mail","z":"ed8280c9e0778d93","server":"","port":"587","secure":false,"tls":false,"name":"","dname":"Office 365","x":4370,"y":1640,"wires":[]},{"id":"3d5b96c180382397","type":"function","z":"ed8280c9e0778d93","name":"Transactional Emails","func":"msg={\n    payload:msg.payload,\n    topic:\"Subject\",\n    to:msg.emailTo,\n    from:\"noreply@mydomain.com\",\n    plaintext: msg.plaintext,\n};\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":4160,"y":1640,"wires":[["626f12c41f9ad239"]]},{"id":"cfb24f6d7c46dfa5","type":"template","z":"ed8280c9e0778d93","name":"","field":"payload","fieldType":"msg","format":"html","syntax":"mustache","template":"<!DOCTYPE html>\n<html lang=\"en\" xmlns:o=\"urn:schemas-microsoft-com:office:office\" xmlns:v=\"urn:schemas-microsoft-com:vml\">\n<head>\n<title></title>\n<meta charset=\"utf-8\"/>\n<meta content=\"width=device-width, initial-scale=1.0\" name=\"viewport\"/>\n<meta name=\"color-scheme\" content=\"light dark\">\n<meta name=\"supported-color-schemes\" content=\"light dark\">\n<!--[if mso]><xml><o:OfficeDocumentSettings><o:PixelsPerInch>96</o:PixelsPerInch><o:AllowPNG/></o:OfficeDocumentSettings></xml><![endif]-->\n<style>\n\t\t* {\n\t\t\tbox-sizing: border-box;\n\t\t}\n\n\t\tbody {\n\t\t\tmargin: 0;\n\t\t\tpadding: 0;\n\t\t}\n\n\t\ta[x-apple-data-detectors] {\n\t\t\tcolor: inherit !important;\n\t\t\ttext-decoration: inherit !important;\n\t\t}\n\n\t\t#MessageViewBody a {\n\t\t\tcolor: inherit;\n\t\t\ttext-decoration: none;\n\t\t}\n\n\t\tp {\n\t\t\tline-height: inherit\n\t\t}\n\n\t\t@media (max-width:620px) {\n\t\t    \n\t\t\t.icons-inner {\n\t\t\t\ttext-align: center;\n\t\t\t}\n\n\t\t\t.icons-inner td {\n\t\t\t\tmargin: 0 auto;\n\t\t\t}\n\n\t\t\t.row-content {\n\t\t\t\twidth: 100% !important;\n\t\t\t}\n\n\t\t\t.stack .column {\n\t\t\t\twidth: 100%;\n\t\t\t\tdisplay: block;\n\t\t\t}\n\t\t}\n\t\t\n\t\t@media (prefers-color-scheme: dark) {\n\t\t    .forced-white-text {\n\n                color: rgb(255, 255, 255) !important;\n\n                -webkit-text-fill-color: rgb(255, 255, 255) !important;\n            } \n\t\t    \n\t\t}\n\t</style>\n</head>\n<body style=\"margin: 0; background-color: #091548; padding: 0; -webkit-text-size-adjust: none; text-size-adjust: none;\">\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"nl-container\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; background-color: #091548;\" width=\"100%\">\n<tbody>\n<tr>\n<td>\n<table align=\"center\" border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"row row-1\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; background-color: #091548; background-image: url('{{{backgroundBase64}}}'); background-position: center top; background-repeat: no-repeat;\" width=\"100%\">\n<tbody>\n<tr>\n<td>\n<table align=\"center\" border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"row-content stack\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; color: #000000; width: 600px;\" width=\"600\">\n<tbody>\n<tr>\n<td class=\"column\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; font-weight: 400; text-align: left; vertical-align: top; padding-left: 10px; padding-right: 10px; padding-top: 5px; padding-bottom: 15px; border-top: 0px; border-right: 0px; border-bottom: 0px; border-left: 0px;\" width=\"100%\">\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"image_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tr>\n<td style=\"padding-bottom:5px;padding-left:5px;padding-right:5px;padding-top:13px;width:100%;\">\n<div align=\"center\" style=\"line-height:10px\">\n<img src=\"{{{logoBase64}}}\" alt=\"\" style=\"width: 129px; max-width: 229px; height: auto; margin: auto; display: block;\">\n</div>\n</td>\n</tr>\n</table>\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"text_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; word-break: break-word;\" width=\"100%\">\n<tr>\n<td style=\"padding-bottom:15px;padding-top:10px;\">\n<div style=\"font-family: sans-serif;text-align: center;\">\n<div style=\"font-size: 14px; mso-line-height-alt: 16.8px; line-height: 1.2; font-family: Varela Round, Trebuchet MS, Helvetica, sans-serif;\">\n<p style=\"margin: 0; font-size: 14px; text-align: center; mso-line-height-alt: 16.8px;\"> </p>\n<p class=\"forced-white-text\" style=\"margin: 2;padding: 2;font-size: 14px; text-align: center;\"><span style=\"font-size:30px;\">{{{subject}}}</span></p>\n</div>\n</div>\n</td>\n</tr>\n</table>\n<table border=\"0\" cellpadding=\"5\" cellspacing=\"0\" class=\"text_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; word-break: break-word;\" width=\"100%\">\n<tr>\n<td>\n<div style=\"font-family: sans-serif;text-align: center;\">\n<div style=\"font-size: 14px; mso-line-height-alt: 21px; line-height: 1.5; font-family: Varela Round, Trebuchet MS, Helvetica, sans-serif;\">\n<p class=\"forced-white-text\" style=\"margin: 2;padding: 2;font-size: 14px;text-align: center;\"><span>{{{emailMessage}}}</span></p>\n</div>\n</div>\n</td>\n</tr>\n</table>\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"divider_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tr>\n<td style=\"padding-bottom:15px;padding-left:10px;padding-right:10px;padding-top:10px;\">\n<div align=\"center\">\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"60%\">\n<tr>\n<td class=\"divider_inner\" style=\"font-size: 1px; line-height: 1px; border-top: 1px solid #5A6BA8;\"><span> </span></td>\n</tr>\n</table>\n</div>\n</td>\n</tr>\n</table>\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"text_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; word-break: break-word;\" width=\"100%\">\n<tr>\n<td style=\"padding-bottom:40px;padding-left:25px;padding-right:25px;padding-top:10px;\">\n<div style=\"font-family: sans-serif\">\n<div style=\"font-size: 14px; mso-line-height-alt: 21px; color: #7f96ef; line-height: 1.5; font-family: Varela Round, Trebuchet MS, Helvetica, sans-serif;\">\n<p style=\"margin: 0; font-size: 14px; text-align: center;\"><strong>{{{displayTime}}}</strong></p>\n<p style=\"margin: 0; font-size: 14px; text-align: center;\">This is an automated email from an unmonitored account, please do not reply.</p>\n</div>\n</div>\n</td>\n</tr>\n</table>\n</td>\n</tr>\n</tbody>\n</table>\n</td>\n</tr>\n</tbody>\n</table>\n<table align=\"center\" border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"row row-2\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tbody>\n<tr>\n<td>\n<table align=\"center\" border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"row-content stack\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; color: #000000; width: 600px;\" width=\"600\">\n<tbody>\n<tr>\n<td class=\"column\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; font-weight: 400; text-align: left; vertical-align: top; padding-left: 10px; padding-right: 10px; padding-top: 15px; padding-bottom: 15px; border-top: 0px; border-right: 0px; border-bottom: 0px; border-left: 0px;\" width=\"100%\">\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"divider_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tr>\n<td style=\"padding-bottom:15px;padding-left:10px;padding-right:10px;padding-top:15px;\">\n<div align=\"center\">\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"60%\">\n<tr>\n<td class=\"divider_inner\" style=\"font-size: 1px; line-height: 1px; border-top: 1px solid #5A6BA8;\"><span> </span></td>\n</tr>\n</table>\n</div>\n</td>\n</tr>\n</table>\n<table border=\"0\" cellpadding=\"15\" cellspacing=\"0\" class=\"text_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; word-break: break-word;\" width=\"100%\">\n<tr>\n<td>\n<div style=\"font-family: sans-serif\">\n<div style=\"font-size: 12px; font-family: Varela Round, Trebuchet MS, Helvetica, sans-serif; mso-line-height-alt: 14.399999999999999px; color: #4a60bb; line-height: 1.2;\">\n<p style=\"margin: 0; font-size: 12px; text-align: center;\"><span style=\"\">Copyright © 2022 Io Integration, All rights reserved.<br/><br/>Where to find us: 1903 Central Drive, Suite 201\nBedford, TX 76021</span></p>\n</div>\n</div>\n</td>\n</tr>\n</table>\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"html_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tr>\n<td>\n<div align=\"center\" style=\"font-family:Varela Round, Trebuchet MS, Helvetica, sans-serif;text-align:center;\"><div style=\"height-top: 20px;\"> </div></div>\n</td>\n</tr>\n</table>\n</td>\n</tr>\n</tbody>\n</table>\n</td>\n</tr>\n</tbody>\n</table>\n<table align=\"center\" border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"row row-3\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tbody>\n<tr>\n<td>\n<table align=\"center\" border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"row-content stack\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; color: #000000; width: 600px;\" width=\"600\">\n<tbody>\n<tr>\n<td class=\"column\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; font-weight: 400; text-align: left; vertical-align: top; padding-top: 5px; padding-bottom: 5px; border-top: 0px; border-right: 0px; border-bottom: 0px; border-left: 0px;\" width=\"100%\">\n<table border=\"0\" cellpadding=\"0\" cellspacing=\"0\" class=\"icons_block\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tr>\n<td style=\"color:#9d9d9d;font-family:inherit;font-size:15px;padding-bottom:5px;padding-top:5px;text-align:center;\">\n<table cellpadding=\"0\" cellspacing=\"0\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt;\" width=\"100%\">\n<tr>\n<td style=\"text-align:center;\">\n<!--[if vml]><table align=\"left\" cellpadding=\"0\" cellspacing=\"0\" role=\"presentation\" style=\"display:inline-block;padding-left:0px;padding-right:0px;mso-table-lspace: 0pt;mso-table-rspace: 0pt;\"><![endif]-->\n<!--[if !vml]><!-->\n<table cellpadding=\"0\" cellspacing=\"0\" class=\"icons-inner\" role=\"presentation\" style=\"mso-table-lspace: 0pt; mso-table-rspace: 0pt; display: inline-block; margin-right: -4px; padding-left: 0px; padding-right: 0px;\">\n<!--<![endif]-->\n<tr>\n<td colspan=\"2\" style=\"font-family:Varela Round, Trebuchet MS, Helvetica, sans-serif;font-size:15px;color:#9d9d9d;vertical-align:middle;letter-spacing:undefined;text-align:center;\"><a href=\"https://www.iointegration.com/iospace\" style=\"color: #9d9d9d;text-decoration:none;\">powered by IOAppflow</a></td>\n</tr>\n</table>\n</td>\n</tr>\n</table>\n</td>\n</tr>\n</table>\n</td>\n</tr>\n</tbody>\n</table>\n</td>\n</tr>\n</tbody>\n</table>\n</td>\n</tr>\n</tbody>\n</table><!-- End -->\n</body>\n</html>","output":"str","x":3980,"y":1640,"wires":[["3d5b96c180382397"]]},{"id":"55ed3c3c3120e084","type":"function","z":"ed8280c9e0778d93","name":"Base64 Images","func":"msg.logoBase64 = \"data:image/png;base64,removed\";\nmsg.backgroundBase64 = \"data:image/png;base64,removed\";\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":3620,"y":1640,"wires":[["4239dd7f55828921"]]},{"id":"07503d7379807854","type":"function","z":"ed8280c9e0778d93","name":"Set Subject and Message","func":"msg.emailTo = \"me@mydomain.com\";\nmsg.subject = \"Completed weekly information records update\";\n\nmsg.emailMessage = \"New information csv has been downloaded, processed and completed.<br>System updated with new values.\";\n\nmsg.plaintext = \"Completed weekly information records update\\n\";\nmsg.plaintext += \"New information csv has been downloaded, processed and completed.\\nSystem updated with new values.\";\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":3410,"y":1640,"wires":[["55ed3c3c3120e084"]]},{"id":"4239dd7f55828921","type":"change","z":"ed8280c9e0778d93","name":"Set Display Time","rules":[{"t":"set","p":"displayTime","pt":"msg","to":"$moment().format('MMM Do YYYY LT')","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":3810,"y":1640,"wires":[["cfb24f6d7c46dfa5"]]},{"id":"febc0ff57a66a0f7","type":"change","z":"ed8280c9e0778d93","name":"Reset Send Mail","rules":[{"t":"set","p":"sendemail","pt":"flow","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":2720,"y":1380,"wires":[["53880e39a42d9823"]]},{"id":"4315abdf8a4c47fc","type":"change","z":"ed8280c9e0778d93","name":"Set Email Sent Flag","rules":[{"t":"set","p":"sendemail","pt":"flow","to":"false","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":3190,"y":1640,"wires":[["07503d7379807854"]]},{"id":"d4110fe015f8f648","type":"switch","z":"ed8280c9e0778d93","name":"Check if we need to send mail","property":"sendemail","propertyType":"flow","rules":[{"t":"true"}],"checkall":"true","repair":false,"outputs":1,"x":2950,"y":1640,"wires":[["4315abdf8a4c47fc"]]}]

Not checked your flow but a common problem with using fs watches is that people don't always understand how a file is updated and what impact that has.

It is quite possible that a file will trigger hundreds or even thousands of update events. One will usually happen each time a fragment of the file is written (I'm a bit hazy about the actual mechanics and whether that depends on inodes, frames or buffers - not sure).

So you may often need to detect when the updates stop rather than when one happens.

After detailing this I managed to fixed the issue.

At the start of the flow when parts index is zero I set

flow.set('processedcount', 0);

Then in subsequent row loops I added

let processedcount = flow.get('processedcount');
processedcount++;
flow.set('processedcount',processedcount);

This count increases and finishes before all the MySQL promises have completed.

After the MySQL node I added this

let processedcount = flow.get('processedcount');
if ((msg.parts.index + 1) !== processedcount ) {
    return;
}
return msg;

This checks if the parts index passed in after the MySQL call equals the expected process count.
If it does then move on, others wise do nothing.

I'm deleting the flow context after the email so that it cleans up after itself

BTW
I did try to use msg.parts.count
I set it to 0, then ran msg.parts.count++; but it kept coming back as NaN. I had to use flow context instead.

Thanks Harry

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.