How protect critical section in node

I've just started to work with Node-RED and I've found a case where there is a potential critical section that I'd like to control (that's something easy to handle with mutex in low-level languages but I don't know if something like that can be done here).

I'm trying to get some data from a remote web server periodically and I'm using an inject node with repetitions to start the sequence (let's say once per minute). However there is a chance that I don't get the right answer and I want to retry, so I've created a sort of loop so that the same round is repeated again as fast as possible instead of having to wait for the next minute check.

In order to prevent both executions to collide, I'm using a variable stored in the global context indicating whether the action is already in process (maybe the periodic line wakes up in the middle of a retry). So both the retry loop and the inject node are connected to a function node where I check this:

if (!context.global.workInProgress) {
    context.global.workInProgress = true;
    msg.canContinue = true;
}
else {
    msg.canContinue = false;
}

After that, the output of the function node is linked to a switch node that discards the execution coming from the path which didn't manage to get control of the lock (so to speak), meaning there is an output connector for the message with msg.canContinue = true.

Is this enough to handle the critical section? In a pure multithreaded program it wouldn't be and I'd have to protect this code with a mutex, I know. Do I have to add some extra protection code to guarantee the thread-safety of this section?

Oh, and I forgot to mention that I set context.global.workInProgress = false after the operation is finished no matter if the result is correct (end of loop, I use another switch node to control that) or not (retry).

Regards

Something like this should do it for you. It uses the subflow describes here. It wasn't designed for exactly that, but I think using it like this should do exactly what you want.

Replace the function nodes with your flow and when it succeeds send it to the OK node, and when it fails send it to the FAIL node. When it succeeds the Guaranteed Delivery subflow will wait for the next trigger, when it fails it will retry every second (or as configured in the subflow properties). I have set the queue size limit to 1 so that it won't queue up multiple triggers in the situation where the failure occurs over an extended period.

You can import this to test and play with it.

[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow master copy","info":"","category":"","in":[{"x":60,"y":80,"wires":[{"id":"6a3f78ab.f6b8e"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"6a3f78ab.f6b8e","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"6a3f78ab.f6b8e","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":180,"wires":[["6a3f78ab.f6b8e"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["6a3f78ab.f6b8e"],"x":300,"y":160,"wires":[[]]},{"id":"52001913.9013b","type":"change","z":"143d969b.e1a4a9","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":670,"y":480,"wires":[["b6271c4b.aa1db"]]},{"id":"8c4facb1.1360c8","type":"change","z":"143d969b.e1a4a9","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":670,"y":420,"wires":[["b6271c4b.aa1db"]]},{"id":"b6271c4b.aa1db","type":"link out","z":"143d969b.e1a4a9","name":"","links":["643a211d.6b4cb8"],"x":795,"y":420,"wires":[]},{"id":"643a211d.6b4cb8","type":"link in","z":"143d969b.e1a4a9","name":"","links":["b6271c4b.aa1db"],"x":215,"y":360,"wires":[["cbcc3efc.0d91a8"]]},{"id":"cbcc3efc.0d91a8","type":"subflow:149380c1.63e107","z":"143d969b.e1a4a9","name":"Guaranteed delivery","env":[{"name":"retrySecs","value":"1","type":"num"},{"name":"maxQueue","value":"1","type":"num"}],"x":420,"y":300,"wires":[["89ce5ba7.f87fa8"]]},{"id":"5a4269a2.da4008","type":"inject","z":"143d969b.e1a4a9","name":"One Minute repeat","props":[{"p":"payload"}],"repeat":"60","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":160,"y":300,"wires":[["cbcc3efc.0d91a8"]]},{"id":"5544de95.93cde","type":"comment","z":"143d969b.e1a4a9","name":"This is the flow that needs protecting","info":"","x":300,"y":500,"wires":[]},{"id":"89ce5ba7.f87fa8","type":"function","z":"143d969b.e1a4a9","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":220,"y":440,"wires":[["6cd065be.a37e84"]]},{"id":"6cd065be.a37e84","type":"function","z":"143d969b.e1a4a9","name":"","func":"\nreturn msg;","outputs":2,"noerr":0,"initialize":"","finalize":"","libs":[],"x":380,"y":440,"wires":[["8c4facb1.1360c8"],["52001913.9013b"]]}]

Ok, I've found another solution that might be a direct mapping from the mutex approach I was looking for. Just let me know how it sounds to you.

  1. Install external library async-mutex
    In $HOME/.node-red run:
    sudo npm install async-mutex

  2. Enable external modules in function nodes:

Set functionExternalModules = true in settings.js

  1. Import the library in the function node in the Setup tab, click on +add and load it. The configuration of this entry could look like this:
    const = asyncMutextModule
    require = async-mutex

  2. Create the Mutex. This can be stored either in this specific function node or in the global context.

4.1 Store it in the node
In the Start tab of the function node where I'm going to use it create it with:

myMutex = new asyncMutexModule.Mutex();

4.2 Store it in the context
In the loadConfig path I run at start triggered by an inject node (just one execution), do it inside a function node that will also have the library imported with:

context.global.myMutex = new asyncMutexModule.Mutex();
  1. Use it at the original function node in the On Message tab with:

Coming from 4.1

await myMutex.runExclusive(async () => {
    if (!context.global.workInProgress) {
        context.global.workInProgress = true;
        msg.canContinue = true;
    }
    else {
        msg.canContinue = false;
    }
});

Coming from 4.2

await context.global.myMutex.runExclusive(async () => {
    if (!context.global.workInProgress) {
        context.global.workInProgress = true;
        msg.canContinue = true;
    }
    else {
        msg.canContinue = false;
    }
});

How does it sound?

Regards

Have you seen...

"node-red-contrib-semaphore (node) - Node-RED" node-red-contrib-semaphore (node) - Node-RED

Oh, that's exactly what I was looking for, a semaphore.

Thx!

I have used that and it does (or did) work. I stopped recommending it as it hasn't been updated in a long time. That may not be an issue, but it makes me nervous.

@Mamonetti is the flow you end up with using the semaphore node simpler than the one I suggested?

1 Like

I wouldn't say simpler, that's for sure, but I'd say it's more intuitive for people like me who come from the C/C++ world.

Take a look.

Now I can compact all the lock + check inProgress + unlock + check canContinue part in one subflow and the lock2 + clear inProgress + unlock2 part in another. I think this will simplify the main flow a lot.

Regards

I don't think this has to be an issue at all. Nowadays people are too obsessed with updates and some stuff doesn't need any as its behavior is not expected to change.

Take a look at some low level API of sockets. I've been using documentation coming from 10 to 15 years ago and I know it works perfectly. Why would a semaphore have to work differently 5/10 years later? A semaphore does what it does, nothing more.

Regards

It doesn't has to be no, but it is not uncommon for nodes to stop working following an upgrade to nodejs.

I have just checked it with nodejs 16 so it should be ok for a couple of years at least.

1 Like

In order to get a more general perspective of node-red, how is the code executed in the background? is it fully multithreaded? if it is, can two threads be running code from the same node at the same time, or a node has a sort of automatic lock that prevents two execution lines to be inside in parallel?

Regards

Node-red runs on node.js which is single threaded. It has an event loop which you can read about here

These copy+paste quotes do some justice to briefly explain what's going on...

The way it operates and queues the event loop via callbacks can give the illusion of multi-threading but at it's core only one bit of the code written by the developer is being executed at a time.

And...

The programming model in Node.js is a single threaded event loop with access to asynchronous operations that use native code to implement asynchronous behavior for some operations (disk I/O, networking, timers, some crypto operations, etc...)

It means for my specific case it would be enough by using the global var without having to worry about the critical section based on the code I use to guarantee that only one execution line is in control:

if (!context.global.workInProgress) {
    context.global.workInProgress = true;
    msg.canContinue = true;
}
else {
    msg.canContinue = false;
}

Considering I have no timers, callbacks, etc. in there, that should be enough. I'll thing about what to do because even though right now node.js is single threaded, this might change in the future.

Well, now that I think about it, this might not be true if the execution of the code of a node with no waits such as this one wasn't completed in one go.

Regards

In a function node if you return null or just return then no message is sent, so if you do that you may not need the switch node to discard the message.

That's good to know, thx.

Regards

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.