Is it unsafe to access same object in context and add / delete in parallel?

Got a bit of a tricky situation here. Have a subflow with it's own flow context. When message arrive, context object is loaded and looped through. But at the end of that flow, I load the same context object again to delete old entries (keys). These keys are actually timestamps, so are guaranteed not to be read. So when a message arrive, it is loaded, then loaded again and deleted from, but at the same time, a second message may arrive and load that context again?

How will these concurrent operations affect stability and correctness, and to avoid race conditions? I know js is single-threaded, but does that eliminate the problem entirely? For example when using Object.entries ( Object.entries() - JavaScript | MDN) to loop through an object to read it, but at the same time another place also loop through object to delete from it?

However, I may have a solution to avoid this situation, because I only need to read from context at a specific time each hour, so can safely delete from it another time of the hour. But I'd still like to know if this can cause problems in general.

Depends on your code, so if you share your function node one might confirm. But in general, if your code in synchronous (you don't use any callbacks, promises, setTimeouts, ...) then you're on a safe side and can be pretty sure your code will run through before new message will start processing.

1 Like

Assuming that your subflow has multiple nodes in it then no it is not safe. As soon as the first node in the subflow has completed then another waiting node anywhere in your flow may run, using a round robin type approach. The result is that it is possible to have multiple messages running through your subflow at the same time. This is one of the major disadvantages of using context. It is usually much preferable to pass data in messages rather than using context.

If you cannot avoid context then a solution is to make sure that there is only one message in the subflow at a time. You can do this using the simple flow described in this post.

1 Like

In fact you could put that inside your subflow, protecting the nodes where the context is referenced, so that you don't need to add it externally to every invocation of the subflow.

1 Like

Don't you then need to be careful about not having multiple instances of the subflow? Each subflow use is a new instance I think isn't it?

1 Like

Yes, but each instance has its own flow context, so that isn't an issue.

2 Likes

Yes you're correct, I was thinking in a context of one node. Reading @ThingsTinkerer post again it seems he's using multiple nodes and in that case it's as you described it.

1 Like

To give some context, I now have a working subflow:


Source:

[
    {
        "id": "a5184c5aab2f6cda",
        "type": "subflow",
        "name": "log",
        "info": "# Writes log to file\r\nMust set env variable `logTopic` in parent flow to determine filename.\r\nThis can't be done as input variable because the parent may also be a subflow.\r\n\r\n### Inputs\r\n\r\n: logMessage (string)          :  the log message to write to file can be provided in the node configuration, or as a message property. By default it will use `msg.logMessage`.\r\n: logTopic (string)            :  determines parts of the file name. Typically enter flow name for this purpose.\r\n: logCompleteMsgObj (boolean):  Add the entire msg object to the log message.\r\n: stdOut (boolean)           :  Also log to standard out (using debug node).\r\n\r\n### Details\r\n\r\nLogs are written to /data/logs/`msg.logTopic`_yyyy-MM-dd.txt. `msg.logTopic` is used as filename prefix. Typically use flow name for this purpose. Creates file and folder if they don't exist.\r\n\r\n`msg.logMessage` is used as the main content of the log to write. Datetime is automatically added as prefix. Each log is appended with newline.",
        "category": "",
        "in": [
            {
                "x": 100,
                "y": 80,
                "wires": [
                    {
                        "id": "0db769e63c22272e"
                    }
                ]
            }
        ],
        "out": [
            {
                "x": 1040,
                "y": 80,
                "wires": [
                    {
                        "id": "5f7d3b7030f7bb02",
                        "port": 0
                    },
                    {
                        "id": "0db769e63c22272e",
                        "port": 0
                    }
                ]
            }
        ],
        "env": [
            {
                "name": "logMessage",
                "type": "str",
                "value": "",
                "ui": {
                    "label": {
                        "en-US": "Log message"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str"
                        ]
                    }
                }
            },
            {
                "name": "logCompleteMsgObj",
                "type": "bool",
                "value": "false",
                "ui": {
                    "label": {
                        "en-US": "Log complete msg"
                    },
                    "type": "checkbox"
                }
            },
            {
                "name": "stdOut",
                "type": "bool",
                "value": "false",
                "ui": {
                    "label": {
                        "en-US": "Debug (sidebar)"
                    },
                    "type": "checkbox"
                }
            },
            {
                "name": "logTopic",
                "type": "env",
                "value": "$parent.logTopic",
                "ui": {
                    "type": "hide"
                }
            }
        ],
        "meta": {},
        "color": "#C7E9C0",
        "icon": "font-awesome/fa-file-text"
    },
    {
        "id": "b1add847796af765",
        "type": "function",
        "z": "a5184c5aab2f6cda",
        "name": "prepare log",
        "func": "msg._backup = RED.util.cloneMessage(msg);\n\nconst date = new Date().toISOString().replaceAll(\"-\", \"-\").replaceAll(\"T\", \"-\").replaceAll(\":\", \"-\").replaceAll(\".\", \"-\").replaceAll(\"Z\", \"-\");\nconst list = date.split(\"-\");\nconst year = list[0];\nconst month = list[1];\nconst day = list[2];\nconst hour = list[3];\nconst minute = list[4];\nconst second = list[5];\n\nlet logTopic;\ntry{\n    logTopic = env.get(\"logTopic\").toLowerCase();\n    if(!logTopic){\n        node.error(\"ERROR: Log failed getting logTopic env from parent. Did you remember to set it?\");\n        return;\n    }\n} catch (error) {\n    node.error(\"ERROR: Log failed getting logTopic env from parent. Did you remember to set it?\");\n    return;\n}\n\nmsg.stdOut = env.get(\"stdOut\");\n\nlet logMessage = env.get(\"logMessage\") ? env.get(\"logMessage\") : msg.logMessage;\nlet logCompleteMsgObj = env.get(\"logCompleteMsgObj\");\nif(!logCompleteMsgObj){\n    logCompleteMsgObj = msg?.logCompleteMsgObj ?? false;\n}\n\nif(logCompleteMsgObj){\n    logMessage += \"\\n\";\n    logMessage += JSON.stringify(msg._backup);\n}\n\nmsg.filename = `/data/logs/${logTopic}/${logTopic}_${year}-${month}-${day}.log`;\nmsg.payload = `${year}-${month}-${day} ${hour}:${minute}:${second} - ${logMessage}`;\n\nconst maxLineLength = 10000;\nif(msg.payload.length > maxLineLength){\n    // Safeguard against excessively large logs\n    msg.payload = msg.payload.slice(0, maxLineLength) + '... [TRUNCATED]';\n}\n\nreturn msg;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 470,
        "y": 140,
        "wires": [
            [
                "3ea911242530d241",
                "2c7e47352b7df809"
            ]
        ]
    },
    {
        "id": "3ea911242530d241",
        "type": "file",
        "z": "a5184c5aab2f6cda",
        "name": "write log file",
        "filename": "filename",
        "filenameType": "msg",
        "appendNewline": true,
        "createDir": true,
        "overwriteFile": "false",
        "encoding": "utf8",
        "x": 670,
        "y": 140,
        "wires": [
            [
                "5f7d3b7030f7bb02"
            ]
        ]
    },
    {
        "id": "e96bad283c240487",
        "type": "catch",
        "z": "a5184c5aab2f6cda",
        "name": "",
        "scope": null,
        "uncaught": false,
        "x": 660,
        "y": 260,
        "wires": [
            [
                "5ac9480a70f4d93a"
            ]
        ]
    },
    {
        "id": "5ac9480a70f4d93a",
        "type": "debug",
        "z": "a5184c5aab2f6cda",
        "name": "log failed error",
        "active": true,
        "tosidebar": true,
        "console": true,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 850,
        "y": 260,
        "wires": []
    },
    {
        "id": "fac2f56d19607dc3",
        "type": "debug",
        "z": "a5184c5aab2f6cda",
        "name": "log std out",
        "active": true,
        "tosidebar": true,
        "console": true,
        "tostatus": false,
        "complete": "payload",
        "targetType": "msg",
        "statusVal": "",
        "statusType": "auto",
        "x": 830,
        "y": 200,
        "wires": []
    },
    {
        "id": "2c7e47352b7df809",
        "type": "switch",
        "z": "a5184c5aab2f6cda",
        "name": "std out?",
        "property": "stdOut",
        "propertyType": "msg",
        "rules": [
            {
                "t": "true"
            },
            {
                "t": "else"
            }
        ],
        "checkall": "true",
        "repair": false,
        "outputs": 2,
        "x": 660,
        "y": 200,
        "wires": [
            [
                "fac2f56d19607dc3"
            ],
            []
        ],
        "inputLabels": [
            "msg.stdOut"
        ],
        "outputLabels": [
            "true",
            "false"
        ]
    },
    {
        "id": "5f7d3b7030f7bb02",
        "type": "function",
        "z": "a5184c5aab2f6cda",
        "name": "revert",
        "func": "return msg._backup;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 810,
        "y": 140,
        "wires": [
            []
        ]
    },
    {
        "id": "0db769e63c22272e",
        "type": "function",
        "z": "a5184c5aab2f6cda",
        "name": "logMessage?",
        "func": "const logMessage = env.get(\"logMessage\") ? env.get(\"logMessage\") : msg.logMessage;\n\nconst skip = logMessage ? null : msg;\nconst log = logMessage ? msg : null;\nreturn [skip, log];",
        "outputs": 2,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 220,
        "y": 80,
        "wires": [
            [],
            [
                "b1add847796af765"
            ]
        ],
        "outputLabels": [
            "Skip",
            "Log"
        ]
    },
    {
        "id": "71082c29770ea643",
        "type": "inject",
        "z": "a5184c5aab2f6cda",
        "g": "6f8fb577b11364ed",
        "name": "",
        "props": [],
        "repeat": "",
        "crontab": "00 00 * * *",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "x": 210,
        "y": 400,
        "wires": [
            [
                "09a9a5f729dd65b4"
            ]
        ]
    },
    {
        "id": "0c31a507622aeb8d",
        "type": "file",
        "z": "a5184c5aab2f6cda",
        "g": "6f8fb577b11364ed",
        "name": "write empty file",
        "filename": "filename",
        "filenameType": "msg",
        "appendNewline": false,
        "createDir": true,
        "overwriteFile": "false",
        "encoding": "utf8",
        "x": 540,
        "y": 400,
        "wires": [
            []
        ]
    },
    {
        "id": "09a9a5f729dd65b4",
        "type": "function",
        "z": "a5184c5aab2f6cda",
        "g": "6f8fb577b11364ed",
        "name": "empty log",
        "func": "const date = new Date().toISOString().replaceAll(\"-\", \"-\").replaceAll(\"T\", \"-\").replaceAll(\":\", \"-\").replaceAll(\".\", \"-\").replaceAll(\"Z\", \"-\");\nconst list = date.split(\"-\");\nconst year = list[0]; // YYYY\nconst month = list[1]; // MM\nconst day = list[2]; // DD\n\nconst logTopic = env.get(\"$parent.logTopic\").toLowerCase();\nif(!logTopic){\n  return; // silent quit\n}\n\nmsg.filename = `/data/logs/${logTopic}/${logTopic}_${year}-${month}-${day}.log`;\nmsg.payload = \"\";\n\nreturn msg;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 370,
        "y": 400,
        "wires": [
            [
                "0c31a507622aeb8d"
            ]
        ]
    },
    {
        "id": "6f8fb577b11364ed",
        "type": "group",
        "z": "a5184c5aab2f6cda",
        "name": "Create new log file every day",
        "style": {
            "stroke": "#0070c0",
            "fill": "#bfdbef",
            "label": true,
            "color": "#000000"
        },
        "nodes": [
            "0c31a507622aeb8d",
            "09a9a5f729dd65b4",
            "71082c29770ea643"
        ],
        "x": 114,
        "y": 359,
        "w": 532,
        "h": 82
    },
    {
        "id": "06d52e0ea09761a2",
        "type": "subflow",
        "name": "safe hour",
        "info": "This subflow is used to safely handle hourly data and optional delta calculations.\r\nSpecify hourly data in config.\r\n```javascript\r\n[\r\n  {\r\n    key: \"energy_consumption_accumulated\",\r\n    deltaKey: \"energy_consumption\"\r\n  },\r\n  {\r\n    key: \"otherKey\"\r\n  }\r\n]\r\n```\r\n\r\n`key` is the name of the incoming value in `msg.payload`.\r\n`deltaKey` is optional and used to calculate the difference between current value and previous value:\r\n- delta = currentValue - previousValue\r\n\r\nFor additional redundancy, keys are stored from minute 55 to 0. For example 18:55 to 19:00.\r\nLatest incoming value will overwrite any value stored minutes earlier \r\n(values will be stored as close as possible to minute 0).\r\nThis is to have more chances of capturing the value if some requests fail.\r\nHowever, this can lead to slight inaccuracies for hourly data, \r\nwhich must be acceptable when using this subflow.\r\n\r\nNote that all keys are deleted from `msg.payload` in minutes 1-59!\r\n\r\nTo summarize what happens in different minute ranges:\r\n- 55-0: Save value to context, delete all specified keys from payload\r\n- 0: Write available values to payload\r\n- 1-54: Delete all specified keys from payload\r\n\r\nIf `msg.topic` is set, hourly data will be stored separately per topic.\r\n",
        "category": "",
        "in": [
            {
                "x": 100,
                "y": 100,
                "wires": [
                    {
                        "id": "caf729e4a1f81ea5"
                    }
                ]
            }
        ],
        "out": [
            {
                "x": 740,
                "y": 100,
                "wires": [
                    {
                        "id": "ca4545eadbbfb442",
                        "port": 0
                    }
                ]
            }
        ],
        "env": [
            {
                "name": "config",
                "type": "json",
                "value": "[{\"key\":\"put name of the value to calculate delta from here\",\"deltaKey\":\"put name of the value to save delta calculaion to here (deltaKey is optional)\"},{\"key\":\"supports multiple keys, here is an example without deltaKey\"},{\"key\":\"example: energy_consumption_accumulated (from msg.payload)\",\"deltaKey\":\"example: energy_consumption (from delta calculation)\"}]",
                "ui": {
                    "icon": "font-awesome/fa-database",
                    "type": "input",
                    "opts": {
                        "types": [
                            "json"
                        ]
                    }
                }
            },
            {
                "name": "MODE_1_SAVE",
                "type": "num",
                "value": "1",
                "ui": {
                    "icon": "font-awesome/fa-ban",
                    "type": "hide"
                }
            },
            {
                "name": "MODE_2_WRITE",
                "type": "str",
                "value": "2",
                "ui": {
                    "icon": "font-awesome/fa-ban",
                    "label": {
                        "en-US": "MODE_2_WRITE"
                    },
                    "type": "hide"
                }
            },
            {
                "name": "logTopic",
                "type": "env",
                "value": "$parent.logTopic",
                "ui": {
                    "icon": "font-awesome/fa-bell-o",
                    "type": "hide"
                }
            }
        ],
        "meta": {},
        "color": "#3FADB5",
        "icon": "font-awesome/fa-database",
        "status": {
            "x": 740,
            "y": 160,
            "wires": [
                {
                    "id": "fee2bb0d919a8e84",
                    "port": 0
                }
            ]
        }
    },
    {
        "id": "caf729e4a1f81ea5",
        "type": "function",
        "z": "06d52e0ea09761a2",
        "name": "time?",
        "func": "msg.hourlyDataOptions ??= {\n  config: env.get(\"config\")\n};\n\nif (!Array.isArray(msg.hourlyDataOptions.config)) {\n  // if object, wrap in array\n  msg.hourlyDataOptions.config = [msg.hourlyDataOptions.config]; \n}\n\nconst nowMinute = new Date(msg.ts).getMinutes();\nif (nowMinute > 0 && nowMinute < 55) {\n  return [msg, null]; // SKIP\n}\n\nconst MODE_1_SAVE = env.get(\"MODE_1_SAVE\");\nconst MODE_2_WRITE = env.get(\"MODE_2_WRITE\");\n\n// current hour\nconst nextHour = new Date(msg.ts);\nnextHour.setMinutes(0); // set minute = 0 (hourly)\n\n// previous hour\nconst previousHour = new Date(nextHour.getTime());\n\nif (nowMinute >= 55 && nowMinute <= 59) {\n  // SAVE: minute 55-59\n  msg.hourlyDataOptions.mode = MODE_1_SAVE;\n  // +1 hour to target hour\n  nextHour.setHours(nextHour.getHours() + 1);\n} else {\n  // WRITE: minute 0\n  msg.hourlyDataOptions.mode = MODE_2_WRITE;\n  // subtract 1 hour\n  previousHour.setHours(previousHour.getHours() - 1); \n}\nmsg.hourlyDataOptions.previousTs = previousHour.getTime(); // previous hour\nmsg.hourlyDataOptions.currentTs = nextHour.getTime(); // hour to be written\nmsg.hourlyDataOptions.stateName = msg.topic ?? \"default\"; // support separate states per topic\nreturn [null, msg];",
        "outputs": 2,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 190,
        "y": 100,
        "wires": [
            [
                "ca4545eadbbfb442"
            ],
            [
                "60954384203be71d"
            ]
        ],
        "outputLabels": [
            "Skip (01-54)",
            "Yes (55-00)"
        ]
    },
    {
        "id": "60954384203be71d",
        "type": "function",
        "z": "06d52e0ea09761a2",
        "name": "save/write",
        "func": "const MODE_1_SAVE = env.get(\"MODE_1_SAVE\");\n\n// always try to save latest data in range 55-00\n\nlet allStates = flow.get(\"$parent.state\");\nif (!allStates) {\n  allStates = {};\n  flow.set(\"$parent.state\", allStates); // init (if not exists)\n}\n\n// retrieve or init topicState\nconst stateName = msg.hourlyDataOptions.stateName;\nallStates[stateName] ??= {}; // init if not exists\nconst topicState = allStates[stateName];\n\n// retrieve or init hourState\nfunction getHourState(ts) {\n  topicState[ts] ??= {};\n  return topicState[ts];\n}\n\nconst currentHourState = getHourState(msg.hourlyDataOptions.currentTs);\nconst previousHourState = getHourState(msg.hourlyDataOptions.previousTs);\n\nfor (const item of msg.hourlyDataOptions.config) {\n  const key = item.key;\n  const currentValue = msg.payload[key] ?? currentHourState[key];\n\n  if (currentValue != null) {\n      // value exists, upsert to context in minute range 55-0\n    currentHourState[key] = currentValue;\n    }\n  if (msg.hourlyDataOptions.mode == MODE_1_SAVE) {\n    continue; // nothing more to do before minute 0\n  }\n \n  // write mode (minute 0)\n  // use latest value if exists, otherwise get latest value from previous 5 minutes\n  if (currentValue == null) {\n    continue; // value is missing, nothing to do\n  }\n  // write to payload\n  msg.payload[key] ??= currentValue;\n\n  // calc delta\n  const previousValue = previousHourState[key];\n  const deltaKey = item.deltaKey;\n  if (deltaKey == null || previousValue == null) {\n    // target key is null or previous value is missing\n    continue; // can't calculate delta\n  }\n  msg.payload[deltaKey] = currentValue - previousValue;\n}\n\nreturn msg;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 340,
        "y": 140,
        "wires": [
            [
                "ca4545eadbbfb442",
                "341fab405964ef45"
            ]
        ]
    },
    {
        "id": "ca4545eadbbfb442",
        "type": "function",
        "z": "06d52e0ea09761a2",
        "name": "clean up",
        "func": "const MODE_2_WRITE = env.get(\"MODE_2_WRITE\");\n\nconst nowMinute = new Date(msg.ts).getMinutes();\nif (nowMinute == 15) {\n  // clean up old states (once per hour)\n  // do this at a time where it won't affect save/write operations (minute 55-0)\n  const allStates = flow.get(\"$parent.state\");\n  if (allStates) {\n    // delete states older than 2 hours\n    const hoursLimit = new Date();\n    hoursLimit.setHours(hoursLimit.getHours() - 2);\n    const deleteLimit = hoursLimit.getTime();\n\n    // for each topic state\n    for (const [, topicState] of Object.entries(allStates)) {\n      // for each hour state in topic state\n      for (const [ts,] of Object.entries(topicState)) {\n          if (Number(ts) < deleteLimit) {\n          delete topicState[ts];\n        }\n      }\n    }\n  }\n}\n\nif(msg.hourlyDataOptions.mode !== MODE_2_WRITE) {\n  // delete hourly data (minute is not 0)\n  for (const item of msg.hourlyDataOptions.config) {\n    delete msg.payload[item.key];\n  }\n}\n// clean up junk\ndelete msg.hourlyDataOptions;\nreturn msg;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 540,
        "y": 100,
        "wires": [
            []
        ]
    },
    {
        "id": "341fab405964ef45",
        "type": "function",
        "z": "06d52e0ea09761a2",
        "name": "status",
        "func": "const MODE_2_WRITE = env.get(\"MODE_2_WRITE\");\nif (msg.hourlyDataOptions.mode !== MODE_2_WRITE) {\n  return null; // only update status at minute 0\n}\nconst allStates = flow.get(\"$parent.state\");\nconst topicStateCount = Object.keys(allStates).length;\n\n// only count keys, not deltaKeys (values stored in context, not payload)\nconst keyCount = msg.hourlyDataOptions.config.length * topicStateCount;\nlet valueCount = 0;\n\nconst thisHour = new Date(msg.ts);\nthisHour.setMinutes(0, 0, 0);\nconst currentTs = thisHour.getTime();\n\nconst missingKeys = [];\n\n// for each key config (key + deltaKay)\nfor (const item of msg.hourlyDataOptions.config) {\n  const key = item.key;\n\n  for (const [, topicState] of Object.entries(allStates)) {\n    const hourState = topicState[currentTs];\n    \n    if (hourState[key] != null) {\n      // count total amount of values from all topics!\n      valueCount++; // status\n    }\n  }\n  // log missing key and deltaKey per individual topic!\n  // which means that this can be logged individually multiple times each hour\n  if(msg.payload[item.key] == null){\n    missingKeys.push(item.key);\n  }\n  if (item.deltaKey && msg.payload[item.deltaKey] == null) {\n      missingKeys.push(item.deltaKey);\n  }\n}\n\nif (missingKeys.length > 0) {\n  if (msg.topic) {\n    msg.logMessage = `ERROR: Hourly data for topic ${msg.topic} is missing ${missingKeys.length} values and/or calculated delta values (${missingKeys.join(\", \")}).`;\n  } else {\n    msg.logMessage = `ERROR: Missing ${missingKeys.length} hourly values and/or calculated delta values (${missingKeys.join(\", \")}).`;\n  }\n}\n\nlet color;\nif (keyCount == valueCount) {\n  color = \"green\";\n} else if (valueCount == 0) {\n  color = \"red\";\n} else if (valueCount < keyCount) {\n  color = \"yellow\";\n}\n\nmsg = {\n  payload: {\n    fill: color,\n    shape: \"dot\",\n    text: `${valueCount} / ${keyCount} values stored`\n  }\n};\n\nreturn msg;",
        "outputs": 1,
        "timeout": 0,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 530,
        "y": 160,
        "wires": [
            [
                "fee2bb0d919a8e84"
            ]
        ]
    },
    {
        "id": "fee2bb0d919a8e84",
        "type": "subflow:a5184c5aab2f6cda",
        "z": "06d52e0ea09761a2",
        "name": "",
        "x": 650,
        "y": 160,
        "wires": [
            []
        ]
    }
]

