Atomic processing of multiple nodes

Is that by design?

If you have an else/otherwise clause you could set some flag in the "missing message" and send it anyway - at the end of the flow, you can then check this flag and discard the message.

Note: I am assuming I understood your requirement. If I'm way off, please add a bit more detail to your question.

In this simplified example this is possible, but the real flow is too complex to flag all messages and introduce alternative paths for empty accounting messages. Wouldn't that impose extra work?

However you do locking there must be defined entry and exit points to lock and unlock the code. You started by suggesting using the group feature for locking and this would suffer from exactly the same problem.
However complex the flow it should not be difficult to accomplish with the semaphore node, you just have to make sure that you don't have anywhere in the flow where the flow terminates without releasing the semaphore. Is there a particular situation where this is not possible?

Dear @Colin, you see the difficulties here, you're placing on an ordinary user :slightly_smiling_face:. I want to use a low-code platform, because I don't want to take care of all this implementation details. Checking a box to get everything synchronized seems to be doable for me, though. I suppose, that the node-red code, does have a single place, where messages enter/exit nodes or groups. It would be much more easy to place a WeakSet here and take care of passing messages.

If you want to enact a complex process, a low-code solution will not help you. Because you still have to represent the logic which itself is complex. No computer system can reliably do that, and certainly not one that you do not contribute to financially.

My advice would be to hire someone to create the logic for you, wrap it into a custom node if needed and then you will be good to go.

The Semaphore node is a no code solution. Just add them around the section you want to protect. What is the problem?

The latest version (2.x)of the delay node may also help. You can configure it to rate limit mode. With the time interval set to some long timeout interval ( in case your process never completes). Then when your process does complete send just a msg.flush set to 1 back to the delay node to release the next message.

@Colin : I've tried to describe the problem with my last example flow above. The solution with a semaphore node needs a start and one endpoint inside the flow. Within my flows it is not possible to determine the endpoint reliable. Sometimes it isn't reached and the semaphore stays locked forever.

@dceejay : The delay node, seems to provide the same functionality as the semaphore node. Without the ability to specify an exit node, it is not possible to call the flush node.

Is that due to an error? You can use a catch node to clear the semaphore.

Alternatively, you could set away a trigger node to clear the semaphore (reset the trigger if your msg reaches the end)

No it doesn't, you can have as many leave nodes as you like, provided one and only one of them is triggered. Though personally I wouldn't do that, if I have multiple exit points I would just link them all the same leave node.

That is an unsolvable problem. Any sort of locking requires that you know when to unlock it. Your original suggestion was to allow locking by putting the nodes in a group. In that context how would the group know when to let the next message in if no message flows out?
If you know that messages get lost sometimes then perhaps you can use a Catch node as @Steve-Mcl suggests, or add a timeout function (using a Trigger node) such that if nothing happens within a particular time then the timeout triggers the unlock.

The delay node must be configured with a timeout, so in the worst case that event could be used to flush/trigger the next event.

@Colin The conditions to lock/unlock are quite clear. Take the following example:

  • Lock: When the first message is entering a node within the box, or the first message emerges from a node inside the box.
  • Unlock: When no messages are processed by nodes inside the box.

Can you export that demo flow (CTRL+E) and paste it in a reply

Please use the </> code button and paste flow between the backticks.

I will take a look at it and propose a solution to your requirement for atomic processing.

I have just realised what you have been trying to say. You want the system to keep track of whether there are still any messages active inside the box. I suppose that could potentially be done if all the nodes implemented the Complete action. Currently not all nodes do that, as it was only recently introduced.

@colin: As I've said previously, perhaps it is possible to place all relevant messages in a WeakSet and do not rely on a potentially missing complete action.

@Steve-Mcl Here's the sample flow:

