Atomic processing of multiple nodes

The following flow tries to read data from a file, compute an update, and store the update in the file read before.


How to arrange this flow, so that multiple clients do not interfere with each other?

Can the file be updated by anything else?

If not, don't read it each time. If it isn't too large, read it into memory when you start. That way, updates will be serial anyway.

If it is too large, use a database.

The file cannot be updated by anything else, than a function node. It's latest version has to be persisted on disk after update, because it is read from other flows.

Pre-loading the file into memory, updating the in-memory-copy might be an option here. But there would be two nodes as well. A function node to compute the next version and a file-out node. When multiple input messages are entering the function node, is there a guarantee, that the message order of the file-out node is the same?

I can't see how database nodes would help here. When using two (four?) database nodes (like node-red-node-mysql for example), is it possible to couple them with a database transaction? Could you please elaborate your suggestion here a little bit?

If your looking to scale - complicated
take a look at and to interact with it try

If you only have a few users - easy-er-ish
For every client the incoming msg, is tied to the client via the _msgid

So you would need something that looks like this.

but every client that tries will be timed out that tries to goto /update until the current user is done.
you would need to get crafty to handle that issue with setting flow context if its busy and responding to user to try again. Or setting up a queue to handle it.

no easy fix for what you want todo

node-red + multiple clients ..... many times its quicker and simpler to find another method. Care to explain some of the voodo your doing in the function node Compute .... someone may know of a simpler solution out there that handles your needs in a multi user environment.

The quick solution is to use something like node-red-contrib-queue-gate (node) - Node-RED - with that you can create a section of your flow that only has one active message in it at a time.

That doesn't matter. The only thing that matters is that it is small enough to be kept in memory and that it isn't updated elsewhere.

Load the file into memory, process it in memory but only have 1 flow that writes it to disk, use link nodes if you need to update from more than 1 flow. That way, you keep only a single write process & the read process only ever happens once. On the write file flow, you may need to control the number of messages per second that flow through but that is easy enough.

A database is designed to handle multiple simultaneous writes from different processes. The db engine would take care of it for you.

But here, I really don't think you need or want that complexity. You can do it with simple file read/write flows as indicated.

In addition to @TotallyInformation's suggestion, which is certainly the best way to solve the problem, are you sure that you need a file at all? Is it just used within node-red or is it needed elsewhere?

If you are just using it for a persistent data store in node-red then you might well be better using the persistent context feature built into node-red. Working with context : Node-RED

1 Like

I couldn't figure out how to use node-red-contrib-queue-gate without shifting the synchronization issue to the new/control nodes, as this node does not allow to modify its internal state, when an ordinary message passes.

Thanks to your suggestion, I've written a little function node, to do the synchronization for me. Please see an example flow:

[{"id":"60d366ee.380678","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"ec568282.397b6","type":"function","z":"60d366ee.380678","name":"","func":"var queue = context.get('queue');\nif (msg.topic == 'control') {\n    switch (msg.payload) {\n        case 'open':\n            oldMsg = queue.messages.shift();\n            if (oldMsg === undefined) {\n                queue.state = 'open';\n                node.status({text: `Open (${queue.messages.length})`});\n            }\n            else {\n                node.status({text: `Closed (${queue.messages.length})`});\n                node.send(oldMsg);\n            }\n            break;\n        case 'close':\n            queue.state = 'close';\n            node.status({text: `Closed (${queue.messages.length})`});\n            break;\n    }\n}\nelse {\n    switch (queue.state) {\n        case 'open':\n            queue.state = 'closed';\n            node.status({text: `Closed (${queue.messages.length})`});\n            return msg;\n        case 'closed':\n            queue.messages.push(msg);\n            node.status({text: `Closed (${queue.messages.length})`});\n            break;\n    }\n}","outputs":1,"noerr":0,"initialize":"var queue = context.get('queue');\nif (queue === undefined) {\n    queue = { messages: [], state: 'open' };\n    context.set('queue', queue);\n}\nnode.status({text: `Open (${queue.messages.length})`});","finalize":"","libs":[],"x":520,"y":160,"wires":[["bbcde59a.d57db8"]],"icon":"node-red/serial.svg"},{"id":"63b9f3df.14e99c","type":"debug","z":"60d366ee.380678","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":550,"y":220,"wires":[]},{"id":"12341d3c.a422f3","type":"inject","z":"60d366ee.380678","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"data","payload":"","payloadType":"date","x":320,"y":160,"wires":[["ec568282.397b6"]]},{"id":"64b1662b.14aea8","type":"change","z":"60d366ee.380678","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"open","tot":"str"},{"t":"set","p":"topic","pt":"msg","to":"control","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":440,"y":280,"wires":[["ec568282.397b6"]]},{"id":"bbcde59a.d57db8","type":"delay","z":"60d366ee.380678","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":360,"y":220,"wires":[["63b9f3df.14e99c","64b1662b.14aea8"]]}]

I would like to see something in core Node-RED, that can be used to synchronize flows, without using control messages (i.e.: a serial property of a subflow/node group box).

I think, that I am understanding your point here. Using a database to synchronize message flows, might be a possible solution. But I can't see, how to combine the transaction construct of a database with Node-RED messages and the various existing database nodes, practically. Maybe I'm slow on the uptake here. Could you please share an example more complex than reading and incrementing a database sequence here?

Can you give us an example of the sort of changes you need to make?

A little check box named Synchronize (like java's synchronize keyword) for the node group box would be sufficient here.

Are you perhaps looking for something like node-red-contrib-semaphore?

I'm not sure that you did get the point of that I'm afraid. It isn't about synchronising message flows, it is about handling multiple or high-speed transactions. There are lots of ways to do that with a managed database system that are hard to do with just a simple file.

But a moot point since it would appear that a slightly revised flow is all you really need.

I've spent my time trying to figure one of the lots of ways to do that with a managed database system and failed. Would have been great if you've shared on of them.

The following is a revised flow containing the same synchronization problem using node-red-node-mysql instead of simple file i/o. I've fleshed it out a little bit, so it can be used to demonstrate the problem and perhaps modified to provide a useful solution, without modifiying the "Compute" node of course.

[{"id":"22f61a0.68a7de6","type":"tab","label":"Flow 8","disabled":false,"info":""},{"id":"ce7cfd83.83a6a","type":"group","z":"22f61a0.68a7de6","name":"Serialize me","style":{"label":true,"fill":"#ffefbf","stroke":"#ff0000"},"nodes":["9c9b166e.92ceb8","c3b2ab22.e72088","350560d5.24df1","75742f08.11cb1","a666d36c.e7b7"],"x":474,"y":39,"w":392,"h":322},{"id":"186e518c.917bbe","type":"mysql","z":"22f61a0.68a7de6","mydb":"57bf72a.833518c","name":"","x":350,"y":420,"wires":[["2566dfc.75ea82"]]},{"id":"853ea53f.79f168","type":"inject","z":"22f61a0.68a7de6","name":"CREATE","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"CREATE TABLE data (id  MEDIUMINT NOT NULL AUTO_INCREMENT, data CHAR(64) NOT NULL, PRIMARY KEY (id))","payload":"","payloadType":"date","x":140,"y":420,"wires":[["186e518c.917bbe"]]},{"id":"d8cc4c8b.61211","type":"inject","z":"22f61a0.68a7de6","name":"INSERT","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"INSERT INTO data VALUES (NULL, 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855')","payload":"","payloadType":"date","x":140,"y":460,"wires":[["186e518c.917bbe"]]},{"id":"27267ef6.824c92","type":"inject","z":"22f61a0.68a7de6","name":"DROP","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"DROP TABLE data;","payload":"","payloadType":"date","x":150,"y":540,"wires":[["186e518c.917bbe"]]},{"id":"2566dfc.75ea82","type":"debug","z":"22f61a0.68a7de6","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":410,"y":480,"wires":[]},{"id":"1277b86e.fcfbf8","type":"inject","z":"22f61a0.68a7de6","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,2,3,4,5]","payloadType":"json","x":140,"y":120,"wires":[["f5668155.eb6c2"]]},{"id":"a666d36c.e7b7","type":"change","z":"22f61a0.68a7de6","g":"ce7cfd83.83a6a","name":"","rules":[{"t":"set","p":"topic","pt":"msg","to":"SELECT id, data FROM data ORDER BY id DESC LIMIT 1","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":570,"y":80,"wires":[["9c9b166e.92ceb8"]]},{"id":"f9f0149b.c85b58","type":"inject","z":"22f61a0.68a7de6","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1]","payloadType":"json","x":130,"y":80,"wires":[["a666d36c.e7b7"]]},{"id":"f5668155.eb6c2","type":"split","z":"22f61a0.68a7de6","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":330,"y":120,"wires":[["a666d36c.e7b7"]]},{"id":"b47e2172.1ce77","type":"debug","z":"22f61a0.68a7de6","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":810,"y":420,"wires":[]},{"id":"765ebe6f.161f1","type":"inject","z":"22f61a0.68a7de6","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,2,3,4,5,6]","payloadType":"json","x":150,"y":160,"wires":[["f5668155.eb6c2"]]},{"id":"6a25e150.a018a","type":"catch","z":"22f61a0.68a7de6","name":"","scope":null,"uncaught":false,"x":600,"y":420,"wires":[["b47e2172.1ce77"]]},{"id":"65f00af3.b0c7c4","type":"inject","z":"22f61a0.68a7de6","name":"SELECT","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"SELECT id, data FROM data ORDER BY ID DESC LIMIT 1","payload":"","payloadType":"date","x":140,"y":500,"wires":[["186e518c.917bbe"]]},{"id":"9c9b166e.92ceb8","type":"mysql","z":"22f61a0.68a7de6","g":"ce7cfd83.83a6a","mydb":"57bf72a.833518c","name":"Read","x":630,"y":140,"wires":[["c3b2ab22.e72088"]]},{"id":"c3b2ab22.e72088","type":"function","z":"22f61a0.68a7de6","g":"ce7cfd83.83a6a","name":"Compute","func":"/* Do not modify this! */\n\nif (msg.payload[0].id == 6)\n    throw new Error('Oops!');\n\nlet hash = crypto.createHash('sha256');\nlet value = hash.update(msg.payload[0].data).digest('hex');\nconsole.log(`${msg.payload[0].data}: ${value}`);\nmsg.payload = [ value ];\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[{"var":"crypto","module":"crypto"}],"x":700,"y":200,"wires":[["350560d5.24df1"]]},{"id":"350560d5.24df1","type":"change","z":"22f61a0.68a7de6","g":"ce7cfd83.83a6a","name":"","rules":[{"t":"set","p":"topic","pt":"msg","to":"INSERT INTO `data` (`id`, `data`) VALUES (NULL, ?) RETURNING `id`, `data`;","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":750,"y":260,"wires":[["75742f08.11cb1"]]},{"id":"75742f08.11cb1","type":"mysql","z":"22f61a0.68a7de6","g":"ce7cfd83.83a6a","mydb":"57bf72a.833518c","name":"Write","x":790,"y":320,"wires":[["b47e2172.1ce77"]]},{"id":"57bf72a.833518c","type":"MySQLdatabase","name":"nodejs","host":"mariadb","port":"3306","db":"nodejs","tz":"GMT","charset":"UTF8"}]

How the game is played

  • Adjust the DB nodes to your database (current settings: { Host: mariadb, Database: nodejs, User: nodejs: Password: password })
  • Inject CREATE to create the table
  • Inject INSERT to create first row of data
  • Inject 5 times [1] and see that data of the last record has the value bdca9e8dbca354e824e67bfe1533fa4a238b9ea832f23fb4271ebeb3a5a8f720
  • Inject DROP, CREATE, and INSERT to start again
  • Inject [1,2,3,4,5] and see that the result differs: cd372fb85148700fa88095e3492d3f9f5beb43e555e5ff26d95f5a6adc36f8e6

For me this problem isn't solveable within Node-RED. It seems to be a conceptual flaw to force the users to split their code/actions into multiple nodes, without providing a mechanism to joining them back into a logical transaction. One of Node.js' strengths is lost here. I guess, that I'm going to abandon Node-RED, until a more mature version is available. Please prove me wrong and provide an implementation that solves the synchronization issue.

Hi @augjoh

there are lots of ways to achieve a guarded flow that only allows one message in it at a time.

I had previously suggested the queue-gate node - but looking at it in more detail I think @Colin's suggestion of gives you exactly what you want.

You add a semphore-take node at the start of the section you want to synchronise, and a semaphore-leave node at the end. The take node will queue up messages whilst something has hold of the semaphore lock and release the next message when the lock is released.

The added benefit of this set of nodes is you can share the semaphore between otherwise unconnected sections of your flow.

Feel free to abandon Node-RED if you really don't want to use it. Just know we provided solutions to your question when you first asked.

[{"id":"2e11283341b3c7b0","type":"inject","z":"ea56e4a5dec3dddb","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[0,1,2,3,4]","payloadType":"json","x":260,"y":40,"wires":[["b09bec848898d77e"]]},{"id":"bfc5ce1bdb514788","type":"semaphore-take","z":"ea56e4a5dec3dddb","config":"4e95ce0d3b4a1924","name":"","x":580,"y":80,"wires":[["b6712a699ec58a04"]]},{"id":"b09bec848898d77e","type":"split","z":"ea56e4a5dec3dddb","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":410,"y":60,"wires":[["bfc5ce1bdb514788"]]},{"id":"b6712a699ec58a04","type":"function","z":"ea56e4a5dec3dddb","name":"","func":"setTimeout(function() {\n    node.send(msg);\n}, Math.random()*2000)\n","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":760,"y":100,"wires":[["6bff95738dc55c8b"]]},{"id":"6bff95738dc55c8b","type":"semaphore-leave","z":"ea56e4a5dec3dddb","config":"4e95ce0d3b4a1924","name":"","x":950,"y":120,"wires":[["991cde3fd75b52ce"]]},{"id":"991cde3fd75b52ce","type":"debug","z":"ea56e4a5dec3dddb","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1150,"y":140,"wires":[]},{"id":"4e95ce0d3b4a1924","type":"semaphore-config","name":"guard","capacity":"1"}]

All of that said, I do think there is something interesting to look at regarding flow control options within groups. Whether as a built-in feature or something they could be augmented with via plugin.

It would indeed be great if I had the time to respond to everyone's complex queries. Sadly I have a paid day-job and a family and other things as well.

Honestly, I'm not sure you've really told us what the actual issue is. As we don't seem to know what you are trying to compute, we don't know what the timing issue might or might not be. Without that information, we can give you lots of general information on how to solve general problems but might still miss what you are trying to achieve.

As Node-RED is a general-purpose compute platform, it is very unlikely that it is unable to solve any specific general compute problem with the possible exception of some that require streaming interfaces.

Really? In what way is Node.js better at discrete transaction processing than Node-RED?

Please first provide people with the actual issue. Exactly what do you need to synchronise and why? The example above doesn't explain what you are actually trying to compute and record.

In general, if I wanted to maintain discrete data by user, all I probably need to do is ensure that I have an appropriate key so that I can record the data against that key. I do that all the time for multiple device data rather than users but the principle is exactly the same. No fancy processing is needed for that. So what is different for your processing?

Thanks for your time and valuable input here (and all the other questions). I didn't want to be rude or offend anyone.

Node.js is kind of single threaded, and this makes it easy, because no synchronization is needed, normally. Node-RED is more complex here, i.e. you have to take care of the order how nodes passes multiple nodes.

Sure, I'm trying to create a flow that implements RFC6962. It records x509 certificates in a strictly ordered way. It requires new entries rely on the values for older entries, like in the second example I've provided.

The SQL-DB's natural way of synchronization this would use SELECT [...] FOR UPDATE. This isn't available when using a database node.

A lot of node.js based systems use msg handling and asynchronous processing to get round the single thread issue since while it is single-threaded, it is also inherently looped. Oddly reminiscent of working on the old IBM mainframes I started my IT career on. :slight_smile:

So the same problems and solutions apply.

Ah, OK, now we are getting somewhere. I will need to remind myself about that RFC - not done much x509 work recently.

When you say "records" though, exactly what do you mean? Are you trying to track certificate issuance or record client TLS access for example? This makes a difference to how we would think about the process and how linear it needs to be.

This is why an understanding of data size is necessary. If the data fits comfortably into memory, you may not need anything complex since you can simply use a JavaScript object with a suitable key or even an array depending on the questions above.

@TotallyInformation @augjoh whilst its good to understand all the details of the requirement, can I make sure the fact we've provided an actual solution for the question asked using the semaphore nodes is not overlooked?

I only say this as @augjoh hasn't responded to my reply where the solution was provided.

1 Like

Thanks to @Colin to bring up the suggestion using node-red-contrib-semaphore and to @knolleary for pointing to that. This node does the right thing, but eventually lead to deadlocks :frowning: .

In one of my flows I duplicate my message and do multiple things in parallel, please see the following example. Not all messages reach the debug node at the end.


[{"id":"f542fe9c.732af","type":"tab","label":"Flow 11","disabled":false,"info":""},{"id":"ce1be0dc.d9c39","type":"inject","z":"f542fe9c.732af","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":170,"y":100,"wires":[["80749176.dab0c","66effb54.4725e4"]]},{"id":"80749176.dab0c","type":"function","z":"f542fe9c.732af","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":460,"y":100,"wires":[["ae192530.d5a898"]]},{"id":"269f16d2.b9e1ba","type":"function","z":"f542fe9c.732af","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":460,"y":300,"wires":[["ae192530.d5a898"]]},{"id":"ba539239.b63a3","type":"switch","z":"f542fe9c.732af","name":"50 %","property":"payload","propertyType":"msg","rules":[{"t":"gt","v":"0.5","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":370,"y":240,"wires":[["269f16d2.b9e1ba"]]},{"id":"66effb54.4725e4","type":"change","z":"f542fe9c.732af","name":"Random","rules":[{"t":"set","p":"payload","pt":"msg","to":"$random()","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":320,"y":180,"wires":[["ba539239.b63a3"]]},{"id":"ae192530.d5a898","type":"debug","z":"f542fe9c.732af","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":690,"y":200,"wires":[]}]

How can I determine that all (that is sometimes 1 and sometimes 2) messages have reached the debug node in the end?