The background is, in many different flows, me and my team collect data from devices. Many of these connections are faulty (modbus/bacnet/mbus), but really same could apply for any network request really. So we come to terms by having several retries to mitigate the problem and accept some degree of loss. Most data is collected minutely, and to lose a minute here and there isn't too important. But we also collect hourly data! And losing an hour is much more noticable and can make ripple effect when we use delta calculations from accumulated energy consumption. We often calculate current value (this hour) with previous value (last hour). And everytime I made a function node, and it was messy, hard to manage and fragile. So this subflow is the solution.

In this subflow, you specify which values (in payload) that are hourly. Additionally, you can specify which values you want delta calcatuions for (diff between current and previous value). Additionaly, this subflow stores hourly data to context in minute range of 55-0 for backup in case minute 0 data is lost. At a cost of slight time inaccuracy. Finally, this subflows deletes hourly data from output at any other time than minute 0. In effect, it gives additional backup of reading values.

There are no callbacks or async stuff here, but context is accessed and modified in separate function nodes.

It doesn't really make sense to store values in msg themselves here, because the data stored originates from other messages. For example:

  • msg at 12:55 has value 10
  • msg at 12:56 has value 11
  • msg at 12:57 has value 12
  • msg at 12:58 has value 13
  • msg at 12:59 has value 14
  • msg at 13:00 has value 15 <-- this is where the hourly data is output and additional delta calculations made

It should technically be possible to write the entire thing in a single function node. But it would suffer in terms of easy to read and maintainability you get by splitting things up.

Anyway, I'm not worried about all race conditions, because the keys that are deleted in the object are not the same as those that are read. Example:

// 'state' is the object stored in context
state: {
  energy:
  {
    1740268800000: { // <-- this object is deleted (old/outdated)
      energy: 123
    },
    1740272400000: { // <-- this object is being read (code has filter to prevent deleting this)
      energy: 456
    }
}

I'm more worried about race conditions that would make stuff like Object.entries skip a beat and traverse irradically or even crash when one function node deletes from it while another function node reads it. And yeah I know keys as numbers are bad, but made an exception here.

The situation seems to be worse than you describe as you are also updating the parent flow context, so if there is any possibility of there being multiple messages in the parent flow at any time then you may have to wrap the whole thing in a protection flow such as the one I posted.

1 Like

No, this is the only place that context is used. I haven't had time to upgrade to 4.0.9 where you can debug subflow context. So to make it easy to see what is going on, the subflow stores directly to parent/caller. And as far as I can see, everything runs smooth.