[{"id":"f542fe9c.732af","type":"tab","label":"Flow 11","disabled":false,"info":""},{"id":"819633a4b4b672e7","type":"group","z":"f542fe9c.732af","name":"Serialize me","style":{"stroke":"#ff0000","fill":"#ffffbf","label":true},"nodes":["80749176.dab0c","269f16d2.b9e1ba","ba539239.b63a3","66effb54.4725e4","b2a178e7ae426431"],"x":234,"y":59,"w":312,"h":322},{"id":"ce1be0dc.d9c39","type":"inject","z":"f542fe9c.732af","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":120,"y":100,"wires":[["80749176.dab0c","66effb54.4725e4"]]},{"id":"80749176.dab0c","type":"function","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":460,"y":100,"wires":[["ae192530.d5a898"]]},{"id":"269f16d2.b9e1ba","type":"function","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":460,"y":340,"wires":[["ae192530.d5a898"]]},{"id":"ba539239.b63a3","type":"switch","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"50 %","property":"payload","propertyType":"msg","rules":[{"t":"gt","v":"0.5","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":370,"y":280,"wires":[["269f16d2.b9e1ba"]]},{"id":"66effb54.4725e4","type":"change","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"Random","rules":[{"t":"set","p":"payload","pt":"msg","to":"$random()","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":320,"y":220,"wires":[["ba539239.b63a3"]]},{"id":"ae192530.d5a898","type":"debug","z":"f542fe9c.732af","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":690,"y":200,"wires":[]},{"id":"b2a178e7ae426431","type":"inject","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"3600","crontab":"","once":false,"onceDelay":0.1,"topic":"","payloadType":"date","x":390,"y":160,"wires":[["80749176.dab0c"]]}]

This works...

[{"id":"ce1be0dc.d9c39","type":"inject","z":"f542fe9c.732af","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":380,"y":180,"wires":[["272aca98d76d7bf4","40acda1f8ef2ae5c"]]},{"id":"ae192530.d5a898","type":"debug","z":"f542fe9c.732af","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1650,"y":340,"wires":[]},{"id":"1657046985ddb193","type":"status","z":"f542fe9c.732af","name":"","scope":["272aca98d76d7bf4","151fdc9a359343a0"],"x":580,"y":500,"wires":[["883c18f1881ff21d"]]},{"id":"75552836b99d60e6","type":"debug","z":"f542fe9c.732af","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":1210,"y":500,"wires":[]},{"id":"883c18f1881ff21d","type":"switch","z":"f542fe9c.732af","name":"semaphore active?","property":"status.text","propertyType":"msg","rules":[{"t":"nempty"},{"t":"else"}],"checkall":"true","repair":false,"outputs":2,"x":750,"y":500,"wires":[["04f984463b6b12fd"],["9d4969d173d43939"]]},{"id":"04f984463b6b12fd","type":"trigger","z":"f542fe9c.732af","name":"","op1":"","op2":"true","op1type":"nul","op2type":"bool","duration":"10","extend":true,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":990,"y":520,"wires":[["e9ac5f7c48456ed4"]]},{"id":"9d4969d173d43939","type":"change","z":"f542fe9c.732af","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":780,"y":560,"wires":[["04f984463b6b12fd"]]},{"id":"214564d761b2015c","type":"catch","z":"f542fe9c.732af","name":"","scope":["80749176.dab0c","269f16d2.b9e1ba","ba539239.b63a3","66effb54.4725e4"],"uncaught":false,"x":570,"y":440,"wires":[["e9ac5f7c48456ed4"]]},{"id":"e9ac5f7c48456ed4","type":"semaphore-leave","z":"f542fe9c.732af","config":"78bba808ba5995c5","name":"","x":1190,"y":440,"wires":[["75552836b99d60e6"]]},{"id":"151fdc9a359343a0","type":"semaphore-leave","z":"f542fe9c.732af","config":"78bba808ba5995c5","name":"","x":1450,"y":340,"wires":[["ae192530.d5a898"]]},{"id":"819633a4b4b672e7","type":"group","z":"f542fe9c.732af","name":"Serialize me","style":{"stroke":"#ff0000","fill":"#ffffbf","label":true},"nodes":["80749176.dab0c","269f16d2.b9e1ba","ba539239.b63a3","66effb54.4725e4","b2a178e7ae426431","272aca98d76d7bf4","40acda1f8ef2ae5c","1aa613a64937c127","304ed7f874953707"],"x":494,"y":139,"w":792,"h":242},{"id":"80749176.dab0c","type":"function","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1200,"y":180,"wires":[["151fdc9a359343a0"]]},{"id":"269f16d2.b9e1ba","type":"function","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","func":"if (msg.payload > 0.8) {\n    throw new Error(\"Number too high\")\n}\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1200,"y":300,"wires":[["151fdc9a359343a0"]]},{"id":"ba539239.b63a3","type":"switch","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"50 %","property":"payload","propertyType":"msg","rules":[{"t":"gt","v":"0.5","vt":"num"},{"t":"else"}],"checkall":"true","repair":false,"outputs":2,"x":1050,"y":340,"wires":[["269f16d2.b9e1ba"],["151fdc9a359343a0"]]},{"id":"66effb54.4725e4","type":"change","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"Random","rules":[{"t":"set","p":"payload","pt":"msg","to":"$random()","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":900,"y":300,"wires":[["ba539239.b63a3"]]},{"id":"b2a178e7ae426431","type":"inject","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"3600","crontab":"","once":false,"onceDelay":0.1,"topic":"","payloadType":"date","x":610,"y":220,"wires":[["272aca98d76d7bf4"]]},{"id":"272aca98d76d7bf4","type":"semaphore-take","z":"f542fe9c.732af","g":"819633a4b4b672e7","config":"78bba808ba5995c5","name":"","x":820,"y":180,"wires":[["1aa613a64937c127"]]},{"id":"40acda1f8ef2ae5c","type":"semaphore-take","z":"f542fe9c.732af","g":"819633a4b4b672e7","config":"78bba808ba5995c5","name":"","x":600,"y":300,"wires":[["304ed7f874953707"]]},{"id":"1aa613a64937c127","type":"delay","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","pauseType":"delay","timeout":"2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":980,"y":180,"wires":[["80749176.dab0c"]]},{"id":"304ed7f874953707","type":"delay","z":"f542fe9c.732af","g":"819633a4b4b672e7","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"x":760,"y":300,"wires":[["66effb54.4725e4"]]},{"id":"78bba808ba5995c5","type":"semaphore-config","name":"MAX-2","capacity":"2"}]

however i am not 100% happy with it. Gonna take another look later.

Thanks for posting this flow, got some error when running nrlint:

TypeError: Cannot read property 'insert' of undefined
    at Object.node (/home/augjoh/.node-red/projects/atomic/node_modules/nrlint/lib/rules/no-overlapping-nodes.js:183:39)
    at /home/augjoh/.node-red/projects/atomic/node_modules/nrlint/lib/linter.js:119:35
    at Array.forEach (<anonymous>)
    at dispatchVisitors (/home/augjoh/.node-red/projects/atomic/node_modules/nrlint/lib/linter.js:115:26)
    at /home/augjoh/.node-red/projects/atomic/node_modules/nrlint/lib/linter.js:158:17
    at NRNode.walk (/home/augjoh/.node-red/projects/atomic/node_modules/@node-red/flow-parser/lib/NRObject.js:65:13)
    at /home/augjoh/.node-red/projects/atomic/node_modules/@node-red/flow-parser/lib/NRContainer.js:37:35
    at Map.forEach (<anonymous>)
    at NRFlow.walkContents (/home/augjoh/.node-red/projects/atomic/node_modules/@node-red/flow-parser/lib/NRContainer.js:37:20)
    at NRFlow.walk (/home/augjoh/.node-red/projects/atomic/node_modules/@node-red/flow-parser/lib/NRFlow.js:88:15)

At a guess, you have found a bug in nrlint. You should raise an issue on the repository.

@Steve-Mcl If your flow triggers a bug in nrlint, I think you should file the issue, because you're the only one who can provide more details here.

@augjoh I evaluated all the info in the thread and "guessed" it might be a bug. I dont have nrlink installed and I dont have the information necessary to file an issue. You on the other hand (who started this thread) do have and it is you seeing this issue.

It is pretty simple to raise an issue - just go to the issues page and fill out the details.

2 Likes